Skip to content

TPC-DS parallel generation + io_uring streaming + tpch cleanup#13

Open
tsafin wants to merge 10 commits intomasterfrom
tsafin/parallel_tpcds
Open

TPC-DS parallel generation + io_uring streaming + tpch cleanup#13
tsafin wants to merge 10 commits intomasterfrom
tsafin/parallel_tpcds

Conversation

@tsafin
Copy link
Owner

@tsafin tsafin commented Mar 17, 2026

  • DS-10.1: Fork-after-init parallel generation for TPC-DS (--parallel, --parallel-tables N) with rolling N-slot window — all 24 tables generated concurrently with a single dsdgen init
  • DS-10.2/10.3: IoUringPool (anchor ring + ATTACH_WQ) and IoUringOutputStream (Arrow OutputStream backed by io_uring worker thread) injected into Parquet streaming path in parallel children
  • TPC-H port: ported the generic parallel + io_uring approach from tpcds_main.cpp to src/main.cpp — removes ad-hoc --async-io / --lance-io-uring flags, replaces flat fork loop with rolling N-slot window, adds --parallel-tables N and explicit --io-uring flag, wires single-table path too
  • CLI cleanup: removed 7 low-level --lance-* tuning options that don't belong in a benchmark CLI
  • README rewrite: accurate CLI reference for both binaries, Docker quick-start, removed stale optimization histories and project structure tree

Key flags added

Binary Flag Description
both --parallel Fork-after-init parallel generation
both --parallel-tables N Max concurrent children (default: all)
both --io-uring Explicit opt-in for io_uring writes
tpch (already had) --zero-copy Now also required reminder: use at SF≥5 with --parallel

Test plan

  • tpch_benchmark --parallel --zero-copy --scale-factor 1 --max-rows 0 — all 8 tables, no crash
  • tpch_benchmark --parallel --zero-copy --io-uring --scale-factor 1 --max-rows 0 — io_uring=yes in banner
  • tpcds_benchmark --parallel --zero-copy --scale-factor 1 --max-rows 0 — all 24 tables
  • tpch_benchmark --help — no stale flags (--async-io, --lance-* gone)
  • Build with TPCH_ENABLE_LANCE=OFF and TPCH_ENABLE_LANCE=ON both succeed

tsafin and others added 10 commits March 16, 2026 23:34
…plan

Describes DS-10 design:
- Fork-after-init for all 24 TPC-DS tables (Phase DS-10.1)
- io_uring anchor ring as both ATTACH_WQ source and process scheduler
- pidfd + IORING_OP_POLL_ADD as the N-slot semaphore (rolling fork, no spin)
- Format-agnostic IoUringOutputStream injected at create_writer() factory
- Lessons generalised from Lance io_uring (persistent ring, no SQPOLL/O_DIRECT,
  sysfs QD calibration, ATTACH_WQ shared worker pool)
- Rationale for rejecting pre-forked job-server pattern (static work set)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add --parallel flag to generate all 24 TPC-DS tables simultaneously using
fork-after-init: distributions loaded once in parent, children inherit via
COW with no re-initialisation.  Rolling N-slot window via waitpid(-1) keeps
at most --parallel-tables N children active (default: all tables).

Key changes:
- DSDGenWrapper: add prepare_for_fork() and clear_tmp_path() for safe fork
  (parent owns tmp dist file; children clear the path before exiting)
- tpcds_main: extract dispatch_generation() — eliminates 24-entry if-else
  duplication between single-table and parallel paths
- ALL_TPCDS_TABLES ordered: small dims first → slots open quickly for facts
- Each child prints its own timing line; parent prints wall-time summary

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…eam injection

DS-10.2 — IoUringPool + IoUringOutputStream (format-agnostic async I/O):

  include/tpch/io_uring_pool.hpp
  src/async/io_uring_pool.cpp
    • IoUringPool singleton: init() creates anchor io_uring ring with sysfs
      queue-depth calibration (nr_requests/2, clamped [8,128]).
    • watch_child(pidfd, user_data): submits POLL_ADD on the anchor ring —
      in-flight count IS the N-slot semaphore (replaces waitpid/-1 in DS-10.2+).
    • wait_any(): blocks until ≥1 child exits; returns batch of user_data values.
    • create_child_ring_struct(): allocates a new io_uring ring with
      IORING_SETUP_ATTACH_WQ → all child rings share one kernel worker pool.
    • free_ring(): releases child ring (called by IoUringOutputStream dtor).
    • Stub (TPCH_ENABLE_ASYNC_IO=OFF): available() returns false; all calls no-op.

  include/tpch/io_uring_output_stream.hpp
  src/async/io_uring_output_stream.cpp
    • arrow::io::OutputStream backed by io_uring.
    • Worker thread owns the ring for the file lifetime (persistent ring).
    • Write(): pre-claims offset atomically (fetch_add), copies to WriteJob,
      enqueues to MPSC queue, blocks on future until worker drains CQEs.
    • Worker loop: 512 KB SQE chunks; submit when SQ full; drain all in-flight.
    • No SQPOLL, no O_DIRECT (both hurt on WSL2/VirtIO).
    • Sync fallback (ring==nullptr): pwrite(2) directly in Write(), no thread.

