From c8c8a2968d95539eccd4964a6a5effc7fbff1031 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Thu, 2 Apr 2026 13:15:47 -0700 Subject: [PATCH 1/3] Fix DuckLake transaction conflict and SSL connection loss handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two distinct failure modes were causing elevated commit failures after the DuckDB 1.5.1/DuckLake 0.4 upgrade: 1. SSL connection loss (35 occurrences): isTransientDuckLakeError checked for "server closed the connection unexpectedly" but the actual error says "SSL connection has been closed unexpectedly" — the SSL variant was never detected, so retryOnTransient never fired. 2. Transaction conflicts: concurrent DuckLake model builds conflict on commit due to global snapshot IDs. Add retryOnConflict with exponential backoff + jitter (50ms initial, 5 retries, 2s cap) for autocommit queries in both control-plane and standalone modes. Also maps transaction conflicts to SQLSTATE 40001 (serialization_failure) so PG-aware clients know to retry, and adds Prometheus counters for conflict tracking. Co-Authored-By: Claude Opus 4.6 --- duckdbservice/flight_handler.go | 18 +++++++ duckdbservice/metrics.go | 28 +++++++++++ duckdbservice/transient.go | 60 ++++++++++++++++++++++ duckdbservice/transient_test.go | 79 +++++++++++++++++++++++++++++ server/conn.go | 89 +++++++++++++++++++++++++-------- server/server.go | 20 ++++++++ server/transient.go | 48 ++++++++++++++++++ server/transient_test.go | 75 +++++++++++++++++++++++++++ 8 files changed, 397 insertions(+), 20 deletions(-) create mode 100644 duckdbservice/metrics.go diff --git a/duckdbservice/flight_handler.go b/duckdbservice/flight_handler.go index 8e070974..3fcf2bf6 100644 --- a/duckdbservice/flight_handler.go +++ b/duckdbservice/flight_handler.go @@ -287,6 +287,12 @@ func (h *FlightSQLHandler) GetFlightInfoStatement(ctx context.Context, cmd fligh schema, err := retryOnTransient(func() (*arrow.Schema, error) { return GetQuerySchema(ctx, session.Conn, query, tx) }) + if err != nil && tx == nil && isDuckLakeTransactionConflict(err) { + ducklakeConflictTotal.Inc() + schema, err = retryOnConflict(func() (*arrow.Schema, error) { + return GetQuerySchema(ctx, session.Conn, query, tx) + }) + } if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to prepare query: %v", err) } @@ -367,6 +373,12 @@ func (h *FlightSQLHandler) DoGetStatement(ctx context.Context, ticket flightsql. } return session.Conn.QueryContext(ctx, handle.Query) }) + if qerr != nil && tx == nil && isDuckLakeTransactionConflict(qerr) { + ducklakeConflictTotal.Inc() + rows, qerr = retryOnConflict(func() (*sql.Rows, error) { + return session.Conn.QueryContext(ctx, handle.Query) + }) + } if qerr != nil { ch <- flight.StreamChunk{Err: qerr} return @@ -419,6 +431,12 @@ func (h *FlightSQLHandler) DoPutCommandStatementUpdate(ctx context.Context, } return session.Conn.ExecContext(ctx, query) }) + if execErr != nil && tx == nil && isDuckLakeTransactionConflict(execErr) { + ducklakeConflictTotal.Inc() + result, execErr = retryOnConflict(func() (sql.Result, error) { + return session.Conn.ExecContext(ctx, query) + }) + } if execErr != nil { return 0, status.Errorf(codes.InvalidArgument, "failed to execute update: %v", execErr) } diff --git a/duckdbservice/metrics.go b/duckdbservice/metrics.go new file mode 100644 index 00000000..52efac80 --- /dev/null +++ b/duckdbservice/metrics.go @@ -0,0 +1,28 @@ +package duckdbservice + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Metrics for DuckLake transaction conflict tracking in the Flight SQL worker. +// These use the "duckgres_worker_" prefix to distinguish from standalone-mode +// metrics defined in server/server.go (which use "duckgres_ducklake_"). +var ( + ducklakeConflictTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_worker_ducklake_conflict_total", + Help: "Total number of DuckLake transaction conflicts encountered (worker)", + }) + ducklakeConflictRetriesTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_worker_ducklake_conflict_retries_total", + Help: "Total number of DuckLake transaction conflict retry attempts (worker)", + }) + ducklakeConflictRetrySuccessesTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_worker_ducklake_conflict_retry_successes_total", + Help: "Total number of DuckLake transaction conflict retries that succeeded (worker)", + }) + ducklakeConflictRetriesExhaustedTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_worker_ducklake_conflict_retries_exhausted_total", + Help: "Total number of DuckLake transaction conflicts where all retries were exhausted (worker)", + }) +) diff --git a/duckdbservice/transient.go b/duckdbservice/transient.go index 8c896b4d..11238178 100644 --- a/duckdbservice/transient.go +++ b/duckdbservice/transient.go @@ -1,7 +1,9 @@ package duckdbservice import ( + "fmt" "log/slog" + "math/rand/v2" "strings" "time" ) @@ -21,6 +23,8 @@ func isTransientDuckLakeError(err error) bool { strings.Contains(msg, "connection reset by peer") || strings.Contains(msg, "connection timed out") || strings.Contains(msg, "server closed the connection unexpectedly") || + strings.Contains(msg, "SSL connection has been closed unexpectedly") || + strings.Contains(msg, "Current transaction is aborted") || strings.Contains(msg, "no route to host") || strings.Contains(msg, "network is unreachable") } @@ -59,3 +63,59 @@ func retryOnTransient[T any](fn func() (T, error)) (T, error) { slog.Error("DuckLake retries exhausted.", "attempts", transientMaxRetries+1, "error", err) return result, err } + +// isDuckLakeTransactionConflict returns true if the error is a DuckLake +// transaction conflict. These occur when concurrent DuckLake transactions +// try to commit overlapping changes. DuckLake uses global snapshot IDs, so +// even writes to unrelated tables can conflict under concurrency. +func isDuckLakeTransactionConflict(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), "Transaction conflict") +} + +const ( + conflictMaxRetries = 5 + conflictInitialBackoff = 50 * time.Millisecond + conflictMaxBackoff = 2 * time.Second +) + +// retryOnConflict retries fn on DuckLake transaction conflicts with +// exponential backoff and jitter (50-100% of backoff interval). +// Only used for autocommit queries — user-managed transactions propagate +// the error since the entire transaction is invalid after a conflict. +func retryOnConflict[T any](fn func() (T, error)) (T, error) { + backoff := conflictInitialBackoff + for attempt := 1; attempt <= conflictMaxRetries; attempt++ { + ducklakeConflictRetriesTotal.Inc() + + // Jitter: 50-100% of backoff to decorrelate retry storms. + jittered := time.Duration(float64(backoff) * (0.5 + rand.Float64()*0.5)) + slog.Warn("DuckLake transaction conflict, retrying.", + "attempt", attempt, "max_retries", conflictMaxRetries, + "backoff", jittered) + + time.Sleep(jittered) + + result, err := fn() + if err == nil { + ducklakeConflictRetrySuccessesTotal.Inc() + slog.Info("DuckLake conflict retry succeeded.", "attempt", attempt) + return result, err + } + if !isDuckLakeTransactionConflict(err) { + return result, err + } + + backoff *= 2 + if backoff > conflictMaxBackoff { + backoff = conflictMaxBackoff + } + } + + ducklakeConflictRetriesExhaustedTotal.Inc() + var zero T + slog.Error("DuckLake conflict retries exhausted.", "attempts", conflictMaxRetries) + return zero, fmt.Errorf("DuckLake transaction conflict: retries exhausted after %d attempts", conflictMaxRetries) +} diff --git a/duckdbservice/transient_test.go b/duckdbservice/transient_test.go index 7b824d5b..69857cd4 100644 --- a/duckdbservice/transient_test.go +++ b/duckdbservice/transient_test.go @@ -19,10 +19,13 @@ func TestIsTransientDuckLakeError(t *testing.T) { {"connection reset", errors.New("read tcp: connection reset by peer"), true}, {"connection timed out", errors.New("connection timed out"), true}, {"server closed", errors.New("server closed the connection unexpectedly"), true}, + {"SSL closed", errors.New(`Failed to execute query "COMMIT": SSL connection has been closed unexpectedly`), true}, + {"current transaction aborted", errors.New("Current transaction is aborted, commands ignored until end of transaction block"), true}, {"no route", errors.New("no route to host"), true}, {"network unreachable", errors.New("network is unreachable"), true}, {"auth error", errors.New("password authentication failed for user"), false}, {"table not found", errors.New("Table with name foo does not exist"), false}, + {"transaction conflict is not transient", errors.New(`Transaction conflict - attempting to insert into table with index "29784"`), false}, } for _, tt := range tests { @@ -69,3 +72,79 @@ func TestRetryOnTransientNoRetryForNonTransient(t *testing.T) { t.Fatalf("expected 1 call (no retry for non-transient), got %d", calls) } } + +func TestIsDuckLakeTransactionConflict(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + {"nil", nil, false}, + {"generic error", errors.New("syntax error"), false}, + {"transaction conflict", errors.New(`Transaction conflict - attempting to insert into table with index "29784"`), true}, + {"transaction conflict variant", errors.New("Transaction conflict on commit"), true}, + {"SSL closed is not conflict", errors.New("SSL connection has been closed unexpectedly"), false}, + {"connection refused is not conflict", errors.New("Connection refused"), false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isDuckLakeTransactionConflict(tt.err); got != tt.expected { + t.Errorf("isDuckLakeTransactionConflict(%v) = %v, want %v", tt.err, got, tt.expected) + } + }) + } +} + +func TestRetryOnConflictSucceedsAfterRetry(t *testing.T) { + calls := 0 + result, err := retryOnConflict(func() (string, error) { + calls++ + if calls <= 2 { + return "", errors.New(`Transaction conflict - attempting to insert into table with index "29784"`) + } + return "ok", nil + }) + + if err != nil { + t.Fatalf("expected success, got error: %v", err) + } + if result != "ok" { + t.Fatalf("expected result 'ok', got %q", result) + } + // Initial call fails (handled externally), then retryOnConflict is called: + // attempt 1 fails (calls=2), attempt 2 succeeds (calls=3) + if calls != 3 { + t.Fatalf("expected 3 calls, got %d", calls) + } +} + +func TestRetryOnConflictExhaustsRetries(t *testing.T) { + calls := 0 + _, err := retryOnConflict(func() (string, error) { + calls++ + return "", errors.New("Transaction conflict on commit") + }) + + if err == nil { + t.Fatal("expected error after exhausting retries") + } + if calls != conflictMaxRetries { + t.Fatalf("expected %d calls, got %d", conflictMaxRetries, calls) + } +} + +func TestRetryOnConflictNoRetryForNonConflict(t *testing.T) { + calls := 0 + _, err := retryOnConflict(func() (string, error) { + calls++ + return "", errors.New("syntax error at position 42") + }) + + if err == nil { + t.Fatal("expected error") + } + if calls != 1 { + t.Fatalf("expected 1 call (no retry for non-conflict), got %d", calls) + } +} diff --git a/server/conn.go b/server/conn.go index cacf1cf5..9b5d7dc4 100644 --- a/server/conn.go +++ b/server/conn.go @@ -359,6 +359,19 @@ func isDuckLakeMetadataConnectionLost(err error) bool { strings.Contains(msg, "SSL connection has been closed unexpectedly") } +// classifyErrorCode returns the most appropriate PostgreSQL SQLSTATE for a +// DuckDB error. Transaction conflicts get 40001 (serialization_failure), which +// signals PG-aware clients to retry. Query cancellations get 57014. +func classifyErrorCode(err error) string { + if isQueryCancelled(err) { + return "57014" + } + if isDuckLakeTransactionConflict(err) { + return "40001" // serialization_failure — client should retry + } + return "42000" +} + // logQueryError logs a query execution failure with additional context for // DuckLake-specific errors (transaction conflicts and metadata connection loss). func logQueryError(user, query string, err error) { @@ -1079,17 +1092,22 @@ func (c *clientConn) handleQuery(body []byte) error { execResult, err = c.executor.ExecContext(ctx, alteredQuery) } } + // Autocommit retry on DuckLake transaction conflicts + if err != nil && c.txStatus == txStatusIdle && isDuckLakeTransactionConflict(err) { + ducklakeConflictTotal.Inc() + execResult, err = retryOnConflict(func() (ExecResult, error) { + return c.executor.ExecContext(ctx, query) + }) + } if err != nil { - errCode := "42000" + errCode := classifyErrorCode(err) errMsg := err.Error() if isQueryCancelled(err) { - errCode = "57014" errMsg = "canceling statement due to user request" - c.sendError("ERROR", errCode, errMsg) } else { logQueryError(c.username, query, err) - c.sendError("ERROR", errCode, errMsg) } + c.sendError("ERROR", errCode, errMsg) c.setTxError() c.logQuery(start, originalQuery, query, cmdType, 0, 0, errCode, errMsg, "simple") _ = writeReadyForQuery(c.writer, c.txStatus) @@ -1136,13 +1154,22 @@ func (c *clientConn) executeQueryDirect(query, cmdType string) error { defer cleanup() result, err := c.executor.ExecContext(ctx, query) + // Autocommit retry on DuckLake transaction conflicts + if err != nil && c.txStatus == txStatusIdle && isDuckLakeTransactionConflict(err) { + ducklakeConflictTotal.Inc() + result, err = retryOnConflict(func() (ExecResult, error) { + return c.executor.ExecContext(ctx, query) + }) + } if err != nil { + errCode := classifyErrorCode(err) + errMsg := err.Error() if isQueryCancelled(err) { - c.sendError("ERROR", "57014", "canceling statement due to user request") + errMsg = "canceling statement due to user request" } else { logQueryError(c.username, query, err) - c.sendError("ERROR", "42000", err.Error()) } + c.sendError("ERROR", errCode, errMsg) c.setTxError() _ = writeReadyForQuery(c.writer, c.txStatus) _ = c.writer.Flush() @@ -1171,16 +1198,14 @@ func (c *clientConn) executeSelectQuery(query string, cmdType string) (int64, st rows, err := c.executor.QueryContext(ctx, query) if err != nil { - errCode := "42000" + errCode := classifyErrorCode(err) errMsg := err.Error() if isQueryCancelled(err) { - errCode = "57014" errMsg = "canceling statement due to user request" - c.sendError("ERROR", errCode, errMsg) } else { logQueryError(c.username, query, err) - c.sendError("ERROR", errCode, errMsg) } + c.sendError("ERROR", errCode, errMsg) c.setTxError() _ = writeReadyForQuery(c.writer, c.txStatus) _ = c.writer.Flush() @@ -1522,11 +1547,17 @@ func (c *clientConn) executeSingleStatement(query string) (errSent bool, fatalEr execResult, err = c.executor.ExecContext(ctx, alteredQuery) } } + // Autocommit retry on DuckLake transaction conflicts + if err != nil && c.txStatus == txStatusIdle && isDuckLakeTransactionConflict(err) { + ducklakeConflictTotal.Inc() + execResult, err = retryOnConflict(func() (ExecResult, error) { + return c.executor.ExecContext(ctx, executedQuery) + }) + } if err != nil { - errCode := "42000" + errCode := classifyErrorCode(err) errMsg := err.Error() if isQueryCancelled(err) { - errCode = "57014" errMsg = "canceling statement due to user request" } else { logQueryError(c.username, executedQuery, err) @@ -1555,10 +1586,9 @@ func (c *clientConn) executeSingleStatement(query string) (errSent bool, fatalEr rows, err := c.executor.QueryContext(ctx, executedQuery) if err != nil { - errCode := "42000" + errCode := classifyErrorCode(err) errMsg := err.Error() if isQueryCancelled(err) { - errCode = "57014" errMsg = "canceling statement due to user request" } else { logQueryError(c.username, executedQuery, err) @@ -4838,11 +4868,24 @@ func (c *clientConn) handleExecute(body []byte) { result, err = c.executor.Exec(alteredQuery, args...) } } + // Autocommit retry on DuckLake transaction conflicts + if err != nil && c.txStatus == txStatusIdle && isDuckLakeTransactionConflict(err) { + ducklakeConflictTotal.Inc() + result, err = retryOnConflict(func() (ExecResult, error) { + return c.executor.Exec(convertedQuery, args...) + }) + } if err != nil { - logQueryError(c.username, convertedQuery, err) - c.sendError("ERROR", "42000", err.Error()) + errCode := classifyErrorCode(err) + errMsg := err.Error() + if isQueryCancelled(err) { + errMsg = "canceling statement due to user request" + } else { + logQueryError(c.username, convertedQuery, err) + } + c.sendError("ERROR", errCode, errMsg) c.setTxError() - c.logQuery(start, originalQuery, convertedQuery, cmdType, 0, 0, "42000", err.Error(), "extended") + c.logQuery(start, originalQuery, convertedQuery, cmdType, 0, 0, errCode, errMsg, "extended") return } } @@ -4860,10 +4903,16 @@ func (c *clientConn) handleExecute(body []byte) { // Result-returning query: use Query with converted query rows, err := c.executor.Query(convertedQuery, args...) if err != nil { - logQueryError(c.username, convertedQuery, err) - c.sendError("ERROR", "42000", err.Error()) + errCode := classifyErrorCode(err) + errMsg := err.Error() + if isQueryCancelled(err) { + errMsg = "canceling statement due to user request" + } else { + logQueryError(c.username, convertedQuery, err) + } + c.sendError("ERROR", errCode, errMsg) c.setTxError() - c.logQuery(start, originalQuery, convertedQuery, cmdType, 0, 0, "42000", err.Error(), "extended") + c.logQuery(start, originalQuery, convertedQuery, cmdType, 0, 0, errCode, errMsg, "extended") return } defer func() { _ = rows.Close() }() diff --git a/server/server.go b/server/server.go index 8f037a00..fff4ac28 100644 --- a/server/server.go +++ b/server/server.go @@ -91,6 +91,26 @@ var queryCancellationsCounter = promauto.NewCounter(prometheus.CounterOpts{ Help: "Total number of queries cancelled via cancel request", }) +var ducklakeConflictTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_ducklake_conflict_total", + Help: "Total number of DuckLake transaction conflicts encountered", +}) + +var ducklakeConflictRetriesTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_ducklake_conflict_retries_total", + Help: "Total number of DuckLake transaction conflict retry attempts", +}) + +var ducklakeConflictRetrySuccessesTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_ducklake_conflict_retry_successes_total", + Help: "Total number of DuckLake transaction conflict retries that succeeded", +}) + +var ducklakeConflictRetriesExhaustedTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_ducklake_conflict_retries_exhausted_total", + Help: "Total number of DuckLake transaction conflicts where all retries were exhausted", +}) + // BackendKey uniquely identifies a backend connection for cancel requests type BackendKey struct { Pid int32 diff --git a/server/transient.go b/server/transient.go index efa02fc9..e80340cd 100644 --- a/server/transient.go +++ b/server/transient.go @@ -1,7 +1,9 @@ package server import ( + "fmt" "log/slog" + "math/rand/v2" "strings" "time" ) @@ -21,6 +23,8 @@ func isTransientDuckLakeError(err error) bool { strings.Contains(msg, "connection reset by peer") || strings.Contains(msg, "connection timed out") || strings.Contains(msg, "server closed the connection unexpectedly") || + strings.Contains(msg, "SSL connection has been closed unexpectedly") || + strings.Contains(msg, "Current transaction is aborted") || strings.Contains(msg, "no route to host") || strings.Contains(msg, "network is unreachable") } @@ -58,3 +62,47 @@ func retryOnTransientAttach(fn func() error) error { slog.Error("DuckLake attach retries exhausted.", "attempts", transientMaxRetries+1, "error", err) return err } + +const ( + conflictMaxRetries = 5 + conflictInitialBackoff = 50 * time.Millisecond + conflictMaxBackoff = 2 * time.Second +) + +// retryOnConflict retries fn on DuckLake transaction conflicts with +// exponential backoff and jitter (50-100% of backoff interval). +// Only used for autocommit queries — user-managed transactions propagate +// the error since the entire transaction is invalid after a conflict. +func retryOnConflict[T any](fn func() (T, error)) (T, error) { + backoff := conflictInitialBackoff + for attempt := 1; attempt <= conflictMaxRetries; attempt++ { + ducklakeConflictRetriesTotal.Inc() + + jittered := time.Duration(float64(backoff) * (0.5 + rand.Float64()*0.5)) + slog.Warn("DuckLake transaction conflict, retrying.", + "attempt", attempt, "max_retries", conflictMaxRetries, + "backoff", jittered) + + time.Sleep(jittered) + + result, err := fn() + if err == nil { + ducklakeConflictRetrySuccessesTotal.Inc() + slog.Info("DuckLake conflict retry succeeded.", "attempt", attempt) + return result, err + } + if !isDuckLakeTransactionConflict(err) { + return result, err + } + + backoff *= 2 + if backoff > conflictMaxBackoff { + backoff = conflictMaxBackoff + } + } + + ducklakeConflictRetriesExhaustedTotal.Inc() + var zero T + slog.Error("DuckLake conflict retries exhausted.", "attempts", conflictMaxRetries) + return zero, fmt.Errorf("DuckLake transaction conflict: retries exhausted after %d attempts", conflictMaxRetries) +} diff --git a/server/transient_test.go b/server/transient_test.go index 056fcbb7..c5bf42ab 100644 --- a/server/transient_test.go +++ b/server/transient_test.go @@ -19,10 +19,13 @@ func TestIsTransientDuckLakeError(t *testing.T) { {"connection reset", errors.New("read tcp: connection reset by peer"), true}, {"connection timed out", errors.New("connection timed out"), true}, {"server closed", errors.New("server closed the connection unexpectedly"), true}, + {"SSL closed", errors.New(`Failed to execute query "COMMIT": SSL connection has been closed unexpectedly`), true}, + {"current transaction aborted", errors.New("Current transaction is aborted, commands ignored until end of transaction block"), true}, {"no route", errors.New("no route to host"), true}, {"network unreachable", errors.New("network is unreachable"), true}, {"auth error", errors.New("password authentication failed for user"), false}, {"table not found", errors.New("Table with name foo does not exist"), false}, + {"transaction conflict is not transient", errors.New(`Transaction conflict - attempting to insert into table with index "29784"`), false}, } for _, tt := range tests { @@ -81,3 +84,75 @@ func TestRetryOnTransientAttachNoRetryForNonTransient(t *testing.T) { t.Fatalf("expected 1 call (no retry for non-transient), got %d", calls) } } + +func TestRetryOnConflictSucceedsAfterRetry(t *testing.T) { + calls := 0 + result, err := retryOnConflict(func() (string, error) { + calls++ + if calls <= 2 { + return "", errors.New(`Transaction conflict - attempting to insert into table with index "29784"`) + } + return "ok", nil + }) + + if err != nil { + t.Fatalf("expected success, got error: %v", err) + } + if result != "ok" { + t.Fatalf("expected result 'ok', got %q", result) + } + if calls != 3 { + t.Fatalf("expected 3 calls, got %d", calls) + } +} + +func TestRetryOnConflictExhaustsRetries(t *testing.T) { + calls := 0 + _, err := retryOnConflict(func() (string, error) { + calls++ + return "", errors.New("Transaction conflict on commit") + }) + + if err == nil { + t.Fatal("expected error after exhausting retries") + } + if calls != conflictMaxRetries { + t.Fatalf("expected %d calls, got %d", conflictMaxRetries, calls) + } +} + +func TestRetryOnConflictNoRetryForNonConflict(t *testing.T) { + calls := 0 + _, err := retryOnConflict(func() (string, error) { + calls++ + return "", errors.New("syntax error at position 42") + }) + + if err == nil { + t.Fatal("expected error") + } + if calls != 1 { + t.Fatalf("expected 1 call (no retry for non-conflict), got %d", calls) + } +} + +func TestClassifyErrorCode(t *testing.T) { + tests := []struct { + name string + err error + expected string + }{ + {"transaction conflict", errors.New("Transaction conflict on commit"), "40001"}, + {"query cancelled", errors.New("context canceled"), "57014"}, + {"generic error", errors.New("syntax error"), "42000"}, + {"SSL closed is not conflict", errors.New("SSL connection has been closed unexpectedly"), "42000"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := classifyErrorCode(tt.err); got != tt.expected { + t.Errorf("classifyErrorCode(%v) = %q, want %q", tt.err, got, tt.expected) + } + }) + } +} From c0190550ce385d27571fede179f2f84154269045 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Thu, 2 Apr 2026 13:46:19 -0700 Subject: [PATCH 2/3] Address code review feedback on conflict retry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix indentation in executeQueryDirect retryOnConflict closure - Preserve last DuckLake error in retryOnConflict exhaustion message (wraps with %w so callers see the actual conflict details) - Add conflict retry to executeSelectQuery for symmetry with Flight SQL handler's GetFlightInfoStatement (SELECT can conflict during schema probing) - Add comments explaining transient→conflict retry chaining behavior - Fix misleading test names and comments for non-conflict error case - Update exhaustion tests for the extra final fn() call Co-Authored-By: Claude Opus 4.6 (1M context) --- duckdbservice/flight_handler.go | 5 +++++ duckdbservice/transient.go | 10 ++++++++-- duckdbservice/transient_test.go | 16 ++++++++++++---- server/conn.go | 12 ++++++++++-- server/transient.go | 10 ++++++++-- server/transient_test.go | 16 ++++++++++++---- 6 files changed, 55 insertions(+), 14 deletions(-) diff --git a/duckdbservice/flight_handler.go b/duckdbservice/flight_handler.go index 3fcf2bf6..752b9348 100644 --- a/duckdbservice/flight_handler.go +++ b/duckdbservice/flight_handler.go @@ -287,6 +287,9 @@ func (h *FlightSQLHandler) GetFlightInfoStatement(ctx context.Context, cmd fligh schema, err := retryOnTransient(func() (*arrow.Schema, error) { return GetQuerySchema(ctx, session.Conn, query, tx) }) + // Conflict retry for autocommit only. Note: if retryOnTransient exhausted on a + // transient error that also matches "Transaction conflict", this chains into + // conflict retry — acceptable since the error patterns are distinct in practice. if err != nil && tx == nil && isDuckLakeTransactionConflict(err) { ducklakeConflictTotal.Inc() schema, err = retryOnConflict(func() (*arrow.Schema, error) { @@ -373,6 +376,7 @@ func (h *FlightSQLHandler) DoGetStatement(ctx context.Context, ticket flightsql. } return session.Conn.QueryContext(ctx, handle.Query) }) + // Conflict retry for autocommit only (see GetFlightInfoStatement comment). if qerr != nil && tx == nil && isDuckLakeTransactionConflict(qerr) { ducklakeConflictTotal.Inc() rows, qerr = retryOnConflict(func() (*sql.Rows, error) { @@ -431,6 +435,7 @@ func (h *FlightSQLHandler) DoPutCommandStatementUpdate(ctx context.Context, } return session.Conn.ExecContext(ctx, query) }) + // Conflict retry for autocommit only (see GetFlightInfoStatement comment). if execErr != nil && tx == nil && isDuckLakeTransactionConflict(execErr) { ducklakeConflictTotal.Inc() result, execErr = retryOnConflict(func() (sql.Result, error) { diff --git a/duckdbservice/transient.go b/duckdbservice/transient.go index 11238178..f5ec7aeb 100644 --- a/duckdbservice/transient.go +++ b/duckdbservice/transient.go @@ -116,6 +116,12 @@ func retryOnConflict[T any](fn func() (T, error)) (T, error) { ducklakeConflictRetriesExhaustedTotal.Inc() var zero T - slog.Error("DuckLake conflict retries exhausted.", "attempts", conflictMaxRetries) - return zero, fmt.Errorf("DuckLake transaction conflict: retries exhausted after %d attempts", conflictMaxRetries) + // Re-run fn one last time to capture the final error for the caller. + result, lastErr := fn() + if lastErr == nil { + ducklakeConflictRetrySuccessesTotal.Inc() + return result, nil + } + slog.Error("DuckLake conflict retries exhausted.", "attempts", conflictMaxRetries, "error", lastErr) + return zero, fmt.Errorf("DuckLake transaction conflict: retries exhausted after %d attempts: %w", conflictMaxRetries, lastErr) } diff --git a/duckdbservice/transient_test.go b/duckdbservice/transient_test.go index 69857cd4..a5106235 100644 --- a/duckdbservice/transient_test.go +++ b/duckdbservice/transient_test.go @@ -2,6 +2,7 @@ package duckdbservice import ( "errors" + "strings" "testing" ) @@ -129,12 +130,17 @@ func TestRetryOnConflictExhaustsRetries(t *testing.T) { if err == nil { t.Fatal("expected error after exhausting retries") } - if calls != conflictMaxRetries { - t.Fatalf("expected %d calls, got %d", conflictMaxRetries, calls) + // conflictMaxRetries attempts in the loop + 1 final attempt to capture the error. + expected := conflictMaxRetries + 1 + if calls != expected { + t.Fatalf("expected %d calls (%d retries + 1 final), got %d", expected, conflictMaxRetries, calls) + } + if !strings.Contains(err.Error(), "Transaction conflict on commit") { + t.Fatalf("expected wrapped original error, got: %v", err) } } -func TestRetryOnConflictNoRetryForNonConflict(t *testing.T) { +func TestRetryOnConflictStopsOnNonConflictError(t *testing.T) { calls := 0 _, err := retryOnConflict(func() (string, error) { calls++ @@ -144,7 +150,9 @@ func TestRetryOnConflictNoRetryForNonConflict(t *testing.T) { if err == nil { t.Fatal("expected error") } + // retryOnConflict always makes one attempt (the first retry); when the + // error is not a transaction conflict it stops immediately. if calls != 1 { - t.Fatalf("expected 1 call (no retry for non-conflict), got %d", calls) + t.Fatalf("expected 1 call (stop on non-conflict error), got %d", calls) } } diff --git a/server/conn.go b/server/conn.go index 9b5d7dc4..da439000 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1158,8 +1158,8 @@ func (c *clientConn) executeQueryDirect(query, cmdType string) error { if err != nil && c.txStatus == txStatusIdle && isDuckLakeTransactionConflict(err) { ducklakeConflictTotal.Inc() result, err = retryOnConflict(func() (ExecResult, error) { - return c.executor.ExecContext(ctx, query) - }) + return c.executor.ExecContext(ctx, query) + }) } if err != nil { errCode := classifyErrorCode(err) @@ -1197,6 +1197,14 @@ func (c *clientConn) executeSelectQuery(query string, cmdType string) (int64, st defer cleanup() rows, err := c.executor.QueryContext(ctx, query) + // Autocommit retry on DuckLake transaction conflicts (SELECT can conflict + // during DuckLake schema probing, symmetric with Flight SQL handler). + if err != nil && c.txStatus == txStatusIdle && isDuckLakeTransactionConflict(err) { + ducklakeConflictTotal.Inc() + rows, err = retryOnConflict(func() (RowSet, error) { + return c.executor.QueryContext(ctx, query) + }) + } if err != nil { errCode := classifyErrorCode(err) errMsg := err.Error() diff --git a/server/transient.go b/server/transient.go index e80340cd..9cedb0c8 100644 --- a/server/transient.go +++ b/server/transient.go @@ -103,6 +103,12 @@ func retryOnConflict[T any](fn func() (T, error)) (T, error) { ducklakeConflictRetriesExhaustedTotal.Inc() var zero T - slog.Error("DuckLake conflict retries exhausted.", "attempts", conflictMaxRetries) - return zero, fmt.Errorf("DuckLake transaction conflict: retries exhausted after %d attempts", conflictMaxRetries) + // Re-run fn one last time to capture the final error for the caller. + result, lastErr := fn() + if lastErr == nil { + ducklakeConflictRetrySuccessesTotal.Inc() + return result, nil + } + slog.Error("DuckLake conflict retries exhausted.", "attempts", conflictMaxRetries, "error", lastErr) + return zero, fmt.Errorf("DuckLake transaction conflict: retries exhausted after %d attempts: %w", conflictMaxRetries, lastErr) } diff --git a/server/transient_test.go b/server/transient_test.go index c5bf42ab..ed851dd6 100644 --- a/server/transient_test.go +++ b/server/transient_test.go @@ -2,6 +2,7 @@ package server import ( "errors" + "strings" "testing" ) @@ -116,12 +117,17 @@ func TestRetryOnConflictExhaustsRetries(t *testing.T) { if err == nil { t.Fatal("expected error after exhausting retries") } - if calls != conflictMaxRetries { - t.Fatalf("expected %d calls, got %d", conflictMaxRetries, calls) + // conflictMaxRetries attempts in the loop + 1 final attempt to capture the error. + expected := conflictMaxRetries + 1 + if calls != expected { + t.Fatalf("expected %d calls (%d retries + 1 final), got %d", expected, conflictMaxRetries, calls) + } + if !strings.Contains(err.Error(), "Transaction conflict on commit") { + t.Fatalf("expected wrapped original error, got: %v", err) } } -func TestRetryOnConflictNoRetryForNonConflict(t *testing.T) { +func TestRetryOnConflictStopsOnNonConflictError(t *testing.T) { calls := 0 _, err := retryOnConflict(func() (string, error) { calls++ @@ -131,8 +137,10 @@ func TestRetryOnConflictNoRetryForNonConflict(t *testing.T) { if err == nil { t.Fatal("expected error") } + // retryOnConflict always makes one attempt (the first retry); when the + // error is not a transaction conflict it stops immediately. if calls != 1 { - t.Fatalf("expected 1 call (no retry for non-conflict), got %d", calls) + t.Fatalf("expected 1 call (stop on non-conflict error), got %d", calls) } } From 74d3d33fb3fcef6c9cbaad56f4385be9d35abfbe Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Thu, 2 Apr 2026 14:01:07 -0700 Subject: [PATCH 3/3] Track lastErr from loop instead of extra fn() call retryOnConflict now captures the last error from within the retry loop rather than making a hidden 6th attempt after exhaustion. The conflictMaxRetries constant now accurately reflects the actual number of retry attempts. Co-Authored-By: Claude Opus 4.6 (1M context) --- duckdbservice/transient.go | 10 +++------- duckdbservice/transient_test.go | 6 ++---- server/transient.go | 10 +++------- server/transient_test.go | 6 ++---- 4 files changed, 10 insertions(+), 22 deletions(-) diff --git a/duckdbservice/transient.go b/duckdbservice/transient.go index f5ec7aeb..0569d5e4 100644 --- a/duckdbservice/transient.go +++ b/duckdbservice/transient.go @@ -86,6 +86,7 @@ const ( // Only used for autocommit queries — user-managed transactions propagate // the error since the entire transaction is invalid after a conflict. func retryOnConflict[T any](fn func() (T, error)) (T, error) { + var lastErr error backoff := conflictInitialBackoff for attempt := 1; attempt <= conflictMaxRetries; attempt++ { ducklakeConflictRetriesTotal.Inc() @@ -102,8 +103,9 @@ func retryOnConflict[T any](fn func() (T, error)) (T, error) { if err == nil { ducklakeConflictRetrySuccessesTotal.Inc() slog.Info("DuckLake conflict retry succeeded.", "attempt", attempt) - return result, err + return result, nil } + lastErr = err if !isDuckLakeTransactionConflict(err) { return result, err } @@ -116,12 +118,6 @@ func retryOnConflict[T any](fn func() (T, error)) (T, error) { ducklakeConflictRetriesExhaustedTotal.Inc() var zero T - // Re-run fn one last time to capture the final error for the caller. - result, lastErr := fn() - if lastErr == nil { - ducklakeConflictRetrySuccessesTotal.Inc() - return result, nil - } slog.Error("DuckLake conflict retries exhausted.", "attempts", conflictMaxRetries, "error", lastErr) return zero, fmt.Errorf("DuckLake transaction conflict: retries exhausted after %d attempts: %w", conflictMaxRetries, lastErr) } diff --git a/duckdbservice/transient_test.go b/duckdbservice/transient_test.go index a5106235..8294e988 100644 --- a/duckdbservice/transient_test.go +++ b/duckdbservice/transient_test.go @@ -130,10 +130,8 @@ func TestRetryOnConflictExhaustsRetries(t *testing.T) { if err == nil { t.Fatal("expected error after exhausting retries") } - // conflictMaxRetries attempts in the loop + 1 final attempt to capture the error. - expected := conflictMaxRetries + 1 - if calls != expected { - t.Fatalf("expected %d calls (%d retries + 1 final), got %d", expected, conflictMaxRetries, calls) + if calls != conflictMaxRetries { + t.Fatalf("expected %d calls, got %d", conflictMaxRetries, calls) } if !strings.Contains(err.Error(), "Transaction conflict on commit") { t.Fatalf("expected wrapped original error, got: %v", err) diff --git a/server/transient.go b/server/transient.go index 9cedb0c8..7c4d4512 100644 --- a/server/transient.go +++ b/server/transient.go @@ -74,6 +74,7 @@ const ( // Only used for autocommit queries — user-managed transactions propagate // the error since the entire transaction is invalid after a conflict. func retryOnConflict[T any](fn func() (T, error)) (T, error) { + var lastErr error backoff := conflictInitialBackoff for attempt := 1; attempt <= conflictMaxRetries; attempt++ { ducklakeConflictRetriesTotal.Inc() @@ -89,8 +90,9 @@ func retryOnConflict[T any](fn func() (T, error)) (T, error) { if err == nil { ducklakeConflictRetrySuccessesTotal.Inc() slog.Info("DuckLake conflict retry succeeded.", "attempt", attempt) - return result, err + return result, nil } + lastErr = err if !isDuckLakeTransactionConflict(err) { return result, err } @@ -103,12 +105,6 @@ func retryOnConflict[T any](fn func() (T, error)) (T, error) { ducklakeConflictRetriesExhaustedTotal.Inc() var zero T - // Re-run fn one last time to capture the final error for the caller. - result, lastErr := fn() - if lastErr == nil { - ducklakeConflictRetrySuccessesTotal.Inc() - return result, nil - } slog.Error("DuckLake conflict retries exhausted.", "attempts", conflictMaxRetries, "error", lastErr) return zero, fmt.Errorf("DuckLake transaction conflict: retries exhausted after %d attempts: %w", conflictMaxRetries, lastErr) } diff --git a/server/transient_test.go b/server/transient_test.go index ed851dd6..502350a4 100644 --- a/server/transient_test.go +++ b/server/transient_test.go @@ -117,10 +117,8 @@ func TestRetryOnConflictExhaustsRetries(t *testing.T) { if err == nil { t.Fatal("expected error after exhausting retries") } - // conflictMaxRetries attempts in the loop + 1 final attempt to capture the error. - expected := conflictMaxRetries + 1 - if calls != expected { - t.Fatalf("expected %d calls (%d retries + 1 final), got %d", expected, conflictMaxRetries, calls) + if calls != conflictMaxRetries { + t.Fatalf("expected %d calls, got %d", conflictMaxRetries, calls) } if !strings.Contains(err.Error(), "Transaction conflict on commit") { t.Fatalf("expected wrapped original error, got: %v", err)