Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions duckdbservice/flight_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,15 @@ func (h *FlightSQLHandler) GetFlightInfoStatement(ctx context.Context, cmd fligh
schema, err := retryOnTransient(func() (*arrow.Schema, error) {
return GetQuerySchema(ctx, session.Conn, query, tx)
})
// Conflict retry for autocommit only. Note: if retryOnTransient exhausted on a
// transient error that also matches "Transaction conflict", this chains into
// conflict retry — acceptable since the error patterns are distinct in practice.
if err != nil && tx == nil && isDuckLakeTransactionConflict(err) {
ducklakeConflictTotal.Inc()
schema, err = retryOnConflict(func() (*arrow.Schema, error) {
return GetQuerySchema(ctx, session.Conn, query, tx)
})
}
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to prepare query: %v", err)
}
Expand Down Expand Up @@ -367,6 +376,13 @@ func (h *FlightSQLHandler) DoGetStatement(ctx context.Context, ticket flightsql.
}
return session.Conn.QueryContext(ctx, handle.Query)
})
// Conflict retry for autocommit only (see GetFlightInfoStatement comment).
if qerr != nil && tx == nil && isDuckLakeTransactionConflict(qerr) {
ducklakeConflictTotal.Inc()
rows, qerr = retryOnConflict(func() (*sql.Rows, error) {
return session.Conn.QueryContext(ctx, handle.Query)
})
}
if qerr != nil {
ch <- flight.StreamChunk{Err: qerr}
return
Expand Down Expand Up @@ -419,6 +435,13 @@ func (h *FlightSQLHandler) DoPutCommandStatementUpdate(ctx context.Context,
}
return session.Conn.ExecContext(ctx, query)
})
// Conflict retry for autocommit only (see GetFlightInfoStatement comment).
if execErr != nil && tx == nil && isDuckLakeTransactionConflict(execErr) {
ducklakeConflictTotal.Inc()
result, execErr = retryOnConflict(func() (sql.Result, error) {
return session.Conn.ExecContext(ctx, query)
})
}
if execErr != nil {
return 0, status.Errorf(codes.InvalidArgument, "failed to execute update: %v", execErr)
}
Expand Down
28 changes: 28 additions & 0 deletions duckdbservice/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package duckdbservice

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// Metrics for DuckLake transaction conflict tracking in the Flight SQL worker.
// These use the "duckgres_worker_" prefix to distinguish from standalone-mode
// metrics defined in server/server.go (which use "duckgres_ducklake_").
var (
ducklakeConflictTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "duckgres_worker_ducklake_conflict_total",
Help: "Total number of DuckLake transaction conflicts encountered (worker)",
})
ducklakeConflictRetriesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "duckgres_worker_ducklake_conflict_retries_total",
Help: "Total number of DuckLake transaction conflict retry attempts (worker)",
})
ducklakeConflictRetrySuccessesTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "duckgres_worker_ducklake_conflict_retry_successes_total",
Help: "Total number of DuckLake transaction conflict retries that succeeded (worker)",
})
ducklakeConflictRetriesExhaustedTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "duckgres_worker_ducklake_conflict_retries_exhausted_total",
Help: "Total number of DuckLake transaction conflicts where all retries were exhausted (worker)",
})
)
62 changes: 62 additions & 0 deletions duckdbservice/transient.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package duckdbservice

import (
"fmt"
"log/slog"
"math/rand/v2"
"strings"
"time"
)
Expand All @@ -21,6 +23,8 @@ func isTransientDuckLakeError(err error) bool {
strings.Contains(msg, "connection reset by peer") ||
strings.Contains(msg, "connection timed out") ||
strings.Contains(msg, "server closed the connection unexpectedly") ||
strings.Contains(msg, "SSL connection has been closed unexpectedly") ||
strings.Contains(msg, "Current transaction is aborted") ||
strings.Contains(msg, "no route to host") ||
strings.Contains(msg, "network is unreachable")
}
Expand Down Expand Up @@ -59,3 +63,61 @@ func retryOnTransient[T any](fn func() (T, error)) (T, error) {
slog.Error("DuckLake retries exhausted.", "attempts", transientMaxRetries+1, "error", err)
return result, err
}

// isDuckLakeTransactionConflict returns true if the error is a DuckLake
// transaction conflict. These occur when concurrent DuckLake transactions
// try to commit overlapping changes. DuckLake uses global snapshot IDs, so
// even writes to unrelated tables can conflict under concurrency.
func isDuckLakeTransactionConflict(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "Transaction conflict")
}

const (
conflictMaxRetries = 5
conflictInitialBackoff = 50 * time.Millisecond
conflictMaxBackoff = 2 * time.Second
)

