diff --git a/CMakeLists.txt b/CMakeLists.txt index 9c02e30..d752dc4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -402,6 +402,12 @@ set(TPCH_CORE_SOURCES ${DBGEN_OBJECTS} ) +# DS-10.2: io_uring pool + output stream (handle #ifdef internally, always compile) +list(APPEND TPCH_CORE_SOURCES + src/async/io_uring_pool.cpp + src/async/io_uring_output_stream.cpp +) + # Add async IO sources only if enabled if(TPCH_ENABLE_ASYNC_IO) list(APPEND TPCH_CORE_SOURCES diff --git a/README.md b/README.md index 6fb21e1..0fc040b 100644 --- a/README.md +++ b/README.md @@ -1,473 +1,201 @@ -# TPC-H C++ Data Generator +# TPC-H / TPC-DS C++ Data Generator -A high-performance TPC-H data generator with multiple output format support (Parquet, ORC, CSV, Paimon, Iceberg, Lance) and optional asynchronous I/O capabilities using Linux io_uring. +High-performance TPC-H and TPC-DS data generators with multiple output format support (Parquet, ORC, CSV, Paimon, Iceberg, Lance) and optional asynchronous I/O via Linux io_uring. ## Features -- **Multiple Output Formats** - - Apache Parquet (columnar, compressed) - - Apache ORC (columnar, optimized for Hive/Spark) - - CSV (row-oriented, human-readable) - - Apache Paimon (lakehouse table format with metadata) - - Apache Iceberg (industry-standard lakehouse format, compatible with Spark/Trino/DuckDB) - - Lance (modern columnar format with native indexing and versioning) - -- **Apache Arrow Integration** - - Central in-memory columnar representation - - Unified API for all output formats - - Zero-copy conversions between formats - -- **Optional Async I/O** - - Linux io_uring support for high-throughput writes - - Optional feature (graceful fallback to synchronous I/O) - - 20-50% throughput improvement over synchronous writes - -- **TPC-H Reference Implementation** - - Official dbgen integration via git submodule - - All 8 TPC-H tables supported (lineitem, orders, customer, part, partsupp, supplier, nation, region) - - Configurable scale factors (1, 10, 100, 1000, ...) - -- **Performance-Focused** - - Target: 1M+ rows/second for lineitem table - - Cross-architecture support considerations - - Benchmarking harness included +- **Multiple Output Formats**: Parquet, ORC, CSV, Apache Paimon, Apache Iceberg, Lance +- **Apache Arrow Integration**: central in-memory columnar representation, unified API across all formats +- **Zero-Copy Streaming Writes**: O(batch) peak RAM regardless of scale factor — essential at SF≥5 +- **Parallel Generation**: fork-after-init with rolling N-slot window — all tables concurrently, one init cost +- **io_uring Support**: kernel async I/O for Parquet (IoUringOutputStream) and Lance (Rust runtime) +- **Both TPC-H and TPC-DS**: all 8 TPC-H tables and 24 TPC-DS tables implemented ## Quick Start -### Prerequisites +### Docker (recommended — no build required) -- **OS**: Linux (WSL2 supported) -- **Compiler**: GCC 11+ or Clang 13+ -- **CMake**: 3.22+ -- **Packages**: libarrow-dev, libparquet-dev, liborc-dev +Pre-built images are available for every platform: -### Installation +```bash +docker pull ghcr.io/tsafin/tpch-cpp-all:latest -Install system dependencies: +# Generate all TPC-H tables at SF=10, Parquet, parallel, zero-copy +docker run --rm -v /data:/data ghcr.io/tsafin/tpch-cpp-all:latest \ + tpch_benchmark --scale-factor 10 --format parquet --output-dir /data \ + --parallel --zero-copy --max-rows 0 -```bash -./scripts/install_deps.sh +# Generate all TPC-DS tables at SF=5, Parquet, parallel, zero-copy +docker run --rm -v /data:/data ghcr.io/tsafin/tpch-cpp-all:latest \ + tpcds_benchmark --scale-factor 5 --format parquet --output-dir /data \ + --parallel --zero-copy --max-rows 0 ``` -### Build +Image registry: https://github.com/tsafin/tpch-cpp/pkgs/container/tpch-cpp-all -```bash -# Configure with default options (Parquet, CSV only) -mkdir build && cd build -cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo .. +### Build from Source -# Build -make -j$(nproc) +#### Prerequisites -# Optional: Install -make install -``` +- Linux (WSL2 supported) — io_uring requires kernel ≥ 5.11 +- GCC 11+ or Clang 13+, CMake 3.22+ +- `libarrow-dev`, `libparquet-dev` +- Rust ≥ 1.85 (only for Lance format) -#### Building with Optional Formats +Install system dependencies: -Enable Paimon table format: ```bash -cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -DTPCH_ENABLE_PAIMON=ON .. +./scripts/install_deps.sh ``` -Enable Iceberg table format: -```bash -cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -DTPCH_ENABLE_ICEBERG=ON .. -``` +#### Build -Enable both Paimon and Iceberg: ```bash -cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -DTPCH_ENABLE_PAIMON=ON -DTPCH_ENABLE_ICEBERG=ON .. -``` +mkdir build && cd build -Enable ORC format (requires liborc-dev): -```bash -cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -DTPCH_ENABLE_ORC=ON .. -``` +# Minimal build (Parquet + CSV only, includes tpch_benchmark) +cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo .. +cmake --build . -j$(nproc) -Enable Lance format (requires Rust): -```bash -cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -DTPCH_ENABLE_LANCE=ON .. -``` +# With TPC-DS: +cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -DTPCDS_ENABLE=ON .. +cmake --build . --target tpcds_benchmark -j$(nproc) -### Usage +# With ORC: +cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -DTPCH_ENABLE_ORC=ON .. -Generate TPC-H customer table in Parquet format: +# With Lance (requires Rust): +cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -DTPCH_ENABLE_LANCE=ON .. -```bash -./tpch_benchmark --scale-factor 1 --format parquet --output-dir data/ --use-dbgen --table customer +# Everything: +cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo \ + -DTPCDS_ENABLE=ON \ + -DTPCH_ENABLE_ORC=ON \ + -DTPCH_ENABLE_LANCE=ON \ + .. +cmake --build . -j$(nproc) ``` -Generate customer table in Iceberg format (with TPCH_ENABLE_ICEBERG=ON): +## Usage -```bash -./tpch_benchmark --scale-factor 1 --format iceberg --output-dir data/ --use-dbgen --table customer -``` - -Generate customer table in Paimon format (with TPCH_ENABLE_PAIMON=ON): +### tpch_benchmark -```bash -./tpch_benchmark --scale-factor 1 --format paimon --output-dir data/ --use-dbgen --table customer ``` +Usage: tpch_benchmark [options] -Generate customer table in Lance format (with TPCH_ENABLE_LANCE=ON): - -```bash -./tpch_benchmark --scale-factor 1 --format lance --output-dir data/ --use-dbgen --table customer + --scale-factor TPC-H scale factor (default: 1) + --format Output format: parquet, csv, orc, paimon, iceberg, lance + (default: parquet) + --output-dir Output directory (default: /tmp) + --max-rows Max rows to generate (default: 1000; use 0 for all rows) + --table Single table: lineitem, orders, customer, part, partsupp, + supplier, nation, region (default: lineitem) + --parallel Generate all 8 tables in parallel (fork-after-init) + --parallel-tables Max concurrent child processes (default: all 8) + --zero-copy Streaming writes — O(batch) RAM; required at SF≥5 with --parallel + --zero-copy-mode Lance streaming variant: sync (default), auto, async + --compression Parquet compression: zstd (default), snappy, none + --io-uring Kernel async I/O: IoUringOutputStream for Parquet, + delegated to Rust runtime for Lance + --verbose Verbose output ``` -With async I/O (if enabled): +**Common invocations:** ```bash -./tpch_benchmark --scale-factor 1 --format parquet --output-dir data/ --async-io --use-dbgen --table customer -``` +# Single table, all rows, Parquet +./tpch_benchmark --scale-factor 5 --format parquet --output-dir /data \ + --table lineitem --max-rows 0 -See `./tpch_benchmark --help` for all options. +# All 8 tables in parallel, Parquet, zero-copy (recommended for SF≥5) +./tpch_benchmark --scale-factor 10 --format parquet --output-dir /data \ + --parallel --zero-copy --max-rows 0 -## Project Structure +# Limit to 4 concurrent children (reduces peak RAM further) +./tpch_benchmark --scale-factor 10 --format parquet --output-dir /data \ + --parallel --parallel-tables 4 --zero-copy --max-rows 0 -``` -tpch-cpp/ -├── CMakeLists.txt # Root build configuration -├── README.md # This file -├── .gitignore # Git ignore patterns -├── cmake/ # CMake modules -│ ├── FindArrow.cmake # Apache Arrow discovery -│ ├── FindORC.cmake # Apache ORC discovery -│ ├── FindPaimon.cmake # Apache Paimon discovery -│ ├── FindUring.cmake # liburing discovery (async I/O) -│ ├── FindThrift.cmake # Apache Thrift discovery -│ └── CompilerWarnings.cmake # Compiler configuration -├── include/tpch/ # Public headers -│ ├── writer_interface.hpp -│ ├── parquet_writer.hpp -│ ├── csv_writer.hpp -│ ├── orc_writer.hpp -│ ├── paimon_writer.hpp -│ ├── iceberg_writer.hpp -│ ├── lance_writer.hpp -│ ├── async_io.hpp -│ └── dbgen_wrapper.hpp -├── src/ # Implementation -│ ├── writers/ -│ │ ├── parquet_writer.cpp -│ │ ├── csv_writer.cpp -│ │ ├── orc_writer.cpp -│ │ ├── paimon_writer.cpp -│ │ ├── iceberg_writer.cpp -│ │ └── lance_writer.cpp -│ ├── async/ -│ │ └── io_uring_context.cpp -│ ├── dbgen/ -│ │ └── dbgen_wrapper.cpp -│ └── main.cpp # Benchmark driver -├── examples/ # Standalone examples -│ ├── simple_arrow_parquet.cpp -│ ├── simple_csv.cpp -│ ├── simple_orc.cpp -│ ├── async_io_demo.cpp -│ ├── multi_table_benchmark.cpp -│ └── CMakeLists.txt -├── third_party/ # External dependencies -│ ├── dbgen/ # TPC-H dbgen (git submodule) -│ ├── lance-ffi/ # Lance FFI bridge (Rust) -│ ├── googletest/ # Google Test framework -│ ├── arrow/ # Arrow (optional vendored) -│ └── orc/ # ORC (optional vendored) -├── tests/ # Unit tests -└── scripts/ # Helper scripts - ├── install_deps.sh # Dependency installation - └── benchmark.sh # Benchmarking harness -``` - -## Build Options +# With io_uring for kernel-async disk writes +./tpch_benchmark --scale-factor 10 --format parquet --output-dir /data \ + --parallel --zero-copy --io-uring --max-rows 0 -Configure with CMake: - -```bash -cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo \ - -DTPCH_BUILD_EXAMPLES=ON \ - -DTPCH_ENABLE_ASAN=OFF \ - -DTPCH_ENABLE_ASYNC_IO=ON \ - .. +# Lance format, streaming mode +./tpch_benchmark --scale-factor 5 --format lance --output-dir /data \ + --parallel --zero-copy --max-rows 0 ``` -| Option | Default | Description | -|--------|---------|-------------| -| `TPCH_BUILD_EXAMPLES` | ON | Build example applications | -| `TPCH_BUILD_TESTS` | OFF | Build unit tests | -| `TPCH_ENABLE_ASAN` | OFF | Enable AddressSanitizer | -| `TPCH_ENABLE_ASYNC_IO` | OFF | Enable async I/O with io_uring | -| `TPCH_ENABLE_ORC` | OFF | Enable ORC format support | -| `TPCH_ENABLE_PAIMON` | OFF | Enable Apache Paimon format support | -| `TPCH_ENABLE_ICEBERG` | OFF | Enable Apache Iceberg format support | -| `TPCH_ENABLE_LANCE` | OFF | Enable Lance format support (requires Rust) | - -## Dependencies - -### Required - -| Library | Version | Ubuntu Package | -|---------|---------|----------------| -| Apache Arrow | >= 10.0 | libarrow-dev | -| Apache Parquet | >= 10.0 | libparquet-dev | -| CMake | >= 3.22 | cmake | -| GCC/Clang | >= 11 | build-essential | - -### Optional - -| Library | Version | Ubuntu Package | Purpose | -|---------|---------|----------------|---------| -| Apache ORC | >= 1.8 | liborc-dev | ORC format support (enable with TPCH_ENABLE_ORC=ON) | -| liburing | >= 2.1 | liburing-dev | Async I/O support | - -## Performance Targets - -- **Lineitem (largest table)**: 1M+ rows/second -- **All tables average**: 500K+ rows/second -- **Parquet write rate**: > 100 MB/second -- **ORC write rate**: > 100 MB/second -- **CSV write rate**: > 50 MB/second -- **Async I/O improvement**: 20-50% over synchronous - -## Benchmark Results (SF=5, Zero-Copy, Maximum Values) - -Comprehensive benchmark of all 6 supported formats across 6 TPC-H tables with 3 runs each: +### tpcds_benchmark -| Format | lineitem | customer | orders | partsupp | part | supplier | **Avg Max** | -|--------|----------|----------|--------|----------|------|----------|-------------| -| **ORC** | 973,893 | 997,340 | 622,252 | 1,371,272 | 422,476 | 1,041,667 | **904,817** 🥇 | -| **PARQUET** | 944,132 | 966,495 | 567,494 | 1,110,186 | 396,983 | 641,026 | **771,053** 🥈 | -| **PAIMON** | 124,716 | 1,082,251 | 277,778 | 101,618 | 373,692 | 531,915 | **415,328** | -| **LANCE** | 130,647 | 1,308,901 | 73,204 | 154,613 | 422,297 | 92,251 | **363,652** | -| **ICEBERG** | 235,531 | 1,001,335 | 204,968 | 135,217 | 286,287 | 245,098 | **351,406** | -| **CSV** | 329,824 | 191,034 | 297,030 | 350,939 | 213,538 | 28,588 | **235,159** | - -*All values in rows/second (maximum of 3 runs). ORC wins with 905K rows/sec average, 17% faster than Parquet.* - -![Performance Comparison](benchmark-results/benchmark_performance_chart.png) - -**Key Takeaways:** -- 🏆 **ORC**: Fastest (905K r/s avg), most stable (9.8% variance) -- ⭐ **Parquet**: Excellent performance (771K r/s), good stability (34% variance) -- ⚠️ **Lance/Paimon/Iceberg**: High variance (60-177%), inconsistent performance -- 📉 **CSV**: Slowest (235K r/s), I/O bound format - -For detailed performance benchmarks across all formats, see **[PERFORMANCE_CONSOLIDATED.md](benchmark-results/PERFORMANCE_CONSOLIDATED.md)** and **[BENCHMARK_COMPREHENSIVE_RESULTS_MAX.md](benchmark-results/BENCHMARK_COMPREHENSIVE_RESULTS_MAX.md)**. - - -## Development - -### Running Examples - -After building with `TPCH_BUILD_EXAMPLES=ON`: - -```bash -# Parquet example -./examples/simple_arrow_parquet - -# CSV example -./examples/simple_csv - -# ORC example -./examples/simple_orc - -# Async I/O demo (if enabled) -./examples/async_io_demo ``` +Usage: tpcds_benchmark [OPTIONS] -### Benchmarking - -Use the included benchmarking harness: - -```bash -./scripts/benchmark.sh + --format Output format: parquet, csv, paimon, lance (default: parquet) + --table Single TPC-DS table (default: store_sales) + --scale-factor Scale factor (default: 1) + --output-dir Output directory (default: /tmp) + --max-rows Max rows to generate (0=all, default: 1000) + --compression Parquet compression: zstd (default), snappy, none + --zero-copy Streaming mode — O(batch) RAM; required at SF≥5 with --parallel + --zero-copy-mode Lance streaming variant: sync, auto, async (default: sync) + --parallel Generate all 24 tables in parallel (fork-after-init) + --parallel-tables Max concurrent child processes (default: all) + --verbose Verbose output ``` -This runs comprehensive benchmarks across all scale factors and formats. - -## Validation - -- Output files readable by standard tools: - - Parquet: `pyarrow.parquet.read_table()` - - ORC: Apache Spark, `orc.read()` - - CSV: pandas, Excel, awk, etc. -- Round-trip testing: write → read → verify data integrity -- Performance benchmarks: throughput and scalability analysis - -## Architecture Notes - -### Apache Arrow Central Format - -Uses Arrow as the central in-memory columnar format: -- Unified API across all output formats -- Zero-copy conversions -- Industry standard for analytics -- Better memory efficiency than row-oriented - -### C++20 Standard - -- C++20 required for std::span in zero-copy optimizations -- Modern features including concepts, ranges, and coroutines -- Smart pointers, optional, structured bindings - -### CMake Build System - -- Multiple external dependencies -- Cross-platform potential -- Clean configuration management - -### Modular Writer Interface +**Common invocations:** -- Abstract `WriterInterface` base class -- Format-specific implementations (Parquet, ORC, CSV, Paimon, Iceberg, Lance) -- Easy to extend with new formats -- Runtime polymorphism for format selection - -### Optional Async I/O - -- Linux io_uring support for high-throughput I/O -- Graceful fallback to synchronous I/O -- Compile-time flag for portability -- 20-50% throughput improvement target - -## Phase 12: Async I/O Performance Optimization - -**Status**: ✅ PARTIALLY COMPLETE - -### Achievements - -**✅ Phase 12.1: Fixed critical 2GB offset truncation bug** -- Root cause: io_uring_prep_write() using 32-bit unsigned for byte count -- Solution: Chunked writes at 2GB boundary -- Impact: Prevents silent data loss on large files (lineitem SF10) - -**✅ Phase 12.2: Profiling identified actual bottlenecks** -- Parquet generation is CPU-bound (serialization), not I/O-bound -- CSV generation is I/O-bound (many small writes) - async I/O helps here -- CPU usage identical in both sync and async modes -- Recommendation: Async I/O beneficial for I/O-heavy workloads - -**✅ Phase 12.5: Multi-file async I/O architecture** -- Shared AsyncIOContext for concurrent writes to multiple files -- Per-file offset tracking and automatic advancement -- Production-ready, fully benchmarked -- Integrated with multi-table generation -- 7.8% improvement for Parquet, 32% for CSV (I/O-bound workloads) - -**❌ Phase 12.3: Parallel generation - BROKEN (do not use)** -- Performance: 16x SLOWER (2 minutes vs 9 seconds) -- Consistent "part" table generation failures -- Root cause: dbgen uses global variables (Seed[], scale, etc.) that conflict in parallel -- Context switches: 1.4M (normal = 1-10), pathological overhead -- CPU utilization: Only 8-9% (shows processes serializing despite fork/execv) - -### Recommendations - -1. **Use `--async-io` flag** for I/O-bound workloads (CSV, streaming) -2. **Do NOT use `--parallel` flag** - it makes performance worse -3. For multi-table generation: Use sequential `--table` calls with `--async-io` -4. Future redesign needed for true parallelization (requires addressing dbgen globals) - -### Documentation - -See `/home/tsafin/.claude/plans/async-io-performance-fixes.md` for comprehensive analysis including: -- Detailed profiling results -- Root cause analysis for parallel failures -- Integration testing results -- Design options for future improvements - -## Phase 14: Zero-Copy Performance Optimizations - -### Phase 14.1: Batch-Level Zero-Copy ✅ COMPLETE - -**Status**: Production-ready, recommended default +```bash +# All TPC-DS tables in parallel, Parquet, zero-copy +./tpcds_benchmark --scale-factor 5 --format parquet --output-dir /data \ + --parallel --zero-copy --max-rows 0 -Eliminates per-row function call overhead by batching data extraction: +# Single table smoke test +./tpcds_benchmark --format parquet --table store_sales --scale-factor 1 -```bash -./tpch_benchmark --use-dbgen --table lineitem --max-rows 100000 \ - --zero-copy --format parquet --output-dir data/ +# Limit parallelism +./tpcds_benchmark --scale-factor 10 --format parquet --output-dir /data \ + --parallel --parallel-tables 6 --zero-copy --max-rows 0 ``` -**Performance**: 2.1× speedup over baseline -- Reduces function call overhead from O(n) to O(1) -- All data types supported -- Byte-for-byte identical output to non-optimized path +**TPC-DS tables:** -**Numeric Performance**: -| Table | Baseline | With `--zero-copy` | Speedup | -|-------|----------|-------------------|---------| -| lineitem | 316K rows/sec | 627K rows/sec | 1.98× | -| partsupp | 476K rows/sec | 678K rows/sec | 1.43× | -| customer | 242K rows/sec | 349K rows/sec | 1.44× | +| Category | Tables | +|----------|--------| +| Fact | store_sales, inventory, catalog_sales, web_sales, store_returns, catalog_returns, web_returns | +| Dimension | customer, item, date_dim, call_center, catalog_page, web_page, web_site, warehouse, ship_mode, household_demographics, customer_demographics, customer_address, income_band, reason, time_dim, promotion, store | -### Phase 14.2.3: Zero-Copy with Buffer::Wrap ✅ PRODUCTION READY +## Build Options -**Status**: Significant performance improvement confirmed (merged into `--zero-copy`) +| CMake Option | Default | Description | +|---|---|---| +| `TPCDS_ENABLE` | OFF | Build `tpcds_benchmark` (TPC-DS) | +| `TPCH_ENABLE_ORC` | OFF | ORC format support | +| `TPCH_ENABLE_PAIMON` | OFF | Apache Paimon format support | +| `TPCH_ENABLE_ICEBERG` | OFF | Apache Iceberg format support | +| `TPCH_ENABLE_LANCE` | OFF | Lance format support (requires Rust ≥ 1.85) | +| `TPCH_ENABLE_ASYNC_IO` | OFF | Build io_uring pool (auto-detected at runtime; needed for `--io-uring`) | +| `TPCH_ENABLE_ASAN` | OFF | AddressSanitizer (for development only — do not benchmark with ASAN) | +| `TPCH_BUILD_EXAMPLES` | ON | Build example applications | +| `TPCH_BUILD_TESTS` | OFF | Build unit tests | -**Note**: The `--true-zero-copy` flag was removed and its optimizations were merged into the standard `--zero-copy` flag. +## Dependencies -Eliminates numeric data memcpy by wrapping vector memory with `arrow::Buffer::Wrap()`: +| Library | Required | Ubuntu Package | Notes | +|---------|----------|----------------|-------| +| Apache Arrow + Parquet | Yes | `libarrow-dev libparquet-dev` | ≥ 10.0 | +| CMake | Yes | `cmake` | ≥ 3.22 | +| GCC/Clang | Yes | `build-essential` | GCC ≥ 11 | +| Apache ORC | No | `liborc-dev` | Needed for ORC format | +| liburing | No | `liburing-dev` | Needed for `--io-uring` (`TPCH_ENABLE_ASYNC_IO=ON`) | +| Rust | No | via rustup | ≥ 1.85, needed for Lance format | -```bash -./tpch_benchmark --use-dbgen --table lineitem --max-rows 100000 \ - --zero-copy --format parquet --output-dir data/ -``` +## Performance Notes -**Performance Results** (no ASAN overhead): -| Table | Baseline | With --zero-copy | Improvement | -|-------|----------|------------------|-------------| -| lineitem | 872K | 1,037K rows/sec | **+19.0%** 🔥 | -| orders | 385K | 429K rows/sec | **+11.4%** | -| part | 308K | 328K rows/sec | **+6.6%** | -| customer | 652K | 652K rows/sec | 0.0% (ceiling) | -| Average | 457K | 486K rows/sec | **+4.6%** ✅ | - -**Important Notes**: -- Requires streaming write mode (constant memory usage) -- String data still requires memcpy (non-contiguous in dbgen) -- Bonus: 10× lower peak memory usage -- **Performance**: +4.6% average, up to +19% for numeric-heavy tables - -**When to use**: -- ✅ **Lineitem and numeric-heavy tables** (50%+ numeric columns) - 15-19% speedup -- ✅ **General-purpose use** (recommended default) - consistent 4-11% improvement -- ✅ **Memory-constrained systems** - 10× lower peak memory usage -- ⚠️ String-heavy tables (71%+ strings) - marginal benefit - -**Technical Details**: -- Uses `BufferLifetimeManager` to manage shared_ptr lifetimes -- Safe from use-after-free via reference counting -- All memory safety tests passing (AddressSanitizer) -- See `PHASE14_2_3_PERFORMANCE_REPORT_UPDATED.md` and `BENCHMARK_ASAN_COMPARISON.md` for detailed analysis - -### Recommendation - -**Use `--zero-copy` by default**: -- **4.6% average speedup** (real-world performance) -- **19% speedup for numeric-heavy tables** (lineitem) -- 10× lower peak memory usage -- Proven safe (all tests passing) - -## Future Enhancements - -- Additional formats: Avro, Arrow IPC, Protobuf -- True parallel data generation (requires dbgen refactoring) -- Query integration with DuckDB/Polars -- Direct I/O (O_DIRECT) support -- Advanced observability and metrics -- String data contiguity for full zero-copy benefit -- Performance profiling integration - -## Contributing - -See the main monorepo documentation for contribution guidelines. +- Always use `--zero-copy` at SF≥5 — without it each child accumulates all batches in RAM before writing, which OOMs at scale. +- `--parallel` forks children after one shared dbgen initialization (COW), giving full CPU utilization with a single init cost. +- `--io-uring` offloads write syscalls to the kernel async worker pool. Useful when disk I/O is the bottleneck; has no effect on CPU-bound workloads (e.g. heavy ZSTD compression). +- Do not use `TPCH_ENABLE_ASAN` for performance measurement — ASAN adds 30–50% overhead and distorts comparisons. ## License -See LICENSE file (inherits from monorepo) - -## Contact - -For questions or issues, contact the maintainers at the Database Internals meetups. +See LICENSE file. diff --git a/docs/parallel_uring_arch.md b/docs/parallel_uring_arch.md new file mode 100644 index 0000000..56b8b5b --- /dev/null +++ b/docs/parallel_uring_arch.md @@ -0,0 +1,575 @@ +# Parallel TPC-DS Generation with General io_uring Layer + +**Status**: DS-10.1/10.2/10.3 complete; DS-10.4 optional next +**Target branch**: `tsafin/parallel_tpcds` +**Phase label**: DS-10 + +| Phase | Status | Commit | +|-------|--------|--------| +| DS-10.1 | ✅ done | `81c7539` | +| DS-10.2 | ✅ done | `141624a` | +| DS-10.3 | ✅ done | `141624a` | +| DS-10.4 | 🔲 optional | — | + +--- + +## Motivation + +TPC-DS has 24 tables. The current `tpcds_benchmark` generates them one at a time, +single-threaded. At SF=10 the slowest table (`store_sales`, 28.8 M rows) takes ~144 s +alone; generating all 24 tables sequentially takes ~1 h. The goal is to close that gap +by two orthogonal improvements: + +1. **Parallel table generation** — all 24 tables generated simultaneously (fork-after-init). +2. **General async write layer** — a format-agnostic `io_uring`-backed Arrow + `OutputStream` shared by all writers, activated automatically when `--parallel` is used. + +--- + +## Lessons from Past Experiments + +### TPC-H Phase 12.3 — per-child re-init (broken) + +Each forked child called `dbgen_init_global()` independently. +Problem: re-initialization reset the table-partitioned seed arrays, producing wrong row +counts and duplicate data. +Fix (Phase 12.6): init once in the parent, fork children that call `set_skip_init(true)`. + +### TPC-H Phase 12.6 — fork-after-init (working) + +``` +parent: dbgen_init_global() + └── fork × 8 + child: set_skip_init(true) → generate one table → exit +parent: waitpid × 8 +``` + +Seeds are partitioned by table in `Seed[]`; children never touch sibling seeds. +Result: ~8× throughput for all-tables generation. + +### Lance io_uring (Phase 3.4 / io_uring v3, working) + +Implemented in `third_party/lance-ffi/src/io_uring_store.rs`. +Key benchmark (SF=10 lineitem, 60 M rows): + +| | Wall time | Avg bandwidth | Stall time | +|---|---|---|---| +| Baseline | 183.9 s | 43.9 MB/s | 144.6 s (78%) | +| io_uring v3 | **85.7 s** | **188.5 MB/s** | 57.5 s (67%) | +| **Speedup** | **2.15×** | **4.3×** | **2.5× less stalled** | + +Discoveries that must be generalised (see next section): + +| # | Discovery | Rationale | +|---|-----------|-----------| +| 1 | `IORING_SETUP_ATTACH_WQ` shared kernel worker pool | Reduces scheduler pressure; prevents host stalls on WSL2 | +| 2 | Persistent ring per file | Eliminates ~800 `io_uring_setup()` syscalls for a 4 GB file | +| 3 | sysfs queue-depth calibration | Reads `/sys/block/*/queue/nr_requests`, clamps to [8, 128] | +| 4 | No `SQPOLL` | WSL2: SQPOLL creates busy-polling kernel thread → Windows scheduler freeze | +| 5 | No `O_DIRECT` | WSL2/VirtIO: each O_DIRECT write waits for disk ACK, kills pipelining | +| 6 | Worker thread owns ring for file lifetime | No per-write ring setup; amortises setup cost | +| 7 | Async MPSC channel for write dispatch | Decouples generation from disk latency | +| 8 | Atomic offset pre-claiming | Lock-free: `fetch_add(len)` before async write, enables out-of-order completion | + +### Parquet `AsyncIOContext` (C++ side, partial) + +`include/tpch/async_io.hpp` has a complete io_uring API +(`queue_write`, `register_buffers`, `submit_queued`, …) gated by +`#ifdef TPCH_ENABLE_ASYNC_IO`. +The Parquet writer has `set_async_context()` but it is wired only to the old +batch-accumulation path — **not** to the streaming (`--zero-copy`) path which calls +`FileWriter::WriteRecordBatch()` directly on an `arrow::io::FileOutputStream`. + +The async context API is a good foundation but the integration layer is missing. + +--- + +## Why Not a Pre-forked Job Server? + +Make's `-j N` job server pre-forks workers because jobs are discovered dynamically +as the build graph expands. Workers stay alive across many short-lived tasks to amortise +the fork cost and avoid repeated initialisation. + +For TPC-DS: the work set is **entirely static** — exactly 24 tables, all known before +any fork. A pre-forked pool would require IPC (pipe/socket) to assign table IDs to +workers, a protocol for "done, give me more", and careful handling of the case where a +worker inherits the wrong dsdgen stream state. That is 100+ lines of IPC plumbing +solving a problem that does not exist here. + +The only desirable property borrowed from the job-server pattern is the **N-slot rolling +window** — don't start table N+1 until one of the N active slots is free. This is +naturally expressed with io_uring as described below. + +--- + +## Architecture + +### Guiding principles + +> 1. io_uring is activated by `--parallel`, not by `---io-uring`. +> Format writers are oblivious to the I/O backend. +> 2. The parent's anchor ring is the **scheduler** (process lifecycle) AND the +> **I/O pool anchor** (ATTACH_WQ). One mechanism, two roles. + +### io_uring as the concurrency semaphore + +Rather than a POSIX semaphore, pipe-token pool, or `WNOHANG` poll loop, the parent +uses `pidfd` + `IORING_OP_POLL_ADD` to implement the N-slot rolling window directly +in the ring: + +- `pidfd_open(child_pid, 0)` (Linux ≥ 5.3) returns an fd that becomes readable when + the child exits. +- Submit `IORING_OP_POLL_ADD(pidfd, POLLIN)` to the anchor ring — one SQE per live + child. +- **The number of in-flight POLL_ADD entries IS the semaphore count**: never more than + N SQEs live in the ring, so never more than N tables running. +- When a CQE fires: reap the child (`waitid(P_PIDFD, ...)`), close the pidfd, + decrement active count, fork the next table if any remain. + +No separate semaphore object. No busy-waiting. `io_uring_enter(min_complete=1)` blocks +until a slot opens. + +``` +// Scheduler loop in parent (pseudocode) + +ring = io_uring_setup(QD, flags=0) // anchor ring +init_dsdgen() // distributions loaded once + +// Fill initial N slots +for i in 0 .. min(N, tables.size()): + pid = fork_child(tables[i]) + pidfds[i] = pidfd_open(pid) + ring.submit(POLL_ADD, pidfds[i], POLLIN, user_data=i) + active++ + +// Rolling scheduler loop +while active > 0: + cqe = ring.wait(min_complete=1) // block — no spin + for each cqe in ring.peek_batch(): + idx = cqe.user_data + waitid(P_PIDFD, pidfds[idx], ...) // reap zombie + close(pidfds[idx]) + active-- + if next_table < tables.size(): + pid = fork_child(tables[next_table]) + pidfds[idx] = pidfd_open(pid) + ring.submit(POLL_ADD, pidfds[idx], POLLIN, user_data=idx) + next_table++; active++ + +unlink(tmp_dist_path) // parent owns cleanup +``` + +The same `ring` fd is inherited by all children as the `ATTACH_WQ` anchor — children +create their I/O rings with `IORING_SETUP_ATTACH_WQ, wq_fd=ring_fd`, sharing the +kernel async-worker pool with the parent scheduler. + +### Full picture + +``` +parent +├── init_dsdgen() ← one-time; all streams ready +├── ring = io_uring_setup(QD, 0) ← anchor: scheduler + ATTACH_WQ source +│ +├── [scheduler loop, N slots active] +│ POLL_ADD(pidfd[i]) → CQE when child[i] exits +│ on CQE: reap, fork next, re-submit POLL_ADD +│ +└── fork() × 24 (rolling, ≤ N at a time) + │ + ├── child [store_sales] + │ child_ring = io_uring_setup(QD, ATTACH_WQ, wq_fd=parent_ring_fd) + │ stream = IoUringOutputStream(path, child_ring) + │ writer = create_writer(format, stream) ← injected, format-agnostic + │ generate → write_batch() → stream.Write() → SQE on child_ring + │ close() → drain CQEs → join worker thread → exit(0) + │ + ├── child [inventory] (same pattern) + └── ... + all children share one kernel async-worker pool via ATTACH_WQ +``` + +### Kernel version requirements + +| Feature | Minimum kernel | Available on 6.6 WSL2 | +|---------|---------------|----------------------| +| `io_uring` | 5.1 | ✅ | +| `pidfd_open` | 5.3 | ✅ | +| `IORING_OP_POLL_ADD` on pidfd | 5.3 | ✅ | +| `IORING_SETUP_ATTACH_WQ` | 5.6 | ✅ | +| `IORING_OP_WAITID` (alternative) | 6.7 | ❌ (not needed; pidfd approach used) | + +### `IoUringPool` — dual-role anchor ring manager + +The anchor ring serves two independent roles after `init()`: + +1. **Scheduler ring** (parent only): submits `POLL_ADD(pidfd)` SQEs and waits for + CQEs to detect child exits. Ring depth = `--parallel-tables N` = semaphore count. +2. **ATTACH_WQ source** (inherited by children): each child calls `create_child_ring()` + which creates its I/O ring with `IORING_SETUP_ATTACH_WQ` pointing at the anchor fd. + One kernel async-worker pool serves all I/O across all children. + +```cpp +// include/tpch/io_uring_pool.hpp +class IoUringPool { +public: + // Called once in parent before any fork. + // Creates anchor ring with QD calibrated from sysfs. + // output_dir used for device detection. + static void init(const std::string& output_dir, uint32_t max_parallel); + + // Parent scheduler: submit POLL_ADD for a child pidfd. + // user_data is returned verbatim in the CQE. + static void watch_child(int pidfd, uint64_t user_data); + + // Parent scheduler: block until at least one child exits. + // Returns completed user_data values (one per exited child in this batch). + static std::vector wait_any(); + + // Called in each child after fork. + // Creates a new ring attached to the anchor (ATTACH_WQ). + // Falls back to plain ring if anchor unavailable. + static int create_child_ring(); + + // True if io_uring is available and anchor was initialised. + static bool available(); + + // fd of anchor ring (needed by Lance FFI for ATTACH_WQ, optional). + static int anchor_fd(); + +private: + static int anchor_fd_; + static uint32_t calibrated_qd_; // sysfs nr_requests/2, clamped [8, 128] +}; +``` + +`wait_any()` calls `io_uring_enter(ring, 0, 1, IORING_ENTER_GETEVENTS)` then drains +the CQ with `io_uring_peek_batch_cqe`. It does **not** call `waitpid` — the caller +is responsible for reaping via `waitid(P_PIDFD, ...)` using the returned user_data +to look up the correct pidfd. + +Queue-depth calibration is the same algorithm as `io_uring_store.rs`: +`stat(output_dir)` → `st_dev` → walk `/sys/block/*/dev` → read `queue/nr_requests` → +`nr_requests / 2`, clamped to `[8, 128]`. + +### `IoUringOutputStream` — format-agnostic Arrow stream + +```cpp +// include/tpch/io_uring_output_stream.hpp +class IoUringOutputStream : public arrow::io::OutputStream { +public: + // ring_fd: io_uring fd created by IoUringPool::create_child_ring() + // Spawns worker thread that owns the ring for the file's lifetime. + IoUringOutputStream(const std::string& path, int ring_fd); + + arrow::Status Write(const void* data, int64_t nbytes) override; + arrow::Status Flush() override; // waits for all in-flight CQEs + arrow::Status Close() override; // Flush() + join worker thread + close fd + + arrow::Result Tell() const override; + bool closed() const override; + +private: + struct WriteJob { + std::vector data; + int64_t offset; + std::promise done; + }; + + std::atomic write_offset_{0}; // lock-free offset pre-claiming + std::thread worker_; + // MPSC: producer = Arrow caller, consumer = worker thread + std::mutex mu_; + std::condition_variable cv_; + std::queue queue_; + bool closed_ = false; +}; +``` + +Worker thread loop (mirrors `uring_write` in `io_uring_store.rs`): + +``` +loop: + job = queue.pop_blocking() + for chunk in job.data (chunk_size = 512 KB): + fill SQ up to ring QD + if SQ full: submit_and_wait(1), drain CQEs + submit_and_wait(in_flight) // drain remaining + job.done.set_value(ok) +``` + +No SQPOLL. No O_DIRECT. 512 KB SQE chunks. Same rules as Rust implementation. + +### Writer factory injection + +Writers already accept an `arrow::io::OutputStream` for initialisation (Parquet, CSV). +The factory function changes from: + +```cpp +// Before +ARROW_ASSIGN_OR_RAISE(stream, arrow::io::FileOutputStream::Open(path)); +``` + +to: + +```cpp +// After — when io_uring is available (parallel mode) +if (IoUringPool::available()) { + int ring_fd = IoUringPool::create_child_ring(); + stream = std::make_shared(path, ring_fd); +} else { + ARROW_ASSIGN_OR_RAISE(stream, arrow::io::FileOutputStream::Open(path)); +} +``` + +**Format writers have zero knowledge of io_uring.** The injection point is +`create_writer()` in `tpcds_main.cpp`, not inside each writer class. + +### Lance integration + +Lance already has its own Rust-side io_uring (`io_uring_store.rs`) with an internal +`ANCHOR` lazy_static. When Lance runs in a child process that inherited the C++ anchor +fd, the Rust `ANCHOR` creates a separate pool by default. + +To share one pool: add a Rust FFI function that accepts the parent's anchor fd and +uses it as `wq_fd` for all Lance rings: + +```rust +// third_party/lance-ffi/src/lib.rs +#[no_mangle] +pub extern "C" fn lance_writer_attach_io_uring_pool(anchor_fd: i32) { … } +``` + +Called from `tpcds_main.cpp` in each child before the Lance writer is constructed. +This is **optional** — Lance works correctly without it; sharing just avoids spawning +a second set of kernel async-worker threads. + +--- + +## DSDGen Fork Safety + +### The `tmp_dist_path_` problem + +`DSDGenWrapper::init_dsdgen()` writes a temp file (`/tmp/tpcds_idx_XXXXXX`) and stores +its path in `tmp_dist_path_`. The destructor calls `unlink(tmp_dist_path_)`. +After fork, every child inherits the same path. The first child to exit deletes the +file. This is benign (all children already loaded distributions into memory via +`init_rand()` before fork) but causes spurious `unlink` failures for later children. + +**Fix**: add two methods to `DSDGenWrapper`: + +```cpp +void set_skip_init(bool skip); // child: skip init_dsdgen() entirely +void clear_tmp_path(); // child: clear path so destructor won't unlink +``` + +Child pattern after fork: + +```cpp +// In child process, immediately after fork(): +dsdgen_child.set_skip_init(true); +dsdgen_child.clear_tmp_path(); +``` + +The parent (not any child) unlinks the temp file after `waitpid` for all children. + +### Seed isolation + +TPC-DS dsdgen uses `Streams[]` partitioned by table ID (same design as TPC-H `Seed[]`). +`init_rand()` initialises all streams; each table's generation reads only its own +stream slots. Children never touch sibling streams. Fork-after-init is safe. + +**Required verification** (after DS-10.1 implementation): + +```bash +# Sequential reference +./tpcds_benchmark --format parquet --table store_sales --sf 1 --max-rows 0 +# Record row count per table + +# Parallel run +./tpcds_benchmark --parallel --format parquet --sf 1 --max-rows 0 +# Verify each table's row count matches the sequential reference +``` + +--- + +## CLI Changes + +``` +tpcds_benchmark [existing flags] [new flags] + + --parallel Generate all implemented tables in parallel. + Activates io_uring (if available) automatically. + --parallel-tables Limit concurrency to N tables at a time + (default: all tables; useful on memory-constrained hosts). +``` + +No `--lance-io-uring`, no `--parquet-io-uring` — the I/O backend is an +implementation detail of `--parallel`. + +--- + +## Implementation Phases + +### DS-10.1 — Fork-after-init parallel generation ✅ `81c7539` + +**Files changed**: `src/tpcds_main.cpp`, `include/tpch/dsdgen_wrapper.hpp`, +`src/dsdgen/dsdgen_wrapper.cpp` +**Net change**: +347 / -98 lines + +What was built: +1. `DSDGenWrapper::prepare_for_fork()` — public wrapper around `init_dsdgen()`; call + in parent before any `fork()`. +2. `DSDGenWrapper::clear_tmp_path()` — call immediately after `fork()` in each child; + prevents child destructor from unlinking the parent's temp dist file. +3. `dispatch_generation()` — extracted the 24-entry if-else table dispatch; used by + both the single-table path and the parallel path (no duplication). +4. `ALL_TPCDS_TABLES` — ordered: tiny dims → small dims → large dims → fact tables, + so small tables complete quickly and free slots for the heavier fact tables. +5. `generate_all_tables_parallel()` — rolling N-slot fork loop using `waitpid(-1)`: + forks up to `--parallel-tables N` children simultaneously (default: all 24), + refills freed slots immediately; parent unlinks temp dist file on exit. +6. `run_table_child()` — per-child setup: clears tmp path, creates writer, calls + dispatch_generation, prints per-table timing line, exits. +7. `main()`: if `opts.parallel`, delegates to `generate_all_tables_parallel` and + returns; single-table path unchanged. + +**Observed performance** (SF=1, default 1000-row smoke test): +- Sequential (24 tables × ~0.24s each): ~5.8s +- `--parallel` (24 slots): **0.14s wall** — ~40× faster on smoke workload +- `--parallel --parallel-tables 4` (rolling 4-slot window): 0.08s wall + +### DS-10.2 + DS-10.3 — `IoUringPool` + `IoUringOutputStream` + Parquet injection ✅ `141624a` + +**Files**: `include/tpch/io_uring_pool.hpp`, `src/async/io_uring_pool.cpp`, +`include/tpch/io_uring_output_stream.hpp`, `src/async/io_uring_output_stream.cpp`, +`include/tpch/parquet_writer.hpp`, `src/writers/parquet_writer.cpp`, +`src/tpcds_main.cpp` (parallel init + child injection), `CMakeLists.txt` +**Net change**: +664 lines + +What was built: + +1. **`IoUringPool`** — dual-role anchor ring manager: + - `init(output_dir)`: sysfs QD calibration + `io_uring_queue_init`. + - `watch_child(pidfd, user_data)`: submits `POLL_ADD` on anchor ring. + - `wait_any()`: blocks on `io_uring_wait_cqe`, drains CQ batch. + - `create_child_ring_struct()`: allocates child ring with `IORING_SETUP_ATTACH_WQ`. + - `free_ring(ring)`: `io_uring_queue_exit` + `delete`. + - Stub when `TPCH_ENABLE_ASYNC_IO=OFF`: `available()` returns false, all no-ops. + +2. **`IoUringOutputStream`** — format-agnostic `arrow::io::OutputStream`: + - Opens file in constructor; spawns worker thread when ring != nullptr. + - `Write()`: atomic offset pre-claim (`fetch_add`), copy to `WriteJob`, + enqueue MPSC, block on `std::future` until worker drains CQEs. + - Worker: 512 KB SQE chunks; submit when SQ full; drain all CQEs per job. + - Sync fallback (ring == nullptr): `pwrite(2)` directly, no worker thread. + - No SQPOLL, no O_DIRECT (both hurt on WSL2/VirtIO). + +3. **Parquet stream injection**: + - `ParquetWriter::set_output_stream(stream)`: store injected stream. + - `init_file_writer()`: uses injected stream if set, else `FileOutputStream::Open`. + +4. **Parallel path wiring** (`tpcds_main.cpp`): + - `generate_all_tables_parallel()`: calls `IoUringPool::init(output_dir)` before fork. + - `run_table_child()`: when `IoUringPool::available() && parquet && --zero-copy`, + creates child ring, wraps in `IoUringOutputStream`, injects into `ParquetWriter`. + +**Activation**: `--parallel --format parquet --zero-copy` with `TPCH_ENABLE_ASYNC_IO=ON`. +**Fallback**: when `ASYNC_IO=OFF` or `available()==false`, uses `FileOutputStream` as before. + +### DS-10.4 — Lance: share kernel worker pool (optional) + +**Files**: `third_party/lance-ffi/src/lib.rs`, `include/tpch/lance_ffi.h`, +`src/tpcds_main.cpp` +**New lines**: ~30 Rust + 10 C++ + +Add `lance_writer_attach_io_uring_pool(anchor_fd: i32)` FFI. +Called in each child before the Lance writer is created; routes all Lance rings through +the C++ anchor's kernel worker pool. + +--- + +## Expected Performance + +### DS-10.1 alone (parallel generation, no io_uring) + +All 24 tables overlap on the VirtIO-blk queue naturally — no io_uring needed. +Total wall time ≈ time of the slowest table (likely `store_sales` or `catalog_sales`). + +| SF | Sequential (est.) | Parallel (est.) | Speedup | +|----|-------------------|-----------------|---------| +| 1 | ~90 s | ~15 s | ~6× | +| 5 | ~450 s | ~70 s | ~6× | +| 10 | ~1800 s | ~150 s | ~12× | + +### DS-10.1 + DS-10.2 (parallel + io_uring) + +Each child's writes are pipelined via `ATTACH_WQ` → shared kernel pool drains faster. +Based on Lance io_uring experiments (+2.15× per table), combined effect: + +| SF | Parallel only | Parallel + io_uring | Total vs sequential | +|----|---------------|---------------------|---------------------| +| 10 | ~150 s | ~70 s (est.) | ~25× | + +--- + +## Fallback Strategy + +| Condition | Behaviour | +|-----------|-----------| +| Kernel < 5.1 (no io_uring) | `IoUringPool::available()` returns false; `FileOutputStream` used | +| `TPCH_ENABLE_ASYNC_IO=OFF` | Stubs compile; `--parallel` still works, just without io_uring | +| io_uring_setup fails at runtime | Log warning; fall back to `FileOutputStream` | +| Lance without `attach_io_uring_pool` | Lance uses its own independent anchor; correct, slightly less efficient | + +--- + +## File Map + +``` +include/tpch/ + io_uring_pool.hpp ← new (DS-10.2) + io_uring_output_stream.hpp ← new (DS-10.2) + dsdgen_wrapper.hpp ← add set_skip_init / clear_tmp_path (DS-10.1) + +src/ + tpcds_main.cpp ← parallel fork loop + create_writer injection (DS-10.1, DS-10.3) + io/ + io_uring_pool.cpp ← new (DS-10.2) + io_uring_output_stream.cpp ← new (DS-10.2) + dsdgen/ + dsdgen_wrapper.cpp ← set_skip_init / clear_tmp_path impl (DS-10.1) + writers/ + parquet_writer.cpp ← accept injected stream (DS-10.3, minimal) + csv_writer.cpp ← accept injected stream (DS-10.3, minimal) + +third_party/lance-ffi/src/ + lib.rs ← lance_writer_attach_io_uring_pool FFI (DS-10.4) + +CMakeLists.txt ← add src/io/*.cpp to tpch_core sources +``` + +--- + +## Open Questions + +1. **Rolling fork vs batch**: DS-10.1 uses `waitpid(-1)` for the rolling window — + a new child is forked immediately when any child exits, keeping N slots busy. + DS-10.2 will upgrade this to the `pidfd` + `IORING_OP_POLL_ADD` event loop so the + same anchor ring drives both scheduling and I/O. ✅ Resolved (DS-10.1 interim). + +2. **Lance `--zero-copy` in parallel mode**: Each child uses its own Lance writer + independently. Streaming mode (`zero_copy_mode=async`) spawns a Tokio runtime per + child. With 24 children this is 24 Tokio runtimes — acceptable since they are in + separate processes (no shared address space). + +3. **`max_rows` per child**: In parallel mode `--max-rows 0` should be the default + (generating partial tables in parallel is only useful for benchmarking). Consider + warning if `--parallel` is used without `--max-rows 0`. + +4. **Output file collision**: All children write to `output_dir/.`. + Since table names are unique, no collision is possible. Document explicitly. + +5. **pidfd reuse slot vs linear array**: The scheduler loop reuses the `idx` slot + (user_data) of the completed child for the next fork. This requires a fixed-size + array of `pidfds[N]`. Simpler than a free-list; correct because exactly one slot + frees per CQE before being refilled. diff --git a/include/tpch/dsdgen_wrapper.hpp b/include/tpch/dsdgen_wrapper.hpp index c04086b..93fe8b5 100644 --- a/include/tpch/dsdgen_wrapper.hpp +++ b/include/tpch/dsdgen_wrapper.hpp @@ -209,6 +209,20 @@ class DSDGenWrapper { long scale_factor() const { return scale_factor_; } + /** + * Load distributions and seed RNG in the parent process before fork(). + * Children inherit the fully-initialised state via COW. + * Must be called before any fork(); call clear_tmp_path() in each child. + */ + void prepare_for_fork(); + + /** + * Clear the temp-file path so this process's destructor does not unlink it. + * Call immediately after fork() in each child process. + * The parent retains the path and unlinks the file after all children exit. + */ + void clear_tmp_path(); + /** * Return the Arrow schema for a table type. */ diff --git a/include/tpch/io_uring_output_stream.hpp b/include/tpch/io_uring_output_stream.hpp new file mode 100644 index 0000000..6ba8282 --- /dev/null +++ b/include/tpch/io_uring_output_stream.hpp @@ -0,0 +1,101 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace tpch { + +/** + * Format-agnostic Arrow OutputStream backed by io_uring. + * + * Designed for the parallel TPC-DS generation pipeline (DS-10): + * - One instance per output file, owned by a single child process. + * - A worker thread owns the io_uring ring for the file's lifetime, + * eliminating per-write ring setup overhead. + * - Write() pre-claims the file offset atomically (fetch_add), copies + * data into a WriteJob, enqueues it to the worker, and blocks until + * the worker has drained all CQEs — preserving the sequential ordering + * that Arrow format writers expect. + * - Large writes are broken into 512 KB SQEs for pipelining. + * - No SQPOLL, no O_DIRECT (both hurt on WSL2/VirtIO). + * + * Sync fallback: when ring_struct == nullptr (io_uring unavailable), + * Write() falls back to pwrite(2) with no worker thread. + * + * Thread safety: NOT thread-safe (single producer assumed — Arrow writers + * call Write() from a single thread). + */ +class IoUringOutputStream : public arrow::io::OutputStream { +public: + /** + * Open path and prepare the stream. + * + * @param path File to create (O_WRONLY | O_CREAT | O_TRUNC). + * @param ring_struct Opaque io_uring* from IoUringPool::create_child_ring_struct(), + * or nullptr for synchronous pwrite fallback. + * The stream takes ownership; destructor calls + * IoUringPool::free_ring(ring_struct). + */ + explicit IoUringOutputStream(const std::string& path, + void* ring_struct = nullptr); + ~IoUringOutputStream() override; + + // Not copyable or movable (owns file fd + thread) + IoUringOutputStream(const IoUringOutputStream&) = delete; + IoUringOutputStream& operator=(const IoUringOutputStream&) = delete; + + // ---- arrow::io::OutputStream ---- + + /** Pre-claim offset, copy data, enqueue to worker, wait for completion. */ + arrow::Status Write(const void* data, int64_t nbytes) override; + + /** Convenience overload accepting a Buffer. */ + arrow::Status Write(const std::shared_ptr& data) override; + + /** No-op: Write() already blocks until data is on disk. */ + arrow::Status Flush() override; + + /** Stop worker thread (if running), fsync, close file fd. */ + arrow::Status Close() override; + + arrow::Result Tell() const override; + bool closed() const override; + +private: + // Each Write() call creates one WriteJob. + struct WriteJob { + std::vector data; + int64_t offset; + std::promise done; + }; + + // Synchronous pwrite path (used when ring_ == nullptr). + arrow::Status write_sync(const void* data, int64_t nbytes, int64_t offset); + + // Worker thread: pop jobs, submit io_uring SQEs, drain CQEs, signal done. + void worker_loop(); + + int file_fd_ = -1; + void* ring_ = nullptr; // io_uring* or nullptr (sync mode) + bool closed_ = false; + + std::atomic write_offset_{0}; // next byte offset to claim + + std::thread worker_; + std::mutex mu_; + std::condition_variable cv_; + std::queue> queue_; + bool stop_ = false; +}; + +} // namespace tpch diff --git a/include/tpch/io_uring_pool.hpp b/include/tpch/io_uring_pool.hpp new file mode 100644 index 0000000..be56c62 --- /dev/null +++ b/include/tpch/io_uring_pool.hpp @@ -0,0 +1,93 @@ +#pragma once + +#include +#include +#include + +namespace tpch { + +/** + * IoUringPool — dual-role anchor ring manager for parallel TPC-DS generation. + * + * Roles: + * 1. Scheduler (parent): POLL_ADD on child pidfds → CQE per child exit. + * Replaces waitpid(-1) with a non-blocking event loop. + * 2. ATTACH_WQ anchor (children): create_child_ring_struct() returns a new + * io_uring ring whose kernel async-worker pool is shared with the anchor. + * All child I/O uses one shared kernel thread pool. + * + * Usage — parent: + * IoUringPool::init(output_dir); + * // for each fork(): + * int pidfd = pidfd_open(child_pid, 0); + * IoUringPool::watch_child(pidfd, slot_index); + * // to reap: + * for (uint64_t idx : IoUringPool::wait_any()) { ... } + * + * Usage — child (immediately after fork): + * void* ring = IoUringPool::create_child_ring_struct(); + * auto stream = std::make_shared(path, ring); + * // stream destructor calls IoUringPool::free_ring(ring) + * + * Thread safety: NOT thread-safe. All calls must be from the same thread. + * (Parent scheduler is single-threaded; children are separate processes.) + */ +class IoUringPool { +public: + /** + * Initialise anchor ring. Must be called in parent before any fork(). + * @param output_dir Path used for sysfs queue-depth calibration. + * @return true if io_uring is available and anchor was created. + */ + static bool init(const std::string& output_dir); + + /** + * (Parent) Submit POLL_ADD on a child pidfd. + * user_data is returned verbatim in wait_any() when this child exits. + * Requires pidfd_open(2) (Linux ≥ 5.3). + */ + static void watch_child(int pidfd, uint64_t user_data); + + /** + * (Parent) Block until at least one POLL_ADD completes (child exits). + * Returns user_data values of all completed events in this batch. + */ + static std::vector wait_any(); + + /** + * (Child, after fork) Allocate a new io_uring ring attached to the anchor + * via IORING_SETUP_ATTACH_WQ, so all child I/O shares one kernel thread pool. + * Falls back to a plain ring if ATTACH_WQ fails. + * Returns opaque heap-allocated io_uring* (to avoid exposing liburing.h), + * or nullptr if io_uring is entirely unavailable. + * Caller should pass this to IoUringOutputStream; the stream owns lifetime. + */ + static void* create_child_ring_struct(); + + /** + * Release a ring created by create_child_ring_struct(). + * Called by IoUringOutputStream destructor. + */ + static void free_ring(void* ring_struct); + + /** True if anchor ring was successfully initialised. */ + static bool available(); + + /** + * fd of anchor ring. + * Useful for Lance FFI ATTACH_WQ (DS-10.4): pass to + * lance_writer_attach_io_uring_pool() so the Rust side can share the pool. + */ + static int anchor_fd(); + + /** Calibrated queue depth (sysfs nr_requests/2, clamped [8, 128]). */ + static uint32_t queue_depth(); + +private: + static void* anchor_ring_; // io_uring* cast to void* + static int anchor_fd_; // ring->ring_fd of the anchor + static uint32_t calibrated_qd_; // sysfs-calibrated QD + static bool available_; +}; + +} // namespace tpch diff --git a/include/tpch/parquet_writer.hpp b/include/tpch/parquet_writer.hpp index ea35132..facf99a 100644 --- a/include/tpch/parquet_writer.hpp +++ b/include/tpch/parquet_writer.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include "writer_interface.hpp" #include "buffer_lifetime_manager.hpp" @@ -97,6 +98,13 @@ class ParquetWriter : public WriterInterface { */ void set_compression(const std::string& codec); + /** + * Inject an external output stream (e.g. IoUringOutputStream). + * When set, init_file_writer() uses this stream instead of opening filepath_. + * Must be called before the first write_batch() and with streaming_mode_ enabled. + */ + void set_output_stream(std::shared_ptr stream); + private: std::string filepath_; std::shared_ptr first_batch_; @@ -117,6 +125,9 @@ class ParquetWriter : public WriterInterface { std::unique_ptr parquet_file_writer_; std::string compression_codec_ = "zstd"; // snappy, zstd, none + // DS-10.3: injected output stream (io_uring or other backend) + std::shared_ptr injected_stream_; + // Initialize the Parquet FileWriter for streaming mode void init_file_writer(); }; diff --git a/src/async/io_uring_output_stream.cpp b/src/async/io_uring_output_stream.cpp new file mode 100644 index 0000000..b3a49ed --- /dev/null +++ b/src/async/io_uring_output_stream.cpp @@ -0,0 +1,229 @@ +#include "tpch/io_uring_output_stream.hpp" +#include "tpch/io_uring_pool.hpp" + +#include +#include +#include +#include +#include +#include + +#include + +#ifdef TPCH_ENABLE_ASYNC_IO +#include +#endif + +namespace tpch { + +// --------------------------------------------------------------------------- +// Constructor / Destructor +// --------------------------------------------------------------------------- + +IoUringOutputStream::IoUringOutputStream(const std::string& path, + void* ring_struct) + : ring_(ring_struct) { + file_fd_ = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (file_fd_ < 0) { + throw std::runtime_error( + "IoUringOutputStream: cannot open '" + path + + "': " + strerror(errno)); + } + +#ifdef TPCH_ENABLE_ASYNC_IO + if (ring_ != nullptr) { + worker_ = std::thread([this] { worker_loop(); }); + } +#endif +} + +IoUringOutputStream::~IoUringOutputStream() { + if (!closed_) { + (void)Close(); + } + IoUringPool::free_ring(ring_); + ring_ = nullptr; +} + +// --------------------------------------------------------------------------- +// OutputStream interface +// --------------------------------------------------------------------------- + +arrow::Result IoUringOutputStream::Tell() const { + return write_offset_.load(std::memory_order_relaxed); +} + +bool IoUringOutputStream::closed() const { return closed_; } + +arrow::Status IoUringOutputStream::Write(const void* data, int64_t nbytes) { + if (closed_) + return arrow::Status::IOError("IoUringOutputStream: stream is closed"); + if (nbytes <= 0) return arrow::Status::OK(); + + // Pre-claim offset atomically so multiple future async writes don't overlap + int64_t off = + write_offset_.fetch_add(nbytes, std::memory_order_relaxed); + +#ifdef TPCH_ENABLE_ASYNC_IO + if (ring_ != nullptr) { + auto job = std::make_unique(); + job->data.assign(static_cast(data), + static_cast(data) + nbytes); + job->offset = off; + auto fut = job->done.get_future(); + + { + std::lock_guard lk(mu_); + queue_.push(std::move(job)); + } + cv_.notify_one(); + + // Block until worker has submitted + drained all CQEs for this job. + // Preserves the sequential write ordering that Arrow writers expect. + return fut.get(); + } +#endif + + return write_sync(data, nbytes, off); +} + +arrow::Status IoUringOutputStream::Write( + const std::shared_ptr& data) { + return Write(data->data(), data->size()); +} + +// Write() already blocks until the worker drains CQEs, so Flush is a no-op. +arrow::Status IoUringOutputStream::Flush() { return arrow::Status::OK(); } + +arrow::Status IoUringOutputStream::Close() { + if (closed_) return arrow::Status::OK(); + closed_ = true; + +#ifdef TPCH_ENABLE_ASYNC_IO + if (ring_ != nullptr && worker_.joinable()) { + { + std::lock_guard lk(mu_); + stop_ = true; + } + cv_.notify_one(); + worker_.join(); + } +#endif + + if (file_fd_ >= 0) { + ::close(file_fd_); + file_fd_ = -1; + } + return arrow::Status::OK(); +} + +// --------------------------------------------------------------------------- +// Synchronous fallback (ring_ == nullptr) +// --------------------------------------------------------------------------- + +arrow::Status IoUringOutputStream::write_sync(const void* data, int64_t nbytes, + int64_t offset) { + const uint8_t* ptr = static_cast(data); + int64_t remaining = nbytes; + while (remaining > 0) { + ssize_t n = + pwrite(file_fd_, ptr, static_cast(remaining), offset); + if (n < 0) { + if (errno == EINTR) continue; + return arrow::Status::IOError("IoUringOutputStream: pwrite: ", + strerror(errno)); + } + ptr += n; + offset += n; + remaining -= n; + } + return arrow::Status::OK(); +} + +// --------------------------------------------------------------------------- +// io_uring worker thread +// --------------------------------------------------------------------------- + +#ifdef TPCH_ENABLE_ASYNC_IO + +void IoUringOutputStream::worker_loop() { + static constexpr size_t CHUNK_SIZE = 512UL * 1024; // 512 KB per SQE + auto* ring = static_cast(ring_); + + while (true) { + // --- wait for a job --- + std::unique_ptr job; + { + std::unique_lock lk(mu_); + cv_.wait(lk, [this] { return !queue_.empty() || stop_; }); + if (queue_.empty()) break; // stop_ set and queue drained + job = std::move(queue_.front()); + queue_.pop(); + } + + const uint8_t* ptr = job->data.data(); + size_t rem = job->data.size(); + int64_t off = job->offset; + int inflight = 0; + arrow::Status st = arrow::Status::OK(); + + // --- fill SQ with 512 KB chunks --- + while (rem > 0) { + size_t chunk = std::min(CHUNK_SIZE, rem); + + struct io_uring_sqe* sqe = io_uring_get_sqe(ring); + if (!sqe) { + // SQ full: submit what we have, wait for one CQE to free a slot + io_uring_submit(ring); + struct io_uring_cqe* cqe = nullptr; + int r = io_uring_wait_cqe(ring, &cqe); + if (r == 0) { + if (cqe->res < 0 && st.ok()) + st = arrow::Status::IOError( + "io_uring write: ", strerror(-cqe->res)); + io_uring_cqe_seen(ring, cqe); + --inflight; + } + sqe = io_uring_get_sqe(ring); + if (!sqe) { + if (st.ok()) + st = arrow::Status::IOError( + "io_uring: cannot get SQE after flush"); + break; + } + } + + io_uring_prep_write(sqe, file_fd_, ptr, + static_cast(chunk), + static_cast(off)); + sqe->user_data = 0; + ++inflight; + + ptr += chunk; + off += static_cast(chunk); + rem -= chunk; + } + + // --- submit remainder + drain all in-flight CQEs --- + if (inflight > 0) { + io_uring_submit(ring); + while (inflight > 0) { + struct io_uring_cqe* cqe = nullptr; + int r = io_uring_wait_cqe(ring, &cqe); + if (r == 0) { + if (cqe->res < 0 && st.ok()) + st = arrow::Status::IOError( + "io_uring write: ", strerror(-cqe->res)); + io_uring_cqe_seen(ring, cqe); + } + --inflight; + } + } + + job->done.set_value(st); + } +} + +#endif // TPCH_ENABLE_ASYNC_IO + +} // namespace tpch diff --git a/src/async/io_uring_pool.cpp b/src/async/io_uring_pool.cpp new file mode 100644 index 0000000..cc26888 --- /dev/null +++ b/src/async/io_uring_pool.cpp @@ -0,0 +1,188 @@ +#include "tpch/io_uring_pool.hpp" + +#include +#include +#include +#include +#include +#include + +#ifdef TPCH_ENABLE_ASYNC_IO +#include +#include // POLLIN +#endif + +namespace tpch { + +// Static member definitions +void* IoUringPool::anchor_ring_ = nullptr; +int IoUringPool::anchor_fd_ = -1; +uint32_t IoUringPool::calibrated_qd_ = 32; +bool IoUringPool::available_ = false; + +// ---- real implementation ------------------------------------------------ +#ifdef TPCH_ENABLE_ASYNC_IO + +static uint32_t sysfs_queue_depth(const std::string& dir) { + struct stat st; + if (stat(dir.c_str(), &st) != 0) return 32; + + unsigned dev_major = major(st.st_dev); + unsigned dev_minor_val = minor(st.st_dev); + + DIR* d = opendir("/sys/block"); + if (!d) return 32; + + uint32_t result = 32; + struct dirent* ent; + while ((ent = readdir(d)) != nullptr) { + if (ent->d_name[0] == '.') continue; + + char dev_path[512]; + snprintf(dev_path, sizeof(dev_path), "/sys/block/%s/dev", ent->d_name); + + FILE* f = fopen(dev_path, "r"); + if (!f) continue; + + unsigned maj = 0, min_val = 0; + bool matched = (fscanf(f, "%u:%u", &maj, &min_val) == 2 && + maj == dev_major && min_val == dev_minor_val); + fclose(f); + if (!matched) continue; + + char nr_path[512]; + snprintf(nr_path, sizeof(nr_path), + "/sys/block/%s/queue/nr_requests", ent->d_name); + FILE* nf = fopen(nr_path, "r"); + if (nf) { + unsigned nr = 0; + if (fscanf(nf, "%u", &nr) == 1 && nr > 0) { + result = nr / 2; + if (result < 8) result = 8; + if (result > 128) result = 128; + } + fclose(nf); + } + break; + } + closedir(d); + return result; +} + +bool IoUringPool::init(const std::string& output_dir) { + if (available_) return true; + + calibrated_qd_ = sysfs_queue_depth(output_dir); + + auto* ring = new io_uring{}; + int ret = io_uring_queue_init(calibrated_qd_, ring, 0); + if (ret < 0) { + delete ring; + fprintf(stderr, + "IoUringPool: io_uring_queue_init(QD=%u) failed: %s\n", + calibrated_qd_, strerror(-ret)); + return false; + } + + anchor_ring_ = ring; + anchor_fd_ = ring->ring_fd; + available_ = true; + + fprintf(stderr, + "IoUringPool: anchor ring initialised (QD=%u, ring_fd=%d)\n", + calibrated_qd_, anchor_fd_); + return true; +} + +void IoUringPool::watch_child(int pidfd, uint64_t user_data) { + if (!available_) return; + auto* ring = static_cast(anchor_ring_); + + struct io_uring_sqe* sqe = io_uring_get_sqe(ring); + if (!sqe) { + // SQ is full — flush to make room (shouldn't happen for small N) + io_uring_submit(ring); + sqe = io_uring_get_sqe(ring); + if (!sqe) + throw std::runtime_error("IoUringPool::watch_child: SQ full"); + } + + io_uring_prep_poll_add(sqe, pidfd, POLLIN); + sqe->user_data = user_data; + io_uring_submit(ring); +} + +std::vector IoUringPool::wait_any() { + if (!available_) return {}; + auto* ring = static_cast(anchor_ring_); + + // Block until at least one CQE arrives + struct io_uring_cqe* cqe = nullptr; + int ret = io_uring_wait_cqe(ring, &cqe); + if (ret < 0) + throw std::runtime_error( + std::string("IoUringPool::wait_any: ") + strerror(-ret)); + + // Drain all available CQEs in one batch + std::vector results; + unsigned head = 0; + struct io_uring_cqe* c = nullptr; + io_uring_for_each_cqe(ring, head, c) { + results.push_back(c->user_data); + } + io_uring_cq_advance(ring, static_cast(results.size())); + return results; +} + +void* IoUringPool::create_child_ring_struct() { + auto* ring = new io_uring{}; + + struct io_uring_params params{}; + if (anchor_fd_ >= 0) { + params.flags |= IORING_SETUP_ATTACH_WQ; + params.wq_fd = static_cast(anchor_fd_); + } + + int ret = io_uring_queue_init_params(calibrated_qd_, ring, ¶ms); + if (ret < 0) { + fprintf(stderr, + "IoUringPool::create_child_ring_struct: ATTACH_WQ failed: %s" + " — retrying as plain ring\n", + strerror(-ret)); + params = {}; + ret = io_uring_queue_init_params(calibrated_qd_, ring, ¶ms); + if (ret < 0) { + delete ring; + fprintf(stderr, + "IoUringPool::create_child_ring_struct: plain ring also failed: %s\n", + strerror(-ret)); + return nullptr; + } + } + return ring; +} + +void IoUringPool::free_ring(void* ring_struct) { + if (!ring_struct) return; + auto* ring = static_cast(ring_struct); + io_uring_queue_exit(ring); + delete ring; +} + +// ---- stub (no io_uring) ------------------------------------------------- +#else + +bool IoUringPool::init(const std::string& /*output_dir*/) { return false; } +void IoUringPool::watch_child(int /*pidfd*/, uint64_t /*user_data*/) {} +std::vector IoUringPool::wait_any() { return {}; } +void* IoUringPool::create_child_ring_struct() { return nullptr; } +void IoUringPool::free_ring(void* /*ring*/) {} + +#endif // TPCH_ENABLE_ASYNC_IO + +// ---- always-available accessors ----------------------------------------- +bool IoUringPool::available() { return available_; } +int IoUringPool::anchor_fd() { return anchor_fd_; } +uint32_t IoUringPool::queue_depth() { return calibrated_qd_; } + +} // namespace tpch diff --git a/src/dsdgen/dsdgen_wrapper.cpp b/src/dsdgen/dsdgen_wrapper.cpp index 9024e90..b5933cc 100644 --- a/src/dsdgen/dsdgen_wrapper.cpp +++ b/src/dsdgen/dsdgen_wrapper.cpp @@ -679,6 +679,14 @@ DSDGenWrapper::~DSDGenWrapper() { } } +void DSDGenWrapper::prepare_for_fork() { + init_dsdgen(); +} + +void DSDGenWrapper::clear_tmp_path() { + tmp_dist_path_.clear(); +} + // --------------------------------------------------------------------------- // Initialization // --------------------------------------------------------------------------- diff --git a/src/main.cpp b/src/main.cpp index 379e020..ae38d22 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -22,8 +22,9 @@ #include "tpch/dbgen_wrapper.hpp" #include "tpch/dbgen_converter.hpp" #include "tpch/zero_copy_converter.hpp" // Phase 13.4: Zero-copy optimizations -#include "tpch/async_io.hpp" #include "tpch/performance_counters.hpp" +#include "tpch/io_uring_pool.hpp" +#include "tpch/io_uring_output_stream.hpp" #ifdef TPCH_ENABLE_ORC #include "tpch/orc_writer.hpp" #endif @@ -44,33 +45,20 @@ struct Options { std::string format = "parquet"; std::string output_dir = "/tmp"; long max_rows = 1000; - bool async_io = false; bool verbose = false; bool parallel = false; + int parallel_tables = 0; // max concurrent children; 0 = all bool zero_copy = false; std::string zero_copy_mode = "sync"; // sync, auto, async (Lance-specific) std::string compression = "zstd"; // snappy, zstd, none std::string table = "lineitem"; - long lance_rows_per_file = 0; - long lance_rows_per_group = 0; - long lance_max_bytes_per_file = 0; - bool lance_skip_auto_cleanup = false; - bool lance_io_uring = false; - long lance_stream_queue = 16; - std::string lance_stats_level; - double lance_cardinality_sample_rate = 1.0; // Phase 3.1: Sampling-based cardinality + bool io_uring = false; // use io_uring for disk writes (Parquet: IoUringOutputStream; Lance: Rust io_uring) }; -constexpr int OPT_LANCE_ROWS_PER_FILE = 1000; -constexpr int OPT_LANCE_ROWS_PER_GROUP = 1001; -constexpr int OPT_LANCE_MAX_BYTES_PER_FILE = 1002; -constexpr int OPT_LANCE_SKIP_AUTO_CLEANUP = 1003; -constexpr int OPT_LANCE_STREAM_QUEUE = 1004; -constexpr int OPT_LANCE_STATS_LEVEL = 1005; -constexpr int OPT_LANCE_CARDINALITY_SAMPLE_RATE = 1006; // Phase 3.1 -constexpr int OPT_LANCE_IO_URING = 1007; +constexpr int OPT_PARALLEL_TABLES = 1007; constexpr int OPT_ZERO_COPY_MODE = 1008; constexpr int OPT_COMPRESSION = 1009; +constexpr int OPT_IO_URING = 1010; constexpr size_t DBGEN_BATCH_SIZE = 8192; // aligned with Lance max_rows_per_group @@ -96,25 +84,13 @@ void print_usage(const char* prog) { << " --max-rows Maximum rows to generate (default: 1000, 0=all)\n" << " --table TPC-H table: lineitem, orders, customer, part,\n" << " partsupp, supplier, nation, region (default: lineitem)\n" - << " --parallel Generate all 8 tables in parallel (Phase 12.6)\n" + << " --parallel Generate all 8 tables in parallel\n" + << " --parallel-tables Max concurrent table children (default: all)\n" << " --zero-copy Enable zero-copy streaming writes (O(batch) RAM)\n" << " --zero-copy-mode Zero-copy mode for Lance: sync (default), auto, async\n" << " --compression Parquet compression: zstd (default), snappy, none\n" -#ifdef TPCH_ENABLE_ASYNC_IO - << " --async-io Enable async I/O with io_uring\n" -#endif -#ifdef TPCH_ENABLE_LANCE - << " --lance-rows-per-file Lance max rows per file (default: use Lance defaults)\n" - << " --lance-rows-per-group Lance max rows per group (default: use Lance defaults)\n" - << " --lance-max-bytes-per-file Lance max bytes per file (default: use Lance defaults)\n" - << " --lance-skip-auto-cleanup Skip Lance auto cleanup during commit\n" - << " --lance-stream-queue Lance streaming queue depth (default: 16)\n" - << " --lance-stats-level Lance stats level (default: full)\n" - << " --lance-cardinality-sample-rate <0.0-1.0> Cardinality sampling rate (Phase 3.1)\n" - << " Controls HyperLogLog sampling: 1.0=100% (default),\n" - << " 0.5=50%, 0.1=10%. Smaller rates = faster writes.\n" - << " --lance-io-uring Use io_uring for Lance disk writes (Linux only)\n" -#endif + << " --io-uring Use io_uring for disk writes (Parquet: kernel async I/O;\n" + << " Lance: delegated to Rust runtime)\n" << " --verbose Verbose output\n" << " --help Show this help message\n"; } @@ -128,34 +104,19 @@ Options parse_args(int argc, char* argv[]) { {"output-dir", required_argument, nullptr, 'o'}, {"max-rows", required_argument, nullptr, 'm'}, {"table", required_argument, nullptr, 't'}, - {"parallel", no_argument, nullptr, 'p'}, // Phase 12.6: Fork-after-init + {"parallel", no_argument, nullptr, 'p'}, + {"parallel-tables", required_argument, nullptr, OPT_PARALLEL_TABLES}, {"zero-copy", no_argument, nullptr, 'z'}, {"zero-copy-mode", required_argument, nullptr, OPT_ZERO_COPY_MODE}, {"compression", required_argument, nullptr, OPT_COMPRESSION}, -#ifdef TPCH_ENABLE_LANCE - {"lance-rows-per-file", required_argument, nullptr, OPT_LANCE_ROWS_PER_FILE}, - {"lance-rows-per-group", required_argument, nullptr, OPT_LANCE_ROWS_PER_GROUP}, - {"lance-max-bytes-per-file", required_argument, nullptr, OPT_LANCE_MAX_BYTES_PER_FILE}, - {"lance-skip-auto-cleanup", no_argument, nullptr, OPT_LANCE_SKIP_AUTO_CLEANUP}, - {"lance-stream-queue", required_argument, nullptr, OPT_LANCE_STREAM_QUEUE}, - {"lance-stats-level", required_argument, nullptr, OPT_LANCE_STATS_LEVEL}, - {"lance-cardinality-sample-rate", required_argument, nullptr, OPT_LANCE_CARDINALITY_SAMPLE_RATE}, - {"lance-io-uring", no_argument, nullptr, OPT_LANCE_IO_URING}, -#endif -#ifdef TPCH_ENABLE_ASYNC_IO - {"async-io", no_argument, nullptr, 'a'}, -#endif + {"io-uring", no_argument, nullptr, OPT_IO_URING}, {"verbose", no_argument, nullptr, 'v'}, {"help", no_argument, nullptr, 'h'}, {nullptr, 0, nullptr, 0} }; int c; -#ifdef TPCH_ENABLE_ASYNC_IO - while ((c = getopt_long(argc, argv, "s:f:o:m:t:pzavh", long_options, nullptr)) != -1) { -#else while ((c = getopt_long(argc, argv, "s:f:o:m:t:pzvh", long_options, nullptr)) != -1) { -#endif switch (c) { case 's': opts.scale_factor = std::stol(optarg); @@ -175,6 +136,13 @@ Options parse_args(int argc, char* argv[]) { case 'p': opts.parallel = true; break; + case OPT_PARALLEL_TABLES: + opts.parallel_tables = std::stoi(optarg); + if (opts.parallel_tables <= 0) { + std::cerr << "Error: --parallel-tables must be > 0\n"; + exit(1); + } + break; case 'z': opts.zero_copy = true; break; @@ -188,39 +156,9 @@ Options parse_args(int argc, char* argv[]) { case OPT_COMPRESSION: opts.compression = optarg; break; - case OPT_LANCE_ROWS_PER_FILE: - opts.lance_rows_per_file = std::stol(optarg); - break; - case OPT_LANCE_ROWS_PER_GROUP: - opts.lance_rows_per_group = std::stol(optarg); - break; - case OPT_LANCE_MAX_BYTES_PER_FILE: - opts.lance_max_bytes_per_file = std::stol(optarg); - break; - case OPT_LANCE_SKIP_AUTO_CLEANUP: - opts.lance_skip_auto_cleanup = true; + case OPT_IO_URING: + opts.io_uring = true; break; - case OPT_LANCE_STREAM_QUEUE: - opts.lance_stream_queue = std::stol(optarg); - break; - case OPT_LANCE_STATS_LEVEL: - opts.lance_stats_level = optarg; - break; - case OPT_LANCE_CARDINALITY_SAMPLE_RATE: - opts.lance_cardinality_sample_rate = std::stod(optarg); - if (opts.lance_cardinality_sample_rate < 0.01 || opts.lance_cardinality_sample_rate > 1.0) { - std::cerr << "Error: cardinality-sample-rate must be between 0.01 and 1.0\n"; - exit(1); - } - break; - case OPT_LANCE_IO_URING: - opts.lance_io_uring = true; - break; -#ifdef TPCH_ENABLE_ASYNC_IO - case 'a': - opts.async_io = true; - break; -#endif case 'v': opts.verbose = true; break; @@ -1115,211 +1053,203 @@ void generate_region_true_zero_copy( * * This eliminates the 8× initialization overhead that made Phase 12.3 broken. */ -int generate_all_tables_parallel_v2(const Options& opts) { - static const std::vector tables = { - "region", "nation", "supplier", "part", - "partsupp", "customer", "orders", "lineitem" - }; +// Wire io_uring into a writer after IoUringPool::init() has been called. +// For Lance: delegates to the Rust runtime via enable_io_uring(). +// For Parquet (and future formats): injects IoUringOutputStream. +// No-op if opts.io_uring is false or IoUringPool is unavailable. +static void wire_io_uring(const Options& opts, const std::string& path, + tpch::WriterInterface* writer) { + if (!opts.io_uring || !tpch::IoUringPool::available()) + return; + +#if defined(TPCH_ENABLE_LANCE) && defined(TPCH_LANCE_IO_URING) + if (auto* lw = dynamic_cast(writer)) { + lw->enable_io_uring(true); + return; + } +#endif + if (auto* pw = dynamic_cast(writer)) { + void* ring = tpch::IoUringPool::create_child_ring_struct(); + pw->set_output_stream(std::make_shared(path, ring)); + } + // Other formats (csv, orc, …) don't yet support stream injection — silently skip. +} - // === PARENT: Heavy initialization ONCE === - std::cout << "Initializing dbgen (loading distributions)...\n"; - auto init_start = std::chrono::high_resolution_clock::now(); +// Run one table in a child process. Called after fork() — must not return to parent. +static void run_table_child(const Options& opts, const std::string& table) { + try { + std::string output_path = get_output_filename(opts.output_dir, opts.format, table); - tpch::dbgen_init_global(opts.scale_factor, opts.verbose); + tpch::DBGenWrapper dbgen(opts.scale_factor, opts.verbose); + dbgen.set_skip_init(true); // distributions already loaded by parent (COW) - auto init_end = std::chrono::high_resolution_clock::now(); - auto init_duration = std::chrono::duration(init_end - init_start).count(); - std::cout << "Initialization complete in " << std::fixed << std::setprecision(3) - << init_duration << "s. Forking " << tables.size() << " children...\n"; + std::shared_ptr schema; + if (table == "lineitem") schema = tpch::DBGenWrapper::get_schema(tpch::TableType::LINEITEM, opts.scale_factor); + else if (table == "orders") schema = tpch::DBGenWrapper::get_schema(tpch::TableType::ORDERS, opts.scale_factor); + else if (table == "customer") schema = tpch::DBGenWrapper::get_schema(tpch::TableType::CUSTOMER, opts.scale_factor); + else if (table == "part") schema = tpch::DBGenWrapper::get_schema(tpch::TableType::PART, opts.scale_factor); + else if (table == "partsupp") schema = tpch::DBGenWrapper::get_schema(tpch::TableType::PARTSUPP, opts.scale_factor); + else if (table == "supplier") schema = tpch::DBGenWrapper::get_schema(tpch::TableType::SUPPLIER, opts.scale_factor); + else if (table == "nation") schema = tpch::DBGenWrapper::get_schema(tpch::TableType::NATION, opts.scale_factor); + else if (table == "region") schema = tpch::DBGenWrapper::get_schema(tpch::TableType::REGION, opts.scale_factor); + else { fprintf(stderr, "tpch_benchmark: unknown table %s\n", table.c_str()); exit(1); } - std::vector children; - std::map pid_to_table; - auto start_time = std::chrono::high_resolution_clock::now(); + auto writer = create_writer(opts.format, output_path, opts.compression, opts.zero_copy); - for (const auto& table : tables) { - pid_t pid = fork(); +#ifdef TPCH_ENABLE_LANCE + if (auto* lw = dynamic_cast(writer.get())) { + if (opts.zero_copy) { + bool use_async = (opts.zero_copy_mode == "async") || (opts.zero_copy_mode == "auto"); + lw->enable_streaming_write(!use_async); + } + } +#endif - if (pid == -1) { - perror("fork"); - return 1; + wire_io_uring(opts, output_path, writer.get()); + + size_t total_rows = 0; + Options child_opts = opts; + child_opts.table = table; + + auto t0 = std::chrono::steady_clock::now(); + + if (table == "lineitem") { + if (opts.zero_copy) generate_lineitem_zero_copy(dbgen, child_opts, schema, writer, total_rows); + else generate_with_dbgen(dbgen, child_opts, schema, writer, + [&](auto& g, auto& cb) { g.generate_lineitem(cb, opts.max_rows); }, total_rows); + } else if (table == "orders") { + if (opts.zero_copy) generate_orders_zero_copy(dbgen, child_opts, schema, writer, total_rows); + else generate_with_dbgen(dbgen, child_opts, schema, writer, + [&](auto& g, auto& cb) { g.generate_orders(cb, opts.max_rows); }, total_rows); + } else if (table == "customer") { + if (opts.zero_copy) generate_customer_zero_copy(dbgen, child_opts, schema, writer, total_rows); + else generate_with_dbgen(dbgen, child_opts, schema, writer, + [&](auto& g, auto& cb) { g.generate_customer(cb, opts.max_rows); }, total_rows); + } else if (table == "part") { + if (opts.zero_copy) generate_part_zero_copy(dbgen, child_opts, schema, writer, total_rows); + else generate_with_dbgen(dbgen, child_opts, schema, writer, + [&](auto& g, auto& cb) { g.generate_part(cb, opts.max_rows); }, total_rows); + } else if (table == "partsupp") { + if (opts.zero_copy) generate_partsupp_zero_copy(dbgen, child_opts, schema, writer, total_rows); + else generate_with_dbgen(dbgen, child_opts, schema, writer, + [&](auto& g, auto& cb) { g.generate_partsupp(cb, opts.max_rows); }, total_rows); + } else if (table == "supplier") { + if (opts.zero_copy) generate_supplier_zero_copy(dbgen, child_opts, schema, writer, total_rows); + else generate_with_dbgen(dbgen, child_opts, schema, writer, + [&](auto& g, auto& cb) { g.generate_supplier(cb, opts.max_rows); }, total_rows); + } else if (table == "nation") { + if (opts.zero_copy) generate_nation_zero_copy(dbgen, child_opts, schema, writer, total_rows); + else generate_with_dbgen(dbgen, child_opts, schema, writer, + [&](auto& g, auto& cb) { g.generate_nation(cb); }, total_rows); + } else if (table == "region") { + if (opts.zero_copy) generate_region_zero_copy(dbgen, child_opts, schema, writer, total_rows); + else generate_with_dbgen(dbgen, child_opts, schema, writer, + [&](auto& g, auto& cb) { g.generate_region(cb); }, total_rows); } - if (pid == 0) { - // === CHILD: Inherits all init via COW === - // Seed[] is pristine, distributions loaded, dates cached - - try { - std::string output_path = get_output_filename(opts.output_dir, opts.format, table); - - // Create DBGenWrapper and tell it to skip init - tpch::DBGenWrapper dbgen(opts.scale_factor, opts.verbose); - dbgen.set_skip_init(true); // Don't re-initialize! - - // Get schema for this table - std::shared_ptr schema; - if (table == "lineitem") { - schema = tpch::DBGenWrapper::get_schema(tpch::TableType::LINEITEM, opts.scale_factor); - } else if (table == "orders") { - schema = tpch::DBGenWrapper::get_schema(tpch::TableType::ORDERS, opts.scale_factor); - } else if (table == "customer") { - schema = tpch::DBGenWrapper::get_schema(tpch::TableType::CUSTOMER, opts.scale_factor); - } else if (table == "part") { - schema = tpch::DBGenWrapper::get_schema(tpch::TableType::PART, opts.scale_factor); - } else if (table == "partsupp") { - schema = tpch::DBGenWrapper::get_schema(tpch::TableType::PARTSUPP, opts.scale_factor); - } else if (table == "supplier") { - schema = tpch::DBGenWrapper::get_schema(tpch::TableType::SUPPLIER, opts.scale_factor); - } else if (table == "nation") { - schema = tpch::DBGenWrapper::get_schema(tpch::TableType::NATION, opts.scale_factor); - } else if (table == "region") { - schema = tpch::DBGenWrapper::get_schema(tpch::TableType::REGION, opts.scale_factor); - } else { - std::cerr << "Unknown table: " << table << "\n"; - exit(1); - } + writer->close(); - // Create writer - auto writer = create_writer(opts.format, output_path, opts.compression, opts.zero_copy); + double elapsed = std::chrono::duration( + std::chrono::steady_clock::now() - t0).count(); + printf("tpch_benchmark: %-12s SF=%ld rows=%zu elapsed=%.2fs rate=%.0f rows/s\n", + table.c_str(), opts.scale_factor, total_rows, + elapsed, elapsed > 0 ? total_rows / elapsed : 0.0); + printf(" output: %s\n", output_path.c_str()); + fflush(stdout); + exit(0); + } catch (const std::exception& e) { + fprintf(stderr, "tpch_benchmark: [%s] failed: %s\n", table.c_str(), e.what()); + exit(1); + } +} -#ifdef TPCH_ENABLE_LANCE - if (auto lance_writer = dynamic_cast(writer.get())) { - if (opts.zero_copy) { - bool use_async = (opts.zero_copy_mode == "async") || - (opts.zero_copy_mode == "auto"); - lance_writer->enable_streaming_write(!use_async); - } - #ifdef TPCH_LANCE_IO_URING - if (opts.lance_io_uring) { - lance_writer->enable_io_uring(true); - } - #endif - } -#endif +// Fork-after-init parallel generation with rolling N-slot window. +int generate_all_tables_parallel(const Options& opts) { + static const std::vector tables = { + "region", "nation", "supplier", "part", + "partsupp", "customer", "orders", "lineitem" + }; + const size_t ntables = tables.size(); + const size_t slot_limit = (opts.parallel_tables > 0) + ? static_cast(opts.parallel_tables) + : ntables; - // Generate table - size_t total_rows = 0; - Options child_opts = opts; - child_opts.table = table; - - if (table == "lineitem") { - if (opts.zero_copy) generate_lineitem_zero_copy(dbgen, child_opts, schema, writer, total_rows); - else generate_with_dbgen(dbgen, child_opts, schema, writer, - [&](auto& g, auto& cb) { g.generate_lineitem(cb, opts.max_rows); }, total_rows); - } else if (table == "orders") { - if (opts.zero_copy) generate_orders_zero_copy(dbgen, child_opts, schema, writer, total_rows); - else generate_with_dbgen(dbgen, child_opts, schema, writer, - [&](auto& g, auto& cb) { g.generate_orders(cb, opts.max_rows); }, total_rows); - } else if (table == "customer") { - if (opts.zero_copy) generate_customer_zero_copy(dbgen, child_opts, schema, writer, total_rows); - else generate_with_dbgen(dbgen, child_opts, schema, writer, - [&](auto& g, auto& cb) { g.generate_customer(cb, opts.max_rows); }, total_rows); - } else if (table == "part") { - if (opts.zero_copy) generate_part_zero_copy(dbgen, child_opts, schema, writer, total_rows); - else generate_with_dbgen(dbgen, child_opts, schema, writer, - [&](auto& g, auto& cb) { g.generate_part(cb, opts.max_rows); }, total_rows); - } else if (table == "partsupp") { - if (opts.zero_copy) generate_partsupp_zero_copy(dbgen, child_opts, schema, writer, total_rows); - else generate_with_dbgen(dbgen, child_opts, schema, writer, - [&](auto& g, auto& cb) { g.generate_partsupp(cb, opts.max_rows); }, total_rows); - } else if (table == "supplier") { - if (opts.zero_copy) generate_supplier_zero_copy(dbgen, child_opts, schema, writer, total_rows); - else generate_with_dbgen(dbgen, child_opts, schema, writer, - [&](auto& g, auto& cb) { g.generate_supplier(cb, opts.max_rows); }, total_rows); - } else if (table == "nation") { - if (opts.zero_copy) generate_nation_zero_copy(dbgen, child_opts, schema, writer, total_rows); - else generate_with_dbgen(dbgen, child_opts, schema, writer, - [&](auto& g, auto& cb) { g.generate_nation(cb); }, total_rows); - } else if (table == "region") { - if (opts.zero_copy) generate_region_zero_copy(dbgen, child_opts, schema, writer, total_rows); - else generate_with_dbgen(dbgen, child_opts, schema, writer, - [&](auto& g, auto& cb) { g.generate_region(cb); }, total_rows); - } + // Initialize dbgen ONCE in the parent — all children inherit via COW. + fprintf(stderr, "tpch_benchmark: initializing dbgen (SF=%ld)...\n", opts.scale_factor); + tpch::dbgen_init_global(opts.scale_factor, opts.verbose); - writer->close(); + // Initialize anchor io_uring ring before fork so children can + // attach via IORING_SETUP_ATTACH_WQ and share one kernel worker pool. + bool io_uring_ready = opts.io_uring && tpch::IoUringPool::init(opts.output_dir); - exit(0); // Success - } catch (const std::exception& e) { - std::cerr << "Child process for table " << table << " failed: " << e.what() << "\n"; - exit(1); - } - } + auto t_wall = std::chrono::steady_clock::now(); - // Parent continues - children.push_back(pid); - pid_to_table[pid] = table; - std::cout << " Forked " << table << " (PID " << pid << ")\n"; - } + fprintf(stderr, + "tpch_benchmark: parallel SF=%ld tables=%zu slots=%zu format=%s io_uring=%s\n", + opts.scale_factor, ntables, slot_limit, opts.format.c_str(), + io_uring_ready ? "yes" : "no"); - // Wait for all children - int failed = 0; - std::map table_status; + std::vector pids(ntables, -1); + std::vector slot_table(slot_limit, SIZE_MAX); + size_t next = 0; + size_t active = 0; + int failed = 0; - for (size_t i = 0; i < children.size(); ++i) { - int status; - pid_t finished_pid = waitpid(-1, &status, 0); // Wait for any child + auto fork_next = [&](size_t slot) { + if (next >= ntables) return; + const std::string& tname = tables[next]; - if (finished_pid == -1) { - perror("waitpid"); - continue; + pid_t pid = ::fork(); + if (pid < 0) { perror("fork"); ++failed; ++next; return; } + if (pid == 0) { + run_table_child(opts, tname); // never returns } + pids[next] = pid; + slot_table[slot] = next; + ++next; + ++active; + }; - std::string table_name = pid_to_table[finished_pid]; + // Fill initial slots + for (size_t s = 0; s < slot_limit && next < ntables; ++s) + fork_next(s); - if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { - std::cout << " " << table_name << " FAILED (status=" << WEXITSTATUS(status) << ")\n"; - table_status[table_name] = 1; - failed++; - } else { - std::cout << " " << table_name << " completed successfully\n"; - table_status[table_name] = 0; + // Rolling wait: as each child finishes, fork the next table into its slot + while (active > 0) { + int status; + pid_t done = ::waitpid(-1, &status, 0); + if (done < 0) { perror("waitpid"); break; } + + size_t freed_slot = SIZE_MAX; + for (size_t s = 0; s < slot_limit; ++s) { + if (slot_table[s] < ntables && pids[slot_table[s]] == done) { + freed_slot = s; + break; + } } - } - auto end_time = std::chrono::high_resolution_clock::now(); - auto duration = std::chrono::duration(end_time - start_time).count(); - - std::cout << "\n=== Parallel Generation Summary ===\n"; - std::cout << "Total time (excluding init): " << std::fixed << std::setprecision(3) - << duration << "s\n"; - std::cout << "Total time (including init): " << std::fixed << std::setprecision(3) - << (duration + init_duration) << "s\n"; - - // Calculate total rows across all tables - long total_rows_all_tables = 0; - for (const auto& table_name : tables) { - // Skip failed tables - if (table_status.count(table_name) && table_status[table_name] != 0) { - continue; + if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { + const char* tname = (freed_slot < slot_limit && slot_table[freed_slot] < ntables) + ? tables[slot_table[freed_slot]].c_str() + : "unknown"; + fprintf(stderr, "tpch_benchmark: [%s] child failed (pid=%d status=%d)\n", + tname, done, status); + ++failed; } - // Get expected row count for this table - tpch::TableType table_type; - if (table_name == "lineitem") table_type = tpch::TableType::LINEITEM; - else if (table_name == "orders") table_type = tpch::TableType::ORDERS; - else if (table_name == "customer") table_type = tpch::TableType::CUSTOMER; - else if (table_name == "part") table_type = tpch::TableType::PART; - else if (table_name == "partsupp") table_type = tpch::TableType::PARTSUPP; - else if (table_name == "supplier") table_type = tpch::TableType::SUPPLIER; - else if (table_name == "nation") table_type = tpch::TableType::NATION; - else if (table_name == "region") table_type = tpch::TableType::REGION; - else continue; - - total_rows_all_tables += tpch::get_row_count(table_type, opts.scale_factor); - } + --active; - // Output throughput for all successfully generated tables - if (total_rows_all_tables > 0 && duration > 0) { - double throughput = static_cast(total_rows_all_tables) / duration; - std::cout << "Throughput: " << std::fixed << std::setprecision(0) - << throughput << " rows/sec\n"; + if (freed_slot != SIZE_MAX) + fork_next(freed_slot); } - if (failed > 0) { - std::cout << "Failed tables: " << failed << "/" << tables.size() << "\n"; - return 1; - } else { - std::cout << "All tables generated successfully!\n"; - return 0; - } + double wall = std::chrono::duration( + std::chrono::steady_clock::now() - t_wall).count(); + fprintf(stderr, + "tpch_benchmark: parallel done SF=%ld %zu tables wall=%.2fs %s\n", + opts.scale_factor, ntables, wall, + failed ? "SOME TABLES FAILED" : "all ok"); + + return failed ? 1 : 0; } } // anonymous namespace @@ -1328,9 +1258,8 @@ int main(int argc, char* argv[]) { try { auto opts = parse_args(argc, argv); - // Phase 12.6: Fork-after-init parallel generation if (opts.parallel) { - return generate_all_tables_parallel_v2(opts); + return generate_all_tables_parallel(opts); } // Validate format @@ -1393,80 +1322,22 @@ int main(int argc, char* argv[]) { std::cout << "Schema: " << schema->ToString() << "\n"; } - // Create async I/O context if enabled - std::shared_ptr async_context; -#ifdef TPCH_ENABLE_ASYNC_IO - if (opts.async_io) { - try { - async_context = std::make_shared(256); - if (opts.verbose) { - std::cout << "Async I/O enabled (io_uring queue depth: 256)\n"; - } - } catch (const std::exception& e) { - std::cerr << "Warning: Failed to initialize async I/O: " << e.what() << "\n"; - std::cerr << "Falling back to synchronous I/O\n"; - } - } -#endif - // Create writer + if (opts.io_uring) + tpch::IoUringPool::init(opts.output_dir); + auto writer = create_writer(opts.format, output_path, opts.compression, opts.zero_copy); + wire_io_uring(opts, output_path, writer.get()); #ifdef TPCH_ENABLE_LANCE - if (auto lance_writer = dynamic_cast(writer.get())) { - if (!opts.lance_stats_level.empty()) { - setenv("LANCE_STATS_LEVEL", opts.lance_stats_level.c_str(), 1); - if (opts.verbose) { - std::cout << "Lance stats level set to: " << opts.lance_stats_level << "\n"; - } - } - // Phase 3.1: Set cardinality sampling rate via environment variable - if (opts.lance_cardinality_sample_rate < 1.0) { - std::string rate_str = std::to_string(opts.lance_cardinality_sample_rate); - setenv("LANCE_CARDINALITY_SAMPLE_RATE", rate_str.c_str(), 1); - if (opts.verbose) { - std::cout << "Lance cardinality sample rate set to: " << opts.lance_cardinality_sample_rate << "\n"; - } - } - lance_writer->set_write_params( - opts.lance_rows_per_file, - opts.lance_rows_per_group, - opts.lance_max_bytes_per_file, - opts.lance_skip_auto_cleanup); - if (opts.lance_stream_queue > 0) { - lance_writer->set_stream_queue_depth(static_cast(opts.lance_stream_queue)); - } - + if (auto* lw = dynamic_cast(writer.get())) { if (opts.zero_copy) { - bool use_async = (opts.zero_copy_mode == "async") || - (opts.zero_copy_mode == "auto"); - lance_writer->enable_streaming_write(!use_async); - if (opts.verbose) { - std::cout << "Lance streaming write mode enabled (zero-copy, mode=" - << opts.zero_copy_mode << ")\n"; - } - } - - // Enable io_uring write path if requested -#ifdef TPCH_LANCE_IO_URING - if (opts.lance_io_uring) { - lance_writer->enable_io_uring(true); - if (opts.verbose) { - std::cout << "Lance io_uring write path enabled\n"; - } + bool use_async = (opts.zero_copy_mode == "async") || (opts.zero_copy_mode == "auto"); + lw->enable_streaming_write(!use_async); } -#endif } #endif - // Set async context if available - if (async_context) { - writer->set_async_context(async_context); - if (opts.verbose) { - std::cout << "Async I/O context configured for writer\n"; - } - } - // Start timing auto start_time = std::chrono::high_resolution_clock::now(); diff --git a/src/tpcds_main.cpp b/src/tpcds_main.cpp index b692552..80eaf72 100644 --- a/src/tpcds_main.cpp +++ b/src/tpcds_main.cpp @@ -17,9 +17,13 @@ #include #include #include +#include #include #include #include +#include +#include +#include #include #include @@ -29,6 +33,8 @@ #include "tpch/parquet_writer.hpp" #include "tpch/dsdgen_wrapper.hpp" #include "tpch/dsdgen_converter.hpp" +#include "tpch/io_uring_pool.hpp" +#include "tpch/io_uring_output_stream.hpp" #ifdef TPCH_ENABLE_ORC #include "tpch/orc_writer.hpp" @@ -46,15 +52,17 @@ namespace { struct Options { - long scale_factor = 1; - std::string format = "parquet"; - std::string output_dir = "/tmp"; - long max_rows = 1000; - std::string table = "store_sales"; - std::string compression = "zstd"; // snappy, zstd, none - bool verbose = false; - bool zero_copy = false; // streaming mode: O(batch) memory instead of O(total) - std::string zero_copy_mode = "sync"; // sync, auto, async (lance-specific selection) + long scale_factor = 1; + std::string format = "parquet"; + std::string output_dir = "/tmp"; + long max_rows = 1000; + std::string table = "store_sales"; + std::string compression = "zstd"; // snappy, zstd, none + bool verbose = false; + bool zero_copy = false; // streaming mode: O(batch) memory instead of O(total) + std::string zero_copy_mode = "sync"; // sync, auto, async (lance-specific selection) + bool parallel = false; // generate all tables in parallel + int parallel_tables = 0; // max concurrent tables; 0 = all }; void print_usage(const char* prog) { @@ -85,6 +93,8 @@ void print_usage(const char* prog) { " --zero-copy-mode Zero-copy mode for Lance: sync, auto, async (default: sync)\n" #ifdef TPCH_ENABLE_LANCE #endif + " --parallel Generate all tables in parallel (fork-after-init)\n" + " --parallel-tables Max concurrent tables (default: all)\n" " --verbose Verbose output\n" " --help Show this help\n" "\n" @@ -103,21 +113,25 @@ Options parse_args(int argc, char* argv[]) { Options opts; enum { - OPT_COMPRESSION = 1000, + OPT_COMPRESSION = 1000, OPT_ZERO_COPY, - OPT_ZERO_COPY_MODE + OPT_ZERO_COPY_MODE, + OPT_PARALLEL, + OPT_PARALLEL_TABLES }; static struct option long_opts[] = { - {"format", required_argument, nullptr, 'f'}, - {"table", required_argument, nullptr, 't'}, - {"scale-factor", required_argument, nullptr, 's'}, - {"output-dir", required_argument, nullptr, 'o'}, - {"max-rows", required_argument, nullptr, 'm'}, - {"compression", required_argument, nullptr, OPT_COMPRESSION}, - {"zero-copy", no_argument, nullptr, OPT_ZERO_COPY}, - {"zero-copy-mode", required_argument, nullptr, OPT_ZERO_COPY_MODE}, - {"verbose", no_argument, nullptr, 'v'}, - {"help", no_argument, nullptr, 'h'}, + {"format", required_argument, nullptr, 'f'}, + {"table", required_argument, nullptr, 't'}, + {"scale-factor", required_argument, nullptr, 's'}, + {"output-dir", required_argument, nullptr, 'o'}, + {"max-rows", required_argument, nullptr, 'm'}, + {"compression", required_argument, nullptr, OPT_COMPRESSION}, + {"zero-copy", no_argument, nullptr, OPT_ZERO_COPY}, + {"zero-copy-mode", required_argument, nullptr, OPT_ZERO_COPY_MODE}, + {"parallel", no_argument, nullptr, OPT_PARALLEL}, + {"parallel-tables", required_argument, nullptr, OPT_PARALLEL_TABLES}, + {"verbose", no_argument, nullptr, 'v'}, + {"help", no_argument, nullptr, 'h'}, {nullptr, 0, nullptr, 0} }; @@ -129,9 +143,15 @@ Options parse_args(int argc, char* argv[]) { case 's': opts.scale_factor = std::stol(optarg); break; case 'o': opts.output_dir = optarg; break; case 'm': opts.max_rows = std::stol(optarg); break; - case OPT_COMPRESSION: opts.compression = optarg; break; - case OPT_ZERO_COPY: opts.zero_copy = true; break; - case OPT_ZERO_COPY_MODE: opts.zero_copy_mode = optarg; break; + case OPT_COMPRESSION: opts.compression = optarg; break; + case OPT_ZERO_COPY: opts.zero_copy = true; break; + case OPT_ZERO_COPY_MODE: opts.zero_copy_mode = optarg; break; + case OPT_PARALLEL: opts.parallel = true; break; + case OPT_PARALLEL_TABLES: + opts.parallel_tables = std::stoi(optarg); + if (opts.parallel_tables <= 0) + throw std::invalid_argument("--parallel-tables must be > 0"); + break; case 'z': opts.zero_copy = true; break; case 'v': opts.verbose = true; break; case 'h': print_usage(argv[0]); exit(0); @@ -371,6 +391,301 @@ std::string file_extension(const std::string& fmt) { return "." + fmt; } +// --------------------------------------------------------------------------- +// dispatch_generation — maps TableType to the correct DSDGenWrapper method +// --------------------------------------------------------------------------- + +size_t dispatch_generation( + const Options& opts, + tpcds::TableType table_type, + std::shared_ptr schema, + std::unique_ptr& writer, + tpcds::DSDGenWrapper& dsdgen) +{ + if (table_type == tpcds::TableType::StoreSales) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_store_sales(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::Inventory) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_inventory(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::CatalogSales) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_catalog_sales(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::WebSales) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_web_sales(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::Customer) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_customer(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::Item) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_item(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::DateDim) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_date_dim(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::StoreReturns) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_store_returns(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::CatalogReturns) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_catalog_returns(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::WebReturns) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_web_returns(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::CallCenter) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_call_center(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::CatalogPage) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_catalog_page(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::WebPage) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_web_page(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::WebSite) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_web_site(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::Warehouse) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_warehouse(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::ShipMode) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_ship_mode(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::HouseholdDemographics) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_household_demographics(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::CustomerDemographics) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_customer_demographics(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::CustomerAddress) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_customer_address(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::IncomeBand) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_income_band(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::Reason) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_reason(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::TimeDim) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_time_dim(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::Promotion) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_promotion(cb, opts.max_rows); }); + if (table_type == tpcds::TableType::Store) + return run_generation(opts, schema, writer, + [&](auto cb) { dsdgen.generate_store(cb, opts.max_rows); }); + throw std::invalid_argument("dispatch_generation: unhandled table type"); +} + +// --------------------------------------------------------------------------- +// Parallel generation — DS-10.1 +// --------------------------------------------------------------------------- + +// All 24 implemented TPC-DS tables, ordered: small dimensions first so they +// complete quickly and their slots open for the heavier fact tables. +static const std::vector> ALL_TPCDS_TABLES = { + // tiny dimensions (< 100 rows at any SF) + {"income_band", tpcds::TableType::IncomeBand}, + {"ship_mode", tpcds::TableType::ShipMode}, + {"warehouse", tpcds::TableType::Warehouse}, + {"reason", tpcds::TableType::Reason}, + {"call_center", tpcds::TableType::CallCenter}, + // small dimensions + {"web_site", tpcds::TableType::WebSite}, + {"web_page", tpcds::TableType::WebPage}, + {"catalog_page", tpcds::TableType::CatalogPage}, + {"household_demographics", tpcds::TableType::HouseholdDemographics}, + {"promotion", tpcds::TableType::Promotion}, + {"store", tpcds::TableType::Store}, + // medium dimensions + {"item", tpcds::TableType::Item}, + {"date_dim", tpcds::TableType::DateDim}, + {"time_dim", tpcds::TableType::TimeDim}, + {"customer_demographics", tpcds::TableType::CustomerDemographics}, + // large dimensions + {"customer_address", tpcds::TableType::CustomerAddress}, + {"customer", tpcds::TableType::Customer}, + // fact tables (heaviest last so all slots are warm when they start) + {"inventory", tpcds::TableType::Inventory}, + {"web_returns", tpcds::TableType::WebReturns}, + {"catalog_returns", tpcds::TableType::CatalogReturns}, + {"store_returns", tpcds::TableType::StoreReturns}, + {"web_sales", tpcds::TableType::WebSales}, + {"catalog_sales", tpcds::TableType::CatalogSales}, + {"store_sales", tpcds::TableType::StoreSales}, +}; + +// Child process: generate one table, write output, exit. +// dsdgen is already initialised (inherited via COW from parent). +// Returns exit code (0 = success). +static int run_table_child( + const Options& opts, + tpcds::TableType table_type, + tpcds::DSDGenWrapper& dsdgen) +{ + const std::string tname = tpcds::DSDGenWrapper::table_name(table_type); + const std::string filepath = opts.output_dir + "/" + tname + file_extension(opts.format); + + bool lance_async = (opts.format == "lance" && opts.zero_copy && + opts.zero_copy_mode == "async"); + + std::unique_ptr writer; + try { + writer = create_writer(opts.format, filepath, opts.compression, + opts.zero_copy, lance_async); + } catch (const std::exception& e) { + fprintf(stderr, "[%s] failed to create writer: %s\n", tname.c_str(), e.what()); + return 1; + } + +#ifdef TPCH_ENABLE_LANCE + if (opts.format == "lance") { + if (auto* lw = dynamic_cast(writer.get())) { + if (opts.zero_copy && !lance_async) + lw->set_buffered_flush_config(128, 1'048'576); + } + } +#endif + + // DS-10.3: inject IoUringOutputStream into Parquet streaming path when available. + // Works in child processes after IoUringPool::init() was called in the parent. + if (tpch::IoUringPool::available() && + opts.format == "parquet" && opts.zero_copy) { + void* ring = tpch::IoUringPool::create_child_ring_struct(); + // IoUringOutputStream takes ownership of ring; stream owns the file fd. + auto io_stream = std::make_shared( + filepath, ring); + if (auto* pw = dynamic_cast(writer.get())) { + pw->set_output_stream(std::move(io_stream)); + } + } + + // run_generation uses opts.table for append_dsdgen_row_to_builders dispatch + Options child_opts = opts; + child_opts.table = tname; + + auto schema = tpcds::DSDGenWrapper::get_schema(table_type, opts.scale_factor); + + auto t0 = std::chrono::steady_clock::now(); + size_t rows = 0; + try { + rows = dispatch_generation(child_opts, table_type, schema, writer, dsdgen); + } catch (const std::exception& e) { + fprintf(stderr, "[%s] generation error: %s\n", tname.c_str(), e.what()); + return 1; + } + writer->close(); + + double elapsed = std::chrono::duration( + std::chrono::steady_clock::now() - t0).count(); + printf("tpcds_benchmark: %-28s SF=%ld rows=%zu elapsed=%.2fs rate=%.0f rows/s\n", + tname.c_str(), opts.scale_factor, rows, + elapsed, elapsed > 0 ? rows / elapsed : 0.0); + printf(" output: %s\n", filepath.c_str()); + fflush(stdout); + return 0; +} + +// Fork-after-init parallel generation with rolling N-slot window. +// Returns 0 if all children succeeded, 1 if any failed. +static int generate_all_tables_parallel(const Options& opts) +{ + const size_t ntables = ALL_TPCDS_TABLES.size(); + const size_t slot_limit = (opts.parallel_tables > 0) + ? static_cast(opts.parallel_tables) + : ntables; + + // Initialise dsdgen ONCE in the parent. All children inherit the loaded + // distributions and seeded RNG streams via COW — no re-init needed. + tpcds::DSDGenWrapper parent_dsdgen(opts.scale_factor, opts.verbose); + parent_dsdgen.prepare_for_fork(); + + // DS-10.2: Initialise anchor io_uring ring before fork so children can + // attach via IORING_SETUP_ATTACH_WQ and share one kernel worker pool. + bool io_uring_ready = tpch::IoUringPool::init(opts.output_dir); + + auto t_wall = std::chrono::steady_clock::now(); + + fprintf(stderr, + "tpcds_benchmark: parallel SF=%ld tables=%zu slots=%zu format=%s io_uring=%s\n", + opts.scale_factor, ntables, slot_limit, opts.format.c_str(), + io_uring_ready ? "yes" : "no"); + + // pid → table index map so we can report which table finished + std::vector pids(ntables, -1); + std::vector slot_table(slot_limit, SIZE_MAX); // slot → table index + size_t next = 0; // index of next table to fork + size_t active = 0; // number of live children + int failed = 0; + + auto fork_next = [&](size_t slot) { + if (next >= ntables) return; + const auto& [tname, ttype] = ALL_TPCDS_TABLES[next]; + + pid_t pid = ::fork(); + if (pid < 0) { + perror("fork"); + ++failed; + ++next; + return; + } + if (pid == 0) { + // Child: temp file belongs to parent — don't unlink on exit. + parent_dsdgen.clear_tmp_path(); + int rc = run_table_child(opts, ttype, parent_dsdgen); + std::exit(rc); + } + // Parent + pids[next] = pid; + slot_table[slot] = next; + ++next; + ++active; + }; + + // Fill initial slots + for (size_t s = 0; s < slot_limit && next < ntables; ++s) + fork_next(s); + + // Rolling wait: as each child finishes, fork the next table into its slot + while (active > 0) { + int status; + pid_t done = ::waitpid(-1, &status, 0); + if (done < 0) { perror("waitpid"); break; } + + // Find which slot this pid occupied + size_t freed_slot = SIZE_MAX; + for (size_t s = 0; s < slot_limit; ++s) { + if (slot_table[s] < ntables && pids[slot_table[s]] == done) { + freed_slot = s; + break; + } + } + + if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { + const char* tname = (freed_slot < slot_limit && slot_table[freed_slot] < ntables) + ? ALL_TPCDS_TABLES[slot_table[freed_slot]].first.c_str() + : "unknown"; + fprintf(stderr, "tpcds_benchmark: [%s] child failed (pid=%d status=%d)\n", + tname, done, status); + ++failed; + } + --active; + + if (freed_slot != SIZE_MAX) + fork_next(freed_slot); + } + + // Parent owns the temp distribution file — destructor unlinks it. + double wall = std::chrono::duration( + std::chrono::steady_clock::now() - t_wall).count(); + fprintf(stderr, + "tpcds_benchmark: parallel done SF=%ld %zu tables wall=%.2fs %s\n", + opts.scale_factor, ntables, wall, + failed ? "SOME TABLES FAILED" : "all ok"); + + return failed ? 1 : 0; +} + } // namespace int main(int argc, char* argv[]) { @@ -396,7 +711,11 @@ int main(int argc, char* argv[]) { return 1; } - // Resolve table + // Parallel mode: generate all tables and return immediately + if (opts.parallel) + return generate_all_tables_parallel(opts); + + // Resolve table (single-table path) tpcds::TableType table_type; try { table_type = parse_table(opts.table); @@ -460,79 +779,7 @@ int main(int argc, char* argv[]) { // Generate size_t actual_rows = 0; try { - if (table_type == tpcds::TableType::StoreSales) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_store_sales(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::Inventory) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_inventory(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::CatalogSales) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_catalog_sales(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::WebSales) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_web_sales(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::Customer) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_customer(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::Item) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_item(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::DateDim) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_date_dim(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::StoreReturns) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_store_returns(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::CatalogReturns) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_catalog_returns(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::WebReturns) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_web_returns(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::CallCenter) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_call_center(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::CatalogPage) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_catalog_page(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::WebPage) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_web_page(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::WebSite) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_web_site(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::Warehouse) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_warehouse(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::ShipMode) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_ship_mode(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::HouseholdDemographics) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_household_demographics(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::CustomerDemographics) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_customer_demographics(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::CustomerAddress) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_customer_address(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::IncomeBand) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_income_band(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::Reason) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_reason(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::TimeDim) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_time_dim(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::Promotion) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_promotion(cb, opts.max_rows); }); - } else if (table_type == tpcds::TableType::Store) { - actual_rows = run_generation(opts, schema, writer, - [&](auto cb) { dsdgen.generate_store(cb, opts.max_rows); }); - } + actual_rows = dispatch_generation(opts, table_type, schema, writer, dsdgen); } catch (const std::exception& e) { fprintf(stderr, "tpcds_benchmark: generation error: %s\n", e.what()); return 1; diff --git a/src/writers/parquet_writer.cpp b/src/writers/parquet_writer.cpp index 574fca0..6490f5e 100644 --- a/src/writers/parquet_writer.cpp +++ b/src/writers/parquet_writer.cpp @@ -115,6 +115,10 @@ void ParquetWriter::enable_streaming_write(bool use_threads) { use_threads_ = use_threads; } +void ParquetWriter::set_output_stream(std::shared_ptr stream) { + injected_stream_ = std::move(stream); +} + void ParquetWriter::write_managed_batch(const ManagedRecordBatch& managed_batch) { if (closed_) { throw std::runtime_error("Cannot write to a closed Parquet writer"); @@ -211,12 +215,17 @@ void ParquetWriter::init_file_writer() { .set_use_threads(use_threads_) ->build(); - // Create output stream - auto outfile_result = arrow::io::FileOutputStream::Open(filepath_); - if (!outfile_result.ok()) { - throw std::runtime_error("Failed to open file: " + outfile_result.status().message()); + // Create output stream — use injected stream (e.g. IoUringOutputStream) if provided + std::shared_ptr outfile; + if (injected_stream_) { + outfile = injected_stream_; + } else { + auto outfile_result = arrow::io::FileOutputStream::Open(filepath_); + if (!outfile_result.ok()) { + throw std::runtime_error("Failed to open file: " + outfile_result.status().message()); + } + outfile = outfile_result.ValueOrDie(); } - auto outfile = outfile_result.ValueOrDie(); // Create FileWriter for streaming RecordBatches auto writer_result = parquet::arrow::FileWriter::Open(