diff --git a/controlplane/control.go b/controlplane/control.go index 195573b..c8d871a 100644 --- a/controlplane/control.go +++ b/controlplane/control.go @@ -603,6 +603,8 @@ func sessionCreationErrorResponse(err error) (code string, message string) { func (cp *ControlPlane) handleConnection(conn net.Conn) { remoteAddr := conn.RemoteAddr() slog.Info("Connection accepted.", "remote_addr", remoteAddr) + server.IncrementOpenConnections() + defer server.DecrementOpenConnections() releaseRateLimit, msg := server.BeginRateLimitedAuthAttempt(cp.rateLimiter, remoteAddr) if msg != "" { @@ -864,7 +866,15 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) { _ = writer.Flush() return } - defer sessions.DestroySession(pid) + if orgID != "" { + observeOrgSessionsActive(orgID, sessions.SessionCount()) + } + defer func() { + sessions.DestroySession(pid) + if orgID != "" { + observeOrgSessionsActive(orgID, sessions.SessionCount()) + } + }() // Register the TCP connection so OnWorkerCrash can close it to unblock // the message loop if the backing worker dies. diff --git a/controlplane/flight_ingress_metrics_stub.go b/controlplane/flight_ingress_metrics_stub.go new file mode 100644 index 0000000..ebaad3d --- /dev/null +++ b/controlplane/flight_ingress_metrics_stub.go @@ -0,0 +1,5 @@ +//go:build !kubernetes + +package controlplane + +func observeOrgSessionsActive(string, int) {} diff --git a/controlplane/warm_pool_metrics.go b/controlplane/warm_pool_metrics.go index 9d35415..f45fbe1 100644 --- a/controlplane/warm_pool_metrics.go +++ b/controlplane/warm_pool_metrics.go @@ -31,6 +31,11 @@ var hotWorkersGauge = promauto.NewGauge(prometheus.GaugeOpts{ Help: "Number of activated, tenant-bound workers serving sessions", }) +var hotIdleWorkersGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "duckgres_hot_idle_workers", + Help: "Number of activated workers retaining org assignment between sessions", +}) + var drainingWorkersGauge = promauto.NewGauge(prometheus.GaugeOpts{ Name: "duckgres_draining_workers", Help: "Number of workers currently draining sessions before retirement", @@ -76,7 +81,7 @@ const ( // observeWarmPoolLifecycleGauges recalculates all lifecycle gauges from the // current worker map. Must be called with p.mu held (at least RLock). func observeWarmPoolLifecycleGauges(workers map[int]*ManagedWorker) { - var idle, reserved, activating, hot, draining int + var idle, reserved, activating, hot, hotIdle, draining int for _, w := range workers { select { case <-w.done: @@ -92,6 +97,8 @@ func observeWarmPoolLifecycleGauges(workers map[int]*ManagedWorker) { activating++ case WorkerLifecycleHot: hot++ + case WorkerLifecycleHotIdle: + hotIdle++ case WorkerLifecycleDraining: draining++ } @@ -100,6 +107,7 @@ func observeWarmPoolLifecycleGauges(workers map[int]*ManagedWorker) { reservedWorkersGauge.Set(float64(reserved)) activatingWorkersGauge.Set(float64(activating)) hotWorkersGauge.Set(float64(hot)) + hotIdleWorkersGauge.Set(float64(hotIdle)) drainingWorkersGauge.Set(float64(draining)) } diff --git a/server/server.go b/server/server.go index e96f025..8f037a0 100644 --- a/server/server.go +++ b/server/server.go @@ -53,6 +53,13 @@ var connectionsGauge = promauto.NewGauge(prometheus.GaugeOpts{ Help: "Number of currently open client connections", }) +// IncrementOpenConnections increments the open connections gauge. +// Used by the control plane which handles connections separately from the standalone server. +func IncrementOpenConnections() { connectionsGauge.Inc() } + +// DecrementOpenConnections decrements the open connections gauge. +func DecrementOpenConnections() { connectionsGauge.Dec() } + var queryDurationHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "duckgres_query_duration_seconds", Help: "Query execution duration in seconds",