// retryOnConflict retries fn on DuckLake transaction conflicts with
// exponential backoff and jitter (50-100% of backoff interval).
// Only used for autocommit queries — user-managed transactions propagate
// the error since the entire transaction is invalid after a conflict.
func retryOnConflict[T any](fn func() (T, error)) (T, error) {
var lastErr error
backoff := conflictInitialBackoff
for attempt := 1; attempt <= conflictMaxRetries; attempt++ {
ducklakeConflictRetriesTotal.Inc()

// Jitter: 50-100% of backoff to decorrelate retry storms.
jittered := time.Duration(float64(backoff) * (0.5 + rand.Float64()*0.5))
slog.Warn("DuckLake transaction conflict, retrying.",
"attempt", attempt, "max_retries", conflictMaxRetries,
"backoff", jittered)

time.Sleep(jittered)

result, err := fn()
if err == nil {
ducklakeConflictRetrySuccessesTotal.Inc()
slog.Info("DuckLake conflict retry succeeded.", "attempt", attempt)
return result, nil
}
lastErr = err
if !isDuckLakeTransactionConflict(err) {
return result, err
}

backoff *= 2
if backoff > conflictMaxBackoff {
backoff = conflictMaxBackoff
}
}

ducklakeConflictRetriesExhaustedTotal.Inc()
var zero T
slog.Error("DuckLake conflict retries exhausted.", "attempts", conflictMaxRetries, "error", lastErr)
return zero, fmt.Errorf("DuckLake transaction conflict: retries exhausted after %d attempts: %w", conflictMaxRetries, lastErr)
}
85 changes: 85 additions & 0 deletions duckdbservice/transient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package duckdbservice

import (
"errors"
"strings"
"testing"
)

Expand All @@ -19,10 +20,13 @@ func TestIsTransientDuckLakeError(t *testing.T) {
{"connection reset", errors.New("read tcp: connection reset by peer"), true},
{"connection timed out", errors.New("connection timed out"), true},
{"server closed", errors.New("server closed the connection unexpectedly"), true},
{"SSL closed", errors.New(`Failed to execute query "COMMIT": SSL connection has been closed unexpectedly`), true},
{"current transaction aborted", errors.New("Current transaction is aborted, commands ignored until end of transaction block"), true},
{"no route", errors.New("no route to host"), true},
{"network unreachable", errors.New("network is unreachable"), true},
{"auth error", errors.New("password authentication failed for user"), false},
{"table not found", errors.New("Table with name foo does not exist"), false},
{"transaction conflict is not transient", errors.New(`Transaction conflict - attempting to insert into table with index "29784"`), false},
}

for _, tt := range tests {
Expand Down Expand Up @@ -69,3 +73,84 @@ func TestRetryOnTransientNoRetryForNonTransient(t *testing.T) {
t.Fatalf("expected 1 call (no retry for non-transient), got %d", calls)
}
}

func TestIsDuckLakeTransactionConflict(t *testing.T) {
tests := []struct {
name string
err error
expected bool
}{
{"nil", nil, false},
{"generic error", errors.New("syntax error"), false},
{"transaction conflict", errors.New(`Transaction conflict - attempting to insert into table with index "29784"`), true},
{"transaction conflict variant", errors.New("Transaction conflict on commit"), true},
{"SSL closed is not conflict", errors.New("SSL connection has been closed unexpectedly"), false},
{"connection refused is not conflict", errors.New("Connection refused"), false},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := isDuckLakeTransactionConflict(tt.err); got != tt.expected {
t.Errorf("isDuckLakeTransactionConflict(%v) = %v, want %v", tt.err, got, tt.expected)
}
})
}
}

func TestRetryOnConflictSucceedsAfterRetry(t *testing.T) {
calls := 0
result, err := retryOnConflict(func() (string, error) {
calls++
if calls <= 2 {
return "", errors.New(`Transaction conflict - attempting to insert into table with index "29784"`)
}
return "ok", nil
})

if err != nil {
t.Fatalf("expected success, got error: %v", err)
}
if result != "ok" {
t.Fatalf("expected result 'ok', got %q", result)
}
// Initial call fails (handled externally), then retryOnConflict is called:
// attempt 1 fails (calls=2), attempt 2 succeeds (calls=3)
if calls != 3 {
t.Fatalf("expected 3 calls, got %d", calls)
}
}

func TestRetryOnConflictExhaustsRetries(t *testing.T) {
calls := 0
_, err := retryOnConflict(func() (string, error) {
calls++
return "", errors.New("Transaction conflict on commit")
})

if err == nil {
t.Fatal("expected error after exhausting retries")
}
if calls != conflictMaxRetries {
t.Fatalf("expected %d calls, got %d", conflictMaxRetries, calls)
}
if !strings.Contains(err.Error(), "Transaction conflict on commit") {
t.Fatalf("expected wrapped original error, got: %v", err)
}
}

func TestRetryOnConflictStopsOnNonConflictError(t *testing.T) {
calls := 0
_, err := retryOnConflict(func() (string, error) {
calls++
return "", errors.New("syntax error at position 42")
})

if err == nil {
t.Fatal("expected error")
}
// retryOnConflict always makes one attempt (the first retry); when the
// error is not a transaction conflict it stops immediately.
if calls != 1 {
t.Fatalf("expected 1 call (stop on non-conflict error), got %d", calls)
}
}
Loading
Loading