Skip to content

Shuffle V2: Coordinated disk-backed shuffle protocol (part one)#2702

Open
jgraettinger wants to merge 8 commits intomasterfrom
johnny/shuffle-v2-pt1
Open

Shuffle V2: Coordinated disk-backed shuffle protocol (part one)#2702
jgraettinger wants to merge 8 commits intomasterfrom
johnny/shuffle-v2-pt1

Conversation

@jgraettinger
Copy link
Member

@jgraettinger jgraettinger commented Feb 23, 2026

Summary

  • Introduces crates/shuffle/, a new Rust crate implementing the V2 shuffle protocol as a three-level RPC hierarchy (Session → Slice → Queue) that replaces the in-memory, per-shard go/shuffle/ system
  • Adds go/protocols/shuffle/shuffle.proto defining the wire protocol and generated Rust/gRPC bindings
  • Includes flowctl raw shuffle, a development client that runs the full pipeline locally against a live collection

Motivation

The legacy go/shuffle/ system has scaling and correctness limitations that block further progress:

  1. In-memory document staging limits how far reads can progress ahead of downstream processing, creating memory pressure vs throughput tradeoffs when processing larger documents
  2. Per-shard independent checkpoints prevent coordinated multi-shard transactions — there's no single frontier representing "data ready to process across all shards"
  3. Per-shard, per-journal RPCs don't scale: M shards × N journals = up to M×N streams (e.g. 1M streams for 10 shards × 100k journals)

The V2 design addresses all three: documents are routed to on-disk queue files (eliminating memory pressure), a single Session coordinates a unified checkpoint frontier across all members, and M + M² streams replace M×N (110 streams for 10 members, regardless of journal count).

Architecture

Coordinator
  └─ Session (one, on member 0)
       ├─ Slice 0 ──┬── Queue 0  (in-process)
       │             ├── Queue 1  (remote)
       │             └── Queue 2  (remote)
       ├─ Slice 1 ──┬── Queue 0  (remote)
       │             ├── Queue 1  (in-process)
       │             └── Queue 2  (remote)
       └─ Slice 2 ──┬── Queue 0  (remote)
                     ├── Queue 1  (remote)
                     └── Queue 2  (in-process)

Session — Coordinates the shuffle. Receives journal discoveries from Slices, assigns journal reads via range-overlap routing, pulls progress deltas, aggregates them into a four-stage checkpoint pipeline (progressed → unresolved → ready → consumed), and serves NextCheckpoint to the Coordinator.

Slice — Reads assigned journals, sequences documents with per-producer clock tracking (filtering duplicates from conservative reads and at-least-once delivery), orders them by priority then adjusted clock via a max-heap, routes each document to the owning Queue(s) by key hash and r-clock, and autonomously flushes Queues after observed commits.

Queue — Receives routed documents from all Slices, merges them into a single ordered stream via a priority heap (best-effort read leveling), and writes to disk. Responds to Flush once all preceding documents are durable.

What's in this PR

Protocol (shuffle.proto)

  • Three bidirectional streaming RPCs: Session, Slice, Queue
  • Wire types for frontiers (FrontierChunk, JournalFrontier, ProducerFrontier) with delta-encoded journal names
  • Enqueue carries routed documents with priority, adjusted clock, packed key, and archived document bytes

