From 183f2488c971baf2bc92a825d8654e073fcb336e Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Wed, 1 Apr 2026 09:59:49 -0700 Subject: [PATCH 1/4] Add metadata latency sensitivity analysis to DuckLake concurrency benchmarks Introduces a TCP latency proxy that injects configurable one-way delay between DuckDB/DuckLake and the metadata PostgreSQL, enabling sensitivity analysis of how network latency affects transaction conflict rates. Key changes: - TCP latency proxy (latency_proxy.go) with bidirectional delay injection - DUCKGRES_BENCH_LATENCIES env var to sweep latencies (e.g. 0ms,25ms,50ms,100ms) - Per-latency duckgres harnesses with proxy in front of metadata PostgreSQL - JSON report includes metadata_latency_ms per metric and latencies_tested_ms - justfile recipes: bench-ducklake-latency, bench-ducklake-full-matrix - Version matrix script passes through latency config for 2D (version x latency) analysis Co-Authored-By: Claude Opus 4.6 (1M context) --- justfile | 10 + scripts/ducklake_version_matrix.sh | 30 ++- .../integration/ducklake_concurrency_test.go | 236 ++++++++++++------ tests/integration/harness.go | 43 +++- tests/integration/latency_proxy.go | 130 ++++++++++ tests/integration/latency_proxy_test.go | 131 ++++++++++ 6 files changed, 486 insertions(+), 94 deletions(-) create mode 100644 tests/integration/latency_proxy.go create mode 100644 tests/integration/latency_proxy_test.go diff --git a/justfile b/justfile index b73ca7f..ae60853 100644 --- a/justfile +++ b/justfile @@ -311,6 +311,16 @@ bench-ducklake: bench-ducklake-matrix: ./scripts/ducklake_version_matrix.sh +# Run DuckLake concurrency benchmarks with metadata latency sensitivity analysis +[group('test')] +bench-ducklake-latency latencies="0ms,25ms,50ms,100ms": + DUCKGRES_BENCH_LATENCIES={{latencies}} go test -v -run TestDuckLakeConcurrentTransactions -timeout 600s ./tests/integration/... + +# Run full version x latency matrix +[group('test')] +bench-ducklake-full-matrix latencies="0ms,50ms,100ms": + DUCKGRES_BENCH_LATENCIES={{latencies}} ./scripts/ducklake_version_matrix.sh + # Run extension loading tests [group('test')] test-extensions: diff --git a/scripts/ducklake_version_matrix.sh b/scripts/ducklake_version_matrix.sh index c401e02..5b0152d 100755 --- a/scripts/ducklake_version_matrix.sh +++ b/scripts/ducklake_version_matrix.sh @@ -9,6 +9,7 @@ # ./scripts/ducklake_version_matrix.sh # run matrix # ./scripts/ducklake_version_matrix.sh --current-only # benchmark current version only # DUCKLAKE_VERSIONS="v2.10501.0" ./scripts/ducklake_version_matrix.sh # custom versions +# DUCKGRES_BENCH_LATENCIES=0ms,50ms,100ms ./scripts/ducklake_version_matrix.sh # latency sweep # # Requires: Docker running (for DuckLake infra), go, git, jq (for comparison) @@ -70,11 +71,12 @@ run_benchmark() { ( cd "$work_dir" DUCKGRES_BENCH_OUT="$out_file" \ + DUCKGRES_BENCH_LATENCIES="${DUCKGRES_BENCH_LATENCIES:-}" \ go test -v -count=1 \ -run TestDuckLakeConcurrentTransactions \ -timeout "$TEST_TIMEOUT" \ ./tests/integration/ 2>&1 \ - | grep -E '(--- PASS|--- FAIL|FAIL|^ok|ducklake_concurrency_test.go.*:|DuckDB|DuckLake)' \ + | grep -E '(--- PASS|--- FAIL|FAIL|^ok|ducklake_concurrency_test.go.*:|DuckDB|DuckLake|latency)' \ || true ) @@ -149,19 +151,29 @@ compare_results() { echo "" printf '%0.s-' $(seq 1 $((45 + ${#files[@]} * 24))); echo "" - # Get all test names from first file + # Get unique (test, latency) pairs from first file local tests - tests=$(jq -r '.metrics[].test' "${files[0]}") - - while IFS= read -r test; do - printf "%-45s" "$test" + tests=$(jq -r '.metrics[] | "\(.test)\t\(.metadata_latency_ms // 0)"' "${files[0]}") + + while IFS=$'\t' read -r test lat; do + local label="$test" + if [[ "$lat" != "0" ]]; then + label="${test} (${lat}ms)" + fi + printf "%-45s" "$label" for f in "${files[@]}"; do local rate - rate=$(jq -r --arg t "$test" '.metrics[] | select(.test == $t) | .conflict_rate_pct // 0 | . * 10 | round / 10' "$f" 2>/dev/null || echo "n/a") + rate=$(jq -r --arg t "$test" --argjson l "$lat" \ + '.metrics[] | select(.test == $t and (.metadata_latency_ms // 0) == $l) | .conflict_rate_pct // 0 | . * 10 | round / 10' \ + "$f" 2>/dev/null || echo "n/a") local succ - succ=$(jq -r --arg t "$test" '.metrics[] | select(.test == $t) | .successes // 0' "$f" 2>/dev/null || echo "n/a") + succ=$(jq -r --arg t "$test" --argjson l "$lat" \ + '.metrics[] | select(.test == $t and (.metadata_latency_ms // 0) == $l) | .successes // 0' \ + "$f" 2>/dev/null || echo "n/a") local conf - conf=$(jq -r --arg t "$test" '.metrics[] | select(.test == $t) | .conflicts // 0' "$f" 2>/dev/null || echo "n/a") + conf=$(jq -r --arg t "$test" --argjson l "$lat" \ + '.metrics[] | select(.test == $t and (.metadata_latency_ms // 0) == $l) | .conflicts // 0' \ + "$f" 2>/dev/null || echo "n/a") printf " %5s ok %4s err %4s%%" "$succ" "$conf" "$rate" done echo "" diff --git a/tests/integration/ducklake_concurrency_test.go b/tests/integration/ducklake_concurrency_test.go index dc57caa..03afa37 100644 --- a/tests/integration/ducklake_concurrency_test.go +++ b/tests/integration/ducklake_concurrency_test.go @@ -6,6 +6,7 @@ import ( "fmt" "math/rand" "os" + "strconv" "strings" "sync" "sync/atomic" @@ -17,19 +18,21 @@ import ( // When DUCKGRES_BENCH_OUT is set, all metrics are written as JSON to that // file so the matrix runner can compare across DuckLake versions. type concurrencyMetric struct { - Test string `json:"test"` - Successes int64 `json:"successes"` - Conflicts int64 `json:"conflicts"` - Errors int64 `json:"errors,omitempty"` - ConflictRate float64 `json:"conflict_rate_pct"` - Duration float64 `json:"duration_sec"` - Throughput float64 `json:"throughput_ops_sec,omitempty"` + Test string `json:"test"` + MetadataLatencyMs int `json:"metadata_latency_ms"` + Successes int64 `json:"successes"` + Conflicts int64 `json:"conflicts"` + Errors int64 `json:"errors,omitempty"` + ConflictRate float64 `json:"conflict_rate_pct"` + Duration float64 `json:"duration_sec"` + Throughput float64 `json:"throughput_ops_sec,omitempty"` } // concurrencyReport is the top-level JSON output for the matrix runner. type concurrencyReport struct { DuckDBVersion string `json:"duckdb_version"` DuckLakeVersion string `json:"ducklake_version"` + LatenciesTested []int `json:"latencies_tested_ms"` Timestamp string `json:"timestamp"` Metrics []concurrencyMetric `json:"metrics"` } @@ -55,12 +58,13 @@ func recordMetric(m concurrencyMetric) { // Duration, throughput, and conflict rate are computed automatically. type metric = concurrencyMetric -func benchSub(t *testing.T, name string, body func(t *testing.T, m *metric)) { +func benchSub(t *testing.T, name string, latencyMs int, body func(t *testing.T, m *metric)) { t.Helper() t.Run(name, func(t *testing.T) { start := time.Now() var m metric m.Test = name + m.MetadataLatencyMs = latencyMs body(t, &m) m.Duration = time.Since(start).Seconds() if m.Duration > 0 && m.Successes > 0 { @@ -70,6 +74,47 @@ func benchSub(t *testing.T, name string, body func(t *testing.T, m *metric)) { }) } +// parseLatencies parses DUCKGRES_BENCH_LATENCIES env var into a sorted list of +// millisecond values. Default: [0] (no artificial latency). +// Format: comma-separated durations, e.g. "0ms,10ms,50ms,100ms" +func parseLatencies() []int { + raw := os.Getenv("DUCKGRES_BENCH_LATENCIES") + if raw == "" { + return []int{0} + } + var latencies []int + for _, s := range strings.Split(raw, ",") { + s = strings.TrimSpace(s) + s = strings.TrimSuffix(s, "ms") + ms, err := strconv.Atoi(s) + if err != nil || ms < 0 { + continue + } + latencies = append(latencies, ms) + } + if len(latencies) == 0 { + return []int{0} + } + return latencies +} + +// openConnToPort opens a fresh connection to a duckgres server on the given port. +func openConnToPort(t *testing.T, port int) *sql.DB { + t.Helper() + connStr := fmt.Sprintf("host=127.0.0.1 port=%d user=testuser password=testpass dbname=test sslmode=require", port) + db, err := sql.Open("postgres", connStr) + if err != nil { + t.Fatalf("Failed to open connection: %v", err) + } + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + if err := db.Ping(); err != nil { + _ = db.Close() + t.Fatalf("Failed to ping port %d: %v", port, err) + } + return db +} + // TestDuckLakeConcurrentTransactions is an extensive concurrency test suite for // DuckLake transactions. It exercises concurrent inserts, updates, deletes, // mixed DDL/DML, and transaction conflicts across multiple connections to @@ -80,17 +125,16 @@ func benchSub(t *testing.T, name string, body func(t *testing.T, m *metric)) { // stress that behavior. // // Set DUCKGRES_BENCH_OUT=path.json to write structured metrics for comparison. +// Set DUCKGRES_BENCH_LATENCIES=0ms,10ms,50ms,100ms to run a latency sensitivity +// analysis (adds artificial one-way latency to the metadata PostgreSQL connection). func TestDuckLakeConcurrentTransactions(t *testing.T) { if !testHarness.useDuckLake { t.Skip("DuckLake mode not enabled (set DUCKGRES_TEST_NO_DUCKLAKE= to enable)") } // Log version info for traceability. - // DuckDB version functions are transpiled to PG-compat strings, so we query - // the extension table for DuckLake and use the library_version pragma for DuckDB. conn := openDuckgresConn(t) var duckdbVer, ducklakeVer string - // SHOW duckdb.library_version is not transpiled and returns the actual engine version if err := conn.QueryRow("SELECT library_version FROM pragma_version()").Scan(&duckdbVer); err != nil { duckdbVer = "unknown" } @@ -104,6 +148,8 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { _ = conn.Close() t.Logf("DuckDB %s, DuckLake extension %s", duckdbVer, ducklakeVer) + latencies := parseLatencies() + // Write report on cleanup t.Cleanup(func() { outPath := os.Getenv("DUCKGRES_BENCH_OUT") @@ -114,6 +160,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { report := concurrencyReport{ DuckDBVersion: duckdbVer, DuckLakeVersion: ducklakeVer, + LatenciesTested: latencies, Timestamp: time.Now().UTC().Format(time.RFC3339), Metrics: metricsCollector.metrics, } @@ -131,8 +178,45 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { t.Logf("metrics written to %s", outPath) }) - benchSub(t, "concurrent_inserts_same_table", func(t *testing.T, m *metric) { - conn := openDuckgresConn(t) + for _, latencyMs := range latencies { + latencyMs := latencyMs // capture loop var + name := fmt.Sprintf("latency_%dms", latencyMs) + t.Run(name, func(t *testing.T) { + // For non-zero latency, spin up a dedicated duckgres with a latency + // proxy in front of the metadata PostgreSQL. For 0ms, use the + // default test harness (no proxy overhead). + var openConn func(t *testing.T) *sql.DB + if latencyMs == 0 { + openConn = openDuckgresConn + } else { + cfg := DefaultConfig() + cfg.SkipPostgres = true + cfg.MetadataLatency = time.Duration(latencyMs) * time.Millisecond + h, err := NewTestHarness(cfg) + if err != nil { + t.Fatalf("failed to create latency harness (%dms): %v", latencyMs, err) + } + t.Cleanup(func() { _ = h.Close() }) + port := h.dgPort + t.Logf("latency harness on port %d (metadata latency: %dms one-way, ~%dms RTT)", port, latencyMs, latencyMs*2) + openConn = func(t *testing.T) *sql.DB { + return openConnToPort(t, port) + } + } + + runConcurrencyBenchmarks(t, latencyMs, openConn) + }) + } +} + +// runConcurrencyBenchmarks contains all the concurrency benchmark sub-tests. +// It is parameterized by a connection opener (openConn) so it can run against +// different duckgres instances (with/without metadata latency proxy). +func runConcurrencyBenchmarks(t *testing.T, latencyMs int, openConn func(*testing.T) *sql.DB) { + t.Helper() + + benchSub(t, "concurrent_inserts_same_table", latencyMs, func(t *testing.T, m *metric) { + conn := openConn(t) defer func() { _ = conn.Close() }() mustExec(t, conn, "CREATE TABLE dl_conc_insert (id INTEGER, worker INTEGER, val TEXT)") @@ -149,7 +233,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for i := range rowsPerWorker { @@ -189,8 +273,8 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { } }) - benchSub(t, "concurrent_inserts_with_transactions", func(t *testing.T, m *metric) { - conn := openDuckgresConn(t) + benchSub(t, "concurrent_inserts_with_transactions", latencyMs, func(t *testing.T, m *metric) { + conn := openConn(t) defer func() { _ = conn.Close() }() mustExec(t, conn, "CREATE TABLE dl_conc_tx_insert (id INTEGER, batch INTEGER, val TEXT)") @@ -208,7 +292,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for b := range batchesPerWorker { @@ -274,8 +358,8 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { } }) - benchSub(t, "concurrent_updates_same_rows", func(t *testing.T, m *metric) { - conn := openDuckgresConn(t) + benchSub(t, "concurrent_updates_same_rows", latencyMs, func(t *testing.T, m *metric) { + conn := openConn(t) defer func() { _ = conn.Close() }() mustExec(t, conn, "CREATE TABLE dl_conc_update (id INTEGER, counter INTEGER, last_writer INTEGER)") @@ -296,7 +380,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for i := range updatesPerWorker { @@ -338,8 +422,8 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { } }) - benchSub(t, "concurrent_deletes_and_inserts", func(t *testing.T, m *metric) { - conn := openDuckgresConn(t) + benchSub(t, "concurrent_deletes_and_inserts", latencyMs, func(t *testing.T, m *metric) { + conn := openConn(t) defer func() { _ = conn.Close() }() mustExec(t, conn, "CREATE TABLE dl_conc_del (id INTEGER, val TEXT)") @@ -363,7 +447,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for i := range opsPerWorker { @@ -389,7 +473,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for i := range opsPerWorker { @@ -433,8 +517,8 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { } }) - benchSub(t, "concurrent_multi_table_transactions", func(t *testing.T, m *metric) { - conn := openDuckgresConn(t) + benchSub(t, "concurrent_multi_table_transactions", latencyMs, func(t *testing.T, m *metric) { + conn := openConn(t) defer func() { _ = conn.Close() }() mustExec(t, conn, "CREATE TABLE dl_conc_orders (id INTEGER, customer TEXT, total DOUBLE)") @@ -455,7 +539,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for o := range ordersPerWorker { @@ -541,8 +625,8 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { } }) - benchSub(t, "concurrent_read_write_isolation", func(t *testing.T, m *metric) { - conn := openDuckgresConn(t) + benchSub(t, "concurrent_read_write_isolation", latencyMs, func(t *testing.T, m *metric) { + conn := openConn(t) defer func() { _ = conn.Close() }() mustExec(t, conn, "CREATE TABLE dl_conc_rw (id INTEGER, val INTEGER)") @@ -564,7 +648,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(writerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for i := range opsPerWorker { @@ -588,7 +672,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(readerID int) { defer wg.Done() - rconn := openDuckgresConn(t) + rconn := openConn(t) defer func() { _ = rconn.Close() }() for i := range opsPerWorker { @@ -637,8 +721,8 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { t.Logf("read/write isolation: %d write conflicts, %d read errors", writeConflicts.Load(), readErrors.Load()) }) - benchSub(t, "concurrent_upsert_storm", func(t *testing.T, m *metric) { - conn := openDuckgresConn(t) + benchSub(t, "concurrent_upsert_storm", latencyMs, func(t *testing.T, m *metric) { + conn := openConn(t) defer func() { _ = conn.Close() }() // DuckLake rewrites ON CONFLICT to MERGE — test it under concurrency @@ -660,7 +744,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for i := range opsPerWorker { @@ -703,8 +787,8 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { t.Logf("upsert storm: %d succeeded, %d conflicts", m.Successes, m.Conflicts) }) - benchSub(t, "concurrent_ddl_while_writing", func(t *testing.T, m *metric) { - conn := openDuckgresConn(t) + benchSub(t, "concurrent_ddl_while_writing", latencyMs, func(t *testing.T, m *metric) { + conn := openConn(t) defer func() { _ = conn.Close() }() mustExec(t, conn, "CREATE TABLE dl_conc_ddl_write (id INTEGER, val TEXT)") @@ -718,7 +802,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for i := range 60 { @@ -744,7 +828,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - dconn := openDuckgresConn(t) + dconn := openConn(t) defer func() { _ = dconn.Close() }() time.Sleep(10 * time.Millisecond) // Let some writes land first @@ -775,8 +859,8 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { t.Logf("ddl+write: %d conflicts", m.Conflicts) }) - benchSub(t, "concurrent_large_batch_inserts", func(t *testing.T, m *metric) { - conn := openDuckgresConn(t) + benchSub(t, "concurrent_large_batch_inserts", latencyMs, func(t *testing.T, m *metric) { + conn := openConn(t) defer func() { _ = conn.Close() }() mustExec(t, conn, "CREATE TABLE dl_conc_batch (id INTEGER, worker INTEGER, payload TEXT)") @@ -794,7 +878,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for b := range batchesPerWorker { @@ -855,7 +939,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { } }) - benchSub(t, "concurrent_create_drop_tables", func(t *testing.T, m *metric) { + benchSub(t, "concurrent_create_drop_tables", latencyMs, func(t *testing.T, m *metric) { const numWorkers = 4 const cyclesPerWorker = 8 var wg sync.WaitGroup @@ -867,7 +951,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for c := range cyclesPerWorker { @@ -940,10 +1024,10 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { t.Logf("create/drop cycles: %d completed, %d conflicts", m.Successes, m.Conflicts) }) - benchSub(t, "concurrent_inserts_separate_tables", func(t *testing.T, m *metric) { + benchSub(t, "concurrent_inserts_separate_tables", latencyMs, func(t *testing.T, m *metric) { // DuckLake uses global snapshot IDs, so even writes to separate tables // can conflict. This test specifically targets that behavior. - conn := openDuckgresConn(t) + conn := openConn(t) defer func() { _ = conn.Close() }() const numTables = 6 @@ -966,7 +1050,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(tableIdx int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() tableName := fmt.Sprintf("dl_conc_sep_%d", tableIdx) @@ -1002,9 +1086,9 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { } }) - benchSub(t, "rapid_autocommit_inserts", func(t *testing.T, m *metric) { + benchSub(t, "rapid_autocommit_inserts", latencyMs, func(t *testing.T, m *metric) { // Fivetran-like pattern: many small autocommit inserts in rapid succession - conn := openDuckgresConn(t) + conn := openConn(t) defer func() { _ = conn.Close() }() mustExec(t, conn, "CREATE TABLE dl_conc_rapid (id INTEGER, ts BIGINT, data TEXT)") @@ -1021,7 +1105,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for i := range insertsPerWorker { @@ -1070,8 +1154,8 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { } }) - benchSub(t, "concurrent_tx_rollback_stress", func(t *testing.T, m *metric) { - conn := openDuckgresConn(t) + benchSub(t, "concurrent_tx_rollback_stress", latencyMs, func(t *testing.T, m *metric) { + conn := openConn(t) defer func() { _ = conn.Close() }() mustExec(t, conn, "CREATE TABLE dl_conc_rollback (id INTEGER, val TEXT)") @@ -1089,7 +1173,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for i := range txPerWorker { @@ -1150,9 +1234,9 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { } }) - benchSub(t, "sustained_concurrent_load", func(t *testing.T, m *metric) { + benchSub(t, "sustained_concurrent_load", latencyMs, func(t *testing.T, m *metric) { // Simulate sustained concurrent load over a time window (like Fivetran sync) - conn := openDuckgresConn(t) + conn := openConn(t) defer func() { _ = conn.Close() }() mustExec(t, conn, "CREATE TABLE dl_conc_sustained (id BIGINT, worker INTEGER, ts BIGINT, payload TEXT)") @@ -1169,7 +1253,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() deadline := time.Now().Add(duration) @@ -1220,12 +1304,12 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { t.Logf("throughput: %.0f inserts/sec, conflict rate: %.1f%%", throughput, conflictRate) }) - benchSub(t, "concurrent_ctas", func(t *testing.T, m *metric) { + benchSub(t, "concurrent_ctas", latencyMs, func(t *testing.T, m *metric) { // CREATE TABLE AS SELECT (CTAS) is a combined DDL+DML operation that // creates a new table and populates it in a single transaction. Under // concurrency this is especially conflict-prone because the DDL catalog // write and the data write both go through the DuckLake metadata store. - conn := openDuckgresConn(t) + conn := openConn(t) defer func() { _ = conn.Close() }() // Seed a source table @@ -1253,7 +1337,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for c := range ctasPerWorker { @@ -1304,7 +1388,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { t.Logf("concurrent CTAS: %d created, %d conflicts", m.Successes, m.Conflicts) }) - benchSub(t, "sqlmesh_ctas_distinct_targets", func(t *testing.T, m *metric) { + benchSub(t, "sqlmesh_ctas_distinct_targets", latencyMs, func(t *testing.T, m *metric) { // Reproduces the exact SQLMesh production failure pattern: // - Multiple models run concurrently via ThreadPoolExecutor // - Each model does CREATE OR REPLACE TABLE AS SELECT ... FROM @@ -1313,7 +1397,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { // // Prod error: "Transaction conflict - attempting to insert into table with // index "25667" - but another transaction has altered it" - conn := openDuckgresConn(t) + conn := openConn(t) defer func() { _ = conn.Close() }() numModels := 10 @@ -1365,7 +1449,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(modelID, roundID int) { defer wg.Done() - mconn := openDuckgresConn(t) + mconn := openConn(t) defer func() { _ = mconn.Close() }() cat := fmt.Sprintf("cat_%d", modelID) @@ -1421,11 +1505,11 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { } }) - benchSub(t, "sqlmesh_ctas_with_deps", func(t *testing.T, m *metric) { + benchSub(t, "sqlmesh_ctas_with_deps", latencyMs, func(t *testing.T, m *metric) { // Extended SQLMesh pattern: models have dependency tiers (DAG levels). // Tier 1 models run first, tier 2 models depend on tier 1 outputs. // Within each tier, models run concurrently. - conn := openDuckgresConn(t) + conn := openConn(t) defer func() { _ = conn.Close() }() // Raw source @@ -1476,7 +1560,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(modelID int) { defer wg.Done() - mconn := openDuckgresConn(t) + mconn := openConn(t) defer func() { _ = mconn.Close() }() _, err := mconn.Exec(fmt.Sprintf( @@ -1511,7 +1595,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(modelID int) { defer wg.Done() - mconn := openDuckgresConn(t) + mconn := openConn(t) defer func() { _ = mconn.Close() }() src := modelID % numTier1 @@ -1544,11 +1628,11 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { t.Logf("SQLMesh DAG pattern: %d succeeded, %d conflicts total", m.Successes, m.Conflicts) }) - benchSub(t, "concurrent_create_or_replace_as_select", func(t *testing.T, m *metric) { + benchSub(t, "concurrent_create_or_replace_as_select", latencyMs, func(t *testing.T, m *metric) { // CREATE OR REPLACE TABLE AS SELECT is the most conflict-prone pattern: // it drops the existing table and recreates it atomically, so concurrent // writers all race to replace the same target table. - conn := openDuckgresConn(t) + conn := openConn(t) defer func() { _ = conn.Close() }() // Seed a source table @@ -1580,7 +1664,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for r := range replacesPerWorker { @@ -1630,10 +1714,10 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { } }) - benchSub(t, "ctas_while_writing_source", func(t *testing.T, m *metric) { + benchSub(t, "ctas_while_writing_source", latencyMs, func(t *testing.T, m *metric) { // CTAS reading from a table while other connections are actively // writing to it — tests snapshot isolation of the source read. - conn := openDuckgresConn(t) + conn := openConn(t) defer func() { _ = conn.Close() }() mustExec(t, conn, "CREATE TABLE dl_ctas_live_source (id INTEGER, val TEXT)") @@ -1660,7 +1744,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(writerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for i := range writesPerWorker { @@ -1690,7 +1774,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(snapID int) { defer wg.Done() - sconn := openDuckgresConn(t) + sconn := openConn(t) defer func() { _ = sconn.Close() }() for i := range snapshotsPerWorker { @@ -1743,10 +1827,10 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { m.Successes, writeConflicts.Load(), ctasConflicts.Load()) }) - benchSub(t, "concurrent_replace_while_reading", func(t *testing.T, m *metric) { + benchSub(t, "concurrent_replace_while_reading", latencyMs, func(t *testing.T, m *metric) { // CREATE OR REPLACE while other connections are reading the same table. // Readers should either see the old or new data, never a partial state. - conn := openDuckgresConn(t) + conn := openConn(t) defer func() { _ = conn.Close() }() mustExec(t, conn, "CREATE TABLE dl_replace_source_a (id INTEGER, val TEXT)") @@ -1780,7 +1864,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(workerID int) { defer wg.Done() - wconn := openDuckgresConn(t) + wconn := openConn(t) defer func() { _ = wconn.Close() }() for r := range replacesPerWorker { @@ -1813,7 +1897,7 @@ func TestDuckLakeConcurrentTransactions(t *testing.T) { wg.Add(1) go func(readerID int) { defer wg.Done() - rconn := openDuckgresConn(t) + rconn := openConn(t) defer func() { _ = rconn.Close() }() for i := range readsPerWorker { diff --git a/tests/integration/harness.go b/tests/integration/harness.go index 064f129..7eefdf5 100644 --- a/tests/integration/harness.go +++ b/tests/integration/harness.go @@ -22,14 +22,15 @@ var duckLakeInfraServices = []string{"ducklake-metadata", "minio", "minio-init"} // TestHarness manages PostgreSQL and Duckgres instances for side-by-side testing type TestHarness struct { - PostgresDB *sql.DB - DuckgresDB *sql.DB - duckgresSrv *server.Server - tmpDir string - pgPort int - dgPort int - useDuckLake bool - mu sync.Mutex + PostgresDB *sql.DB + DuckgresDB *sql.DB + duckgresSrv *server.Server + latencyProxy *LatencyProxy + tmpDir string + pgPort int + dgPort int + useDuckLake bool + mu sync.Mutex } // HarnessConfig configures the test harness @@ -46,6 +47,10 @@ type HarnessConfig struct { DuckLakeMetadataPort int // MinIOPort is the port for MinIO S3 API (default: 39000) MinIOPort int + // MetadataLatency adds artificial one-way latency between DuckDB/DuckLake + // and the metadata PostgreSQL via a TCP proxy. Total RTT overhead = 2x this value. + // Zero means no proxy (direct connection). + MetadataLatency time.Duration } // DefaultConfig returns the default harness configuration @@ -143,8 +148,22 @@ func (h *TestHarness) startDuckgres(harnessCfg HarnessConfig) error { // Configure DuckLake if enabled if harnessCfg.UseDuckLake { + metadataPort := harnessCfg.DuckLakeMetadataPort + + // If latency injection is requested, start a TCP proxy in front of the + // metadata PostgreSQL and point DuckLake at the proxy port instead. + if harnessCfg.MetadataLatency > 0 { + target := fmt.Sprintf("127.0.0.1:%d", harnessCfg.DuckLakeMetadataPort) + proxy, err := NewLatencyProxy(target, harnessCfg.MetadataLatency) + if err != nil { + return fmt.Errorf("failed to start latency proxy: %w", err) + } + h.latencyProxy = proxy + metadataPort = proxy.Port() + } + cfg.DuckLake = server.DuckLakeConfig{ - MetadataStore: fmt.Sprintf("postgres:host=127.0.0.1 port=%d user=ducklake password=ducklake dbname=ducklake", harnessCfg.DuckLakeMetadataPort), + MetadataStore: fmt.Sprintf("postgres:host=127.0.0.1 port=%d user=ducklake password=ducklake dbname=ducklake", metadataPort), ObjectStore: "s3://ducklake/data/", S3Provider: "config", S3Endpoint: fmt.Sprintf("127.0.0.1:%d", harnessCfg.MinIOPort), @@ -547,6 +566,12 @@ func (h *TestHarness) Close() error { } } + if h.latencyProxy != nil { + if err := h.latencyProxy.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to close latency proxy: %w", err)) + } + } + if h.tmpDir != "" { if err := os.RemoveAll(h.tmpDir); err != nil { errs = append(errs, fmt.Errorf("failed to remove temp dir: %w", err)) diff --git a/tests/integration/latency_proxy.go b/tests/integration/latency_proxy.go new file mode 100644 index 0000000..7556ef6 --- /dev/null +++ b/tests/integration/latency_proxy.go @@ -0,0 +1,130 @@ +package integration + +import ( + "io" + "net" + "sync" + "time" +) + +// LatencyProxy is a TCP proxy that adds configurable bidirectional latency +// between a client (DuckDB/DuckLake) and a target (metadata PostgreSQL). +// Each direction gets the configured delay, so total RTT overhead = 2 * Latency. +type LatencyProxy struct { + listener net.Listener + target string + latency time.Duration + + mu sync.Mutex + closed bool + conns []net.Conn +} + +// NewLatencyProxy creates a latency proxy listening on a random port, +// forwarding to the given target address with the specified one-way latency. +func NewLatencyProxy(target string, latency time.Duration) (*LatencyProxy, error) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, err + } + p := &LatencyProxy{ + listener: ln, + target: target, + latency: latency, + } + go p.acceptLoop() + return p, nil +} + +// Port returns the local port the proxy is listening on. +func (p *LatencyProxy) Port() int { + return p.listener.Addr().(*net.TCPAddr).Port +} + +// Close shuts down the proxy and all active connections. +func (p *LatencyProxy) Close() error { + p.mu.Lock() + p.closed = true + conns := p.conns + p.conns = nil + p.mu.Unlock() + + for _, c := range conns { + _ = c.Close() + } + return p.listener.Close() +} + +func (p *LatencyProxy) acceptLoop() { + for { + client, err := p.listener.Accept() + if err != nil { + return + } + + p.mu.Lock() + if p.closed { + p.mu.Unlock() + _ = client.Close() + return + } + p.conns = append(p.conns, client) + p.mu.Unlock() + + go p.handleConn(client) + } +} + +func (p *LatencyProxy) handleConn(client net.Conn) { + upstream, err := net.DialTimeout("tcp", p.target, 5*time.Second) + if err != nil { + _ = client.Close() + return + } + + p.mu.Lock() + p.conns = append(p.conns, upstream) + p.mu.Unlock() + + var wg sync.WaitGroup + wg.Add(2) + + // client → upstream (with latency) + go func() { + defer wg.Done() + copyWithLatency(upstream, client, p.latency) + _ = upstream.Close() + }() + + // upstream → client (with latency) + go func() { + defer wg.Done() + copyWithLatency(client, upstream, p.latency) + _ = client.Close() + }() + + wg.Wait() +} + +// copyWithLatency copies data from src to dst, injecting a delay before each +// write. For zero latency this degrades to a plain io.Copy. +func copyWithLatency(dst, src net.Conn, latency time.Duration) { + if latency <= 0 { + _, _ = io.Copy(dst, src) + return + } + buf := make([]byte, 32*1024) + for { + n, readErr := src.Read(buf) + if n > 0 { + time.Sleep(latency) + _, writeErr := dst.Write(buf[:n]) + if writeErr != nil { + return + } + } + if readErr != nil { + return + } + } +} diff --git a/tests/integration/latency_proxy_test.go b/tests/integration/latency_proxy_test.go new file mode 100644 index 0000000..1e38667 --- /dev/null +++ b/tests/integration/latency_proxy_test.go @@ -0,0 +1,131 @@ +package integration + +import ( + "fmt" + "net" + "testing" + "time" +) + +func TestLatencyProxy(t *testing.T) { + // Start an echo server as the "upstream". + echo, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer echo.Close() + go func() { + for { + c, err := echo.Accept() + if err != nil { + return + } + go func(c net.Conn) { + defer c.Close() + buf := make([]byte, 1024) + for { + n, err := c.Read(buf) + if err != nil { + return + } + _, _ = c.Write(buf[:n]) + } + }(c) + } + }() + + echoAddr := echo.Addr().String() + + t.Run("zero_latency_forwards_data", func(t *testing.T) { + proxy, err := NewLatencyProxy(echoAddr, 0) + if err != nil { + t.Fatal(err) + } + defer proxy.Close() + + conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", proxy.Port())) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + msg := []byte("hello proxy") + _, err = conn.Write(msg) + if err != nil { + t.Fatal(err) + } + + buf := make([]byte, 64) + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + n, err := conn.Read(buf) + if err != nil { + t.Fatal(err) + } + if string(buf[:n]) != "hello proxy" { + t.Errorf("got %q, want %q", buf[:n], "hello proxy") + } + }) + + t.Run("adds_measurable_latency", func(t *testing.T) { + latency := 50 * time.Millisecond + proxy, err := NewLatencyProxy(echoAddr, latency) + if err != nil { + t.Fatal(err) + } + defer proxy.Close() + + conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", proxy.Port())) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + msg := []byte("ping") + start := time.Now() + _, _ = conn.Write(msg) + + buf := make([]byte, 64) + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + n, err := conn.Read(buf) + elapsed := time.Since(start) + if err != nil { + t.Fatal(err) + } + if string(buf[:n]) != "ping" { + t.Errorf("got %q, want %q", buf[:n], "ping") + } + + // Expect at least 2x latency (one delay per direction). + // Use 1.5x as lower bound to account for scheduling jitter. + minExpected := time.Duration(float64(latency) * 1.5) + if elapsed < minExpected { + t.Errorf("round-trip %v faster than expected minimum %v (latency=%v per direction)", elapsed, minExpected, latency) + } + t.Logf("round-trip with %v one-way latency: %v", latency, elapsed) + }) + + t.Run("close_terminates_connections", func(t *testing.T) { + proxy, err := NewLatencyProxy(echoAddr, 0) + if err != nil { + t.Fatal(err) + } + + conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", proxy.Port())) + if err != nil { + t.Fatal(err) + } + + _ = proxy.Close() + + // After close, writes should eventually fail. + conn.SetDeadline(time.Now().Add(time.Second)) + _, err = conn.Write([]byte("after close")) + if err == nil { + // Read should fail since upstream is gone + buf := make([]byte, 64) + _, err = conn.Read(buf) + } + // Either write or read should have errored + conn.Close() + }) +} From f685ef37db64010f009a87fee2c7a8fdaf969e2b Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Wed, 1 Apr 2026 11:22:51 -0700 Subject: [PATCH 2/4] Add sqlmesh_ctas_with_comments benchmark to test COMMENT ON TABLE conflicts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reproduces the full SQLMesh model execution pattern: each concurrent model does CREATE OR REPLACE TABLE ... AS SELECT followed by COMMENT ON TABLE to set the model description. The COMMENT is a separate DDL hitting the DuckLake metadata store, widening the conflict window — matching the prod log pattern: /* SQLMESH_PLAN: ... */ COMMENT ON TABLE "schema"."model" IS 'description' Co-Authored-By: Claude Opus 4.6 (1M context) --- .../integration/ducklake_concurrency_test.go | 116 ++++++++++++++++++ tests/integration/harness.go | 5 +- 2 files changed, 120 insertions(+), 1 deletion(-) diff --git a/tests/integration/ducklake_concurrency_test.go b/tests/integration/ducklake_concurrency_test.go index 03afa37..d9b4123 100644 --- a/tests/integration/ducklake_concurrency_test.go +++ b/tests/integration/ducklake_concurrency_test.go @@ -1505,6 +1505,122 @@ func runConcurrencyBenchmarks(t *testing.T, latencyMs int, openConn func(*testin } }) + benchSub(t, "sqlmesh_ctas_with_comments", latencyMs, func(t *testing.T, m *metric) { + // Reproduces the full SQLMesh model execution pattern more accurately: + // each model does CREATE OR REPLACE TABLE ... AS SELECT, then immediately + // runs COMMENT ON TABLE to set the model description. The COMMENT is a + // separate DDL that hits the DuckLake metadata store, widening the + // conflict window. This matches the prod log pattern: + // /* SQLMESH_PLAN: ... */ COMMENT ON TABLE "schema"."model" IS 'description' + conn := openConn(t) + defer func() { _ = conn.Close() }() + + numModels := 10 + if os.Getenv("DUCKGRES_STRESS") != "" { + numModels = 30 + } + + // Create shared source table + mustExec(t, conn, "CREATE TABLE dl_sqlmesh_comment_src (id INTEGER, ts BIGINT, category TEXT, val DOUBLE)") + for i := range numModels * 50 { + cat := fmt.Sprintf("cat_%d", i%numModels) + mustExec(t, conn, fmt.Sprintf( + "INSERT INTO dl_sqlmesh_comment_src VALUES (%d, %d, '%s', %f)", + i, int64(i)*1000, cat, float64(i)*1.5, + )) + } + // Pre-create target tables + for i := range numModels { + mustExec(t, conn, fmt.Sprintf( + "CREATE TABLE dl_sqlmesh_cmt_%d AS SELECT * FROM dl_sqlmesh_comment_src WHERE category = 'cat_%d'", + i, i, + )) + } + defer func() { + _, _ = conn.Exec("DROP TABLE IF EXISTS dl_sqlmesh_comment_src") + for i := range 30 { + _, _ = conn.Exec(fmt.Sprintf("DROP TABLE IF EXISTS dl_sqlmesh_cmt_%d", i)) + } + }() + + numRounds := 3 + if os.Getenv("DUCKGRES_STRESS") != "" { + numRounds = 5 + } + var totalConflicts, totalSuccesses, commentConflicts atomic.Int64 + + for round := range numRounds { + var wg sync.WaitGroup + errs := make(chan error, numModels) + + for model := range numModels { + wg.Add(1) + go func(modelID, roundID int) { + defer wg.Done() + mconn := openConn(t) + defer func() { _ = mconn.Close() }() + + cat := fmt.Sprintf("cat_%d", modelID) + tableName := fmt.Sprintf("dl_sqlmesh_cmt_%d", modelID) + + // Step 1: CREATE OR REPLACE TABLE AS SELECT (the model evaluation) + _, err := mconn.Exec(fmt.Sprintf( + "CREATE OR REPLACE TABLE %s AS SELECT * FROM dl_sqlmesh_comment_src WHERE category = '%s'", + tableName, cat, + )) + if err != nil { + if isTransactionConflict(err) || isAbortedTransaction(err) { + totalConflicts.Add(1) + recoverConnection(mconn) + return + } + errs <- fmt.Errorf("round %d model %d CTAS: %w", roundID, modelID, err) + return + } + totalSuccesses.Add(1) + + // Step 2: COMMENT ON TABLE (sets model description, just like SQLMesh does) + comment := fmt.Sprintf("Model %d: filtered by %s (round %d)", modelID, cat, roundID) + _, err = mconn.Exec(fmt.Sprintf( + "COMMENT ON TABLE %s IS '%s'", + tableName, comment, + )) + if err != nil { + if isTransactionConflict(err) || isAbortedTransaction(err) { + commentConflicts.Add(1) + recoverConnection(mconn) + return + } + // COMMENT ON TABLE may not be supported in DuckLake — log and continue + if strings.Contains(err.Error(), "Not implemented") || + strings.Contains(err.Error(), "not supported") { + t.Logf("COMMENT ON TABLE not supported: %v", err) + return + } + errs <- fmt.Errorf("round %d model %d COMMENT: %w", roundID, modelID, err) + return + } + }(model, round) + } + + wg.Wait() + close(errs) + for err := range errs { + t.Error(err) + } + } + + m.Successes, m.Conflicts = totalSuccesses.Load(), totalConflicts.Load()+commentConflicts.Load() + t.Logf("SQLMesh CTAS+COMMENT pattern: %d CTAS succeeded, %d CTAS conflicts, %d COMMENT conflicts across %d rounds of %d models", + totalSuccesses.Load(), totalConflicts.Load(), commentConflicts.Load(), numRounds, numModels) + + conflictRate := float64(m.Conflicts) / float64(m.Successes+m.Conflicts) * 100 + t.Logf("conflict rate: %.1f%% (CTAS: %.1f%%, COMMENT: %.1f%%)", + conflictRate, + float64(totalConflicts.Load())/float64(totalSuccesses.Load()+totalConflicts.Load())*100, + float64(commentConflicts.Load())/float64(totalSuccesses.Load()+commentConflicts.Load())*100) + }) + benchSub(t, "sqlmesh_ctas_with_deps", latencyMs, func(t *testing.T, m *metric) { // Extended SQLMesh pattern: models have dependency tiers (DAG levels). // Tier 1 models run first, tier 2 models depend on tier 1 outputs. diff --git a/tests/integration/harness.go b/tests/integration/harness.go index 7eefdf5..0ee8eed 100644 --- a/tests/integration/harness.go +++ b/tests/integration/harness.go @@ -519,7 +519,10 @@ func (h *TestHarness) cleanupDuckLakeTables() error { tables = append(tables, fmt.Sprintf("dl_cortas_target_%d", i)) } // SQLMesh CTAS reproduction tables - tables = append(tables, "dl_sqlmesh_source", "dl_sqlmesh_raw") + tables = append(tables, "dl_sqlmesh_source", "dl_sqlmesh_raw", "dl_sqlmesh_comment_src") + for i := range 30 { + tables = append(tables, fmt.Sprintf("dl_sqlmesh_cmt_%d", i)) + } for i := range 30 { tables = append(tables, fmt.Sprintf("dl_sqlmesh_model_%d", i)) } From 7f708b0eecc99cedd36b69a6dc2e2ca033d1c807 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Wed, 1 Apr 2026 21:08:28 -0700 Subject: [PATCH 3/4] Fix lint: handle error returns in latency proxy tests Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/integration/latency_proxy_test.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/tests/integration/latency_proxy_test.go b/tests/integration/latency_proxy_test.go index 1e38667..a3afe0c 100644 --- a/tests/integration/latency_proxy_test.go +++ b/tests/integration/latency_proxy_test.go @@ -13,7 +13,7 @@ func TestLatencyProxy(t *testing.T) { if err != nil { t.Fatal(err) } - defer echo.Close() + defer func() { _ = echo.Close() }() go func() { for { c, err := echo.Accept() @@ -21,7 +21,7 @@ func TestLatencyProxy(t *testing.T) { return } go func(c net.Conn) { - defer c.Close() + defer func() { _ = c.Close() }() buf := make([]byte, 1024) for { n, err := c.Read(buf) @@ -41,13 +41,13 @@ func TestLatencyProxy(t *testing.T) { if err != nil { t.Fatal(err) } - defer proxy.Close() + defer func() { _ = proxy.Close() }() conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", proxy.Port())) if err != nil { t.Fatal(err) } - defer conn.Close() + defer func() { _ = conn.Close() }() msg := []byte("hello proxy") _, err = conn.Write(msg) @@ -56,7 +56,7 @@ func TestLatencyProxy(t *testing.T) { } buf := make([]byte, 64) - conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + _ = conn.SetReadDeadline(time.Now().Add(2 * time.Second)) n, err := conn.Read(buf) if err != nil { t.Fatal(err) @@ -72,20 +72,20 @@ func TestLatencyProxy(t *testing.T) { if err != nil { t.Fatal(err) } - defer proxy.Close() + defer func() { _ = proxy.Close() }() conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", proxy.Port())) if err != nil { t.Fatal(err) } - defer conn.Close() + defer func() { _ = conn.Close() }() msg := []byte("ping") start := time.Now() _, _ = conn.Write(msg) buf := make([]byte, 64) - conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + _ = conn.SetReadDeadline(time.Now().Add(2 * time.Second)) n, err := conn.Read(buf) elapsed := time.Since(start) if err != nil { @@ -118,14 +118,13 @@ func TestLatencyProxy(t *testing.T) { _ = proxy.Close() // After close, writes should eventually fail. - conn.SetDeadline(time.Now().Add(time.Second)) + _ = conn.SetDeadline(time.Now().Add(time.Second)) _, err = conn.Write([]byte("after close")) if err == nil { // Read should fail since upstream is gone buf := make([]byte, 64) - _, err = conn.Read(buf) + _, _ = conn.Read(buf) } - // Either write or read should have errored - conn.Close() + _ = conn.Close() }) } From 5242f85a03c78deca504a09ff62df12002d862bb Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Wed, 1 Apr 2026 21:10:29 -0700 Subject: [PATCH 4/4] Add DuckLake concurrency & latency benchmark docs to integration README Documents how to run the concurrency benchmarks, latency sensitivity analysis, version matrix, JSON output schema, and environment variables. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/integration/README.md | 102 ++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/tests/integration/README.md b/tests/integration/README.md index 83ef996..4b68492 100644 --- a/tests/integration/README.md +++ b/tests/integration/README.md @@ -212,6 +212,108 @@ opts := CompareOptions{ } ``` +## DuckLake Concurrency & Latency Benchmarks + +The test suite includes benchmarks that measure DuckLake transaction conflict rates under concurrent load, and a latency sensitivity analysis that injects artificial metadata store latency to simulate remote RDS configurations. + +### Prerequisites + +The DuckLake benchmarks require the metadata PostgreSQL and MinIO infrastructure: + +```bash +# Start DuckLake infrastructure +docker compose -f tests/integration/docker-compose.yml up -d ducklake-metadata minio minio-init +``` + +### Running concurrency benchmarks + +```bash +# Run all concurrency tests (default: 0ms latency) +just test-ducklake-concurrency + +# Or directly: +go test -v -run TestDuckLakeConcurrentTransactions -timeout 300s ./tests/integration/ +``` + +### Running latency sensitivity analysis + +The `DUCKGRES_BENCH_LATENCIES` environment variable controls which latency levels to sweep. Each value is a one-way latency injected via a TCP proxy between DuckDB/DuckLake and the metadata PostgreSQL (total RTT overhead = 2x the configured value). + +```bash +# Sweep multiple latency levels +just bench-ducklake-latency 0ms,10ms,25ms,50ms + +# Or directly: +DUCKGRES_BENCH_LATENCIES=0ms,10ms,50ms \ + go test -v -run TestDuckLakeConcurrentTransactions -timeout 3600s ./tests/integration/ + +# Write structured JSON results for comparison +DUCKGRES_BENCH_LATENCIES=0ms,10ms \ +DUCKGRES_BENCH_OUT=results.json \ + go test -v -run TestDuckLakeConcurrentTransactions -timeout 3600s ./tests/integration/ +``` + +**Important:** Higher latency levels make tests significantly slower since every metadata round-trip pays the extra RTT. Budget roughly: +- `0ms`: ~2 minutes for all 21 tests +- `10ms`: ~25 minutes +- `20ms`: ~45 minutes +- `50ms`: may exceed 1 hour + +### Version matrix + +Compare conflict rates across DuckDB/DuckLake versions, optionally combined with latency: + +```bash +# Version matrix (current version vs others) +just bench-ducklake-matrix + +# Full version × latency matrix +DUCKGRES_BENCH_LATENCIES=0ms,10ms just bench-ducklake-matrix +``` + +### How the latency proxy works + +For non-zero latency, a TCP proxy sits between DuckDB's DuckLake extension and the metadata PostgreSQL: + +``` +DuckDB → DuckLake ext → [TCP Proxy (+Xms per direction)] → Metadata PostgreSQL (port 35433) +``` + +Each read/write through the proxy gets a `time.Sleep(latency)` before forwarding. For `0ms`, no proxy is used (zero overhead). Each latency level gets its own dedicated duckgres server instance. + +### JSON output schema + +When `DUCKGRES_BENCH_OUT` is set, the test writes a JSON report: + +```json +{ + "duckdb_version": "v1.5.1", + "ducklake_version": "67480b1d", + "latencies_tested_ms": [0, 10], + "timestamp": "2026-04-01T19:12:47Z", + "metrics": [ + { + "test": "concurrent_updates_same_rows", + "metadata_latency_ms": 0, + "successes": 103, + "conflicts": 77, + "conflict_rate_pct": 42.8, + "duration_sec": 5.2, + "throughput_ops_sec": 19.8 + } + ] +} +``` + +### Environment variables + +| Variable | Description | Default | +|----------|-------------|---------| +| `DUCKGRES_BENCH_LATENCIES` | Comma-separated latency levels (e.g. `0ms,10ms,50ms`) | `0ms` | +| `DUCKGRES_BENCH_OUT` | Path to write JSON results | *(none, no file written)* | +| `DUCKGRES_STRESS` | Set to any value to increase SQLMesh model count (10→30) | *(unset)* | +| `DUCKGRES_TEST_NO_DUCKLAKE` | Set to `1` to disable DuckLake mode | *(unset)* | + ## Troubleshooting ### PostgreSQL container won't start