Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
471 changes: 247 additions & 224 deletions src/adapter/src/active_compute_sink.rs

Large diffs are not rendered by default.

286 changes: 150 additions & 136 deletions src/adapter/src/coord/peek.rs

Large diffs are not rendered by default.

84 changes: 83 additions & 1 deletion src/adapter/src/frontend_peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use mz_ore::task::JoinHandle;
use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
use mz_repr::role_id::RoleId;
use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp};
use mz_repr::{Datum, GlobalId, IntoRowIterator, RelationDesc, Timestamp};
use mz_sql::ast::Raw;
use mz_sql::catalog::CatalogCluster;
use mz_sql::plan::Params;
Expand Down Expand Up @@ -1123,11 +1123,26 @@ impl PeekClient {
let local_mir_plan =
global_mir_plan.resolve(Antichain::from_elem(as_of));

if let Some(up_to) = plan.up_to
&& let Some((fast_path, _id, sink, df_meta)) =
local_mir_plan.try_create_fast_path_plan(view_id)?
{
return Ok(Execution::SubscribeFastPath {
subscribe_plan: plan,
fast_path_plan: fast_path,
typ: sink.from_desc,
up_to,
df_meta,
optimization_finished_at: now(),
});
};

let global_lir_plan =
optimizer.catch_unwind_optimize(local_mir_plan)?;
let optimization_finished_at = now();

let (df_desc, df_meta) = global_lir_plan.unapply();

Ok(Execution::Subscribe {
subscribe_plan: plan,
df_desc,
Expand Down Expand Up @@ -1417,6 +1432,62 @@ impl PeekClient {
.await?;
Ok(Some(response))
}
Execution::SubscribeFastPath {
subscribe_plan,
fast_path_plan,
typ,
up_to,
df_meta,
optimization_finished_at: _finished_at,
} => {
let row_set_finishing_seconds =
session.metrics().row_set_finishing_seconds().clone();

let peek_stash_read_batch_size_bytes =
mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
.get(catalog.system_config().dyncfgs());
let peek_stash_read_memory_budget_bytes =
mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
.get(catalog.system_config().dyncfgs());
let max_result_size = catalog.system_config().max_result_size();

coord::sequencer::emit_optimizer_notices(
&*catalog,
session,
&df_meta.optimizer_notices,
);

let watch_set = statement_logging_id.map(|logging_id| {
WatchSetCreation::new(
logging_id,
catalog.state(),
&input_id_bundle,
determination.timestamp_context.timestamp_or_default(),
)
});

let response = self
.implement_fast_path_subscribe_plan(
subscribe_plan,
fast_path_plan,
determination.timestamp_context.timestamp_or_default(),
up_to,
target_cluster_id,
target_replica,
typ.typ().clone(),
max_result_size,
max_query_result_size,
row_set_finishing_seconds,
read_holds,
peek_stash_read_batch_size_bytes,
peek_stash_read_memory_budget_bytes,
session.conn_id().clone(),
source_ids,
watch_set,
)
.await?;
Ok(Some(response))
}
Execution::CopyToS3 {
global_lir_plan,
source_ids,
Expand Down Expand Up @@ -1651,6 +1722,9 @@ impl PeekClient {
"Subscribe",
)
}
Execution::SubscribeFastPath { .. } => {
return;
}
};

// Assert that we have some read holds for all the imports of the dataflow.
Expand Down Expand Up @@ -1728,6 +1802,14 @@ enum Execution {
df_meta: DataflowMetainfo,
optimization_finished_at: EpochMillis,
},
SubscribeFastPath {
subscribe_plan: SubscribePlan,
fast_path_plan: FastPathPlan,
typ: RelationDesc,
up_to: Timestamp,
df_meta: DataflowMetainfo,
optimization_finished_at: EpochMillis,
},
CopyToS3 {
global_lir_plan: optimize::copy_to::GlobalLirPlan,
source_ids: BTreeSet<GlobalId>,
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/optimize/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
// `create_fast_path_plan` later again, because, e.g., running `LiteralConstraints` is still
// ahead of us.)
let use_fast_path_optimizer = match create_fast_path_plan(
&mut df_desc,
&df_desc,
self.select_id,
Some(&self.finishing),
self.config.features.persist_fast_path_limit,
Expand Down Expand Up @@ -384,7 +384,7 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
// .expect("GlobalMirPlan type");

let peek_plan = match create_fast_path_plan(
&mut df_desc,
&df_desc,
self.select_id,
Some(&self.finishing),
self.config.features.persist_fast_path_limit,
Expand Down
36 changes: 26 additions & 10 deletions src/adapter/src/optimize/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,21 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};

use crate::CollectionIdBundle;
use crate::coord::peek::{FastPathPlan, create_fast_path_plan};
use crate::optimize::dataflows::{
ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
dataflow_import_id_bundle,
};
use crate::optimize::{
LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
};
use differential_dataflow::lattice::Lattice;
use mz_compute_types::ComputeInstanceId;
use mz_compute_types::plan::Plan;
use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection};
use mz_ore::collections::CollectionExt;
use mz_ore::soft_assert_or_log;
use mz_repr::{GlobalId, Timestamp};
use mz_sql::optimizer_metrics::OptimizerMetrics;
Expand All @@ -27,16 +38,6 @@ use mz_transform::normalize_lets::normalize_lets;
use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
use timely::progress::Antichain;

use crate::CollectionIdBundle;
use crate::optimize::dataflows::{
ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
dataflow_import_id_bundle,
};
use crate::optimize::{
LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
};

pub struct Optimizer {
/// A representation typechecking context to use throughout the optimizer pipeline.
typecheck_ctx: SharedTypecheckingContext,
Expand Down Expand Up @@ -133,6 +134,21 @@ impl<T: Clone> GlobalMirPlan<T> {
pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle {
dataflow_import_id_bundle(&self.df_desc, compute_instance_id)
}

/// Attempt to extract a fast-path plan from the optimized subscribe.
pub fn try_create_fast_path_plan(
&self,
id: GlobalId,
) -> Result<Option<(FastPathPlan, GlobalId, ComputeSinkDesc, DataflowMetainfo)>, OptimizerError>
{
// Use the standard peek logic for the fast path plan, disabling the persist fast path
// and the finishing since those aren't applicable for subscribes.
let Some(plan) = create_fast_path_plan(&self.df_desc, id, None, 0, false)? else {
return Ok(None);
};
let (id, sink) = self.df_desc.sink_exports.iter().into_element();
Ok(Some((plan, *id, sink.clone(), self.df_meta.clone())))
}
}

/// The (final) result after MIR ⇒ LIR lowering and optimizing the resulting
Expand Down
Loading
Loading