Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions controlplane/k8s_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,12 +814,14 @@ func (p *K8sWorkerPool) RetireWorker(id int) {
p.retireWorkerWithReason(id, RetireReasonNormal)
}

func (p *K8sWorkerPool) retireWorkerWithReason(id int, reason string) {
// retireWorkerWithReason retires a worker and deletes its pod.
// Returns true if the worker was found and retired.
func (p *K8sWorkerPool) retireWorkerWithReason(id int, reason string) bool {
p.mu.Lock()
w, ok := p.workers[id]
if !ok {
p.mu.Unlock()
return
return false
}
p.markWorkerRetiredLocked(w, reason)
delete(p.workers, id)
Expand All @@ -828,6 +830,7 @@ func (p *K8sWorkerPool) retireWorkerWithReason(id int, reason string) {
observeControlPlaneWorkers(workerCount)

go p.retireWorkerPod(id, w)
return true
}

// RetireWorkerIfNoSessions retires a worker only if it has no active sessions.
Expand Down Expand Up @@ -1655,17 +1658,21 @@ func (p *K8sWorkerPool) ShutdownAll() {

// retireWorkerPod closes the gRPC client and deletes the worker pod.
func (p *K8sWorkerPool) retireWorkerPod(id int, w *ManagedWorker) {
slog.Info("Retiring K8s worker.", "id", id)
podName := p.workerPodName(w)
slog.Info("Retiring K8s worker.", "id", id, "pod", podName)
if w.client != nil {
_ = w.client.Close()
}
podName := p.workerPodName(w)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
_ = p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, podName, metav1.DeleteOptions{
if err := p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, podName, metav1.DeleteOptions{
GracePeriodSeconds: int64Ptr(10),
})
_ = p.deleteWorkerRPCSecret(ctx, podName)
}); err != nil {
slog.Warn("Failed to delete worker pod.", "id", id, "pod", podName, "error", err)
}
if err := p.deleteWorkerRPCSecret(ctx, podName); err != nil {
slog.Warn("Failed to delete worker RPC secret.", "id", id, "pod", podName, "error", err)
}
}

// idleReaper periodically retires workers that have been idle too long and
Expand Down
9 changes: 1 addition & 8 deletions controlplane/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,7 @@ func SetupMultiTenant(
router.sharedPool.retireClaimedWorker(&record, reason)
}
janitor.retireLocalWorker = func(workerID int, reason string) bool {
router.sharedPool.mu.Lock()
_, local := router.sharedPool.workers[workerID]
router.sharedPool.mu.Unlock()
if !local {
return false
}
router.sharedPool.retireWorkerWithReason(workerID, reason)
return true
return router.sharedPool.retireWorkerWithReason(workerID, reason)
}
janitor.reconcileWarmCapacity = func() {
target := router.sharedPool.WarmCapacityTarget()
Expand Down
Loading