From ef985361c1ee1d73a17511eb2524f74bd99e5054 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Tue, 17 Feb 2026 17:12:14 -0500 Subject: [PATCH 1/6] Add an upper bound to peeks --- src/adapter/src/coord/peek.rs | 51 ++++---- src/compute-client/src/controller.rs | 6 +- src/compute-client/src/controller/instance.rs | 8 +- .../src/controller/instance_client.rs | 6 +- src/compute-client/src/protocol/command.rs | 86 +++++++++++- src/compute-client/src/protocol/response.rs | 10 +- src/compute-client/src/service.rs | 3 +- src/compute/src/compute_state.rs | 123 ++++++++++-------- .../src/compute_state/peek_result_iterator.rs | 85 ++++++------ src/compute/src/compute_state/peek_stash.rs | 94 ++++++------- src/expr/src/row/collection.rs | 27 ++++ src/persist-client/src/lib.rs | 4 +- src/persist-client/src/read.rs | 9 +- src/repr/src/update.rs | 25 +++- src/sql/src/plan.rs | 2 - 15 files changed, 341 insertions(+), 198 deletions(-) diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index 757a872709994..ab215d266352c 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -19,7 +19,6 @@ use std::ops::Deref; use std::sync::Arc; use differential_dataflow::consolidation::consolidate; -use itertools::Itertools; use mz_adapter_types::compaction::CompactionWindow; use mz_adapter_types::connection::ConnectionId; use mz_cluster_client::ReplicaId; @@ -45,7 +44,7 @@ use mz_repr::explain::text::DisplayText; use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes}; use mz_repr::{ Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType, - preserves_order, + UpdateCollection, preserves_order, }; use mz_storage_types::sources::SourceData; use serde::{Deserialize, Serialize}; @@ -932,7 +931,17 @@ impl crate::coord::Coordinator { match rows { PeekResponse::Rows(rows) => { - let rows = RowCollection::merge_sorted(&rows, &finishing.order_by); + let rows: Result, _> = rows + .into_iter() + .map(RowCollection::from_updates) + .collect(); + let rows = match rows { + Ok(ref rows) => RowCollection::merge_sorted(rows, &finishing.order_by), + Err(e) => { + yield PeekResponseUnary::Error(e.to_string()); + return; + } + }; match finishing.finish( rows, max_result_size, @@ -1012,25 +1021,18 @@ impl crate::coord::Coordinator { } } - let mut current_batch = Vec::new(); + let mut current_batch = UpdateCollection::builder(0, 0); let mut current_batch_size: usize = 0; 'outer: while let Some(rows) = row_cursor.next().await { - for ((source_data, _val), _ts, diff) in rows { + for ((source_data, _val), ts, diff) in rows { let row = source_data .0 .expect("we are not sending errors on this code path"); - let diff = usize::try_from(diff) - .expect("peek responses cannot have negative diffs"); - - if diff > 0 { - let diff = - NonZeroUsize::new(diff).expect("checked to be non-zero"); - current_batch_size = - current_batch_size.saturating_add(row.byte_len()); - current_batch.push((row, diff)); - } + current_batch_size = + current_batch_size.saturating_add(row.byte_len()); + current_batch.push((row.as_row_ref(), &ts, Diff::from(diff))); if current_batch_size > peek_stash_read_batch_size_bytes { // We're re-encoding the rows as a RowCollection @@ -1038,12 +1040,8 @@ impl crate::coord::Coordinator { // slow path already, since we're returning a big // stashed result so this is worth the convenience // of that for now. - let result = tx - .send(RowCollection::new( - current_batch.drain(..).collect_vec(), - &[], - )) - .await; + let result = tx.send(current_batch.build()).await; + current_batch = UpdateCollection::builder(0, 0); if result.is_err() { tracing::debug!("receiver went away"); // Don't return but break so we fall out to the @@ -1056,8 +1054,9 @@ impl crate::coord::Coordinator { } } + let current_batch = current_batch.build(); if current_batch.len() > 0 { - let result = tx.send(RowCollection::new(current_batch, &[])).await; + let result = tx.send(current_batch).await; if result.is_err() { tracing::debug!("receiver went away"); } @@ -1088,7 +1087,13 @@ impl crate::coord::Coordinator { let mut got_zero_rows = true; while let Some(rows) = rx.recv().await { got_zero_rows = false; - + let rows = match RowCollection::from_updates(rows) { + Ok(rows) => rows, + Err(e) => { + yield PeekResponseUnary::Error(e); + return; + } + }; let result_rows = incremental_finishing.finish_incremental( rows, max_result_size, diff --git a/src/compute-client/src/controller.rs b/src/compute-client/src/controller.rs index 9d05ba9d2d251..2e60eaf4e1b03 100644 --- a/src/compute-client/src/controller.rs +++ b/src/compute-client/src/controller.rs @@ -77,7 +77,7 @@ use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection use crate::controller::replica::ReplicaConfig; use crate::logging::{LogVariant, LoggingConfig}; use crate::metrics::ComputeControllerMetrics; -use crate::protocol::command::{ComputeParameters, PeekTarget}; +use crate::protocol::command::{ComputeParameters, PeekDescription, PeekTarget}; use crate::protocol::response::{PeekResponse, SubscribeBatch}; mod instance; @@ -152,7 +152,7 @@ impl PeekNotification { match peek_response { PeekResponse::Rows(rows) => { let num_rows = u64::cast_from(RowCollection::offset_limit( - rows.iter().map(|r| r.count()).sum(), + rows.iter().map(|r| r.count().unwrap_or(0)).sum(), offset, limit, )); @@ -907,7 +907,7 @@ where peek_target, literal_constraints, uuid, - timestamp, + PeekDescription::select(timestamp), result_desc, finishing, map_filter_project, diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index 4fb44eac36494..4e6a5264bfd0f 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -60,7 +60,7 @@ use crate::logging::LogVariant; use crate::metrics::IntCounter; use crate::metrics::{InstanceMetrics, ReplicaCollectionMetrics, ReplicaMetrics, UIntGauge}; use crate::protocol::command::{ - ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget, + ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekDescription, PeekTarget, }; use crate::protocol::history::ComputeCommandHistory; use crate::protocol::response::{ @@ -1605,7 +1605,7 @@ where peek_target: PeekTarget, literal_constraints: Option>, uuid: Uuid, - timestamp: T, + timestamps: PeekDescription, result_desc: RelationDesc, finishing: RowSetFinishing, map_filter_project: mz_expr::SafeMfpPlan, @@ -1622,7 +1622,7 @@ where return Err(ReadHoldIdMismatch(read_hold.id())); } read_hold - .try_downgrade(Antichain::from_elem(timestamp.clone())) + .try_downgrade(timestamps.downgrade_to()) .map_err(|_| ReadHoldInsufficient(target_id))?; if let Some(target) = target_replica { @@ -1650,7 +1650,7 @@ where let peek = Peek { literal_constraints, uuid, - timestamp, + timestamps, finishing, map_filter_project, // Obtain an `OpenTelemetryContext` from the thread-local tracing diff --git a/src/compute-client/src/controller/instance_client.rs b/src/compute-client/src/controller/instance_client.rs index f9dffa1def429..ee75447e23cbe 100644 --- a/src/compute-client/src/controller/instance_client.rs +++ b/src/compute-client/src/controller/instance_client.rs @@ -40,7 +40,7 @@ use crate::controller::{ }; use crate::logging::LogVariant; use crate::metrics::InstanceMetrics; -use crate::protocol::command::PeekTarget; +use crate::protocol::command::{PeekDescription, PeekTarget}; use crate::protocol::response::PeekResponse; /// Error indicating the instance has shut down. @@ -229,7 +229,7 @@ impl InstanceClient { peek_target: PeekTarget, literal_constraints: Option>, uuid: Uuid, - timestamp: T, + timestamps: PeekDescription, result_desc: RelationDesc, finishing: RowSetFinishing, map_filter_project: mz_expr::SafeMfpPlan, @@ -242,7 +242,7 @@ impl InstanceClient { peek_target, literal_constraints, uuid, - timestamp, + timestamps, result_desc, finishing, map_filter_project, diff --git a/src/compute-client/src/protocol/command.rs b/src/compute-client/src/protocol/command.rs index 59b13ee6ff4a4..8ca5e49abc17c 100644 --- a/src/compute-client/src/protocol/command.rs +++ b/src/compute-client/src/protocol/command.rs @@ -18,12 +18,12 @@ use mz_dyncfg::ConfigUpdates; use mz_expr::RowSetFinishing; use mz_ore::tracing::OpenTelemetryContext; use mz_persist_types::PersistLocation; -use mz_repr::{GlobalId, RelationDesc, Row}; +use mz_repr::{GlobalId, RelationDesc, Row, TimestampManipulation}; use mz_service::params::GrpcClientParameters; use mz_storage_types::controller::CollectionMetadata; use mz_tracing::params::TracingParameters; use serde::{Deserialize, Serialize}; -use timely::progress::frontier::Antichain; +use timely::progress::frontier::{Antichain, AntichainRef}; use uuid::Uuid; use crate::logging::LoggingConfig; @@ -404,6 +404,84 @@ impl PeekTarget { } } +/// A [differential_dataflow::trace::Description], but specialized for our totally-ordered times. +#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct PeekDescription { + lower: Option, + upper: Option, + since: T, +} + +impl PeekDescription { + /// Peek the data at a specific timestamp. + pub fn select(as_of: T) -> Self { + Self { + lower: Some(T::minimum()), + upper: as_of.try_step_forward(), + since: as_of, + } + } + + /// Peek the data covering a given subscribe window. + pub fn subscribe(as_of: T, up_to: Option, with_snapshot: bool) -> Self { + let lower = if with_snapshot { + Some(T::minimum()) + } else { + as_of.try_step_forward() + }; + Self { + lower, + upper: up_to, + since: as_of, + } + } + + /// Advance the given timestamp by the since. + pub fn advance(&self, time: T) -> T { + if self.since > time { + self.since.clone() + } else { + time + } + } + + /// True iff the timestamp lies between the lower and upper. + pub fn contains(&self, time: &T) -> bool { + self.lower.as_ref().is_some_and(|l| l <= time) + && self.upper.as_ref().is_none_or(|u| time < u) + } + + /// We can allow our input trace to advance up to this frontier without losing our ability to + /// read. + pub fn downgrade_to(&self) -> Antichain { + let Some(lower) = self.lower.as_ref() else { + return Antichain::new(); + }; + let before_lower = lower.step_back().unwrap_or_else(T::minimum); + Antichain::from_elem(before_lower.max(self.since.clone())) + } + + /// Retrieve the lower as a frontier. + pub fn lower(&self) -> AntichainRef<'_, T> { + AntichainRef::new(self.lower.as_slice()) + } + + /// Retrieve the upper as a frontier. + pub fn upper(&self) -> AntichainRef<'_, T> { + AntichainRef::new(self.upper.as_slice()) + } + + /// Retrieve the since as a frontier. + pub fn since(&self) -> AntichainRef<'_, T> { + AntichainRef::new(std::slice::from_ref(&self.since)) + } + + /// Return the since as a bare timestamp, representing the overall read. + pub fn timestamp(&self) -> T { + self.since.clone() + } +} + /// Peek a collection, either in an arrangement or Persist. /// /// This request elicits data from the worker, by naming the @@ -431,8 +509,8 @@ pub struct Peek { /// /// Used in responses and cancellation requests. pub uuid: Uuid, - /// The logical timestamp at which the collection is queried. - pub timestamp: T, + /// The logical timestamps at which the collection is queried. + pub timestamps: PeekDescription, /// Actions to apply to the result set before returning them. pub finishing: RowSetFinishing, /// Linear operation to apply in-line on each result. diff --git a/src/compute-client/src/protocol/response.rs b/src/compute-client/src/protocol/response.rs index 3fd3a321c5944..918fd95ed9d75 100644 --- a/src/compute-client/src/protocol/response.rs +++ b/src/compute-client/src/protocol/response.rs @@ -190,7 +190,7 @@ impl FrontiersResponse { #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum PeekResponse { /// Returned rows of a successful peek. - Rows(Vec), + Rows(Vec), /// Results of the peek were stashed in persist batches. Stashed(Box), /// Error of an unsuccessful peek. @@ -232,7 +232,7 @@ pub struct StashedPeekResponse { /// /// We will have a mix of stashed responses and inline responses because the /// result sizes across different workers can and will vary. - pub inline_rows: Vec, + pub inline_rows: Vec, } impl StashedPeekResponse { @@ -240,7 +240,11 @@ impl StashedPeekResponse { /// possible `OFFSET` and `LIMIT`. pub fn num_rows(&self, offset: usize, limit: Option) -> usize { let num_stashed_rows: usize = usize::cast_from(self.num_rows_batches); - let num_inline_rows: usize = self.inline_rows.iter().map(|r| r.count()).sum(); + let num_inline_rows: usize = self + .inline_rows + .iter() + .map(|r| r.count().unwrap_or(0)) + .sum(); RowCollection::offset_limit(num_stashed_rows + num_inline_rows, offset, limit) } diff --git a/src/compute-client/src/service.rs b/src/compute-client/src/service.rs index c700236857799..990d286ca5c89 100644 --- a/src/compute-client/src/service.rs +++ b/src/compute-client/src/service.rs @@ -15,7 +15,6 @@ use std::mem; use async_trait::async_trait; use bytesize::ByteSize; use differential_dataflow::lattice::Lattice; -use mz_expr::row::RowCollection; use mz_ore::cast::CastInto; use mz_ore::soft_panic_or_log; use mz_ore::tracing::OpenTelemetryContext; @@ -235,7 +234,7 @@ where otel_ctx: OpenTelemetryContext, ) -> Option> { let (merged, ready_shards) = self.peek_responses.entry(uuid).or_insert(( - PeekResponse::Rows(vec![RowCollection::default()]), + PeekResponse::Rows(vec![UpdateCollection::default()]), BTreeSet::new(), )); diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index b90fec173bf52..d5625a3a35c97 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -20,7 +20,7 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::{Cursor, TraceReader}; use mz_compute_client::logging::LoggingConfig; use mz_compute_client::protocol::command::{ - ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget, + ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekDescription, PeekTarget, }; use mz_compute_client::protocol::history::ComputeCommandHistory; use mz_compute_client::protocol::response::{ @@ -33,10 +33,10 @@ use mz_compute_types::dyncfgs::{ }; use mz_compute_types::plan::render_plan::RenderPlan; use mz_dyncfg::ConfigSet; -use mz_expr::row::RowCollection; use mz_expr::{RowComparator, SafeMfpPlan}; use mz_ore::cast::CastFrom; use mz_ore::collections::CollectionExt; +use mz_ore::iter::consolidate_update_iter; use mz_ore::metrics::{MetricsRegistry, UIntGauge}; use mz_ore::now::EpochMillis; use mz_ore::soft_panic_or_log; @@ -49,7 +49,7 @@ use mz_persist_client::read::ReadHandle; use mz_persist_types::PersistLocation; use mz_persist_types::codec_impls::UnitSchema; use mz_repr::fixed_length::ToDatumIter; -use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp}; +use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp, UpdateCollection}; use mz_storage_operators::stats::StatsCursor; use mz_storage_types::StorageDiff; use mz_storage_types::controller::CollectionMetadata; @@ -1130,7 +1130,7 @@ impl PendingPeek { let uuid = peek.uuid.into_bytes(); ComputeEvent::Peek(PeekEvent { id, - time: peek.timestamp, + time: peek.timestamps.timestamp(), uuid, peek_type, installed, @@ -1139,7 +1139,7 @@ impl PendingPeek { fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self { let empty_frontier = Antichain::new(); - let timestamp_frontier = Antichain::from_elem(peek.timestamp); + let timestamp_frontier = peek.timestamps.downgrade_to(); trace_bundle .oks_mut() .set_logical_compaction(timestamp_frontier.borrow()); @@ -1176,14 +1176,12 @@ impl PendingPeek { let peek_uuid = peek.uuid; let (result_tx, result_rx) = oneshot::channel(); - let timestamp = peek.timestamp; + let timestamps = peek.timestamps.clone(); let mfp_plan = peek.map_filter_project.clone(); let max_results_needed = peek .finishing .limit - .map(|l| usize::cast_from(u64::from(l))) - .unwrap_or(usize::MAX) - + peek.finishing.offset; + .map(|l| usize::cast_from(u64::from(l)).saturating_add(peek.finishing.offset)); let order_by = peek.finishing.order_by.clone(); // Persist peeks can include at most one literal constraint. @@ -1194,11 +1192,12 @@ impl PendingPeek { let task_handle = mz_ore::task::spawn(|| "persist::peek", async move { let start = Instant::now(); + let comparator = RowComparator::new(order_by); let result = if active_worker { PersistPeek::do_peek( &persist_clients, metadata, - timestamp, + timestamps, literal_constraint, mfp_plan, max_result_size, @@ -1209,7 +1208,17 @@ impl PendingPeek { Ok(vec![]) }; let result = match result { - Ok(rows) => PeekResponse::Rows(vec![RowCollection::new(rows, &order_by)]), + Ok(mut rows) => { + rows.sort_by(|(r0, t0, _), (r1, t1, _)| { + t0.cmp(t1).then_with(|| { + comparator.compare_rows(r0.as_row_ref(), r1.as_row_ref(), || r0.cmp(r1)) + }) + }); + let collection = UpdateCollection::from_iter(consolidate_update_iter( + rows.iter().map(|(r, t, d)| (r.as_row_ref(), t, *d)), + )); + PeekResponse::Rows(vec![collection]) + } Err(e) => PeekResponse::Error(e.to_string()), }; match result_tx.send((result, start.elapsed())) { @@ -1269,12 +1278,12 @@ impl PersistPeek { async fn do_peek( persist_clients: &PersistClientCache, metadata: CollectionMetadata, - as_of: Timestamp, + timestamps: PeekDescription, literal_constraint: Option, mfp_plan: SafeMfpPlan, max_result_size: usize, - mut limit_remaining: usize, - ) -> Result, String> { + mut limit_remaining: Option, + ) -> Result, String> { let client = persist_clients .open(metadata.persist_location) .await @@ -1311,11 +1320,14 @@ impl PersistPeek { metrics, &mfp_plan, &metadata.relation_desc, - Antichain::from_elem(as_of), + timestamps.since().to_owned(), ) .await .map_err(|since| { - format!("attempted to peek at {as_of}, but the since has advanced to {since:?}") + format!( + "attempted to peek at {as_of:?}, but the since has advanced to {since:?}", + as_of = timestamps.since() + ) })?; // Re-used state for processing and building rows. @@ -1330,11 +1342,11 @@ impl PersistPeek { Some(row) => row.iter().count(), }; - 'collect: while limit_remaining > 0 { + 'collect: while limit_remaining.is_none_or(|limit| limit > 0) { let Some(batch) = cursor.next().await else { break; }; - for (data, _, d) in batch { + for (data, time, diff) in batch { let row = data.map_err(|e| e.to_string())?; if let Some(literal) = &literal_constraint { @@ -1345,20 +1357,6 @@ impl PersistPeek { } } - let count: usize = d.try_into().map_err(|_| { - error!( - shard = %metadata.data_shard, diff = d, ?row, - "persist peek encountered negative multiplicities", - ); - format!( - "Invalid data in source, \ - saw retractions ({}) for row that does not exist: {:?}", - -d, row, - ) - })?; - let Some(count) = NonZeroUsize::new(count) else { - continue; - }; let mut datum_local = datum_vec.borrow_with(&row); let eval_result = mfp_plan .evaluate_into(&mut datum_local, &arena, &mut row_builder) @@ -1374,10 +1372,15 @@ impl PersistPeek { ByteSize::b(u64::cast_from(max_result_size)) )); } - result.push((row, count)); - limit_remaining = limit_remaining.saturating_sub(count.get()); - if limit_remaining == 0 { - break; + result.push((row, time, diff.into())); + if let Some(limit_remaining) = &mut limit_remaining { + let Ok(count) = usize::try_from(diff) else { + return Err(format!("negative diff in collection with limit: {diff}")); + }; + *limit_remaining = limit_remaining.saturating_sub(count); + if *limit_remaining == 0 { + break; + } } } } @@ -1434,20 +1437,20 @@ impl IndexPeek { let method_start = Instant::now(); self.trace_bundle.oks_mut().read_upper(upper); - if upper.less_equal(&self.peek.timestamp) { + if PartialOrder::less_than(&upper.borrow(), &self.peek.timestamps.upper()) { return PeekStatus::NotReady; } self.trace_bundle.errs_mut().read_upper(upper); - if upper.less_equal(&self.peek.timestamp) { + if PartialOrder::less_than(&upper.borrow(), &self.peek.timestamps.upper()) { return PeekStatus::NotReady; } let read_frontier = self.trace_bundle.compaction_frontier(); - if !read_frontier.less_equal(&self.peek.timestamp) { + if !PartialOrder::less_equal(&read_frontier, &self.peek.timestamps.downgrade_to()) { let error = format!( "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})", read_frontier.elements(), - self.peek.timestamp, + self.peek.timestamps.timestamp(), ); return PeekStatus::Ready(PeekResponse::Error(error)); } @@ -1486,7 +1489,7 @@ impl IndexPeek { while cursor.key_valid(&storage) { let mut copies = Diff::ZERO; cursor.map_times(&storage, |time, diff| { - if time.less_equal(&self.peek.timestamp) { + if self.peek.timestamps.contains(time) { copies += diff; } }); @@ -1536,7 +1539,7 @@ impl IndexPeek { Key<'a>: ToDatumIter + Eq, KeyOwn = Row, Val<'a>: ToDatumIter, - TimeGat<'a>: PartialOrder, + TimeGat<'a> = &'a mz_repr::Timestamp, DiffGat<'a> = &'a Diff, >, { @@ -1551,7 +1554,7 @@ impl IndexPeek { let mut peek_iterator = peek_result_iterator::PeekResultIterator::new( peek.target.id().clone(), peek.map_filter_project.clone(), - peek.timestamp, + peek.timestamps.clone(), peek.literal_constraints.clone().as_deref_mut(), oks_handle, ); @@ -1582,9 +1585,7 @@ impl IndexPeek { Ok(row) => row, Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)), }; - let (row, copies) = row; - let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize"); - + let (row, timestamp, copies) = row; total_size = total_size .saturating_add(row.byte_len()) .saturating_add(count_byte_size); @@ -1598,7 +1599,7 @@ impl IndexPeek { ))); } - results.push((row, copies)); + results.push((row, timestamp, copies)); // If we hold many more than `max_results` records, we can thin down // `results` using `self.finishing.ordering`. @@ -1616,7 +1617,14 @@ impl IndexPeek { .result_sort_seconds .observe(sort_time_accum.as_secs_f64()); let row_collection_start = Instant::now(); - let collection = RowCollection::new(results, &peek.finishing.order_by); + results.sort_by(|(r0, t0, _), (r1, t1, _)| { + t0.cmp(t1) + .then_with(|| comparator.compare_rows(r0, r1, || r0.cmp(r1))) + }); + let collection = UpdateCollection::from_iter(consolidate_update_iter( + results.iter().map(|(r, t, d)| (r.as_row_ref(), t, *d)), + )); + results.clear(); metrics .row_collection_seconds .observe(row_collection_start.elapsed().as_secs_f64()); @@ -1637,9 +1645,13 @@ impl IndexPeek { sort_time_accum += sort_start.elapsed(); let dropped = results.drain(max_results..); let dropped_size = - dropped.into_iter().fold(0, |acc: usize, (row, _count)| { - acc.saturating_add(row.byte_len().saturating_add(count_byte_size)) - }); + dropped + .into_iter() + .fold(0, |acc: usize, (row, _time, _count)| { + acc.saturating_add( + row.byte_len().saturating_add(count_byte_size), + ) + }); total_size = total_size.saturating_sub(dropped_size); } } @@ -1654,7 +1666,13 @@ impl IndexPeek { .observe(sort_time_accum.as_secs_f64()); let row_collection_start = Instant::now(); - let collection = RowCollection::new(results, &peek.finishing.order_by); + results.sort_by(|left, right| { + comparator.compare_rows(&left.0, &right.0, || left.0.cmp(&right.0)) + }); + let collection = UpdateCollection::from_iter(mz_ore::iter::consolidate_update_iter( + results.iter().map(|(r, t, d)| (r.as_row_ref(), t, *d)), + )); + results.clear(); metrics .row_collection_seconds .observe(row_collection_start.elapsed().as_secs_f64()); @@ -1664,6 +1682,7 @@ impl IndexPeek { /// For keeping track of the state of pending or ready peeks, and managing /// control flow. +#[derive(Debug)] enum PeekStatus { /// The frontiers of objects are not yet advanced enough, peek is still /// pending. diff --git a/src/compute/src/compute_state/peek_result_iterator.rs b/src/compute/src/compute_state/peek_result_iterator.rs index cb67d04dbd4af..43110304c30f1 100644 --- a/src/compute/src/compute_state/peek_result_iterator.rs +++ b/src/compute/src/compute_state/peek_result_iterator.rs @@ -5,30 +5,31 @@ //! Code for extracting a peek result out of compute state/an arrangement. -use std::iter::FusedIterator; -use std::num::NonZeroI64; -use std::ops::Range; - use differential_dataflow::trace::implementations::BatchContainer; use differential_dataflow::trace::{Cursor, TraceReader}; +use mz_compute_client::protocol::command::PeekDescription; use mz_ore::result::ResultExt; use mz_repr::fixed_length::ToDatumIter; use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena}; -use timely::order::PartialOrder; +use std::collections::VecDeque; +use std::iter::FusedIterator; +use std::ops::{AddAssign, Range}; pub struct PeekResultIterator where Tr: TraceReader, { // For debug/trace logging. + #[allow(unused)] target_id: GlobalId, cursor: Tr::Cursor, storage: Tr::Storage, map_filter_project: mz_expr::SafeMfpPlan, - peek_timestamp: mz_repr::Timestamp, + peek_timestamps: PeekDescription, row_builder: Row, datum_vec: DatumVec, literals: Option>, + extracted_updates: VecDeque<(Row, mz_repr::Timestamp, Diff)>, } /// Helper to handle literals in peeks @@ -99,14 +100,14 @@ where Key<'a>: ToDatumIter + Eq, KeyOwn = Row, Val<'a>: ToDatumIter, - TimeGat<'a>: PartialOrder, + TimeGat<'a> = &'a mz_repr::Timestamp, DiffGat<'a> = &'a Diff, >, { pub fn new( target_id: GlobalId, map_filter_project: mz_expr::SafeMfpPlan, - peek_timestamp: mz_repr::Timestamp, + peek_timestamps: PeekDescription, literal_constraints: Option<&mut [Row]>, trace_reader: &mut Tr, ) -> Self { @@ -119,14 +120,15 @@ where cursor, storage, map_filter_project, - peek_timestamp, + peek_timestamps, row_builder: Row::default(), datum_vec: DatumVec::new(), literals, + extracted_updates: VecDeque::with_capacity(1), } } - /// Returns `true` if the iterator has no more literals to process, or if there are no literals at all. + /// Returns `true` if the iterator has no more literals to process. fn literals_exhausted(&self) -> bool { self.literals.as_ref().map_or(false, Literals::is_exhausted) } @@ -137,7 +139,7 @@ impl FusedIterator for PeekResultIterator where Key<'a>: ToDatumIter + Eq, KeyOwn = Row, Val<'a>: ToDatumIter, - TimeGat<'a>: PartialOrder, + TimeGat<'a> = &'a mz_repr::Timestamp, DiffGat<'a> = &'a Diff, > { @@ -149,14 +151,18 @@ where Key<'a>: ToDatumIter + Eq, KeyOwn = Row, Val<'a>: ToDatumIter, - TimeGat<'a>: PartialOrder, + TimeGat<'a> = &'a mz_repr::Timestamp, DiffGat<'a> = &'a Diff, >, { - type Item = Result<(Row, NonZeroI64), String>; + type Item = Result<(Row, mz_repr::Timestamp, Diff), String>; fn next(&mut self) -> Option { let result = loop { + if let Some(result) = self.extracted_updates.pop_front() { + break Ok(result); + } + if self.literals_exhausted() { return None; } @@ -173,17 +179,11 @@ where } match self.extract_current_row() { - Ok(Some(row)) => break Ok(row), - Ok(None) => { - // Have to keep stepping and try with the next val. - self.cursor.step_val(&self.storage); - } + Ok(()) => self.cursor.step_val(&self.storage), Err(err) => break Err(err), } }; - self.cursor.step_val(&self.storage); - Some(result) } } @@ -194,14 +194,14 @@ where Key<'a>: ToDatumIter + Eq, KeyOwn = Row, Val<'a>: ToDatumIter, - TimeGat<'a>: PartialOrder, + TimeGat<'a> = &'a mz_repr::Timestamp, DiffGat<'a> = &'a Diff, >, { /// Extracts and returns the row currently pointed at by our cursor. Returns /// `Ok(None)` if our MapFilterProject evaluates to `None`. Also returns any /// errors that arise from evaluating the MapFilterProject. - fn extract_current_row(&mut self) -> Result, String> { + fn extract_current_row(&mut self) -> Result<(), String> { // TODO: This arena could be maintained and reused for longer, // but it wasn't clear at what interval we should flush // it to ensure we don't accidentally spike our memory use. @@ -235,34 +235,27 @@ where .map(|row| row.cloned()) .map_err_to_string_with_causes()? { - let mut copies = Diff::ZERO; self.cursor.map_times(&self.storage, |time, diff| { - if time.less_equal(&self.peek_timestamp) { - copies += diff; + if diff.is_zero() || !self.peek_timestamps.contains(time) { + return; + } + let time = self.peek_timestamps.advance(*time); + if let Some((_, last_time, last_diff)) = self.extracted_updates.back_mut() + && *last_time == time + { + last_diff.add_assign(*diff); + if last_diff.is_zero() { + self.extracted_updates.pop_back(); + } + } else { + self.extracted_updates + .push_back((result.clone(), time, *diff)) } }); - let copies: i64 = if copies.is_negative() { - let row = &*borrow; - tracing::error!( - target = %self.target_id, diff = %copies, ?row, - "index peek encountered negative multiplicities in ok trace", - ); - return Err(format!( - "Invalid data in source, \ - saw retractions ({}) for row that does not exist: {:?}", - -copies, row, - )); - } else { - copies.into_inner() - }; - // if copies > 0 ... otherwise skip - if let Some(copies) = NonZeroI64::new(copies) { - Ok(Some((result, copies))) - } else { - Ok(None) - } + + Ok(()) } else { - Ok(None) + Ok(()) } } diff --git a/src/compute/src/compute_state/peek_stash.rs b/src/compute/src/compute_state/peek_stash.rs index 22f6c8dec8648..372560bda7a06 100644 --- a/src/compute/src/compute_state/peek_stash.rs +++ b/src/compute/src/compute_state/peek_stash.rs @@ -6,25 +6,22 @@ //! For eligible peeks, we send the result back via the peek stash (aka persist //! blob), instead of inline in `ComputeResponse`. -use std::num::{NonZeroI64, NonZeroU64}; use std::sync::Arc; use std::time::{Duration, Instant}; +use differential_dataflow::difference::IsZero; use mz_compute_client::protocol::command::Peek; use mz_compute_client::protocol::response::{PeekResponse, StashedPeekResponse}; -use mz_expr::row::RowCollection; use mz_ore::cast::CastFrom; use mz_ore::task::AbortOnDropHandle; use mz_persist_client::Schemas; use mz_persist_client::cache::PersistClientCache; use mz_persist_types::codec_impls::UnitSchema; use mz_persist_types::{PersistLocation, ShardId}; -use mz_repr::{Diff, RelationDesc, Row, Timestamp}; +use mz_repr::{Diff, RelationDesc, Row, Timestamp, UpdateCollection}; use mz_storage_types::sources::SourceData; -use timely::progress::Antichain; use tokio::sync::oneshot; use tracing::debug; -use uuid::Uuid; use crate::arrangement::manager::{PaddedTrace, TraceBundle}; use crate::compute_state::peek_result_iterator; @@ -45,7 +42,7 @@ pub struct StashingPeek { /// We can't give a PeekResultIterator to our async upload task because the /// underlying trace reader is not Send/Sync. So we need to use a channel to /// send result rows from the worker thread to the async background task. - rows_tx: Option, String>>>, + rows_tx: Option, String>>>, /// The result of the background task, eventually. pub result: oneshot::Receiver<(PeekResponse, Duration)>, /// The `tracing::Span` tracking this peek's operation @@ -77,7 +74,7 @@ impl StashingPeek { let peek_iterator = peek_result_iterator::PeekResultIterator::new( peek.target.id(), peek.map_filter_project.clone(), - peek.timestamp, + peek.timestamps.clone(), peek.literal_constraints.as_deref_mut(), oks_handle, ); @@ -86,28 +83,31 @@ impl StashingPeek { let task_handle = mz_ore::task::spawn( || format!("peek_stash::stash_peek_response({peek_uuid})"), - async move { - let start = Instant::now(); - - let result = Self::do_upload( - &persist_clients, - persist_location, - batch_max_runs, - peek.uuid, - relation_desc, - rows_needed_by_finishing, - rows_rx, - ) - .await; - - let result = match result { - Ok(peek_response) => peek_response, - Err(e) => PeekResponse::Error(e.to_string()), - }; - match result_tx.send((result, start.elapsed())) { - Ok(()) => {} - Err((_result, elapsed)) => { - debug!(duration = ?elapsed, "dropping result for cancelled peek {}", peek_uuid) + { + let peek = peek.clone(); + async move { + let start = Instant::now(); + + let result = Self::do_upload( + &persist_clients, + persist_location, + batch_max_runs, + peek, + relation_desc, + rows_needed_by_finishing, + rows_rx, + ) + .await; + + let result = match result { + Ok(peek_response) => peek_response, + Err(e) => PeekResponse::Error(e.to_string()), + }; + match result_tx.send((result, start.elapsed())) { + Ok(()) => {} + Err((_result, elapsed)) => { + debug!(duration = ?elapsed, "dropping result for cancelled peek {}", peek_uuid) + } } } }, @@ -127,17 +127,17 @@ impl StashingPeek { persist_clients: &PersistClientCache, persist_location: PersistLocation, batch_max_runs: usize, - peek_uuid: Uuid, + peek: Peek, relation_desc: RelationDesc, max_rows: Option, // The number of rows needed by the RowSetFinishing's offset + limit - mut rows_rx: tokio::sync::mpsc::Receiver, String>>, + mut rows_rx: tokio::sync::mpsc::Receiver, String>>, ) -> Result { let client = persist_clients .open(persist_location) .await .map_err(|e| e.to_string())?; - let shard_id = format!("s{}", peek_uuid); + let shard_id = format!("s{}", peek.uuid); let shard_id = ShardId::try_from(shard_id).expect("can parse"); let write_schemas: Schemas = Schemas { id: None, @@ -145,10 +145,6 @@ impl StashingPeek { val: Arc::new(UnitSchema), }; - let result_ts = Timestamp::default(); - let lower = Antichain::from_elem(result_ts); - let upper = Antichain::from_elem(result_ts.step_forward()); - // We have to use SourceData, which is a wrapper around a Result, because the bare columnar Row encoder doesn't support // encoding rows with zero columns. @@ -159,30 +155,31 @@ impl StashingPeek { .batch_builder::( shard_id, write_schemas, - lower, + peek.timestamps.lower().to_owned(), Some(batch_max_runs), ) .await; - let mut num_rows: u64 = 0; + let mut num_rows: usize = 0; loop { let row = rows_rx.recv().await; match row { Some(Ok(rows)) => { - for (row, diff) in rows { - num_rows += - u64::from(NonZeroU64::try_from(diff).expect("diff fits into u64")); - let diff: i64 = diff.into(); - + for (row, timestamp, diff) in rows { + let diff: i64 = diff.into_inner(); + if diff.is_zero() { + continue; + } batch_builder - .add(&SourceData(Ok(row)), &(), &Timestamp::default(), &diff) + .add(&SourceData(Ok(row)), &(), ×tamp, &diff) .await .expect("invalid usage"); // Stop if we have enough rows to satisfy the RowSetFinishing's offset + limit. if let Some(max_rows) = max_rows { - if num_rows >= u64::cast_from(max_rows) { + num_rows += usize::try_from(diff).expect("diff fits into u64"); + if num_rows >= max_rows { break; } } @@ -195,7 +192,10 @@ impl StashingPeek { } } - let batch = batch_builder.finish(upper).await.expect("invalid usage"); + let batch = batch_builder + .finish(peek.timestamps.upper().to_owned()) + .await + .expect("invalid usage"); let stashed_response = StashedPeekResponse { num_rows_batches: u64::cast_from(num_rows), @@ -203,7 +203,7 @@ impl StashingPeek { relation_desc, shard_id, batches: vec![batch.into_transmittable_batch()], - inline_rows: vec![RowCollection::new(vec![], &[])], + inline_rows: vec![UpdateCollection::default()], }; let result = PeekResponse::Stashed(Box::new(stashed_response)); Ok(result) diff --git a/src/expr/src/row/collection.rs b/src/expr/src/row/collection.rs index 53fe97dbfb65c..e2ff8d3abcd99 100644 --- a/src/expr/src/row/collection.rs +++ b/src/expr/src/row/collection.rs @@ -14,6 +14,7 @@ use std::num::NonZeroUsize; use itertools::Itertools; use mz_repr::{ DatumVec, IntoRowIterator, Row, RowIterator, RowRef, Rows, RowsBuilder, SharedSlice, + UpdateCollection, }; use serde::{Deserialize, Serialize}; @@ -62,6 +63,32 @@ impl RowCollection { } } + /// Create a new row collection from a update collection. If any update has a non-positive + /// multiplicity, this will return an error. + pub fn from_updates(updates: UpdateCollection) -> Result { + let rows = updates.rows().clone(); + let mut diffs = Vec::with_capacity(rows.len()); + for (row, time, diff) in updates.iter() { + let Ok(count) = usize::try_from(diff.into_inner()) else { + tracing::error!( + ?row, + ?time, + ?diff, + "encountered negative multiplicities in ok trace" + ); + return Err(format!( + "Invalid data in source, saw retractions ({count}) for row that does not exist: {row:?}", + count = -diff, + )); + }; + diffs.push(NonZeroUsize::new(count).expect("diffs should be consolidated")); + } + Ok(Self { + rows, + diffs: diffs.into(), + }) + } + /// Create a new [`RowCollection`] from a collection of [`Row`]s. Sorts data by `order_by`. /// /// Note that all row collections to be merged must be constructed with the same `order_by` diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 342fba325252e..77f0e57707555 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -39,7 +39,7 @@ use crate::cache::{PersistClientCache, StateCache}; use crate::cfg::PersistConfig; use crate::critical::{CriticalReaderId, Opaque, SinceHandle}; use crate::error::InvalidUsage; -use crate::fetch::{BatchFetcher, BatchFetcherConfig}; +use crate::fetch::{BatchFetcher, BatchFetcherConfig, FetchBatchFilter}; use crate::internal::compact::{CompactConfig, Compactor}; use crate::internal::encoding::parse_id; use crate::internal::gc::GarbageCollector; @@ -650,7 +650,7 @@ impl PersistClient { self.metrics.read.snapshot.clone(), Arc::clone(&self.blob), shard_id, - as_of, + FetchBatchFilter::Compaction { since: as_of }, read_schemas, &hollow_batches, batches, diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index a9c5dca8a199a..30a32d8395f0d 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -1187,7 +1187,7 @@ where self.metrics.read.snapshot.clone(), Arc::clone(&self.blob), self.shard_id(), - as_of, + FetchBatchFilter::Snapshot { as_of }, self.read_schemas.clone(), &batches, lease, @@ -1203,17 +1203,14 @@ where read_metrics: ReadMetrics, blob: Arc, shard_id: ShardId, - as_of: Antichain, + filter: FetchBatchFilter, schemas: Schemas, batches: &[HollowBatch], lease: L, should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool, memory_budget_bytes: usize, ) -> Result, Since> { - let context = format!("{}[as_of={:?}]", shard_id, as_of.elements()); - let filter = FetchBatchFilter::Snapshot { - as_of: as_of.clone(), - }; + let context = format!("{}[filter={:?}]", shard_id, filter); let mut consolidator = Consolidator::new( context, diff --git a/src/repr/src/update.rs b/src/repr/src/update.rs index e89360b082cb9..1aea8d88984b8 100644 --- a/src/repr/src/update.rs +++ b/src/repr/src/update.rs @@ -10,6 +10,7 @@ use bytes::Bytes; use itertools::Itertools; use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::fmt::Debug; use std::ops::{Deref, Range}; use std::sync::Arc; use timely::progress::Timestamp; @@ -18,7 +19,7 @@ use crate::{Diff, RowRef}; /// An immutable, shared slice. Morally, this is [bytes::Bytes] but with fewer features /// and supporting arbitrary types. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct SharedSlice { /// The range of offsets in the backing data that are present in the slice. /// (This allows us to subset the slice without reallocating.) @@ -26,6 +27,12 @@ pub struct SharedSlice { data: Arc<[T]>, } +impl Debug for SharedSlice { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("SharedSlice").field(&&self[..]).finish() + } +} + impl SharedSlice { /// Split this slice in half at the provided offset. pub fn split_at(self, offset: usize) -> (Self, Self) { @@ -284,10 +291,26 @@ impl UpdateCollection { ) } + pub fn rows(&self) -> &Rows { + &self.rows + } + pub fn times(&self) -> &[T] { &*self.times } + pub fn diffs(&self) -> &[Diff] { + &self.diffs + } + + pub fn count(&self) -> Result { + let mut sum = 0usize; + for diff in self.diffs.iter() { + sum += usize::try_from(diff.into_inner())?; + } + Ok(sum) + } + pub fn byte_len(&self) -> usize { self.rows.byte_len() } diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 20fe2d0012825..05410f6ae84e6 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -1929,8 +1929,6 @@ pub enum QueryWhen { FreshestTableWrite, /// The peek should occur at the timestamp described by the specified /// expression. - /// - /// The expression may have any type. AtTimestamp(Timestamp), /// Same as Immediately, but will also advance to at least the specified /// expression. From ab668072dd3e154f5188356a46250c0ad69d6445 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Tue, 31 Mar 2026 13:09:02 -0400 Subject: [PATCH 2/6] Extract a method for fetching stashed peek data --- src/adapter/src/coord/peek.rs | 257 ++++++++++++++++++---------------- 1 file changed, 133 insertions(+), 124 deletions(-) diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index ab215d266352c..ca53e473aa20a 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -24,7 +24,7 @@ use mz_adapter_types::connection::ConnectionId; use mz_cluster_client::ReplicaId; use mz_compute_client::controller::PeekNotification; use mz_compute_client::protocol::command::PeekTarget; -use mz_compute_client::protocol::response::PeekResponse; +use mz_compute_client::protocol::response::{PeekResponse, StashedPeekResponse}; use mz_compute_types::ComputeInstanceId; use mz_compute_types::dataflows::{DataflowDescription, IndexImport}; use mz_controller_types::ClusterId; @@ -38,7 +38,7 @@ use mz_ore::cast::CastFrom; use mz_ore::str::{StrExt, separated}; use mz_ore::task; use mz_ore::tracing::OpenTelemetryContext; -use mz_persist_client::Schemas; +use mz_persist_client::{PersistClient, Schemas}; use mz_persist_types::codec_impls::UnitSchema; use mz_repr::explain::text::DisplayText; use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes}; @@ -904,6 +904,126 @@ impl crate::coord::Coordinator { }) } + pub(crate) fn create_stash_stream( + mut persist_client: PersistClient, + response: StashedPeekResponse, + peek_stash_read_batch_size_bytes: usize, + peek_stash_read_memory_budget_bytes: usize, + ) -> tokio::sync::mpsc::Receiver { + let shard_id = response.shard_id; + + let mut batches = Vec::new(); + for proto_batch in response.batches.into_iter() { + let batch = persist_client.batch_from_transmittable_batch(&shard_id, proto_batch); + + batches.push(batch); + } + tracing::trace!(?batches, "stashed peek response"); + + let as_of = Antichain::from_elem(mz_repr::Timestamp::default()); + let read_schemas: Schemas = Schemas { + id: None, + key: Arc::new(response.relation_desc.clone()), + val: Arc::new(UnitSchema), + }; + + // NOTE: Using the cursor creates Futures that are not Sync, + // so we can't drive them on the main Coordinator loop. + // Spawning a task has the additional benefit that we get to + // delete batches once we're done. + // + // Batch deletion is best-effort, though, and there are + // multiple known ways in which they can leak, among them: + // + // - ProtoBatch is lost in flight + // - ProtoBatch is lost because when combining PeekResponse + // from workers a cancellation or error "overrides" other + // results, meaning we drop them + // - This task here is not run to completion before it can + // delete all batches + // + // This is semi-ok, because persist needs a reaper of leaked + // batches already, and so we piggy-back on that, even if it + // might not exist as of today. + let (tx, rx) = tokio::sync::mpsc::channel(1); + mz_ore::task::spawn(|| "read_peek_batches", async move { + let mut row_cursor = persist_client + .read_batches_consolidated::<_, _, _, i64>( + response.shard_id, + as_of, + read_schemas, + batches, + |_stats| true, + peek_stash_read_memory_budget_bytes, + ) + .await + .expect("invalid usage"); + + // We always send our inline rows first. Ordering + // doesn't matter because we can only be in this case + // when there is no ORDER BY. + // + // We _could_ write these out as a Batch, and include it + // in the batches we read via the Consolidator. If we + // wanted to get a consistent ordering. That's not + // needed for correctness! But might be nice for more + // aesthetic reasons. + for rows in response.inline_rows { + let result = tx.send(rows).await; + if result.is_err() { + tracing::debug!("receiver went away"); + } + } + + let mut current_batch = UpdateCollection::builder(0, 0); + let mut current_batch_size: usize = 0; + + 'outer: while let Some(rows) = row_cursor.next().await { + for ((source_data, _val), ts, diff) in rows { + let row = source_data + .0 + .expect("we are not sending errors on this code path"); + + current_batch_size = current_batch_size.saturating_add(row.byte_len()); + current_batch.push((row.as_row_ref(), &ts, Diff::from(diff))); + + if current_batch_size > peek_stash_read_batch_size_bytes { + // We're re-encoding the rows as a RowCollection + // here, for which we pay in CPU time. We're in a + // slow path already, since we're returning a big + // stashed result so this is worth the convenience + // of that for now. + let result = tx.send(current_batch.build()).await; + current_batch = UpdateCollection::builder(0, 0); + if result.is_err() { + tracing::debug!("receiver went away"); + // Don't return but break so we fall out to the + // batch delete logic below. + break 'outer; + } + + current_batch_size = 0; + } + } + } + + let current_batch = current_batch.build(); + if current_batch.len() > 0 { + let result = tx.send(current_batch).await; + if result.is_err() { + tracing::debug!("receiver went away"); + } + } + + let batches = row_cursor.into_lease(); + tracing::trace!(?response.shard_id, "cleaning up batches of peek result"); + for batch in batches { + batch.delete().await; + } + }); + rx + } + /// Creates an async stream that processes peek responses and yields rows. /// /// TODO(peek-seq): Move this out of `coord` once we delete the old peek sequencing. @@ -914,7 +1034,7 @@ impl crate::coord::Coordinator { max_result_size: u64, max_returned_query_size: Option, duration_histogram: prometheus::Histogram, - mut persist_client: mz_persist_client::PersistClient, + persist_client: mz_persist_client::PersistClient, peek_stash_read_batch_size_bytes: usize, peek_stash_read_memory_budget_bytes: usize, ) -> impl futures::Stream { @@ -931,10 +1051,8 @@ impl crate::coord::Coordinator { match rows { PeekResponse::Rows(rows) => { - let rows: Result, _> = rows - .into_iter() - .map(RowCollection::from_updates) - .collect(); + let rows: Result, _> = + rows.into_iter().map(RowCollection::from_updates).collect(); let rows = match rows { Ok(ref rows) => RowCollection::merge_sorted(rows, &finishing.order_by), Err(e) => { @@ -953,130 +1071,21 @@ impl crate::coord::Coordinator { } } PeekResponse::Stashed(response) => { - let response = *response; - - let shard_id = response.shard_id; - - let mut batches = Vec::new(); - for proto_batch in response.batches.into_iter() { - let batch = - persist_client.batch_from_transmittable_batch(&shard_id, proto_batch); - - batches.push(batch); - } - tracing::trace!(?batches, "stashed peek response"); - - let as_of = Antichain::from_elem(mz_repr::Timestamp::default()); - let read_schemas: Schemas = Schemas { - id: None, - key: Arc::new(response.relation_desc.clone()), - val: Arc::new(UnitSchema), - }; - - let mut row_cursor = persist_client - .read_batches_consolidated::<_, _, _, i64>( - response.shard_id, - as_of, - read_schemas, - batches, - |_stats| true, - peek_stash_read_memory_budget_bytes, - ) - .await - .expect("invalid usage"); - - // NOTE: Using the cursor creates Futures that are not Sync, - // so we can't drive them on the main Coordinator loop. - // Spawning a task has the additional benefit that we get to - // delete batches once we're done. - // - // Batch deletion is best-effort, though, and there are - // multiple known ways in which they can leak, among them: - // - // - ProtoBatch is lost in flight - // - ProtoBatch is lost because when combining PeekResponse - // from workers a cancellation or error "overrides" other - // results, meaning we drop them - // - This task here is not run to completion before it can - // delete all batches - // - // This is semi-ok, because persist needs a reaper of leaked - // batches already, and so we piggy-back on that, even if it - // might not exist as of today. - let (tx, mut rx) = tokio::sync::mpsc::channel(1); - mz_ore::task::spawn(|| "read_peek_batches", async move { - // We always send our inline rows first. Ordering - // doesn't matter because we can only be in this case - // when there is no ORDER BY. - // - // We _could_ write these out as a Batch, and include it - // in the batches we read via the Consolidator. If we - // wanted to get a consistent ordering. That's not - // needed for correctness! But might be nice for more - // aesthetic reasons. - for rows in response.inline_rows { - let result = tx.send(rows).await; - if result.is_err() { - tracing::debug!("receiver went away"); - } - } - - let mut current_batch = UpdateCollection::builder(0, 0); - let mut current_batch_size: usize = 0; - - 'outer: while let Some(rows) = row_cursor.next().await { - for ((source_data, _val), ts, diff) in rows { - let row = source_data - .0 - .expect("we are not sending errors on this code path"); - - current_batch_size = - current_batch_size.saturating_add(row.byte_len()); - current_batch.push((row.as_row_ref(), &ts, Diff::from(diff))); - - if current_batch_size > peek_stash_read_batch_size_bytes { - // We're re-encoding the rows as a RowCollection - // here, for which we pay in CPU time. We're in a - // slow path already, since we're returning a big - // stashed result so this is worth the convenience - // of that for now. - let result = tx.send(current_batch.build()).await; - current_batch = UpdateCollection::builder(0, 0); - if result.is_err() { - tracing::debug!("receiver went away"); - // Don't return but break so we fall out to the - // batch delete logic below. - break 'outer; - } - - current_batch_size = 0; - } - } - } - - let current_batch = current_batch.build(); - if current_batch.len() > 0 { - let result = tx.send(current_batch).await; - if result.is_err() { - tracing::debug!("receiver went away"); - } - } - - let batches = row_cursor.into_lease(); - tracing::trace!(?response.shard_id, "cleaning up batches of peek result"); - for batch in batches { - batch.delete().await; - } - }); + let is_streamable = finishing.is_streamable(response.relation_desc.arity()); + let mut rx = Self::create_stash_stream( + persist_client, + *response, + peek_stash_read_batch_size_bytes, + peek_stash_read_memory_budget_bytes, + ); assert!( - finishing.is_streamable(response.relation_desc.arity()), + is_streamable, "can only get stashed responses when the finishing is streamable" ); tracing::trace!("query result is streamable!"); - assert!(finishing.is_streamable(response.relation_desc.arity())); let mut incremental_finishing = RowSetFinishingIncremental::new( finishing.offset, finishing.limit, From b9c77dfbdbc87eef87a782cbe64f6f4201521e80 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Tue, 31 Mar 2026 13:09:02 -0400 Subject: [PATCH 3/6] Split out a batch-rendering method for subscribe --- src/adapter/src/active_compute_sink.rs | 471 +++++++++++++------------ 1 file changed, 247 insertions(+), 224 deletions(-) diff --git a/src/adapter/src/active_compute_sink.rs b/src/adapter/src/active_compute_sink.rs index f7c0186b88b51..69d9f6e21fb41 100644 --- a/src/adapter/src/active_compute_sink.rs +++ b/src/adapter/src/active_compute_sink.rs @@ -17,7 +17,7 @@ use anyhow::anyhow; use mz_adapter_types::connection::ConnectionId; use mz_compute_client::protocol::response::SubscribeBatch; use mz_controller_types::ClusterId; -use mz_expr::row::RowCollection; +use mz_expr::row::{RowCollection, RowCollectionBuilder}; use mz_expr::{RowComparator, compare_columns}; use mz_ore::cast::CastFrom; use mz_ore::now::EpochMillis; @@ -25,7 +25,6 @@ use mz_repr::adt::numeric; use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowRef, Timestamp}; use mz_sql::plan::SubscribeOutput; use mz_storage_types::instances::StorageInstanceId; -use timely::progress::Antichain; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; @@ -118,263 +117,287 @@ pub struct ActiveSubscribe { pub output: SubscribeOutput, } -impl ActiveSubscribe { - /// Initializes the subscription. - /// - /// This method must be called exactly once, after constructing an - /// `ActiveSubscribe` and before calling `process_response`. - pub fn initialize(&self) { - // Always emit progress message indicating snapshot timestamp. - self.send_progress_message(&Antichain::from_elem(self.as_of)); +fn format_progress( + builder: &mut RowCollectionBuilder, + arity: usize, + output: &SubscribeOutput, + upper: Timestamp, +) { + let mut row_buf = Row::default(); + let mut packer = row_buf.packer(); + packer.push(Datum::from(numeric::Numeric::from(upper))); + packer.push(Datum::True); + + // Fill in the mz_diff or mz_state column + packer.push(Datum::Null); + + // Fill all table columns with NULL. + for _ in 0..arity { + packer.push(Datum::Null); } - fn send_progress_message(&self, upper: &Antichain) { - if !self.emit_progress { - return; - } - if let Some(upper) = upper.as_option() { - let mut row_buf = Row::default(); - let mut packer = row_buf.packer(); - packer.push(Datum::from(numeric::Numeric::from(*upper))); - packer.push(Datum::True); - - // Fill in the mz_diff or mz_state column + if let SubscribeOutput::EnvelopeDebezium { order_by_keys } = output { + for _ in 0..(arity - order_by_keys.len()) { packer.push(Datum::Null); - - // Fill all table columns with NULL. - for _ in 0..self.arity { - packer.push(Datum::Null); - } - - if let SubscribeOutput::EnvelopeDebezium { order_by_keys } = &self.output { - for _ in 0..(self.arity - order_by_keys.len()) { - packer.push(Datum::Null); - } - } - - let row_iter = Box::new(row_buf.into_row_iter()); - self.send(PeekResponseUnary::Rows(row_iter)); } } + builder.push(&row_buf, NonZeroUsize::MIN); +} - /// Processes a subscribe response from the controller. - /// - /// Returns `true` if the subscribe is finished. - pub fn process_response(&self, batch: SubscribeBatch) -> bool { - let comparator = RowComparator::new(self.output.row_order()); - let rows = match batch.updates { - Ok(ref rows) => { - let iters = rows.iter().map(|r| r.iter()); - let merged = mz_ore::iter::merge_iters_by( - iters, - |(left_row, left_time, _), (right_row, right_time, _)| { - left_time.cmp(right_time).then_with(|| { - comparator.compare_rows(left_row, right_row, || left_row.cmp(right_row)) - }) - }, - ); - mz_ore::iter::consolidate_update_iter(merged) - } - Err(s) => { - self.send(PeekResponseUnary::Error(s)); - return true; - } - }; - - // Sort results by time. We use stable sort here because it will produce - // deterministic results since the cursor will always produce rows in - // the same order. Compute doesn't guarantee that the results are sorted - // (materialize#18936) - let mut output_buf = Row::default(); - let mut output_builder = RowCollection::builder(0, 0); - let mut left_datum_vec = mz_repr::DatumVec::new(); - let mut right_datum_vec = mz_repr::DatumVec::new(); - let mut push_row = |row: &RowRef, time: Timestamp, diff: Diff| { - assert!(self.as_of <= time); - let mut packer = output_buf.packer(); - // TODO: Change to MzTimestamp. - packer.push(Datum::from(numeric::Numeric::from(time))); - if self.emit_progress { - // When sinking with PROGRESS, the output includes an - // additional column that indicates whether a timestamp is - // complete. For regular "data" updates this is always - // `false`. - packer.push(Datum::False); - } +/// Generate the appropriate unary peek response for the given subscribe batch. +pub fn format_subscribe_response( + batch: SubscribeBatch, + arity: usize, + as_of: Timestamp, + output: &SubscribeOutput, + emit_progress: bool, +) -> PeekResponseUnary { + let comparator = RowComparator::new(output.row_order()); + let rows = match batch.updates { + Ok(ref rows) => { + let iters = rows.iter().map(|r| r.iter()); + let merged = mz_ore::iter::merge_iters_by( + iters, + |(left_row, left_time, _), (right_row, right_time, _)| { + left_time.cmp(right_time).then_with(|| { + comparator.compare_rows(left_row, right_row, || left_row.cmp(right_row)) + }) + }, + ); + mz_ore::iter::consolidate_update_iter(merged) + } + Err(s) => { + return PeekResponseUnary::Error(s); + } + }; + + // Sort results by time. We use stable sort here because it will produce + // deterministic results since the cursor will always produce rows in + // the same order. Compute doesn't guarantee that the results are sorted + // (materialize#18936) + let mut output_buf = Row::default(); + let mut output_builder = RowCollection::builder(0, 0); + let mut left_datum_vec = mz_repr::DatumVec::new(); + let mut right_datum_vec = mz_repr::DatumVec::new(); + let mut push_row = |row: &RowRef, time: Timestamp, diff: Diff| { + assert!(as_of <= time); + let mut packer = output_buf.packer(); + // TODO: Change to MzTimestamp. + packer.push(Datum::from(numeric::Numeric::from(time))); + if emit_progress { + // When sinking with PROGRESS, the output includes an + // additional column that indicates whether a timestamp is + // complete. For regular "data" updates this is always + // `false`. + packer.push(Datum::False); + } - match &self.output { - SubscribeOutput::EnvelopeUpsert { .. } - | SubscribeOutput::EnvelopeDebezium { .. } => {} - SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => { - packer.push(Datum::Int64(diff.into_inner())); - } + match output { + SubscribeOutput::EnvelopeUpsert { .. } | SubscribeOutput::EnvelopeDebezium { .. } => {} + SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => { + packer.push(Datum::Int64(diff.into_inner())); } + } - packer.extend_by_row_ref(row); - - output_builder.push(output_buf.as_row_ref(), NonZeroUsize::MIN); - }; - - match &self.output { - SubscribeOutput::WithinTimestampOrderBy { order_by } => { - let mut rows: Vec<_> = rows.collect(); - // Since the diff is inserted as the first column, we can't take advantage of the - // known ordering. (Aside from timestamp, I suppose.) - rows.sort_by( - |(left_row, left_time, left_diff), (right_row, right_time, right_diff)| { - left_time.cmp(right_time).then_with(|| { - let mut left_datums = left_datum_vec.borrow(); - left_datums.extend(&[Datum::Int64(left_diff.into_inner())]); - left_datums.extend(left_row.iter()); - let mut right_datums = right_datum_vec.borrow(); - right_datums.extend(&[Datum::Int64(right_diff.into_inner())]); - right_datums.extend(right_row.iter()); - compare_columns(order_by, &left_datums, &right_datums, || { - left_row.cmp(right_row).then(left_diff.cmp(right_diff)) - }) + packer.extend_by_row_ref(row); + + output_builder.push(output_buf.as_row_ref(), NonZeroUsize::MIN); + }; + + match output { + SubscribeOutput::WithinTimestampOrderBy { order_by } => { + let mut rows: Vec<_> = rows.collect(); + // Since the diff is inserted as the first column, we can't take advantage of the + // known ordering. (Aside from timestamp, I suppose.) + rows.sort_by( + |(left_row, left_time, left_diff), (right_row, right_time, right_diff)| { + left_time.cmp(right_time).then_with(|| { + let mut left_datums = left_datum_vec.borrow(); + left_datums.extend(&[Datum::Int64(left_diff.into_inner())]); + left_datums.extend(left_row.iter()); + let mut right_datums = right_datum_vec.borrow(); + right_datums.extend(&[Datum::Int64(right_diff.into_inner())]); + right_datums.extend(right_row.iter()); + compare_columns(order_by, &left_datums, &right_datums, || { + left_row.cmp(right_row).then(left_diff.cmp(right_diff)) }) - }, - ); - for (row, time, diff) in rows { - push_row(row, *time, diff); - } + }) + }, + ); + for (row, time, diff) in rows { + push_row(row, *time, diff); } - SubscribeOutput::EnvelopeUpsert { order_by_keys } - | SubscribeOutput::EnvelopeDebezium { order_by_keys } => { - let debezium = matches!(self.output, SubscribeOutput::EnvelopeDebezium { .. }); - let mut it = rows.peekable(); - let mut datum_vec = mz_repr::DatumVec::new(); - let mut old_datum_vec = mz_repr::DatumVec::new(); - let comparator = RowComparator::new(order_by_keys.as_slice()); - let mut group = Vec::with_capacity(2); - let mut row_buf = Row::default(); - // The iterator is sorted by time and key, so elements in the same group should be - // adjacent already. - while let Some(start) = it.next() { - group.clear(); - group.push(start); - while let Some(row) = it.peek() - && start.1 == row.1 - && { - comparator - .compare_rows(start.0, row.0, || Ordering::Equal) - .is_eq() - } - { - group.extend(it.next()); + } + SubscribeOutput::EnvelopeUpsert { order_by_keys } + | SubscribeOutput::EnvelopeDebezium { order_by_keys } => { + let debezium = matches!(output, SubscribeOutput::EnvelopeDebezium { .. }); + let mut it = rows.peekable(); + let mut datum_vec = mz_repr::DatumVec::new(); + let mut old_datum_vec = mz_repr::DatumVec::new(); + let comparator = RowComparator::new(order_by_keys.as_slice()); + let mut group = Vec::with_capacity(2); + let mut row_buf = Row::default(); + // The iterator is sorted by time and key, so elements in the same group should be + // adjacent already. + while let Some(start) = it.next() { + group.clear(); + group.push(start); + while let Some(row) = it.peek() + && start.1 == row.1 + && { + comparator + .compare_rows(start.0, row.0, || Ordering::Equal) + .is_eq() } - group.sort_by_key(|(_, _, d)| *d); - - // Four cases: - // [(key, value, +1)] => ("insert", key, NULL, value) - // [(key, v1, -1), (key, v2, +1)] => ("upsert", key, v1, v2) - // [(key, value, -1)] => ("delete", key, value, NULL) - // everything else => ("key_violation", key, NULL, NULL) - let value_columns = self.arity - order_by_keys.len(); - let mut packer = row_buf.packer(); - match &group[..] { - [(row, _, Diff::ONE)] => { - packer.push(if debezium { - Datum::String("insert") - } else { - Datum::String("upsert") - }); - let datums = datum_vec.borrow_with(row); - for column_order in order_by_keys { - packer.push(datums[column_order.column]); + { + group.extend(it.next()); + } + group.sort_by_key(|(_, _, d)| *d); + + // Four cases: + // [(key, value, +1)] => ("insert", key, NULL, value) + // [(key, v1, -1), (key, v2, +1)] => ("upsert", key, v1, v2) + // [(key, value, -1)] => ("delete", key, value, NULL) + // everything else => ("key_violation", key, NULL, NULL) + let value_columns = arity - order_by_keys.len(); + let mut packer = row_buf.packer(); + match &group[..] { + [(row, _, Diff::ONE)] => { + packer.push(if debezium { + Datum::String("insert") + } else { + Datum::String("upsert") + }); + let datums = datum_vec.borrow_with(row); + for column_order in order_by_keys { + packer.push(datums[column_order.column]); + } + if debezium { + for _ in 0..value_columns { + packer.push(Datum::Null); } - if debezium { - for _ in 0..value_columns { - packer.push(Datum::Null); - } + } + for idx in 0..arity { + if !order_by_keys.iter().any(|co| co.column == idx) { + packer.push(datums[idx]); } - for idx in 0..self.arity { + } + push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO) + } + [(_, _, Diff::MINUS_ONE)] => { + packer.push(Datum::String("delete")); + let datums = datum_vec.borrow_with(start.0); + for column_order in order_by_keys { + packer.push(datums[column_order.column]); + } + if debezium { + for idx in 0..arity { if !order_by_keys.iter().any(|co| co.column == idx) { packer.push(datums[idx]); } } - push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO) } - [(_, _, Diff::MINUS_ONE)] => { - packer.push(Datum::String("delete")); - let datums = datum_vec.borrow_with(start.0); - for column_order in order_by_keys { - packer.push(datums[column_order.column]); - } - if debezium { - for idx in 0..self.arity { - if !order_by_keys.iter().any(|co| co.column == idx) { - packer.push(datums[idx]); - } - } - } - for _ in 0..self.arity - order_by_keys.len() { - packer.push(Datum::Null); - } - push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO) + for _ in 0..arity - order_by_keys.len() { + packer.push(Datum::Null); } - [(old_row, _, Diff::MINUS_ONE), (row, _, Diff::ONE)] => { - packer.push(Datum::String("upsert")); - let datums = datum_vec.borrow_with(row); - let old_datums = old_datum_vec.borrow_with(old_row); + push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO) + } + [(old_row, _, Diff::MINUS_ONE), (row, _, Diff::ONE)] => { + packer.push(Datum::String("upsert")); + let datums = datum_vec.borrow_with(row); + let old_datums = old_datum_vec.borrow_with(old_row); - for column_order in order_by_keys { - packer.push(datums[column_order.column]); - } - if debezium { - for idx in 0..self.arity { - if !order_by_keys.iter().any(|co| co.column == idx) { - packer.push(old_datums[idx]); - } - } - } - for idx in 0..self.arity { + for column_order in order_by_keys { + packer.push(datums[column_order.column]); + } + if debezium { + for idx in 0..arity { if !order_by_keys.iter().any(|co| co.column == idx) { - packer.push(datums[idx]); + packer.push(old_datums[idx]); } } - push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO) } - _ => { - packer.push(Datum::String("key_violation")); - let datums = datum_vec.borrow_with(start.0); - for column_order in order_by_keys { - packer.push(datums[column_order.column]); - } - if debezium { - for _ in 0..(self.arity - order_by_keys.len()) { - packer.push(Datum::Null); - } + for idx in 0..arity { + if !order_by_keys.iter().any(|co| co.column == idx) { + packer.push(datums[idx]); } - for _ in 0..(self.arity - order_by_keys.len()) { + } + push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO) + } + _ => { + packer.push(Datum::String("key_violation")); + let datums = datum_vec.borrow_with(start.0); + for column_order in order_by_keys { + packer.push(datums[column_order.column]); + } + if debezium { + for _ in 0..(arity - order_by_keys.len()) { packer.push(Datum::Null); } - push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO) } - }; - } + for _ in 0..(arity - order_by_keys.len()) { + packer.push(Datum::Null); + } + push_row(row_buf.as_row_ref(), *start.1, Diff::ZERO) + } + }; } - SubscribeOutput::Diffs => { - // Diffs output is sorted by time and row, so it can be pushed directly. - for (row, time, diff) in rows { - push_row(row, *time, diff) - } + } + SubscribeOutput::Diffs => { + // Diffs output is sorted by time and row, so it can be pushed directly. + for (row, time, diff) in rows { + push_row(row, *time, diff) } - }; + } + }; + + if emit_progress && !batch.upper.less_equal(&as_of) { + if let Some(upper) = batch.upper.as_option() { + format_progress(&mut output_builder, arity, output, *upper); + } + } - let rows = output_builder.build(); - let rows = Box::new(rows.into_row_iter()); - self.send(PeekResponseUnary::Rows(rows)); + let rows = output_builder.build(); + let rows = Box::new(rows.into_row_iter()); - // Emit progress message if requested. Don't emit progress for the first - // batch if the upper is exactly `as_of` (we're guaranteed it is not - // less than `as_of`, but it might be exactly `as_of`) as we've already - // emitted that progress message in `initialize`. - if !batch.upper.less_equal(&self.as_of) { - self.send_progress_message(&batch.upper); + PeekResponseUnary::Rows(rows) +} + +impl ActiveSubscribe { + /// Initializes the subscription. + /// + /// This method must be called exactly once, after constructing an + /// `ActiveSubscribe` and before calling `process_response`. + pub fn initialize(&self) { + if self.emit_progress { + // A single, all-null row except for the progress timestamp. + let mut output_builder = RowCollection::builder(8, 1); + format_progress(&mut output_builder, self.arity, &self.output, self.as_of); + self.send(PeekResponseUnary::Rows(Box::new( + output_builder.build().into_row_iter(), + ))); } + } + + /// Processes a subscribe response from the controller. + /// + /// Returns `true` if the subscribe is finished. + pub fn process_response(&self, batch: SubscribeBatch) -> bool { + let is_finished = match &batch.updates { + Ok(_) => batch.upper.is_empty(), + Err(_) => true, + }; + let response = format_subscribe_response( + batch, + self.arity, + self.as_of, + &self.output, + self.emit_progress, + ); + + self.send(response); - batch.upper.is_empty() + is_finished } /// Retires the subscribe with the specified reason. From e995932259c4b21356961607acf129a6b42b6d3c Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Tue, 31 Mar 2026 13:09:02 -0400 Subject: [PATCH 4/6] Don't require an unnecessary mut --- src/adapter/src/coord/peek.rs | 4 ++-- src/adapter/src/optimize/peek.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index ca53e473aa20a..939a6d5e5ce78 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -428,7 +428,7 @@ fn permute_oneshot_mfp_around_index( /// we can avoid building a dataflow (and either just return the results, or peek /// out of the arrangement, respectively). pub fn create_fast_path_plan( - dataflow_plan: &mut DataflowDescription, + dataflow_plan: &DataflowDescription, view_id: GlobalId, finishing: Option<&RowSetFinishing>, persist_fast_path_limit: usize, @@ -441,7 +441,7 @@ pub fn create_fast_path_plan( // to build (no dependent views). There is likely an index to build as well, but we may not be sure. if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id { - let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut(); + let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner(); if let Some((rows, found_typ)) = mir.as_const() { // In the case of a constant, we can return the result now. let plan = FastPathPlan::Constant( diff --git a/src/adapter/src/optimize/peek.rs b/src/adapter/src/optimize/peek.rs index 1a718f134597a..8a4a573de0634 100644 --- a/src/adapter/src/optimize/peek.rs +++ b/src/adapter/src/optimize/peek.rs @@ -349,7 +349,7 @@ impl<'s> Optimize>> for Optimizer { // `create_fast_path_plan` later again, because, e.g., running `LiteralConstraints` is still // ahead of us.) let use_fast_path_optimizer = match create_fast_path_plan( - &mut df_desc, + &df_desc, self.select_id, Some(&self.finishing), self.config.features.persist_fast_path_limit, @@ -384,7 +384,7 @@ impl<'s> Optimize>> for Optimizer { // .expect("GlobalMirPlan type"); let peek_plan = match create_fast_path_plan( - &mut df_desc, + &df_desc, self.select_id, Some(&self.finishing), self.config.features.persist_fast_path_limit, From f4abe7668e9f067e6d58314c91e7272b7dfb4f8f Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Wed, 1 Apr 2026 17:24:08 -0400 Subject: [PATCH 5/6] WIP --- src/adapter/src/frontend_peek.rs | 84 +++++++- src/adapter/src/optimize/subscribe.rs | 36 +++- src/adapter/src/peek_client.rs | 281 ++++++++++++++++++++++++-- 3 files changed, 377 insertions(+), 24 deletions(-) diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 336ff34b6f948..16cd187c8e733 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -24,7 +24,7 @@ use mz_ore::task::JoinHandle; use mz_ore::{soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log}; use mz_repr::optimize::{OptimizerFeatures, OverrideFrom}; use mz_repr::role_id::RoleId; -use mz_repr::{Datum, GlobalId, IntoRowIterator, Timestamp}; +use mz_repr::{Datum, GlobalId, IntoRowIterator, RelationDesc, Timestamp}; use mz_sql::ast::Raw; use mz_sql::catalog::CatalogCluster; use mz_sql::plan::Params; @@ -1123,11 +1123,26 @@ impl PeekClient { let local_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of)); + if let Some(up_to) = plan.up_to + && let Some((fast_path, _id, sink, df_meta)) = + local_mir_plan.try_create_fast_path_plan(view_id)? + { + return Ok(Execution::SubscribeFastPath { + subscribe_plan: plan, + fast_path_plan: fast_path, + typ: sink.from_desc, + up_to, + df_meta, + optimization_finished_at: now(), + }); + }; + let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?; let optimization_finished_at = now(); let (df_desc, df_meta) = global_lir_plan.unapply(); + Ok(Execution::Subscribe { subscribe_plan: plan, df_desc, @@ -1417,6 +1432,62 @@ impl PeekClient { .await?; Ok(Some(response)) } + Execution::SubscribeFastPath { + subscribe_plan, + fast_path_plan, + typ, + up_to, + df_meta, + optimization_finished_at: _finished_at, + } => { + let row_set_finishing_seconds = + session.metrics().row_set_finishing_seconds().clone(); + + let peek_stash_read_batch_size_bytes = + mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES + .get(catalog.system_config().dyncfgs()); + let peek_stash_read_memory_budget_bytes = + mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES + .get(catalog.system_config().dyncfgs()); + let max_result_size = catalog.system_config().max_result_size(); + + coord::sequencer::emit_optimizer_notices( + &*catalog, + session, + &df_meta.optimizer_notices, + ); + + let watch_set = statement_logging_id.map(|logging_id| { + WatchSetCreation::new( + logging_id, + catalog.state(), + &input_id_bundle, + determination.timestamp_context.timestamp_or_default(), + ) + }); + + let response = self + .implement_fast_path_subscribe_plan( + subscribe_plan, + fast_path_plan, + determination.timestamp_context.timestamp_or_default(), + up_to, + target_cluster_id, + target_replica, + typ.typ().clone(), + max_result_size, + max_query_result_size, + row_set_finishing_seconds, + read_holds, + peek_stash_read_batch_size_bytes, + peek_stash_read_memory_budget_bytes, + session.conn_id().clone(), + source_ids, + watch_set, + ) + .await?; + Ok(Some(response)) + } Execution::CopyToS3 { global_lir_plan, source_ids, @@ -1651,6 +1722,9 @@ impl PeekClient { "Subscribe", ) } + Execution::SubscribeFastPath { .. } => { + return; + } }; // Assert that we have some read holds for all the imports of the dataflow. @@ -1728,6 +1802,14 @@ enum Execution { df_meta: DataflowMetainfo, optimization_finished_at: EpochMillis, }, + SubscribeFastPath { + subscribe_plan: SubscribePlan, + fast_path_plan: FastPathPlan, + typ: RelationDesc, + up_to: Timestamp, + df_meta: DataflowMetainfo, + optimization_finished_at: EpochMillis, + }, CopyToS3 { global_lir_plan: optimize::copy_to::GlobalLirPlan, source_ids: BTreeSet, diff --git a/src/adapter/src/optimize/subscribe.rs b/src/adapter/src/optimize/subscribe.rs index 618e3a533cc68..196d75c68d80f 100644 --- a/src/adapter/src/optimize/subscribe.rs +++ b/src/adapter/src/optimize/subscribe.rs @@ -13,10 +13,21 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::{Duration, Instant}; +use crate::CollectionIdBundle; +use crate::coord::peek::{FastPathPlan, create_fast_path_plan}; +use crate::optimize::dataflows::{ + ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained, + dataflow_import_id_bundle, +}; +use crate::optimize::{ + LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog, + OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan, +}; use differential_dataflow::lattice::Lattice; use mz_compute_types::ComputeInstanceId; use mz_compute_types::plan::Plan; use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection}; +use mz_ore::collections::CollectionExt; use mz_ore::soft_assert_or_log; use mz_repr::{GlobalId, Timestamp}; use mz_sql::optimizer_metrics::OptimizerMetrics; @@ -27,16 +38,6 @@ use mz_transform::normalize_lets::normalize_lets; use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context}; use timely::progress::Antichain; -use crate::CollectionIdBundle; -use crate::optimize::dataflows::{ - ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained, - dataflow_import_id_bundle, -}; -use crate::optimize::{ - LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog, - OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan, -}; - pub struct Optimizer { /// A representation typechecking context to use throughout the optimizer pipeline. typecheck_ctx: SharedTypecheckingContext, @@ -133,6 +134,21 @@ impl GlobalMirPlan { pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle { dataflow_import_id_bundle(&self.df_desc, compute_instance_id) } + + /// Attempt to extract a fast-path plan from the optimized subscribe. + pub fn try_create_fast_path_plan( + &self, + id: GlobalId, + ) -> Result, OptimizerError> + { + // Use the standard peek logic for the fast path plan, disabling the persist fast path + // and the finishing since those aren't applicable for subscribes. + let Some(plan) = create_fast_path_plan(&self.df_desc, id, None, 0, false)? else { + return Ok(None); + }; + let (id, sink) = self.df_desc.sink_exports.iter().into_element(); + Ok(Some((plan, *id, sink.clone(), self.df_meta.clone()))) + } } /// The (final) result after MIR ⇒ LIR lowering and optimizing the resulting diff --git a/src/adapter/src/peek_client.rs b/src/adapter/src/peek_client.rs index 231c8ecd9196f..eb777cde7befe 100644 --- a/src/adapter/src/peek_client.rs +++ b/src/adapter/src/peek_client.rs @@ -10,12 +10,29 @@ use std::collections::BTreeMap; use std::sync::Arc; +use crate::active_compute_sink::format_subscribe_response; +use crate::catalog::Catalog; +use crate::command::{CatalogSnapshot, Command}; +use crate::coord::Coordinator; +use crate::coord::peek::FastPathPlan; +use crate::statement_logging::WatchSetCreation; +use crate::statement_logging::{ + FrontendStatementLoggingEvent, PreparedStatementEvent, StatementLoggingFrontend, + StatementLoggingId, +}; +use crate::{ + AdapterError, Client, CollectionIdBundle, ExecuteResponse, PeekResponseUnary, ReadHolds, + statement_logging, +}; use differential_dataflow::consolidation::consolidate; +use futures::StreamExt; use mz_compute_client::controller::error::{CollectionMissing, InstanceMissing}; use mz_compute_client::controller::instance_client::InstanceClient; use mz_compute_client::controller::instance_client::{AcquireReadHoldsError, InstanceShutDown}; -use mz_compute_client::protocol::command::PeekTarget; +use mz_compute_client::protocol::command::{PeekDescription, PeekTarget}; +use mz_compute_client::protocol::response::{PeekResponse, SubscribeBatch}; use mz_compute_types::ComputeInstanceId; +use mz_expr::RowSetFinishing; use mz_expr::row::RowCollection; use mz_ore::cast::CastFrom; use mz_persist_client::PersistClient; @@ -24,25 +41,16 @@ use mz_repr::Timestamp; use mz_repr::global_id::TransientIdGen; use mz_repr::{RelationDesc, Row}; use mz_sql::optimizer_metrics::OptimizerMetrics; +use mz_sql::plan::SubscribePlan; use mz_storage_types::sources::Timeline; use mz_timestamp_oracle::TimestampOracle; use prometheus::Histogram; use thiserror::Error; use timely::progress::Antichain; use tokio::sync::oneshot; +use tokio_stream::wrappers::ReceiverStream; use uuid::Uuid; -use crate::catalog::Catalog; -use crate::command::{CatalogSnapshot, Command}; -use crate::coord::Coordinator; -use crate::coord::peek::FastPathPlan; -use crate::statement_logging::WatchSetCreation; -use crate::statement_logging::{ - FrontendStatementLoggingEvent, PreparedStatementEvent, StatementLoggingFrontend, - StatementLoggingId, -}; -use crate::{AdapterError, Client, CollectionIdBundle, ReadHolds, statement_logging}; - /// Storage collections trait alias we need to consult for since/frontiers. pub type StorageCollectionsHandle = Arc< dyn mz_storage_client::storage_collections::StorageCollections @@ -378,7 +386,7 @@ impl PeekClient { peek_target, literal_constraints, uuid, - timestamp, + PeekDescription::select(timestamp), result_desc, finishing_for_instance, mfp, @@ -419,6 +427,253 @@ impl PeekClient { }) } + /// Implement a fast-path peek plan. + /// This is similar to `Coordinator::implement_peek_plan`, but only for fast path peeks. + /// + /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`. + /// + /// Note: `input_read_holds` has holds for all inputs. For fast-path peeks, this includes the + /// peek target. For slow-path peeks (to be implemented later), we'll need to additionally call + /// into the Controller to acquire a hold on the peek target after we create the dataflow. + pub async fn implement_fast_path_subscribe_plan( + &mut self, + subscribe_plan: SubscribePlan, + fast_path: FastPathPlan, + as_of: Timestamp, + up_to: Timestamp, + compute_instance: ComputeInstanceId, + target_replica: Option, + intermediate_result_type: mz_repr::SqlRelationType, + max_result_size: u64, + max_returned_query_size: Option, + row_set_finishing_seconds: Histogram, + input_read_holds: ReadHolds, + peek_stash_read_batch_size_bytes: usize, + peek_stash_read_memory_budget_bytes: usize, + conn_id: mz_adapter_types::connection::ConnectionId, + depends_on: std::collections::BTreeSet, + watch_set: Option, + ) -> Result { + let arity = intermediate_result_type.arity(); + let finishing = RowSetFinishing::trivial(arity); + let (rows_tx, rows_rx) = oneshot::channel(); + + let (subscribe_tx, subscribe_rx) = tokio::sync::mpsc::unbounded_channel(); + + let persist_client = self.persist_client.clone(); + mz_ore::task::spawn(|| "todo: make this method take a stream", async move { + let response = match rows_rx.await { + Err(recv_error) => PeekResponseUnary::Error(recv_error.to_string()), + Ok(PeekResponse::Rows(rows)) => format_subscribe_response( + SubscribeBatch { + lower: Antichain::from_elem(as_of), + upper: Antichain::from_elem(up_to), + updates: Ok(rows), + }, + arity, + as_of, + &subscribe_plan.output, + subscribe_plan.emit_progress, + ), + Ok(PeekResponse::Stashed(response)) => { + let rx = Coordinator::create_stash_stream( + persist_client, + *response, + peek_stash_read_batch_size_bytes, + peek_stash_read_memory_budget_bytes, + ); + let rows: Vec<_> = ReceiverStream::new(rx).collect().await; + format_subscribe_response( + SubscribeBatch { + lower: Antichain::from_elem(as_of), + upper: Antichain::from_elem(up_to), + updates: Ok(rows), + }, + arity, + as_of, + &subscribe_plan.output, + subscribe_plan.emit_progress, + ) + } + Ok(PeekResponse::Error(e)) => PeekResponseUnary::Error(e), + Ok(PeekResponse::Canceled) => PeekResponseUnary::Canceled, + }; + let _ = subscribe_tx.send(response); + }); + + // If the dataflow optimizes to a constant expression, we can immediately return the result. + if let FastPathPlan::Constant(rows_res, _) = fast_path { + // For constant queries with statement logging, immediately log that + // dependencies are "ready" (trivially, because there are none). + if let Some(ref ws) = watch_set { + self.log_lifecycle_event( + ws.logging_id, + statement_logging::StatementLifecycleEvent::StorageDependenciesFinished, + ); + self.log_lifecycle_event( + ws.logging_id, + statement_logging::StatementLifecycleEvent::ComputeDependenciesFinished, + ); + } + + let mut rows = match rows_res { + Ok(rows) => rows, + Err(e) => return Err(e.into()), + }; + consolidate(&mut rows); + + let mut results = Vec::new(); + for (row, count) in rows { + let count = match u64::try_from(count.into_inner()) { + Ok(u) => usize::cast_from(u), + Err(_) => { + return Err(AdapterError::Unstructured(anyhow::anyhow!( + "Negative multiplicity in constant result: {}", + count + ))); + } + }; + match std::num::NonZeroUsize::new(count) { + Some(nzu) => { + results.push((row, nzu)); + } + None => { + // No need to retain 0 diffs. + } + }; + } + let row_collection = RowCollection::new(results, &finishing.order_by); + return match finishing.finish( + row_collection, + max_result_size, + max_returned_query_size, + &row_set_finishing_seconds, + ) { + Ok((rows, _bytes)) => Ok(Coordinator::send_immediate_rows(rows)), + // TODO(peek-seq): make this a structured error. (also in the old sequencing) + Err(e) => Err(AdapterError::ResultSize(e)), + }; + } + + let (peek_target, target_read_hold, literal_constraints, mfp, _strategy) = match fast_path { + FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, mfp) => { + let peek_target = PeekTarget::Index { id: idx_id }; + let target_read_hold = input_read_holds + .compute_holds + .get(&(compute_instance, idx_id)) + .expect("missing compute read hold on PeekExisting peek target") + .clone(); + let strategy = statement_logging::StatementExecutionStrategy::FastPath; + ( + peek_target, + target_read_hold, + literal_constraints, + mfp, + strategy, + ) + } + FastPathPlan::PeekPersist(coll_id, literal_constraint, mfp) => { + let literal_constraints = literal_constraint.map(|r| vec![r]); + let metadata = self + .storage_collections + .collection_metadata(coll_id) + .map_err(AdapterError::concurrent_dependency_drop_from_collection_missing)? + .clone(); + let peek_target = PeekTarget::Persist { + id: coll_id, + metadata, + }; + let target_read_hold = input_read_holds + .storage_holds + .get(&coll_id) + .expect("missing storage read hold on PeekPersist peek target") + .clone(); + let strategy = statement_logging::StatementExecutionStrategy::PersistFastPath; + ( + peek_target, + target_read_hold, + literal_constraints, + mfp, + strategy, + ) + } + FastPathPlan::Constant(..) => { + // FastPathPlan::Constant handled above. + unreachable!() + } + }; + + let uuid = Uuid::new_v4(); + + // At this stage we don't know column names for the result because we + // only know the peek's result type as a bare SqlRelationType. + let cols = (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}")); + let result_desc = RelationDesc::new(intermediate_result_type.clone(), cols); + + let client = self + .ensure_compute_instance_client(compute_instance) + .await + .map_err(AdapterError::concurrent_dependency_drop_from_instance_missing)?; + + // Register coordinator tracking of this peek. This has to complete before issuing the peek. + // + // Warning: If we fail to actually issue the peek after this point, then we need to + // unregister it to avoid an orphaned registration. + self.call_coordinator(|tx| Command::RegisterFrontendPeek { + uuid, + conn_id: conn_id.clone(), + cluster_id: compute_instance, + depends_on, + is_fast_path: true, + watch_set, + tx, + }) + .await?; + + let finishing_for_instance = finishing.clone(); + let peek_result = client + .peek( + peek_target, + literal_constraints, + uuid, + PeekDescription::subscribe(as_of, Some(up_to), subscribe_plan.with_snapshot), + result_desc, + finishing_for_instance, + mfp, + target_read_hold, + target_replica, + rows_tx, + ) + .await; + + if let Err(err) = peek_result { + // Clean up the registered peek since the peek failed to issue. + // The frontend will handle statement logging for the error. + self.call_coordinator(|tx| Command::UnregisterFrontendPeek { uuid, tx }) + .await; + return Err( + AdapterError::concurrent_dependency_drop_from_instance_peek_error( + err, + compute_instance, + ), + ); + } + + let resp = ExecuteResponse::Subscribing { + rx: subscribe_rx, + instance_id: compute_instance, + ctx_extra: Default::default(), + }; + let resp = match subscribe_plan.copy_to { + None => resp, + Some(format) => ExecuteResponse::CopyTo { + format, + resp: Box::new(resp), + }, + }; + Ok(resp) + } + // Statement logging helper methods /// Log the beginning of statement execution. From f503287674b3cbe73dcc93f06d36e8f5ade32ef8 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Wed, 1 Apr 2026 20:36:44 -0400 Subject: [PATCH 6/6] Run-end encoding for timestamps --- src/repr/proptest-regressions/update.txt | 7 + src/repr/src/update.rs | 225 ++++++++++++++++++++++- 2 files changed, 224 insertions(+), 8 deletions(-) create mode 100644 src/repr/proptest-regressions/update.txt diff --git a/src/repr/proptest-regressions/update.txt b/src/repr/proptest-regressions/update.txt new file mode 100644 index 0000000000000..94614540febe3 --- /dev/null +++ b/src/repr/proptest-regressions/update.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 87763977e393988c0931c4b9731a82f3f22f508ac8f6ec7a66927f0ad1d9169e # shrinks to mut data = [3], needle = 0 diff --git a/src/repr/src/update.rs b/src/repr/src/update.rs index 1aea8d88984b8..a48e99228567b 100644 --- a/src/repr/src/update.rs +++ b/src/repr/src/update.rs @@ -19,7 +19,6 @@ use crate::{Diff, RowRef}; /// An immutable, shared slice. Morally, this is [bytes::Bytes] but with fewer features /// and supporting arbitrary types. -#[derive(Clone)] pub struct SharedSlice { /// The range of offsets in the backing data that are present in the slice. /// (This allows us to subset the slice without reallocating.) @@ -33,11 +32,24 @@ impl Debug for SharedSlice { } } +impl Clone for SharedSlice { + fn clone(&self) -> Self { + Self { + range: self.range.clone(), + data: Arc::clone(&self.data), + } + } +} + impl SharedSlice { /// Split this slice in half at the provided offset. pub fn split_at(self, offset: usize) -> (Self, Self) { let Self { range, data } = self; - assert!(offset <= range.len()); + assert!( + offset <= range.len(), + "offset out of bounds: {offset} !<= {len}", + len = range.len() + ); let offset = range.start + offset; ( Self { @@ -194,10 +206,158 @@ impl Rows { } } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RunEncoded { + /// Actual data, one per run. + data: SharedSlice, + /// Endpoints of individual runs of data. Should be equal length with the data itself. + run_ends: SharedSlice, + /// The range of valid endpoints for the runs themeselves; the endpoints above are offset from + /// the start of the range and clamped at the end. This allows us to support splitting by adjusting + /// these endpoints without rewriting the endpoint array, which is linear time and may be shared. + run_range: Range, +} + +impl Default for RunEncoded { + fn default() -> Self { + Self { + data: Default::default(), + run_ends: Default::default(), + run_range: Default::default(), + } + } +} + +#[derive(Debug)] +struct RunEncodedBuilder { + data: Vec, + run_ends: Vec, +} + +impl RunEncodedBuilder { + fn len(&self) -> usize { + self.run_ends.last().copied().unwrap_or(0) + } + + fn push(&mut self, value: T) { + if self.data.last().is_some_and(|t| *t == value) { + *self.run_ends.last_mut().expect("same length") += 1 + } else { + self.data.push(value); + self.run_ends.push(self.len() + 1); + } + } + + fn build(self) -> RunEncoded { + let len = self.len(); + RunEncoded { + data: self.data.into(), + run_ends: self.run_ends.into(), + run_range: 0..len, + } + } +} + +impl RunEncoded { + fn builder() -> RunEncodedBuilder { + RunEncodedBuilder { + data: Vec::new(), + run_ends: Vec::new(), + } + } + + /// Similar to [slice::partition_point], but searching over the run ends directly. + pub fn partition_point(&self, partition_by: impl Fn(&T) -> bool) -> usize { + let index = self.data.partition_point(partition_by); + if index == 0 { + 0 + } else if index == self.len() { + self.run_range.len() + } else { + self.run_ends[index - 1] + } + } + + pub fn len(&self) -> usize { + self.run_range.len() + } + + pub fn get(&self, index: usize) -> Option<&T> { + let target = self.run_range.start + index; + let index = self.run_ends.partition_point(|i| target < *i); + self.data.get(index) + } + + pub fn iter(&self) -> impl Iterator { + [self.run_range.start] + .into_iter() + .chain(self.run_ends.iter().copied()) + .map(|i| i.min(self.run_range.end) - self.run_range.start) + .tuple_windows() + .enumerate() + .flat_map(|(idx, (lo, hi))| { + let value = &self.data[idx]; + (lo..hi).map(move |_| value) + }) + } + + pub fn split_at(self, index: usize) -> (Self, Self) { + assert!( + index <= self.len(), + "index out of bounds: {index} > {len}", + len = self.len() + ); + // Suppose our sequence is AAABBBCCC, with run ends [3, 6, 9] and data [A, B, C]. + // If we split at index 3, at a run boundary, each unique + // value will only end up in one of the two runs. However, if we split at index 5, + // the first half will need to include [A, B] and [3, 6] (with a run_range 0..5), + // and the second half will include [B, C] and [6, 9] (with run_range 5..9). + + let index = self.run_range.start + index; + let run_index = if index == self.run_range.start { + Ok(0) + } else if index == self.run_range.end { + Ok(self.run_ends.len()) + } else { + self.run_ends.binary_search(&index).map(|n| n + 1) + }; + let ((before_data, after_data), (before_ends, after_ends)) = match run_index { + Ok(endpoint) => { + // Great news: we're splitting at one of the endpoints of a run. + ( + self.data.split_at(endpoint), + self.run_ends.split_at(endpoint), + ) + } + Err(midpoint) => { + // Bad news - splitting in the middle of a run, which means we need to preserve elements on both sides. + let after_split = midpoint; + let before_split = midpoint + 1; + let (before_data, _) = self.data.clone().split_at(before_split); + let (_, after_data) = self.data.split_at(after_split); + let (before_ends, _) = self.run_ends.clone().split_at(before_split); + let (_, after_ends) = self.run_ends.split_at(after_split); + ((before_data, after_data), (before_ends, after_ends)) + } + }; + let before = Self { + data: before_data, + run_range: self.run_range.start..index, + run_ends: before_ends, + }; + let after = Self { + data: after_data, + run_range: index..self.run_range.end, + run_ends: after_ends, + }; + (before, after) + } +} + #[derive(Debug)] pub struct UpdateCollectionBuilder { rows: RowsBuilder, - times: Vec, + times: RunEncodedBuilder, diffs: Vec, } @@ -211,7 +371,7 @@ impl UpdateCollectionBuilder { pub fn build(self) -> UpdateCollection { UpdateCollection { rows: self.rows.build(), - times: self.times.into(), + times: self.times.build(), diffs: self.diffs.into(), } } @@ -221,7 +381,7 @@ impl UpdateCollectionBuilder { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct UpdateCollection { rows: Rows, - times: SharedSlice, + times: RunEncoded, diffs: SharedSlice, } @@ -252,7 +412,7 @@ impl UpdateCollection { pub fn builder(byte_size_hint: usize, row_size_hint: usize) -> UpdateCollectionBuilder { UpdateCollectionBuilder { rows: Rows::builder(byte_size_hint, row_size_hint), - times: Vec::with_capacity(row_size_hint), + times: RunEncoded::builder(), diffs: Vec::with_capacity(row_size_hint), } } @@ -295,8 +455,8 @@ impl UpdateCollection { &self.rows } - pub fn times(&self) -> &[T] { - &*self.times + pub fn times(&self) -> &RunEncoded { + &self.times } pub fn diffs(&self) -> &[Diff] { @@ -375,4 +535,53 @@ mod tests { assert_eq!(rows_1.0.len(), mid); }); } + + #[mz_ore::test] + #[cfg_attr(miri, ignore)] + fn test_run_splits() { + proptest!(|(data in vec(0u32..4, 0..8), [a, b] in uniform(0usize..8))| { + let mut runs = RunEncoded::builder(); + for x in &data { + runs.push(*x); + } + let runs = runs.build(); + + let mid = a.clamp(0, data.len()); + let data = data.split_at(mid); + let runs = runs.split_at(mid); + assert!(data.0.iter().eq(runs.0.iter())); + assert_eq!(runs.0.len(), mid); + + let mid = b.clamp(0, data.0.len()); + let data_0 = data.0.split_at(mid); + let runs_0 = runs.0.clone().split_at(mid); + assert!(data_0.0.iter().eq(runs_0.0.iter())); + assert_eq!(runs_0.0.len(), mid); + assert_eq!(runs_0.0.len() + runs_0.1.len(), runs.0.len()); + + let mid = b.clamp(0, data.1.len()); + let data_1 = data.1.split_at(mid); + let runs_1 = runs.1.clone().split_at(mid); + assert!(data_1.0.iter().eq(runs_1.0.iter())); + assert_eq!(runs_1.0.len(), mid); + }); + } + + #[mz_ore::test] + #[cfg_attr(miri, ignore)] + fn test_partition_point() { + proptest!(|(mut data in vec(0u32..4, 0..8), needle in 0u32..5)| { + data.sort(); + let mut builder = RunEncoded::builder(); + for value in &data { + builder.push(*value); + } + let runs = builder.build(); + + assert_eq!( + data.partition_point(|x| *x < needle), + runs.partition_point(|x| *x < needle), + ); + }); + } }