crates/shuffle/

  • binding: Extracts and validates per-binding shuffle configuration from task specs (derivation transforms, materialization bindings, ad-hoc reads). Assigns cohorts by unique (priority, read_delay) tuples.
  • frontier: Sorted frontier data structure with reduce (sorted merge), resolve_hints (causal hint resolution for cross-journal transactions), project_unresolved_hints (recovery projection), and chunked Drain for streaming over gRPC.
  • session/: Handler opens Slice RPCs and reads the resume checkpoint. Actor runs the select! loop dispatching Session and Slice messages. State implements Topology (routing journal reads to members), CheckpointPipeline (four-stage state machine that gates on causal hint resolution).
  • slice/: Handler opens Queue RPCs. Actor runs the select! loop over listing watches, journal probes, journal reads, and the ready-read heap. State implements FlushState (autonomous flush pipelining), ProgressState (pull-based progress reporting), and sequence_document (per-producer duplicate filtering with rollback detection). Sub-modules: listing (Gazette journal watch), producer (ProducerState with dual settled/pending maps), routing (r-clock rotation, key hash + range routing), read (ReadState with journal probing), heap (max-heap by priority DESC, adjusted clock ASC).
  • queue/: Handler coordinates QueueJoin synchronization (defers actor spawn until all Slices connect). Actor runs the select! loop merging Enqueues from all Slices. Heap matches Slice ordering for best-effort read leveling.
  • service: gRPC server entry point with peer channel caching, spawn_{session,slice,queue}, and HTTP/2 keep-alive tuning.

Changes to existing crates

  • proto-gazette/uuid: Ord/Hash on Producer (for use as map keys with passthrough hashers), Clock arithmetic and accessors, custom Debug impls for readability
  • proto-flow: Generated Rust bindings and serde impls for shuffle.proto
  • proto-grpc: Generated gRPC client/server stubs for the three shuffle RPCs
  • labels/partition: decode_field_range now takes explicit field names and validates coverage (needed for shuffle key partition field extraction)
  • doc/validation: Custom Debug on Validator showing schema URI instead of full schema

Development tooling

  • flowctl raw shuffle: Runs the full Session → Slice → Queue pipeline locally against a live production collection, useful for end-to-end validation

Not yet implemented

  • Queue disk writes: Currently emulated with a 50ms sleep stub; actual disk-backed queue file format is deferred to a follow-up
  • Queue dequeue: Consumers will read queue files directly via file access; the dequeue protocol is out of scope for this PR
  • Lambda shuffle keys: Binding tracks uses_lambda but actual lambda-based key extraction is not yet wired (and may be removed eventually? TBD).
  • Integration with reactor runtime: This PR builds the shuffle crate and development client; wiring into the Gazette consumer framework reactor is a follow-up

Testing

Extensive new unit tests of business logic, which is generally extracted into pure functions and state machines wherever possible.

E2E test via RUST_LOG=shuffle=info cargo run -p flowctl -- raw shuffle --name <collection> --members 3, which runs the full pipeline against a live collection.

A future PR will introduce integration tests which exercise shuffled reads using real Etcd & Gazette, as we have with the legacy go/shuffle/ package.

Implementation of the Shuffle V2 gRPC protocol.

Also add README for legacy shuffle implementation.
Add various impls for Producer, Clock, and Flag such as custom Debug,
Hash, and various helpers.

Add sequence() which implements the core logic of sequencing Producer
messages, to understand pending versus committed spans and tracked
offsets.
`Binding` represents extracted shuffle configuration for a single binding
of a task.

Frontier is a delta of progress available for consumption.
It's closely related to the existing Gazette checkpoint concept,
but additionally represents "causal" hints that are tracked and resolved
internal to the shuffle protocol.
The Session RPC is the entrypoint of a distributed shuffle.

It spawns and coordinates subordinate Slice RPCs, assigns journals to
slices for reading, and aggregates & reports available progress
upwards to the coordinator client.
Slices are a scale-out RPC (one per member) which do the heavy lifting
of monitoring journal listings and reading across assigned journals.

They sequence documents across their assigned journals, track producer
states, and route & enqueue documents to Queue RPCs. They initiate flush
barriers which allow the Slice to report deltas of progress upwards to
the Session.

Each Slice RPC maintains a Queue RPC to each member queue (MxM fan-out).
Queue RPCs join across and receive Enqueue and Flush requests from Slice
RPCs. They merge and order recieved Enqueues, write to an on-disk queue
file (still TODO), and respond to Slice Flush barriers.