DS-10.3 — inject IoUringOutputStream into Parquet streaming path:

  include/tpch/parquet_writer.hpp, src/writers/parquet_writer.cpp
    • ParquetWriter::set_output_stream(shared_ptr<OutputStream>): inject external
      stream; init_file_writer() uses it instead of FileOutputStream::Open().

  src/tpcds_main.cpp
    • In generate_all_tables_parallel(): call IoUringPool::init(output_dir) before
      fork so children inherit the anchor ring fd for ATTACH_WQ.
    • In run_table_child(): when IoUringPool::available() && parquet+zero-copy,
      create child ring, wrap in IoUringOutputStream, inject into ParquetWriter.

  CMakeLists.txt
    • Add io_uring_pool.cpp, io_uring_output_stream.cpp unconditionally (both
      handle #ifdef TPCH_ENABLE_ASYNC_IO internally; stubs when flag is OFF).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace ad-hoc --async-io and --lance-io-uring flags with the generic
IoUringPool+IoUringOutputStream infrastructure already used by tpcds.

Changes:
- Remove Options::async_io, Options::lance_io_uring
- Remove AsyncIOContext single-table path (dead code since DS-10.3)
- Remove #ifdef TPCH_ENABLE_ASYNC_IO guards in parse_args
- Add Options::parallel_tables + --parallel-tables <N> flag
- Replace generate_all_tables_parallel_v2 flat-fork loop with rolling
  N-slot window matching tpcds_main.cpp: run_table_child + generate_all_tables_parallel
- Inject IoUringOutputStream into Parquet streaming path in children
  (same as tpcds DS-10.3)
- Print per-table completion lines to stdout (matches tpcds style)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace implicit io_uring auto-detection (format==parquet && zero_copy)
with an explicit --io-uring CLI flag, matching the user-controlled
approach in tpcds.

- Add Options::io_uring + OPT_IO_URING + --io-uring to parse_args/usage
- Extract wire_io_uring() helper: Lance → enable_io_uring(true);
  Parquet → inject IoUringOutputStream; other formats → no-op
- Gate IoUringPool::init() in generate_all_tables_parallel on opts.io_uring
- Wire single-table main() path: IoUringPool::init() + wire_io_uring()
  (was completely missing before)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Drop seven research knobs that don't belong in a benchmark CLI:
--lance-rows-per-file, --lance-rows-per-group, --lance-max-bytes-per-file,
--lance-skip-auto-cleanup, --lance-stream-queue, --lance-stats-level,
--lance-cardinality-sample-rate (and corresponding Options fields,
OPT_LANCE_* constants, parse_args cases, and the writer-setup block).

The single-table Lance path now mirrors run_table_child: only wires
enable_streaming_write() based on --zero-copy / --zero-copy-mode.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Remove: project structure tree, Phase 12/14 optimization histories,
  future enhancements, architecture notes, outdated benchmark table,
  references to removed flags (--use-dbgen, --async-io, --lance-*)
- Add: Docker quick-start with ghcr.io/tsafin/tpch-cpp-all image
- Add: complete, accurate tpch_benchmark and tpcds_benchmark CLI reference
  reflecting current flags (--parallel-tables, --io-uring, etc.)
- Add: TPC-DS tables reference table
- Update: build options table (add TPCDS_ENABLE, remove stale entries)
- Update: dependencies table (add liburing, Rust entries)
- Add: performance notes (zero-copy requirement, io_uring trade-offs, ASAN warning)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
enable_io_uring() is only declared when TPCH_LANCE_IO_URING is defined.
The CI lance build (TPCH_ENABLE_LANCE=ON, TPCH_LANCE_IO_URING not set)
hit a compile error. Gate the call on both defines.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@tsafin
Copy link
Owner Author

tsafin commented Mar 17, 2026

@codex review

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a generalized io_uring write backend and extends the benchmark drivers to support fork-after-init parallel generation (TPC-DS: all 24 tables; TPC-H: improved parallel controls), including Parquet streaming integration via injected arrow::io::OutputStream.

Changes:

  • Add IoUringPool + IoUringOutputStream and compile them into the core library (with runtime fallback when TPCH_ENABLE_ASYNC_IO is off/unavailable).
  • Enable Parquet streaming writer to accept an injected arrow::io::OutputStream (used for io_uring).
  • Implement TPC-DS parallel generation (--parallel, --parallel-tables) and update TPC-H CLI to support --parallel-tables and a unified --io-uring flag.

Reviewed changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
src/writers/parquet_writer.cpp Uses an injected Arrow OutputStream when provided for streaming Parquet writes.
include/tpch/parquet_writer.hpp Declares set_output_stream() and stores injected_stream_.
src/tpcds_main.cpp Adds parallel all-tables generation + io_uring stream injection for Parquet zero-copy children.
src/main.cpp Reworks CLI flags and wiring for io_uring + parallel generation (TPC-H).
src/dsdgen/dsdgen_wrapper.cpp Adds fork-safety helpers (prepare_for_fork, clear_tmp_path).
include/tpch/dsdgen_wrapper.hpp Declares fork-safety helpers for TPC-DS generator.
include/tpch/io_uring_pool.hpp New API for initializing/creating rings (anchor + child rings).
src/async/io_uring_pool.cpp Implements io_uring ring init + child ring creation (stubbed when disabled).
include/tpch/io_uring_output_stream.hpp New Arrow OutputStream abstraction backed by io_uring (or sync fallback).
src/async/io_uring_output_stream.cpp Implements io_uring-backed writes with a worker thread and chunked SQEs.
CMakeLists.txt Adds new io_uring sources to core build.
README.md Updates project scope (TPC-H + TPC-DS), flags, and usage examples.
docs/parallel_uring_arch.md New design/architecture writeup for parallel generation + io_uring integration.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +23 to +31
IoUringOutputStream::IoUringOutputStream(const std::string& path,
void* ring_struct)
: ring_(ring_struct) {
file_fd_ = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (file_fd_ < 0) {
throw std::runtime_error(
"IoUringOutputStream: cannot open '" + path +
"': " + strerror(errno));
}
Comment on lines +196 to +205
io_uring_prep_write(sqe, file_fd_, ptr,
static_cast<unsigned>(chunk),
static_cast<off_t>(off));
sqe->user_data = 0;
++inflight;

ptr += chunk;
off += static_cast<int64_t>(chunk);
rem -= chunk;
}
Comment on lines +209 to +220
io_uring_submit(ring);
while (inflight > 0) {
struct io_uring_cqe* cqe = nullptr;
int r = io_uring_wait_cqe(ring, &cqe);
if (r == 0) {
if (cqe->res < 0 && st.ok())
st = arrow::Status::IOError(
"io_uring write: ", strerror(-cqe->res));
io_uring_cqe_seen(ring, cqe);
}
--inflight;
}
Comment on lines +59 to +70
/** Pre-claim offset, copy data, enqueue to worker, wait for completion. */
arrow::Status Write(const void* data, int64_t nbytes) override;

/** Convenience overload accepting a Buffer. */
arrow::Status Write(const std::shared_ptr<arrow::Buffer>& data) override;

/** No-op: Write() already blocks until data is on disk. */
arrow::Status Flush() override;

/** Stop worker thread (if running), fsync, close file fd. */
arrow::Status Close() override;

use_threads_ = use_threads;
}

void ParquetWriter::set_output_stream(std::shared_ptr<arrow::io::OutputStream> stream) {
Comment on lines +573 to +585
} catch (const std::exception& e) {
fprintf(stderr, "[%s] generation error: %s\n", tname.c_str(), e.what());
return 1;
}
writer->close();

double elapsed = std::chrono::duration<double>(
std::chrono::steady_clock::now() - t0).count();
printf("tpcds_benchmark: %-28s SF=%ld rows=%zu elapsed=%.2fs rate=%.0f rows/s\n",
tname.c_str(), opts.scale_factor, rows,
elapsed, elapsed > 0 ? rows / elapsed : 0.0);
printf(" output: %s\n", filepath.c_str());
fflush(stdout);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants