Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions config_resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type configCLIInputs struct {
DataDir string
CertFile string
KeyFile string
FilePersistence bool
ProcessIsolation bool
IdleTimeout string
MemoryLimit string
Expand Down Expand Up @@ -284,6 +285,7 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
}
}

cfg.FilePersistence = fileCfg.FilePersistence
cfg.ProcessIsolation = fileCfg.ProcessIsolation
if fileCfg.IdleTimeout != "" {
if d, err := time.ParseDuration(fileCfg.IdleTimeout); err == nil {
Expand Down Expand Up @@ -526,6 +528,13 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
warn("Invalid DUCKGRES_DUCKLAKE_DATA_INLINING_ROW_LIMIT: " + err.Error())
}
}
if v := getenv("DUCKGRES_FILE_PERSISTENCE"); v != "" {
if b, err := strconv.ParseBool(v); err == nil {
cfg.FilePersistence = b
} else {
warn("Invalid DUCKGRES_FILE_PERSISTENCE: " + err.Error())
}
}
if v := getenv("DUCKGRES_PROCESS_ISOLATION"); v != "" {
if b, err := strconv.ParseBool(v); err == nil {
cfg.ProcessIsolation = b
Expand Down Expand Up @@ -782,6 +791,9 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
if cli.Set["key"] {
cfg.TLSKeyFile = cli.KeyFile
}
if cli.Set["file-persistence"] {
cfg.FilePersistence = cli.FilePersistence
}
if cli.Set["process-isolation"] {
cfg.ProcessIsolation = cli.ProcessIsolation
}
Expand Down Expand Up @@ -902,6 +914,11 @@ func resolveEffectiveConfig(fileCfg *FileConfig, cli configCLIInputs, getenv fun
cfg.QueryLog.Enabled = cli.QueryLog
}

if cfg.FilePersistence && cfg.DataDir == "" {
warn("file_persistence is enabled but data_dir is empty; disabling file persistence")
cfg.FilePersistence = false
}

if cfg.ACMEDNSProvider != "" {
provider := strings.ToLower(cfg.ACMEDNSProvider)
if provider != "route53" {
Expand Down
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type FileConfig struct {
RateLimit RateLimitFileConfig `yaml:"rate_limit"`
Extensions []string `yaml:"extensions"`
DuckLake DuckLakeFileConfig `yaml:"ducklake"`
FilePersistence bool `yaml:"file_persistence"` // Persist DuckDB to <data_dir>/<username>.duckdb instead of :memory:
ProcessIsolation bool `yaml:"process_isolation"` // Enable process isolation per connection
IdleTimeout string `yaml:"idle_timeout"` // e.g., "24h", "1h", "-1" to disable
MemoryLimit string `yaml:"memory_limit"` // DuckDB memory_limit per session (e.g., "4GB")
Expand Down Expand Up @@ -206,6 +207,7 @@ func main() {
dataDir := flag.String("data-dir", "", "Directory for DuckDB files (env: DUCKGRES_DATA_DIR)")
certFile := flag.String("cert", "", "TLS certificate file (env: DUCKGRES_CERT)")
keyFile := flag.String("key", "", "TLS private key file (env: DUCKGRES_KEY)")
filePersistence := flag.Bool("file-persistence", false, "Persist DuckDB to <data-dir>/<username>.duckdb instead of in-memory (env: DUCKGRES_FILE_PERSISTENCE)")
processIsolation := flag.Bool("process-isolation", false, "Enable process isolation (spawn child process per connection)")
idleTimeout := flag.String("idle-timeout", "", "Connection idle timeout (e.g., '30m', '1h', '-1' to disable) (env: DUCKGRES_IDLE_TIMEOUT)")
memoryLimit := flag.String("memory-limit", "", "DuckDB memory_limit per session (e.g., '4GB') (env: DUCKGRES_MEMORY_LIMIT)")
Expand Down Expand Up @@ -280,6 +282,7 @@ func main() {
fmt.Fprintf(os.Stderr, " DUCKGRES_DATA_DIR Directory for DuckDB files (default: ./data)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_CERT TLS certificate file (default: ./certs/server.crt)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_KEY TLS private key file (default: ./certs/server.key)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_FILE_PERSISTENCE Persist DuckDB to <data_dir>/<username>.duckdb (1 or true)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_PROCESS_ISOLATION Enable process isolation (1 or true)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_IDLE_TIMEOUT Connection idle timeout (e.g., 30m, 1h, -1 to disable)\n")
fmt.Fprintf(os.Stderr, " DUCKGRES_MEMORY_LIMIT DuckDB memory_limit per session (e.g., 4GB)\n")
Expand Down Expand Up @@ -386,6 +389,7 @@ func main() {
DataDir: *dataDir,
CertFile: *certFile,
KeyFile: *keyFile,
FilePersistence: *filePersistence,
ProcessIsolation: *processIsolation,
IdleTimeout: *idleTimeout,
MemoryLimit: *memoryLimit,
Expand Down
85 changes: 85 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,60 @@ func TestResolveEffectiveConfigACMEDNSProviderValidation(t *testing.T) {
}
}

func TestResolveEffectiveConfigFilePersistenceFromFile(t *testing.T) {
fileCfg := &FileConfig{
FilePersistence: true,
DataDir: "/tmp/data",
}
resolved := resolveEffectiveConfig(fileCfg, configCLIInputs{}, envFromMap(nil), nil)
if !resolved.Server.FilePersistence {
t.Fatal("expected file_persistence from YAML to be true")
}
}

func TestResolveEffectiveConfigFilePersistenceFromEnv(t *testing.T) {
env := map[string]string{
"DUCKGRES_FILE_PERSISTENCE": "true",
}
resolved := resolveEffectiveConfig(nil, configCLIInputs{}, envFromMap(env), nil)
if !resolved.Server.FilePersistence {
t.Fatal("expected file_persistence from env to be true")
}
}

func TestResolveEffectiveConfigFilePersistenceEnvOverridesFile(t *testing.T) {
fileCfg := &FileConfig{
FilePersistence: true,
}
env := map[string]string{
"DUCKGRES_FILE_PERSISTENCE": "false",
}
resolved := resolveEffectiveConfig(fileCfg, configCLIInputs{}, envFromMap(env), nil)
if resolved.Server.FilePersistence {
t.Fatal("expected env false to override file true")
}
}

func TestResolveEffectiveConfigFilePersistenceCLIOverridesEnv(t *testing.T) {
env := map[string]string{
"DUCKGRES_FILE_PERSISTENCE": "false",
}
resolved := resolveEffectiveConfig(nil, configCLIInputs{
Set: map[string]bool{"file-persistence": true},
FilePersistence: true,
}, envFromMap(env), nil)
if !resolved.Server.FilePersistence {
t.Fatal("expected CLI true to override env false")
}
}

func TestResolveEffectiveConfigFilePersistenceDefaultFalse(t *testing.T) {
resolved := resolveEffectiveConfig(nil, configCLIInputs{}, envFromMap(nil), nil)
if resolved.Server.FilePersistence {
t.Fatal("expected file_persistence to default to false")
}
}

func TestResolveEffectiveConfigACMEDNSRequiresDomain(t *testing.T) {
fileCfg := &FileConfig{
TLS: TLSConfig{
Expand Down Expand Up @@ -767,3 +821,34 @@ func TestResolveEffectiveConfigACMEDNSRequiresDomain(t *testing.T) {
t.Fatalf("expected warning about missing ACME domain for DNS mode, warnings: %v", warns)
}
}

func TestFilePersistenceRequiresDataDir(t *testing.T) {
var warns []string
// Use CLI to explicitly set data-dir to empty, overriding the default.
resolved := resolveEffectiveConfig(
&FileConfig{
FilePersistence: true,
},
configCLIInputs{
Set: map[string]bool{"data-dir": true},
DataDir: "",
},
nil,
func(msg string) { warns = append(warns, msg) },
)

if resolved.Server.FilePersistence {
t.Fatal("expected FilePersistence to be disabled when DataDir is empty")
}

found := false
for _, w := range warns {
if strings.Contains(w, "file_persistence is enabled but data_dir is empty") {
found = true
break
}
}
if !found {
t.Fatalf("expected warning about empty data_dir, warnings: %v", warns)
}
}
73 changes: 58 additions & 15 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ type clientConn struct {
ctx context.Context // connection context, cancelled when connection is closed
cancel context.CancelFunc // cancels the connection context

// sharedDB is true when this connection uses a shared file-persistence DB pool.
// Cleanup differs: we return the pinned conn to the pool instead of closing the DB.
sharedDB bool

// pg_stat_activity fields
backendStart time.Time // when this connection started
applicationName string // from startup params
Expand Down Expand Up @@ -415,6 +419,28 @@ func (c *clientConn) safeCleanupDB() {
}()

cleanupTimeout := 5 * time.Second

if c.sharedDB {
// Shared file-persistence pool: ROLLBACK any open transaction on the
// pinned connection, then return it to the pool. Skip DuckLake DETACH
// since the underlying DB is shared across connections.
if c.txStatus == txStatusTransaction || c.txStatus == txStatusError {
ctx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
_, err := c.executor.ExecContext(ctx, "ROLLBACK")
cancel()
if err != nil {
slog.Warn("Failed to rollback transaction during cleanup.",
"user", c.username, "error", err)
}
}
// Close returns the pinned *sql.Conn to the pool (does not close the DB).
if err := c.executor.Close(); err != nil {
slog.Warn("Failed to return connection to pool.", "user", c.username, "error", err)
}
c.server.releaseFileDB(c.username)
return
}

connHealthy := true

// Check connection health. For DuckLake, we need to actually run a query that
Expand Down Expand Up @@ -618,23 +644,40 @@ func (c *clientConn) serve() error {
// Create a DuckDB connection for this client session (unless pre-created by caller)
var stopRefresh func()
if c.executor == nil {
var db *sql.DB
var err error
if c.passthrough {
db, err = CreatePassthroughDBConnection(c.server.cfg, c.server.duckLakeSem, c.username, processStartTime, processVersion)
if c.server.cfg.FilePersistence {
db, err := c.server.acquireFileDB(c.username, c.passthrough)
if err != nil {
c.sendError("FATAL", "28000", fmt.Sprintf("failed to open database: %v", err))
return err
}
conn, err := db.Conn(c.ctx)
if err != nil {
c.server.releaseFileDB(c.username)
c.sendError("FATAL", "28000", fmt.Sprintf("failed to get pooled connection: %v", err))
return err
}
c.executor = NewPinnedExecutor(conn, db)
c.sharedDB = true
// Don't start per-connection credential refresh; the pool manages it.
} else {
db, err = c.server.createDBConnection(c.username)
}
if err != nil {
c.sendError("FATAL", "28000", fmt.Sprintf("failed to open database: %v", err))
return err
}
c.executor = NewLocalExecutor(db)
var db *sql.DB
var err error
if c.passthrough {
db, err = CreatePassthroughDBConnection(c.server.cfg, c.server.duckLakeSem, c.username, processStartTime, processVersion)
} else {
db, err = c.server.createDBConnection(c.username)
}
if err != nil {
c.sendError("FATAL", "28000", fmt.Sprintf("failed to open database: %v", err))
return err
}
c.executor = NewLocalExecutor(db)

// Start background credential refresh for long-lived connections.
// Only needed when we create the DB here; the control plane manages
// refresh for pre-created connections via DBPool.
stopRefresh = StartCredentialRefresh(db, c.server.cfg.DuckLake)
// Start background credential refresh for long-lived connections.
// Only needed when we create the DB here; the control plane manages
// refresh for pre-created connections via DBPool.
stopRefresh = StartCredentialRefresh(db, c.server.cfg.DuckLake)
}
}
// Defers run LIFO: close cursors first (they hold open RowSets), then stop
// credential refresh, then clean up the database connection.
Expand Down
53 changes: 53 additions & 0 deletions server/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,59 @@ func (e *LocalExecutor) Close() error {
return e.db.Close()
}

// PinnedExecutor wraps a pinned *sql.Conn from a shared *sql.DB pool
// to implement QueryExecutor for file-persistence mode.
type PinnedExecutor struct {
conn *sql.Conn
db *sql.DB
}

func NewPinnedExecutor(conn *sql.Conn, db *sql.DB) *PinnedExecutor {
return &PinnedExecutor{conn: conn, db: db}
}

// DB returns the underlying *sql.DB (for credential refresh and other direct access).
func (e *PinnedExecutor) DB() *sql.DB {
return e.db
}

func (e *PinnedExecutor) QueryContext(ctx context.Context, query string, args ...any) (RowSet, error) {
rows, err := e.conn.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &LocalRowSet{rows: rows}, nil
}

func (e *PinnedExecutor) ExecContext(ctx context.Context, query string, args ...any) (ExecResult, error) {
return e.conn.ExecContext(ctx, query, args...)
}

func (e *PinnedExecutor) Query(query string, args ...any) (RowSet, error) {
rows, err := e.conn.QueryContext(context.Background(), query, args...)
if err != nil {
return nil, err
}
return &LocalRowSet{rows: rows}, nil
}

func (e *PinnedExecutor) Exec(query string, args ...any) (ExecResult, error) {
return e.conn.ExecContext(context.Background(), query, args...)
}

func (e *PinnedExecutor) ConnContext(ctx context.Context) (RawConn, error) {
return e.db.Conn(ctx)
}

func (e *PinnedExecutor) PingContext(ctx context.Context) error {
return e.conn.PingContext(ctx)
}

// Close returns the pinned connection to the pool; it does not close the underlying DB.
func (e *PinnedExecutor) Close() error {
return e.conn.Close()
}

// LocalRowSet wraps *sql.Rows to implement RowSet.
type LocalRowSet struct {
rows *sql.Rows
Expand Down
Loading