Conversation
… disconnect detection
…uard, spawn request_from_agent
…ts initialization
There was a problem hiding this comment.
Pull request overview
This PR introduces a continuous-batching inference path for llama.cpp by replacing the previous per-slot Actix SyncArbiter model with a single continuous-batch scheduler thread that multiplexes multiple concurrent sequences, and updates the test suite accordingly.
Changes:
- Replace legacy
LlamaCppSlot/LlamaCppArbiterHandle-based execution withContinuousBatchArbiter+ContinuousBatchSchedulercommand loop. - Improve client-disconnect handling in the balancer response forwarding path and adjust WebSocket request handling to be fully asynchronous.
- Expand/adjust model and integration tests to cover continuous batching behavior (stop signals, disconnects, slot exhaustion, shutdown) and add explicit
slotsconfiguration.
Reviewed changes
Copilot reviewed 56 out of 57 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| paddler/src/slot_status.rs | Removed legacy slot status wrapper (replaced by aggregated status usage). |
| paddler/src/slot_request_drop_guard.rs | Refactored guard to operate directly on SlotAggregatedStatus. |
| paddler/src/slot_aggregated_status_manager.rs | Removed bind_slot_status helper no longer needed with new scheduler. |
| paddler/src/lib.rs | Removed exports for legacy slot modules. |
| paddler/src/cmd/agent.rs | Wire agent service to use continuous_batch_arbiter_handle field. |
| paddler/src/balancer/request_from_agent.rs | Stop streaming loop if sending to client fails; return send success boolean. |
| paddler/src/balancer/inference_service/http_route/api/ws_inference_socket/mod.rs | Spawn request processing tasks to avoid awaiting within message handler. |
| paddler/src/agent/sequence_id_pool.rs | New pool to manage sequence IDs for concurrent batching. |
| paddler/src/agent/mod.rs | Add continuous batching modules; remove legacy arbiter/slot modules. |
| paddler/src/agent/llamacpp_slot.rs | Deleted legacy per-slot Actix worker implementation. |
| paddler/src/agent/llamacpp_arbiter_service.rs | Switch service to manage ContinuousBatchArbiterHandle and forward scheduler commands. |
| paddler/src/agent/llamacpp_arbiter_handle.rs | Deleted legacy handle type (Actix addr + shutdown). |
| paddler/src/agent/kv_cache_repair_action.rs | Deleted legacy KV-cache repair enum (old slot implementation only). |
| paddler/src/agent/generate_embedding_batch_request.rs | Remove Actix Message derive; used as plain command payload. |
| paddler/src/agent/continuous_batch_scheduler.rs | New continuous batching scheduler implementation (decode loop, sampling, embeddings, cleanup). |
| paddler/src/agent/continuous_batch_scheduler_context.rs | New scheduler context type (renamed from old slot context). |
| paddler/src/agent/continuous_batch_scheduler_command.rs | New command enum for scheduler control plane. |
| paddler/src/agent/continuous_batch_request_phase.rs | New request state enum for ingest/generate/complete phases. |
| paddler/src/agent/continuous_batch_arbiter.rs | New arbiter that loads model/context and runs scheduler thread. |
| paddler/src/agent/continuous_batch_arbiter_handle.rs | New handle: command_tx + joinable scheduler thread handle. |
| paddler/src/agent/continuous_batch_active_request.rs | New per-request state struct tracked by scheduler. |
| paddler/src/agent/continue_from_raw_prompt_request.rs | Remove Actix Message derive; used as plain command payload. |
| paddler/src/agent/continue_from_conversation_history_request.rs | Remove Actix Message derive; used as plain command payload. |
| paddler_model_tests/tests/smolvlm2.rs | Add explicit slots configuration to managed model params. |
| paddler_model_tests/tests/qwen35_thinking.rs | Add explicit slots configuration to managed model params. |
| paddler_model_tests/tests/qwen35_thinking_multi_turn.rs | Add explicit slots configuration to managed model params. |
| paddler_model_tests/tests/qwen35_system_message.rs | Add explicit slots configuration to managed model params. |
| paddler_model_tests/tests/qwen35_system_message_thinking.rs | Add explicit slots configuration to managed model params. |
| paddler_model_tests/tests/qwen35_multimodal.rs | Add explicit slots configuration to managed model params. |
| paddler_model_tests/tests/qwen35_multimodal_not_supported.rs | Adapt test expectations to streamed GeneratedTokenResult::* errors; add slots. |
| paddler_model_tests/tests/qwen35_long_prompt.rs | Add explicit slots configuration to managed model params. |
| paddler_model_tests/tests/qwen35_generation.rs | Add explicit slots configuration to managed model params. |
| paddler_model_tests/tests/qwen3_raw_prompt.rs | Add explicit slots configuration to managed model params. |
| paddler_model_tests/tests/qwen3_grammar.rs | Add explicit slots configuration and adapt error expectation to streamed results. |
| paddler_model_tests/tests/qwen3_conversation.rs | Add explicit slots configuration to managed model params. |
| paddler_model_tests/tests/qwen25vl_multimodal.rs | Add explicit slots configuration to managed model params. |
| paddler_model_tests/tests/continuous_batch_stop_signal.rs | New test: stop signal terminates generation early. |
| paddler_model_tests/tests/continuous_batch_stop_sender_dropped.rs | New test: dropping stop sender stops generation and frees capacity. |
| paddler_model_tests/tests/continuous_batch_slot_leak_on_shutdown.rs | New test: shutdown during active request releases all slots. |
| paddler_model_tests/tests/continuous_batch_slot_exhaustion.rs | New test: reject second request when only one slot is available. |
| paddler_model_tests/tests/continuous_batch_shutdown_during_generation.rs | New test: shutdown during generation does not hang. |
| paddler_model_tests/tests/continuous_batch_sequential_reuse.rs | New test: slot is reusable after request completion. |
| paddler_model_tests/tests/continuous_batch_mixed_prompt_lengths.rs | New test: concurrent long/short prompts both complete. |
| paddler_model_tests/tests/continuous_batch_max_tokens.rs | New test: generation stops exactly at max_tokens. |
| paddler_model_tests/tests/continuous_batch_embedding_rejected.rs | New test: embeddings rejected while generation is active. |
| paddler_model_tests/tests/continuous_batch_distinct_output.rs | New test: different prompts produce different outputs concurrently. |
| paddler_model_tests/tests/continuous_batch_conversation_history.rs | New test: concurrent conversation-history requests complete. |
| paddler_model_tests/tests/continuous_batch_concurrent_generation.rs | New test: multiple concurrent raw-prompt requests complete. |
| paddler_model_tests/tests/continuous_batch_client_disconnect.rs | New test: client disconnect releases slot for subsequent requests. |
| paddler_model_tests/src/model_test_harness.rs | Switch harness to send scheduler commands instead of Actix addr; export token collection helper. |
| paddler_model_tests/src/managed_model.rs | Managed model now spawns ContinuousBatchArbiter; add slots param; shutdown now mutably consumes handle. |
| paddler_integration_tests/tests/management_sse_stream.rs | New integration test: multiple SSE listeners receive ongoing updates. |
| paddler_integration_tests/tests/agent_websocket_disconnect.rs | New integration test: websocket disconnect releases slot quickly. |
| paddler_integration_tests/tests/agent_graceful_shutdown.rs | New integration test: agent exits gracefully during active generation. |
| Makefile | Add timeouts to test targets. |
| Cargo.toml | Add [patch.crates-io] overrides for llama bindings (local path). |
| Cargo.lock | Reflect llama bindings patching (removes registry source/checksum entries). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Cargo.toml
Outdated
|
|
||
| [patch.crates-io] | ||
| llama-cpp-bindings = { path = "../llama-cpp-bindings/llama-cpp-bindings" } | ||
| llama-cpp-bindings-sys = { path = "../llama-cpp-bindings/llama-cpp-bindings-sys" } |
There was a problem hiding this comment.
The workspace [patch.crates-io] overrides llama-cpp-bindings* to paths outside this repository (../llama-cpp-bindings/...). This will break builds/CI for anyone who doesn’t have that sibling directory. Prefer removing this patch, vendoring the crates into this repo, or using a git dependency with a stable revision instead of a local path.
| [patch.crates-io] | |
| llama-cpp-bindings = { path = "../llama-cpp-bindings/llama-cpp-bindings" } | |
| llama-cpp-bindings-sys = { path = "../llama-cpp-bindings/llama-cpp-bindings-sys" } |
| if let Some(arbiter_handle) = self.continuous_batch_arbiter_handle.as_mut() { | ||
| arbiter_handle | ||
| .shutdown() | ||
| .context("Unable to stop arbiter controller")?; | ||
| } | ||
|
|
||
| self.continuous_batch_arbiter_handle = None; | ||
|
|
There was a problem hiding this comment.
apply_state calls shutdown() via as_mut(), and only sets continuous_batch_arbiter_handle = None after shutdown() succeeds. If shutdown() errors after consuming the thread handle (e.g., join error), the service can be left with Some(handle) but no thread handle, causing subsequent state applications to fail repeatedly. Consider take()-ing the handle (or setting it to None first) before attempting shutdown, mirroring the previous pattern.
| if let Some(arbiter_handle) = self.continuous_batch_arbiter_handle.as_mut() { | |
| arbiter_handle | |
| .shutdown() | |
| .context("Unable to stop arbiter controller")?; | |
| } | |
| self.continuous_batch_arbiter_handle = None; | |
| if let Some(mut arbiter_handle) = self.continuous_batch_arbiter_handle.take() { | |
| arbiter_handle | |
| .shutdown() | |
| .context("Unable to stop arbiter controller")?; | |
| } |
| use crate::decoded_image_error::DecodedImageError; | ||
| use crate::dispenses_slots::DispensesSlots; | ||
| use crate::embedding_input_tokenized::EmbeddingInputTokenized; | ||
| use crate::slot_aggregated_status::SlotAggregatedStatus; |
There was a problem hiding this comment.
Unused import: crate::dispenses_slots::DispensesSlots is imported but not used anywhere in this file. Removing it avoids warnings and keeps imports accurate.
| use crate::slot_aggregated_status::SlotAggregatedStatus; |
| @@ -30,8 +30,6 @@ pub mod sets_desired_state; | |||
| pub mod slot_aggregated_status; | |||
| pub mod slot_aggregated_status_download_progress; | |||
| pub mod slot_aggregated_status_manager; | |||
There was a problem hiding this comment.
slot_request_drop_guard.rs is still present/modified in this PR, but lib.rs no longer declares pub mod slot_request_drop_guard;, so the module is no longer compiled or accessible. Either re-add the module (or include it privately where needed) or delete the file to avoid dead/unreachable code.
| pub mod slot_aggregated_status_manager; | |
| pub mod slot_aggregated_status_manager; | |
| pub mod slot_request_drop_guard; |
|
|
||
| use crate::dispenses_slots::DispensesSlots as _; | ||
| use crate::slot_status::SlotStatus; | ||
| use crate::dispenses_slots::DispensesSlots; |
There was a problem hiding this comment.
Unused import: crate::dispenses_slots::DispensesSlots is no longer used in this file after refactoring to call methods directly on SlotAggregatedStatus. Consider removing it to avoid warnings if/when this module is re-enabled.
| use crate::dispenses_slots::DispensesSlots; |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 63 out of 64 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (3)
paddler/src/agent/continuous_batch_arbiter.rs:92
n_seq_maxis set toembedding_n_seq_maxwhenever embeddings are enabled. Ifdesired_slots_totalis greater thanembedding_n_seq_max, the scheduler can still accept up todesired_slots_totalconcurrent sequences, but the llama.cpp context will only have KV/cache capacity forn_seq_maxsequences. That mismatch will reliably triggerDecodeError::NoKvCacheSlotand evictions under load. Consider settingn_seq_maxto at leastdesired_slots_total(e.g.,max(desired_slots_total as u32, embedding_n_seq_max)) and/or enforcing the same limit in the scheduler’s sequence pool.
paddler/src/agent/continuous_batch_arbiter.rs:293ContinuousBatchScheduler::new(..., max_concurrent_sequences: i32, ...)is passeddesired_slots_totalhere, but the context’swith_n_seq_max(...)may be configured differently (especially when embeddings are enabled). Please pass the effectiven_seq_max(or otherwise ensureSequenceIdPoolcannot hand out more sequence IDs than the context supports) to avoid KV cache slot exhaustion and forced evictions.
paddler/src/agent/continuous_batch_arbiter.rs:61std::sync::mpsc::channel()is unbounded; if clients enqueue requests faster than the scheduler thread can consume them, this can grow without bound and increase memory usage/latency. Consider using a bounded channel (and returning a backpressure/overload error when full) to keep resource usage predictable under load.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let validated_params = conversation_history_params.validate()?; | ||
|
|
||
| rt::spawn(async move { | ||
| if let Err(err) = request_from_agent( | ||
| context.buffered_request_manager.clone(), | ||
| connection_close_tx, | ||
| context.inference_service_configuration.clone(), | ||
| validated_params, | ||
| request_id.clone(), | ||
| websocket_session_controller, | ||
| ) | ||
| .await | ||
| { | ||
| error!("Request {request_id:?} failed: {err}"); | ||
| } | ||
| }); |
There was a problem hiding this comment.
actix_web::rt::spawn typically requires the spawned future to be Send + 'static, but WebSocketSessionController wraps actix_ws::Session (see paddler/src/websocket_session_controller.rs:14), which is generally !Send. Spawning request_from_agent(...) like this is likely to fail to compile or run on the wrong executor. Consider using a local spawn (spawn_local) or restructuring so the spawned task only owns Send types (e.g., send outgoing messages through a channel handled on the websocket task).
| id: request_id, | ||
| request: InferenceJsonRpcRequest::ContinueFromRawPrompt(raw_prompt_params), | ||
| }) => { | ||
| request_from_agent( | ||
| context.buffered_request_manager.clone(), | ||
| connection_close_tx, | ||
| context.inference_service_configuration.clone(), | ||
| raw_prompt_params, | ||
| request_id, | ||
| websocket_session_controller, | ||
| ) | ||
| .await?; | ||
| rt::spawn(async move { | ||
| if let Err(err) = request_from_agent( | ||
| context.buffered_request_manager.clone(), | ||
| connection_close_tx, | ||
| context.inference_service_configuration.clone(), | ||
| raw_prompt_params, | ||
| request_id.clone(), | ||
| websocket_session_controller, | ||
| ) | ||
| .await | ||
| { | ||
| error!("Request {request_id:?} failed: {err}"); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Same issue as above: this second rt::spawn(async move { ... request_from_agent(...) ... }) also moves a WebSocketSessionController into a background task. If actix_web::rt::spawn requires Send, this will break; even if it compiles, it risks running websocket I/O on an unexpected executor. Prefer spawn_local or a channel-based handoff to a single websocket writer task.
| if self.has_active_requests() { | ||
| let request = self.pending_embedding_requests.remove(0); | ||
|
|
There was a problem hiding this comment.
pending_embedding_requests.remove(0) is an O(n) shift on every embedding request. If embedding requests can queue up (or if generation keeps rejecting them), this becomes unnecessarily costly. Using a VecDeque (pop_front/push_back) would make this O(1) and simplify the “take first pending request” logic.
| self.command_tx | ||
| .send(ContinuousBatchSchedulerCommand::Shutdown) | ||
| .map_err(|err| anyhow!("Failed to send shutdown command: {err}"))?; | ||
|
|
||
| let thread_handle = self | ||
| .scheduler_thread_handle | ||
| .take() | ||
| .ok_or_else(|| anyhow!("Scheduler thread handle already consumed"))?; | ||
|
|
||
| thread_handle | ||
| .join() | ||
| .map_err(|err| anyhow!("Failed to join scheduler thread: {err:?}"))??; | ||
|
|
||
| Ok(()) |
There was a problem hiding this comment.
If the scheduler thread has already exited (receiver dropped), command_tx.send(Shutdown) will fail and shutdown() returns early without joining the thread. That turns a benign “already stopped” state into an error path and can skip collecting the thread result. Consider attempting to join() even when send() fails (treat send failure as an already-shutdown scheduler) so shutdown is idempotent and cleanup always happens.
| self.command_tx | |
| .send(ContinuousBatchSchedulerCommand::Shutdown) | |
| .map_err(|err| anyhow!("Failed to send shutdown command: {err}"))?; | |
| let thread_handle = self | |
| .scheduler_thread_handle | |
| .take() | |
| .ok_or_else(|| anyhow!("Scheduler thread handle already consumed"))?; | |
| thread_handle | |
| .join() | |
| .map_err(|err| anyhow!("Failed to join scheduler thread: {err:?}"))??; | |
| Ok(()) | |
| let shutdown_send_result = self | |
| .command_tx | |
| .send(ContinuousBatchSchedulerCommand::Shutdown) | |
| .map_err(|err| anyhow!("Failed to send shutdown command: {err}")); | |
| let thread_handle = self.scheduler_thread_handle.take(); | |
| if let Some(thread_handle) = thread_handle { | |
| thread_handle | |
| .join() | |
| .map_err(|err| anyhow!("Failed to join scheduler thread: {err:?}"))??; | |
| return Ok(()); | |
| } | |
| shutdown_send_result?; | |
| Err(anyhow!("Scheduler thread handle already consumed")) |
No description provided.