diff --git a/duckdbservice/flight_handler.go b/duckdbservice/flight_handler.go index 8e07097..752b934 100644 --- a/duckdbservice/flight_handler.go +++ b/duckdbservice/flight_handler.go @@ -287,6 +287,15 @@ func (h *FlightSQLHandler) GetFlightInfoStatement(ctx context.Context, cmd fligh schema, err := retryOnTransient(func() (*arrow.Schema, error) { return GetQuerySchema(ctx, session.Conn, query, tx) }) + // Conflict retry for autocommit only. Note: if retryOnTransient exhausted on a + // transient error that also matches "Transaction conflict", this chains into + // conflict retry — acceptable since the error patterns are distinct in practice. + if err != nil && tx == nil && isDuckLakeTransactionConflict(err) { + ducklakeConflictTotal.Inc() + schema, err = retryOnConflict(func() (*arrow.Schema, error) { + return GetQuerySchema(ctx, session.Conn, query, tx) + }) + } if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to prepare query: %v", err) } @@ -367,6 +376,13 @@ func (h *FlightSQLHandler) DoGetStatement(ctx context.Context, ticket flightsql. } return session.Conn.QueryContext(ctx, handle.Query) }) + // Conflict retry for autocommit only (see GetFlightInfoStatement comment). + if qerr != nil && tx == nil && isDuckLakeTransactionConflict(qerr) { + ducklakeConflictTotal.Inc() + rows, qerr = retryOnConflict(func() (*sql.Rows, error) { + return session.Conn.QueryContext(ctx, handle.Query) + }) + } if qerr != nil { ch <- flight.StreamChunk{Err: qerr} return @@ -419,6 +435,13 @@ func (h *FlightSQLHandler) DoPutCommandStatementUpdate(ctx context.Context, } return session.Conn.ExecContext(ctx, query) }) + // Conflict retry for autocommit only (see GetFlightInfoStatement comment). + if execErr != nil && tx == nil && isDuckLakeTransactionConflict(execErr) { + ducklakeConflictTotal.Inc() + result, execErr = retryOnConflict(func() (sql.Result, error) { + return session.Conn.ExecContext(ctx, query) + }) + } if execErr != nil { return 0, status.Errorf(codes.InvalidArgument, "failed to execute update: %v", execErr) } diff --git a/duckdbservice/metrics.go b/duckdbservice/metrics.go new file mode 100644 index 0000000..52efac8 --- /dev/null +++ b/duckdbservice/metrics.go @@ -0,0 +1,28 @@ +package duckdbservice + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Metrics for DuckLake transaction conflict tracking in the Flight SQL worker. +// These use the "duckgres_worker_" prefix to distinguish from standalone-mode +// metrics defined in server/server.go (which use "duckgres_ducklake_"). +var ( + ducklakeConflictTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_worker_ducklake_conflict_total", + Help: "Total number of DuckLake transaction conflicts encountered (worker)", + }) + ducklakeConflictRetriesTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_worker_ducklake_conflict_retries_total", + Help: "Total number of DuckLake transaction conflict retry attempts (worker)", + }) + ducklakeConflictRetrySuccessesTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_worker_ducklake_conflict_retry_successes_total", + Help: "Total number of DuckLake transaction conflict retries that succeeded (worker)", + }) + ducklakeConflictRetriesExhaustedTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_worker_ducklake_conflict_retries_exhausted_total", + Help: "Total number of DuckLake transaction conflicts where all retries were exhausted (worker)", + }) +) diff --git a/duckdbservice/transient.go b/duckdbservice/transient.go index 8c896b4..0569d5e 100644 --- a/duckdbservice/transient.go +++ b/duckdbservice/transient.go @@ -1,7 +1,9 @@ package duckdbservice import ( + "fmt" "log/slog" + "math/rand/v2" "strings" "time" ) @@ -21,6 +23,8 @@ func isTransientDuckLakeError(err error) bool { strings.Contains(msg, "connection reset by peer") || strings.Contains(msg, "connection timed out") || strings.Contains(msg, "server closed the connection unexpectedly") || + strings.Contains(msg, "SSL connection has been closed unexpectedly") || + strings.Contains(msg, "Current transaction is aborted") || strings.Contains(msg, "no route to host") || strings.Contains(msg, "network is unreachable") } @@ -59,3 +63,61 @@ func retryOnTransient[T any](fn func() (T, error)) (T, error) { slog.Error("DuckLake retries exhausted.", "attempts", transientMaxRetries+1, "error", err) return result, err } + +// isDuckLakeTransactionConflict returns true if the error is a DuckLake +// transaction conflict. These occur when concurrent DuckLake transactions +// try to commit overlapping changes. DuckLake uses global snapshot IDs, so +// even writes to unrelated tables can conflict under concurrency. +func isDuckLakeTransactionConflict(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), "Transaction conflict") +} + +const ( + conflictMaxRetries = 5 + conflictInitialBackoff = 50 * time.Millisecond + conflictMaxBackoff = 2 * time.Second +) + +// retryOnConflict retries fn on DuckLake transaction conflicts with +// exponential backoff and jitter (50-100% of backoff interval). +// Only used for autocommit queries — user-managed transactions propagate +// the error since the entire transaction is invalid after a conflict. +func retryOnConflict[T any](fn func() (T, error)) (T, error) { + var lastErr error + backoff := conflictInitialBackoff + for attempt := 1; attempt <= conflictMaxRetries; attempt++ { + ducklakeConflictRetriesTotal.Inc() + + // Jitter: 50-100% of backoff to decorrelate retry storms. + jittered := time.Duration(float64(backoff) * (0.5 + rand.Float64()*0.5)) + slog.Warn("DuckLake transaction conflict, retrying.", + "attempt", attempt, "max_retries", conflictMaxRetries, + "backoff", jittered) + + time.Sleep(jittered) + + result, err := fn() + if err == nil { + ducklakeConflictRetrySuccessesTotal.Inc() + slog.Info("DuckLake conflict retry succeeded.", "attempt", attempt) + return result, nil + } + lastErr = err + if !isDuckLakeTransactionConflict(err) { + return result, err + } + + backoff *= 2 + if backoff > conflictMaxBackoff { + backoff = conflictMaxBackoff + } + } + + ducklakeConflictRetriesExhaustedTotal.Inc() + var zero T + slog.Error("DuckLake conflict retries exhausted.", "attempts", conflictMaxRetries, "error", lastErr) + return zero, fmt.Errorf("DuckLake transaction conflict: retries exhausted after %d attempts: %w", conflictMaxRetries, lastErr) +} diff --git a/duckdbservice/transient_test.go b/duckdbservice/transient_test.go index 7b824d5..8294e98 100644 --- a/duckdbservice/transient_test.go +++ b/duckdbservice/transient_test.go @@ -2,6 +2,7 @@ package duckdbservice import ( "errors" + "strings" "testing" ) @@ -19,10 +20,13 @@ func TestIsTransientDuckLakeError(t *testing.T) { {"connection reset", errors.New("read tcp: connection reset by peer"), true}, {"connection timed out", errors.New("connection timed out"), true}, {"server closed", errors.New("server closed the connection unexpectedly"), true}, + {"SSL closed", errors.New(`Failed to execute query "COMMIT": SSL connection has been closed unexpectedly`), true}, + {"current transaction aborted", errors.New("Current transaction is aborted, commands ignored until end of transaction block"), true}, {"no route", errors.New("no route to host"), true}, {"network unreachable", errors.New("network is unreachable"), true}, {"auth error", errors.New("password authentication failed for user"), false}, {"table not found", errors.New("Table with name foo does not exist"), false}, + {"transaction conflict is not transient", errors.New(`Transaction conflict - attempting to insert into table with index "29784"`), false}, } for _, tt := range tests { @@ -69,3 +73,84 @@ func TestRetryOnTransientNoRetryForNonTransient(t *testing.T) { t.Fatalf("expected 1 call (no retry for non-transient), got %d", calls) } } + +func TestIsDuckLakeTransactionConflict(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + {"nil", nil, false}, + {"generic error", errors.New("syntax error"), false}, + {"transaction conflict", errors.New(`Transaction conflict - attempting to insert into table with index "29784"`), true}, + {"transaction conflict variant", errors.New("Transaction conflict on commit"), true}, + {"SSL closed is not conflict", errors.New("SSL connection has been closed unexpectedly"), false}, + {"connection refused is not conflict", errors.New("Connection refused"), false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isDuckLakeTransactionConflict(tt.err); got != tt.expected { + t.Errorf("isDuckLakeTransactionConflict(%v) = %v, want %v", tt.err, got, tt.expected) + } + }) + } +} + +func TestRetryOnConflictSucceedsAfterRetry(t *testing.T) { + calls := 0 + result, err := retryOnConflict(func() (string, error) { + calls++ + if calls <= 2 { + return "", errors.New(`Transaction conflict - attempting to insert into table with index "29784"`) + } + return "ok", nil + }) + + if err != nil { + t.Fatalf("expected success, got error: %v", err) + } + if result != "ok" { + t.Fatalf("expected result 'ok', got %q", result) + } + // Initial call fails (handled externally), then retryOnConflict is called: + // attempt 1 fails (calls=2), attempt 2 succeeds (calls=3) + if calls != 3 { + t.Fatalf("expected 3 calls, got %d", calls) + } +} + +func TestRetryOnConflictExhaustsRetries(t *testing.T) { + calls := 0 + _, err := retryOnConflict(func() (string, error) { + calls++ + return "", errors.New("Transaction conflict on commit") + }) + + if err == nil { + t.Fatal("expected error after exhausting retries") + } + if calls != conflictMaxRetries { + t.Fatalf("expected %d calls, got %d", conflictMaxRetries, calls) + } + if !strings.Contains(err.Error(), "Transaction conflict on commit") { + t.Fatalf("expected wrapped original error, got: %v", err) + } +} + +func TestRetryOnConflictStopsOnNonConflictError(t *testing.T) { + calls := 0 + _, err := retryOnConflict(func() (string, error) { + calls++ + return "", errors.New("syntax error at position 42") + }) + + if err == nil { + t.Fatal("expected error") + } + // retryOnConflict always makes one attempt (the first retry); when the + // error is not a transaction conflict it stops immediately. + if calls != 1 { + t.Fatalf("expected 1 call (stop on non-conflict error), got %d", calls) + } +} diff --git a/server/conn.go b/server/conn.go index cacf1cf..da43900 100644 --- a/server/conn.go +++ b/server/conn.go @@ -359,6 +359,19 @@ func isDuckLakeMetadataConnectionLost(err error) bool { strings.Contains(msg, "SSL connection has been closed unexpectedly") } +// classifyErrorCode returns the most appropriate PostgreSQL SQLSTATE for a +// DuckDB error. Transaction conflicts get 40001 (serialization_failure), which +// signals PG-aware clients to retry. Query cancellations get 57014. +func classifyErrorCode(err error) string { + if isQueryCancelled(err) { + return "57014" + } + if isDuckLakeTransactionConflict(err) { + return "40001" // serialization_failure — client should retry + } + return "42000" +} + // logQueryError logs a query execution failure with additional context for // DuckLake-specific errors (transaction conflicts and metadata connection loss). func logQueryError(user, query string, err error) { @@ -1079,17 +1092,22 @@ func (c *clientConn) handleQuery(body []byte) error { execResult, err = c.executor.ExecContext(ctx, alteredQuery) } } + // Autocommit retry on DuckLake transaction conflicts + if err != nil && c.txStatus == txStatusIdle && isDuckLakeTransactionConflict(err) { + ducklakeConflictTotal.Inc() + execResult, err = retryOnConflict(func() (ExecResult, error) { + return c.executor.ExecContext(ctx, query) + }) + } if err != nil { - errCode := "42000" + errCode := classifyErrorCode(err) errMsg := err.Error() if isQueryCancelled(err) { - errCode = "57014" errMsg = "canceling statement due to user request" - c.sendError("ERROR", errCode, errMsg) } else { logQueryError(c.username, query, err) - c.sendError("ERROR", errCode, errMsg) } + c.sendError("ERROR", errCode, errMsg) c.setTxError() c.logQuery(start, originalQuery, query, cmdType, 0, 0, errCode, errMsg, "simple") _ = writeReadyForQuery(c.writer, c.txStatus) @@ -1136,13 +1154,22 @@ func (c *clientConn) executeQueryDirect(query, cmdType string) error { defer cleanup() result, err := c.executor.ExecContext(ctx, query) + // Autocommit retry on DuckLake transaction conflicts + if err != nil && c.txStatus == txStatusIdle && isDuckLakeTransactionConflict(err) { + ducklakeConflictTotal.Inc() + result, err = retryOnConflict(func() (ExecResult, error) { + return c.executor.ExecContext(ctx, query) + }) + } if err != nil { + errCode := classifyErrorCode(err) + errMsg := err.Error() if isQueryCancelled(err) { - c.sendError("ERROR", "57014", "canceling statement due to user request") + errMsg = "canceling statement due to user request" } else { logQueryError(c.username, query, err) - c.sendError("ERROR", "42000", err.Error()) } + c.sendError("ERROR", errCode, errMsg) c.setTxError() _ = writeReadyForQuery(c.writer, c.txStatus) _ = c.writer.Flush() @@ -1170,17 +1197,23 @@ func (c *clientConn) executeSelectQuery(query string, cmdType string) (int64, st defer cleanup() rows, err := c.executor.QueryContext(ctx, query) + // Autocommit retry on DuckLake transaction conflicts (SELECT can conflict + // during DuckLake schema probing, symmetric with Flight SQL handler). + if err != nil && c.txStatus == txStatusIdle && isDuckLakeTransactionConflict(err) { + ducklakeConflictTotal.Inc() + rows, err = retryOnConflict(func() (RowSet, error) { + return c.executor.QueryContext(ctx, query) + }) + } if err != nil { - errCode := "42000" + errCode := classifyErrorCode(err) errMsg := err.Error() if isQueryCancelled(err) { - errCode = "57014" errMsg = "canceling statement due to user request" - c.sendError("ERROR", errCode, errMsg) } else { logQueryError(c.username, query, err) - c.sendError("ERROR", errCode, errMsg) } + c.sendError("ERROR", errCode, errMsg) c.setTxError() _ = writeReadyForQuery(c.writer, c.txStatus) _ = c.writer.Flush() @@ -1522,11 +1555,17 @@ func (c *clientConn) executeSingleStatement(query string) (errSent bool, fatalEr execResult, err = c.executor.ExecContext(ctx, alteredQuery) } } + // Autocommit retry on DuckLake transaction conflicts + if err != nil && c.txStatus == txStatusIdle && isDuckLakeTransactionConflict(err) { + ducklakeConflictTotal.Inc() + execResult, err = retryOnConflict(func() (ExecResult, error) { + return c.executor.ExecContext(ctx, executedQuery) + }) + } if err != nil { - errCode := "42000" + errCode := classifyErrorCode(err) errMsg := err.Error() if isQueryCancelled(err) { - errCode = "57014" errMsg = "canceling statement due to user request" } else { logQueryError(c.username, executedQuery, err) @@ -1555,10 +1594,9 @@ func (c *clientConn) executeSingleStatement(query string) (errSent bool, fatalEr rows, err := c.executor.QueryContext(ctx, executedQuery) if err != nil { - errCode := "42000" + errCode := classifyErrorCode(err) errMsg := err.Error() if isQueryCancelled(err) { - errCode = "57014" errMsg = "canceling statement due to user request" } else { logQueryError(c.username, executedQuery, err) @@ -4838,11 +4876,24 @@ func (c *clientConn) handleExecute(body []byte) { result, err = c.executor.Exec(alteredQuery, args...) } } + // Autocommit retry on DuckLake transaction conflicts + if err != nil && c.txStatus == txStatusIdle && isDuckLakeTransactionConflict(err) { + ducklakeConflictTotal.Inc() + result, err = retryOnConflict(func() (ExecResult, error) { + return c.executor.Exec(convertedQuery, args...) + }) + } if err != nil { - logQueryError(c.username, convertedQuery, err) - c.sendError("ERROR", "42000", err.Error()) + errCode := classifyErrorCode(err) + errMsg := err.Error() + if isQueryCancelled(err) { + errMsg = "canceling statement due to user request" + } else { + logQueryError(c.username, convertedQuery, err) + } + c.sendError("ERROR", errCode, errMsg) c.setTxError() - c.logQuery(start, originalQuery, convertedQuery, cmdType, 0, 0, "42000", err.Error(), "extended") + c.logQuery(start, originalQuery, convertedQuery, cmdType, 0, 0, errCode, errMsg, "extended") return } } @@ -4860,10 +4911,16 @@ func (c *clientConn) handleExecute(body []byte) { // Result-returning query: use Query with converted query rows, err := c.executor.Query(convertedQuery, args...) if err != nil { - logQueryError(c.username, convertedQuery, err) - c.sendError("ERROR", "42000", err.Error()) + errCode := classifyErrorCode(err) + errMsg := err.Error() + if isQueryCancelled(err) { + errMsg = "canceling statement due to user request" + } else { + logQueryError(c.username, convertedQuery, err) + } + c.sendError("ERROR", errCode, errMsg) c.setTxError() - c.logQuery(start, originalQuery, convertedQuery, cmdType, 0, 0, "42000", err.Error(), "extended") + c.logQuery(start, originalQuery, convertedQuery, cmdType, 0, 0, errCode, errMsg, "extended") return } defer func() { _ = rows.Close() }() diff --git a/server/server.go b/server/server.go index 8f037a0..fff4ac2 100644 --- a/server/server.go +++ b/server/server.go @@ -91,6 +91,26 @@ var queryCancellationsCounter = promauto.NewCounter(prometheus.CounterOpts{ Help: "Total number of queries cancelled via cancel request", }) +var ducklakeConflictTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_ducklake_conflict_total", + Help: "Total number of DuckLake transaction conflicts encountered", +}) + +var ducklakeConflictRetriesTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_ducklake_conflict_retries_total", + Help: "Total number of DuckLake transaction conflict retry attempts", +}) + +var ducklakeConflictRetrySuccessesTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_ducklake_conflict_retry_successes_total", + Help: "Total number of DuckLake transaction conflict retries that succeeded", +}) + +var ducklakeConflictRetriesExhaustedTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "duckgres_ducklake_conflict_retries_exhausted_total", + Help: "Total number of DuckLake transaction conflicts where all retries were exhausted", +}) + // BackendKey uniquely identifies a backend connection for cancel requests type BackendKey struct { Pid int32 diff --git a/server/transient.go b/server/transient.go index efa02fc..7c4d451 100644 --- a/server/transient.go +++ b/server/transient.go @@ -1,7 +1,9 @@ package server import ( + "fmt" "log/slog" + "math/rand/v2" "strings" "time" ) @@ -21,6 +23,8 @@ func isTransientDuckLakeError(err error) bool { strings.Contains(msg, "connection reset by peer") || strings.Contains(msg, "connection timed out") || strings.Contains(msg, "server closed the connection unexpectedly") || + strings.Contains(msg, "SSL connection has been closed unexpectedly") || + strings.Contains(msg, "Current transaction is aborted") || strings.Contains(msg, "no route to host") || strings.Contains(msg, "network is unreachable") } @@ -58,3 +62,49 @@ func retryOnTransientAttach(fn func() error) error { slog.Error("DuckLake attach retries exhausted.", "attempts", transientMaxRetries+1, "error", err) return err } + +const ( + conflictMaxRetries = 5 + conflictInitialBackoff = 50 * time.Millisecond + conflictMaxBackoff = 2 * time.Second +) + +// retryOnConflict retries fn on DuckLake transaction conflicts with +// exponential backoff and jitter (50-100% of backoff interval). +// Only used for autocommit queries — user-managed transactions propagate +// the error since the entire transaction is invalid after a conflict. +func retryOnConflict[T any](fn func() (T, error)) (T, error) { + var lastErr error + backoff := conflictInitialBackoff + for attempt := 1; attempt <= conflictMaxRetries; attempt++ { + ducklakeConflictRetriesTotal.Inc() + + jittered := time.Duration(float64(backoff) * (0.5 + rand.Float64()*0.5)) + slog.Warn("DuckLake transaction conflict, retrying.", + "attempt", attempt, "max_retries", conflictMaxRetries, + "backoff", jittered) + + time.Sleep(jittered) + + result, err := fn() + if err == nil { + ducklakeConflictRetrySuccessesTotal.Inc() + slog.Info("DuckLake conflict retry succeeded.", "attempt", attempt) + return result, nil + } + lastErr = err + if !isDuckLakeTransactionConflict(err) { + return result, err + } + + backoff *= 2 + if backoff > conflictMaxBackoff { + backoff = conflictMaxBackoff + } + } + + ducklakeConflictRetriesExhaustedTotal.Inc() + var zero T + slog.Error("DuckLake conflict retries exhausted.", "attempts", conflictMaxRetries, "error", lastErr) + return zero, fmt.Errorf("DuckLake transaction conflict: retries exhausted after %d attempts: %w", conflictMaxRetries, lastErr) +} diff --git a/server/transient_test.go b/server/transient_test.go index 056fcbb..502350a 100644 --- a/server/transient_test.go +++ b/server/transient_test.go @@ -2,6 +2,7 @@ package server import ( "errors" + "strings" "testing" ) @@ -19,10 +20,13 @@ func TestIsTransientDuckLakeError(t *testing.T) { {"connection reset", errors.New("read tcp: connection reset by peer"), true}, {"connection timed out", errors.New("connection timed out"), true}, {"server closed", errors.New("server closed the connection unexpectedly"), true}, + {"SSL closed", errors.New(`Failed to execute query "COMMIT": SSL connection has been closed unexpectedly`), true}, + {"current transaction aborted", errors.New("Current transaction is aborted, commands ignored until end of transaction block"), true}, {"no route", errors.New("no route to host"), true}, {"network unreachable", errors.New("network is unreachable"), true}, {"auth error", errors.New("password authentication failed for user"), false}, {"table not found", errors.New("Table with name foo does not exist"), false}, + {"transaction conflict is not transient", errors.New(`Transaction conflict - attempting to insert into table with index "29784"`), false}, } for _, tt := range tests { @@ -81,3 +85,80 @@ func TestRetryOnTransientAttachNoRetryForNonTransient(t *testing.T) { t.Fatalf("expected 1 call (no retry for non-transient), got %d", calls) } } + +func TestRetryOnConflictSucceedsAfterRetry(t *testing.T) { + calls := 0 + result, err := retryOnConflict(func() (string, error) { + calls++ + if calls <= 2 { + return "", errors.New(`Transaction conflict - attempting to insert into table with index "29784"`) + } + return "ok", nil + }) + + if err != nil { + t.Fatalf("expected success, got error: %v", err) + } + if result != "ok" { + t.Fatalf("expected result 'ok', got %q", result) + } + if calls != 3 { + t.Fatalf("expected 3 calls, got %d", calls) + } +} + +func TestRetryOnConflictExhaustsRetries(t *testing.T) { + calls := 0 + _, err := retryOnConflict(func() (string, error) { + calls++ + return "", errors.New("Transaction conflict on commit") + }) + + if err == nil { + t.Fatal("expected error after exhausting retries") + } + if calls != conflictMaxRetries { + t.Fatalf("expected %d calls, got %d", conflictMaxRetries, calls) + } + if !strings.Contains(err.Error(), "Transaction conflict on commit") { + t.Fatalf("expected wrapped original error, got: %v", err) + } +} + +func TestRetryOnConflictStopsOnNonConflictError(t *testing.T) { + calls := 0 + _, err := retryOnConflict(func() (string, error) { + calls++ + return "", errors.New("syntax error at position 42") + }) + + if err == nil { + t.Fatal("expected error") + } + // retryOnConflict always makes one attempt (the first retry); when the + // error is not a transaction conflict it stops immediately. + if calls != 1 { + t.Fatalf("expected 1 call (stop on non-conflict error), got %d", calls) + } +} + +func TestClassifyErrorCode(t *testing.T) { + tests := []struct { + name string + err error + expected string + }{ + {"transaction conflict", errors.New("Transaction conflict on commit"), "40001"}, + {"query cancelled", errors.New("context canceled"), "57014"}, + {"generic error", errors.New("syntax error"), "42000"}, + {"SSL closed is not conflict", errors.New("SSL connection has been closed unexpectedly"), "42000"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := classifyErrorCode(tt.err); got != tt.expected { + t.Errorf("classifyErrorCode(%v) = %q, want %q", tt.err, got, tt.expected) + } + }) + } +}