Queues build out the on-disk queued state which will be dequeued from
using the checkpoint presented to the shuffle coordinator by the session
RPC.
This subcommand is for development of the v2 distributed shuffle
implementation. It's hidden and not intended as user-facing.
@jgraettinger jgraettinger changed the title shuffle V2 part 1 (placeholder) Shuffle V2: Coordinated disk-backed shuffle protocol (part one) Feb 24, 2026
@jgraettinger jgraettinger marked this pull request as ready for review February 24, 2026 22:00
Copy link
Member

@williamhbaker williamhbaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Lots to digest here.

Since there is more to come on this and it isn't actually hooked up to anything production-related yet, none of these comments are blocking. The most substantive thing I found was using the flowctl raw shuffle on bindings with suspended journals does not work, so that will need to be taken care of either here or in a follow-up if it makes more sense.

// available. The client requests a next checkpoint at times of its choosing
// (e.g., after completing processing of the previous checkpoint).
message NextCheckpoint {}
NextCheckpoint next_checkpoint = 4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is skipping field 3 intentional here?

}

impl<'p> Verify<'p> {
#[must_use]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this #[must_use] redundant for an anyhow::Result return type?

/// Pre-parsed schema for validation and inference.
pub schema: doc::Validator,
/// True if documents should be validated on read (derivation transforms only).
pub validate_on_read: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own understanding, I wanted to ask about what this means. At a very superficial level my thought is that materializations do (schema) validation on reads as well, but clearly this must be referring to something a little different than that, and I wasn't able to deduce it.


/// Parse a collection schema bundle into a Validator and inferred Shape.
/// Prefers the read schema; falls back to the write schema.
fn build_schema(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm noting that we do this pattern of "build schema, validator, and shape" in at least one other place in the code base, and elsewhere too like materialize-kafka. It would be nice to have a helper maybe like doc::validation::build_validator_and_shape(bundle) -> Result<(Validator, Shape)> for this repeated pattern.

} = added;

let binding = &self.bindings[binding as usize];
let journal_name = spec.as_ref().map(|s| s.name.as_str()).unwrap_or("");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to error here, instead of using an empty journal name? My understanding is that it shouldn't be possible to not have a name here.

block: false,
do_not_proxy: true,
header,
..Default::default()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does setting the metadata_only flag here do any good? Looking at the broker code here I suppose the req.EndOffset != 0 && resp.Offset >= req.EndOffset would always end up being true with a request offset of -1 so perhaps it is redundant.

}

/// Probe the current write head of a journal via a non-blocking read at offset -1.
/// Returns `(write_head, header)`. JournalNotFound yields `(0, None)`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "JournalNotFound yields (0, None)" part from the comment doesn't seem to track, unless I'm missing something: Seems like it would hit the map_read_error and return an Err(<something>).

/// Returns the completed frontier when all members have flushed.
pub fn on_flushed(&mut self, member_index: usize) -> Option<crate::Frontier> {
let Some(in_flight) = self.in_flight.get_mut(member_index) else {
return None;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a normally possible condition to happen? I'm thinking it would take some kind of application error, for a flushed to be sent twice for the same member, and we might want to error instead of silently dropping it.

Ok(Binding {
index: 0,
filter_r_clocks: false,
journal_read_suffix: String::new(), // No suffix for ad-hoc reads.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed in VC and will be taken care of in a later PR, but just noting here:

For an ad-hoc collection read with flowctl raw shuffle --name=<collection_name>, I found this not to work with an error of Journal.metadata path segment: invalid length (0; expected 1 <= length <= 512). If I put a dummy suffix here though, it worked.

attempt,
inner: err,
}) => match err {
gazette::Error::BrokerStatus(broker::Status::JournalNotFound) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested flowctl raw shuffle on a materialization task reading from collections with suspended journals, and got an error unexpected broker status: Suspended. I think there will need to be some additional handling for suspended journals.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants