Skip to content

feat(distributed): wire coordinator-side distributed execution for scan queries#4

Open
vamsimanohar wants to merge 11 commits intomainfrom
distributed-refactor-and-test-fixes
Open

feat(distributed): wire coordinator-side distributed execution for scan queries#4
vamsimanohar wants to merge 11 commits intomainfrom
distributed-refactor-and-test-fixes

Conversation

@vamsimanohar
Copy link
Owner

Summary

  • Renames split package to dataunit and removes unused SourceOperatorFactory/OperatorFactory from ComputeStage
  • Adds RelNodeAnalyzer to extract query metadata (index, fields, limit, filters) from Calcite RelNode trees
  • Adds OpenSearchDataUnitSource for shard discovery from cluster routing table
  • Adds LocalityAwareDataUnitAssignment to group shards by preferred node
  • Adds SimplePlanFragmenter and OpenSearchFragmentationContext for 2-stage plan creation (leaf scan + root merge)
  • Adds DistributedQueryCoordinator that assigns shards to nodes, sends transport requests, and merges results
  • Wires DistributedExecutionEngine to use the coordinator when plugins.ppl.distributed.enabled=true
  • Rejects unsupported operations (aggregation, sort, window) with clear error messages instead of silently producing wrong results

Supported query patterns

  • search source=<index>
  • search source=<index> | head N
  • search source=<index> | where <condition>
  • search source=<index> | where <condition> | head N
  • search source=<index> | fields f1, f2, ...

Test plan

  • Unit tests pass for all new classes (RelNodeAnalyzerTest, OpenSearchDataUnitSourceTest, LocalityAwareDataUnitAssignmentTest, SimplePlanFragmenterTest, DistributedQueryCoordinatorTest, DistributedExecutionEngineTest)
  • End-to-end tested on single-node cluster with setup-distributed-ppl.sh
  • Verified distributed path in logs: DistributedExecutionEngineRelNodeAnalyzerSimplePlanFragmenterDistributedQueryCoordinatorTransportExecuteDistributedTaskAction → rows merged
  • Verified unsupported queries (stats, sort) are rejected with descriptive errors
  • Verified legacy engine still works when plugins.ppl.distributed.enabled=false
  • Multi-node cluster testing (single-node only verified so far)

…eline

Implement a distributed MPP query engine for PPL that executes queries
across multiple OpenSearch nodes in parallel using direct Lucene access.

Key components:
- DistributedExecutionEngine: routes queries between legacy and distributed paths
- DistributedQueryPlanner: converts Calcite RelNode trees to multi-stage plans
- DistributedTaskScheduler: coordinates operator pipeline across cluster nodes
- TransportExecuteDistributedTaskAction: executes pipelines on data nodes
- LuceneScanOperator/LimitOperator: direct Lucene _source reads per shard
- Coordinator-side Calcite execution for complex operations (stats, eval, joins)
- Hash join support with parallel distributed table scans
- Filter pushdown, sort, rename, and limit in operator pipeline
- Phase 5A core operator framework (Page, Pipeline, ComputeStage, StagedPlan)
- Explain API showing distributed plan stages via _plugins/_ppl/_explain
- Architecture documentation with class hierarchy and execution plan details
- Comprehensive test coverage including integration tests

Architecture: two execution paths controlled by plugins.ppl.distributed.enabled
- Legacy (off): existing Calcite-based OpenSearchExecutionEngine
- Distributed (on): operator pipeline with no fallback
- Rename Split → DataUnit (abstract class), SplitSource → DataUnitSource,
  SplitAssignment → DataUnitAssignment
- Add Block interface (columnar, Arrow-aligned)
- Add PlanFragmenter, FragmentationContext, SubPlan for automatic stage creation
- Add OutputBuffer for exchange back-pressure
- Add execution lifecycle: QueryExecution, StageExecution, TaskExecution
- Add planFragment field to ComputeStage for query pushdown
- Extend Page with getBlock() and getRetainedSizeBytes() defaults
- Create OpenSearchDataUnit (index + shard, not remotely accessible)
- Delete H1 types: DistributedPhysicalPlan, ExecutionStage, WorkUnit,
  DataPartition, DistributedQueryPlanner, DistributedPlanAnalyzer,
  RelNodeAnalysis, PartitionDiscovery
- Delete execution code: DistributedTaskScheduler, HashJoinExecutor,
  InMemoryScannableTable, QueryResponseBuilder, TemporalValueNormalizer,
  RelNodeAnalyzer, FieldMapping, JoinInfo, SortKey,
  OpenSearchPartitionDiscovery
- Gut DistributedExecutionEngine to routing shell (throws when enabled)
- Simplify OpenSearchPluginModule constructor
- Default PPL_DISTRIBUTED_ENABLED to false
- Remove assumeFalse(isDistributedEnabled()) from integ tests
- Update architecture documentation
…vent infinite loop

The processOnce() loop only passed output between adjacent operator pairs
(i to i+1), never calling getOutput() on the last operator. Operators that
buffer pages (e.g., PassThroughOperator) would never have their buffer
drained, causing isFinished() to never return true and an infinite loop
in run().
…used operator factories

- Rename split/ package to dataunit/ in both core and opensearch modules
- Delete SourceOperatorFactory, OperatorFactory, and Pipeline (unused)
- Simplify ComputeStage constructor by removing factory fields
- Update all imports across 10+ files
…y-aware assignment

- RelNodeAnalyzer: walks Calcite RelNode tree to extract index name,
  field names, query limit, and filter conditions
- OpenSearchDataUnitSource: discovers shards from ClusterState routing
  table, creates OpenSearchDataUnit per shard with preferred nodes
- LocalityAwareDataUnitAssignment: assigns data units to nodes by
  matching preferred nodes to available nodes (groupBy locality)
…scan queries

- SimplePlanFragmenter: creates 2-stage plan (leaf scan + root merge)
  from a Calcite RelNode tree for single-table scan queries
- OpenSearchFragmentationContext: provides cluster topology (data node
  IDs, shard discovery) from ClusterState to the fragmenter
…engine

- DistributedQueryCoordinator: orchestrates distributed execution by
  assigning shards to nodes, sending transport requests, collecting
  responses async, merging rows, and applying coordinator-side limit
- DistributedExecutionEngine: when distributed enabled, fragments
  RelNode into staged plan and delegates to coordinator instead of
  throwing UnsupportedOperationException
- OpenSearchPluginModule: pass ClusterService and TransportService
  to DistributedExecutionEngine constructor
- Explain path: formats staged plan with stage details when
  distributed is enabled
Aggregation, sort, and window queries were silently producing wrong
results because RelNodeAnalyzer walked past unrecognized single-input
nodes. Now throws UnsupportedOperationException with clear messages
for LogicalAggregate, LogicalSort with collation, and Window nodes.
…l planning

Replace the ad-hoc RelNodeAnalyzer pattern matching system with proper MPP
architecture using H2 interfaces. This eliminates hardcoded query analysis
and enables intelligent multi-stage planning.

**Major Changes:**

• **CalciteDistributedPhysicalPlanner** - Replaces RelNodeAnalyzer
  - Proper Calcite visitor pattern for RelNode traversal
  - Implements PhysicalPlanner interface with plan(RelNode) method
  - Converts logical operators to typed physical operators

• **Physical Operator Hierarchy** - Type-safe intermediate representation
  - PhysicalOperatorTree, ScanPhysicalOperator, FilterPhysicalOperator
  - ProjectionPhysicalOperator, LimitPhysicalOperator
  - Bridge between Calcite RelNodes and runtime operators

• **ProjectionOperator** - New runtime operator for field selection
  - Handles field projection and nested field access
  - Page-based columnar data processing
  - Standard operator lifecycle (needsInput/addInput/getOutput)

• **IntelligentPlanFragmenter** - Replaces SimplePlanFragmenter
  - Smart stage boundary decisions based on operator types
  - Cost-driven fragmentation using real estimates
  - Eliminates hardcoded 2-stage assumptions

• **DynamicPipelineBuilder** - Dynamic operator construction
  - Builds pipelines from ComputeStage physical operators
  - Replaces hardcoded LuceneScan→Limit→Collect pattern
  - Supports filter pushdown and operator chaining

• **OpenSearchCostEstimator** - Real cost estimation
  - Uses Lucene index statistics and cluster metadata
  - Replaces stub cost estimator with actual data
  - Enables cost-based optimization decisions

• **Simplified Architecture** - Removed feature flag complexity
  - Single execution path using new physical planner
  - Eliminated legacy SimplePlanFragmenter
  - Streamlined DistributedExecutionEngine integration

**Enhanced Explain Output:**
- Shows physical operators in each stage
- Displays cost estimates and data size projections
- Operator-level execution details

This change establishes proper MPP foundations for complex query support
while maintaining full backward compatibility for supported query patterns.
The SimplePlanFragmenterTest was referencing the deleted SimplePlanFragmenter
class, causing compilation failures after merging origin/main. Removed the
obsolete test as SimplePlanFragmenter has been replaced with
IntelligentPlanFragmenter.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant