From bcca13b7a2bebf352aadf23a529b4235a0014b76 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 12:04:17 -0700 Subject: [PATCH 01/12] =?UTF-8?q?epic:=20Queue-based=20trigger=20=E2=80=94?= =?UTF-8?q?=20internal=20task=20queue=20with=20push=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit From b852f4201851979a87d019d69b5258b39d64c841 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 12:04:27 -0700 Subject: [PATCH 02/12] feat: add queue table migration and storage operations for task queue From 733148bdac0655535c6aeef05cf355364cf23d59 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 12:08:03 -0700 Subject: [PATCH 03/12] feat(storage): add task_queue table migration and storage operations for queue-based trigger --- crates/orchestrator/src/entity/mod.rs | 1 + crates/orchestrator/src/entity/task_queue.rs | 33 +++ .../m20260328_000011_add_task_queue.rs | 67 ++++++ crates/orchestrator/src/migration/mod.rs | 2 + crates/orchestrator/src/scheduler/storage.rs | 201 +++++++++++++++++- 5 files changed, 301 insertions(+), 3 deletions(-) create mode 100644 crates/orchestrator/src/entity/task_queue.rs create mode 100644 crates/orchestrator/src/migration/m20260328_000011_add_task_queue.rs diff --git a/crates/orchestrator/src/entity/mod.rs b/crates/orchestrator/src/entity/mod.rs index a94d2908..4e105793 100644 --- a/crates/orchestrator/src/entity/mod.rs +++ b/crates/orchestrator/src/entity/mod.rs @@ -2,5 +2,6 @@ pub mod agent; pub mod dispatch; +pub mod task_queue; pub mod usage_session; pub mod workflow; diff --git a/crates/orchestrator/src/entity/task_queue.rs b/crates/orchestrator/src/entity/task_queue.rs new file mode 100644 index 00000000..729a19f7 --- /dev/null +++ b/crates/orchestrator/src/entity/task_queue.rs @@ -0,0 +1,33 @@ +//! SeaORM entity for the `task_queue` table. + +use sea_orm::entity::prelude::*; + +/// Task status values for queue entries. +pub const STATUS_PENDING: &str = "pending"; +pub const STATUS_PROCESSING: &str = "processing"; +pub const STATUS_COMPLETED: &str = "completed"; +pub const STATUS_FAILED: &str = "failed"; +pub const STATUS_DEAD: &str = "dead"; + +/// SeaORM model for the `task_queue` table. +#[derive(Clone, Debug, PartialEq, DeriveEntityModel)] +#[sea_orm(table_name = "task_queue")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: String, + pub queue_name: String, + pub title: String, + pub body: Option, + pub priority: i32, + pub status: String, + pub visibility_timeout_at: Option, + pub retry_count: i32, + pub max_retries: i32, + pub created_at: String, + pub updated_at: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/orchestrator/src/migration/m20260328_000011_add_task_queue.rs b/crates/orchestrator/src/migration/m20260328_000011_add_task_queue.rs new file mode 100644 index 00000000..8f38c15f --- /dev/null +++ b/crates/orchestrator/src/migration/m20260328_000011_add_task_queue.rs @@ -0,0 +1,67 @@ +//! Migration 11: add the `task_queue` table for queue-based triggers. + +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(TaskQueue::Table) + .if_not_exists() + .col(ColumnDef::new(TaskQueue::Id).string().not_null().primary_key()) + .col(ColumnDef::new(TaskQueue::QueueName).string().not_null()) + .col(ColumnDef::new(TaskQueue::Title).string().not_null()) + .col(ColumnDef::new(TaskQueue::Body).string().null()) + .col(ColumnDef::new(TaskQueue::Priority).integer().not_null().default(0)) + .col(ColumnDef::new(TaskQueue::Status).string().not_null().default("pending")) + .col(ColumnDef::new(TaskQueue::VisibilityTimeoutAt).string().null()) + .col(ColumnDef::new(TaskQueue::RetryCount).integer().not_null().default(0)) + .col(ColumnDef::new(TaskQueue::MaxRetries).integer().not_null().default(3)) + .col(ColumnDef::new(TaskQueue::CreatedAt).string().not_null()) + .col(ColumnDef::new(TaskQueue::UpdatedAt).string().not_null()) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("idx_task_queue_name_status") + .table(TaskQueue::Table) + .col(TaskQueue::QueueName) + .col(TaskQueue::Status) + .col(TaskQueue::Priority) + .col(TaskQueue::CreatedAt) + .if_not_exists() + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager.drop_table(Table::drop().table(TaskQueue::Table).to_owned()).await + } +} + +#[derive(DeriveIden)] +enum TaskQueue { + Table, + Id, + QueueName, + Title, + Body, + Priority, + Status, + VisibilityTimeoutAt, + RetryCount, + MaxRetries, + CreatedAt, + UpdatedAt, +} diff --git a/crates/orchestrator/src/migration/mod.rs b/crates/orchestrator/src/migration/mod.rs index b1e63c7a..50ba943a 100644 --- a/crates/orchestrator/src/migration/mod.rs +++ b/crates/orchestrator/src/migration/mod.rs @@ -21,6 +21,7 @@ mod m20260316_000007_rename_trigger_columns; mod m20260319_000008_add_rooms_to_agents; mod m20260323_000009_add_launch_command_to_agents; mod m20260324_000010_add_system_prompt_fields_to_agents; +mod m20260328_000011_add_task_queue; /// The migration runner — applies all known migrations in order. pub struct Migrator; @@ -39,6 +40,7 @@ impl MigratorTrait for Migrator { Box::new(m20260319_000008_add_rooms_to_agents::Migration), Box::new(m20260323_000009_add_launch_command_to_agents::Migration), Box::new(m20260324_000010_add_system_prompt_fields_to_agents::Migration), + Box::new(m20260328_000011_add_task_queue::Migration), ] } } diff --git a/crates/orchestrator/src/scheduler/storage.rs b/crates/orchestrator/src/scheduler/storage.rs index 98d216c1..657ff630 100644 --- a/crates/orchestrator/src/scheduler/storage.rs +++ b/crates/orchestrator/src/scheduler/storage.rs @@ -6,14 +6,16 @@ //! [`crate::migration::Migrator`] that runs at startup. use crate::{ - entity::{dispatch as dispatch_entity, workflow as workflow_entity}, + entity::{ + dispatch as dispatch_entity, task_queue as queue_entity, workflow as workflow_entity, + }, scheduler::types::{DispatchRecord, DispatchStatus, WorkflowConfig}, }; use anyhow::Result; use chrono::{DateTime, Utc}; use sea_orm::{ - ColumnTrait, Condition, DatabaseConnection, EntityTrait, Order, PaginatorTrait, QueryFilter, - QueryOrder, QuerySelect, Set, + ActiveModelTrait, ColumnTrait, Condition, DatabaseConnection, EntityTrait, Order, + PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Set, TransactionTrait, }; use uuid::Uuid; @@ -278,6 +280,199 @@ impl SchedulerStorage { Ok(result.rows_affected) } + + // -- Queue operations -- + + /// Inserts a task into the named queue and returns the task ID. + pub async fn enqueue( + &self, + queue_name: &str, + title: &str, + body: Option<&str>, + priority: i32, + ) -> Result { + let id = uuid::Uuid::new_v4().to_string(); + let now = chrono::Utc::now().to_rfc3339(); + + let model = queue_entity::ActiveModel { + id: Set(id.clone()), + queue_name: Set(queue_name.to_string()), + title: Set(title.to_string()), + body: Set(body.map(str::to_string)), + priority: Set(priority), + status: Set(queue_entity::STATUS_PENDING.to_string()), + visibility_timeout_at: Set(None), + retry_count: Set(0), + max_retries: Set(3), + created_at: Set(now.clone()), + updated_at: Set(now), + }; + + queue_entity::Entity::insert(model).exec(&self.db).await?; + Ok(id) + } + + /// Atomically claims the highest-priority oldest pending task from the queue. + /// + /// The task is marked as `processing` with a visibility timeout. Returns + /// `None` if the queue has no pending tasks. + pub async fn dequeue( + &self, + queue_name: &str, + visibility_timeout_secs: u64, + ) -> Result> { + let timeout_at = (chrono::Utc::now() + + chrono::Duration::seconds(visibility_timeout_secs as i64)) + .to_rfc3339(); + let now = chrono::Utc::now().to_rfc3339(); + let now_str = chrono::Utc::now().to_rfc3339(); + + // Use a transaction to atomically claim the next task. + let db = &self.db; + let result = db + .transaction::<_, Option, sea_orm::DbErr>(|txn| { + let queue_name = queue_name.to_string(); + let timeout_at = timeout_at.clone(); + let now = now.clone(); + + Box::pin(async move { + // Find the best candidate (highest priority, then oldest). + let candidate = queue_entity::Entity::find() + .filter(queue_entity::Column::QueueName.eq(&queue_name)) + .filter(queue_entity::Column::Status.eq(queue_entity::STATUS_PENDING)) + .order_by(queue_entity::Column::Priority, Order::Desc) + .order_by(queue_entity::Column::CreatedAt, Order::Asc) + .one(txn) + .await?; + + let Some(row) = candidate else { + return Ok(None); + }; + + // Claim it. + let mut active: queue_entity::ActiveModel = row.into(); + active.status = Set(queue_entity::STATUS_PROCESSING.to_string()); + active.visibility_timeout_at = Set(Some(timeout_at)); + active.updated_at = Set(now); + let updated = active.update(txn).await?; + Ok(Some(updated)) + }) + }) + .await + .map_err(|e| anyhow::anyhow!("Dequeue transaction failed: {}", e))?; + + let _ = now_str; // suppress unused warning + Ok(result) + } + + /// Marks a queue task as completed and removes it from the queue. + pub async fn complete_queue_task(&self, id: &str) -> Result<()> { + use sea_orm::sea_query::Expr; + + let now = chrono::Utc::now().to_rfc3339(); + let result = queue_entity::Entity::update_many() + .col_expr(queue_entity::Column::Status, Expr::value(queue_entity::STATUS_COMPLETED)) + .col_expr(queue_entity::Column::UpdatedAt, Expr::value(now)) + .filter(queue_entity::Column::Id.eq(id)) + .exec(&self.db) + .await?; + + if result.rows_affected == 0 { + anyhow::bail!("Queue task not found: {}", id); + } + Ok(()) + } + + /// Increments retry count; requeues the task or moves it to dead letter. + pub async fn fail_queue_task(&self, id: &str) -> Result<()> { + use sea_orm::sea_query::Expr; + + let now = chrono::Utc::now().to_rfc3339(); + + // Load the current task to check retry count. + let task = queue_entity::Entity::find_by_id(id) + .one(&self.db) + .await? + .ok_or_else(|| anyhow::anyhow!("Queue task not found: {}", id))?; + + let new_retry = task.retry_count + 1; + let new_status = if new_retry >= task.max_retries { + queue_entity::STATUS_DEAD + } else { + queue_entity::STATUS_PENDING + }; + + queue_entity::Entity::update_many() + .col_expr(queue_entity::Column::Status, Expr::value(new_status)) + .col_expr(queue_entity::Column::RetryCount, Expr::value(new_retry)) + .col_expr( + queue_entity::Column::VisibilityTimeoutAt, + Expr::value(Option::::None), + ) + .col_expr(queue_entity::Column::UpdatedAt, Expr::value(now)) + .filter(queue_entity::Column::Id.eq(id)) + .exec(&self.db) + .await?; + + Ok(()) + } + + /// Returns up to `limit` pending tasks without claiming them. + pub async fn peek_queue( + &self, + queue_name: &str, + limit: u64, + ) -> Result> { + let tasks = queue_entity::Entity::find() + .filter(queue_entity::Column::QueueName.eq(queue_name)) + .filter(queue_entity::Column::Status.eq(queue_entity::STATUS_PENDING)) + .order_by(queue_entity::Column::Priority, Order::Desc) + .order_by(queue_entity::Column::CreatedAt, Order::Asc) + .limit(limit) + .all(&self.db) + .await?; + Ok(tasks) + } + + /// Deletes all tasks in the named queue regardless of status. + pub async fn purge_queue(&self, queue_name: &str) -> Result { + let result = queue_entity::Entity::delete_many() + .filter(queue_entity::Column::QueueName.eq(queue_name)) + .exec(&self.db) + .await?; + Ok(result.rows_affected) + } + + /// Returns task counts by status for the named queue. + pub async fn queue_stats(&self, queue_name: &str) -> Result { + let all = queue_entity::Entity::find() + .filter(queue_entity::Column::QueueName.eq(queue_name)) + .all(&self.db) + .await?; + + let mut stats = QueueStats::default(); + for task in &all { + match task.status.as_str() { + queue_entity::STATUS_PENDING => stats.pending += 1, + queue_entity::STATUS_PROCESSING => stats.processing += 1, + queue_entity::STATUS_COMPLETED => stats.completed += 1, + queue_entity::STATUS_FAILED => stats.failed += 1, + queue_entity::STATUS_DEAD => stats.dead += 1, + _ => {} + } + } + Ok(stats) + } +} + +/// Counts of queue tasks by status. +#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)] +pub struct QueueStats { + pub pending: u64, + pub processing: u64, + pub completed: u64, + pub failed: u64, + pub dead: u64, } // --------------------------------------------------------------------------- From 3f85a64b87049a2e95301cf4089edea9c51dc3da Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 12:08:18 -0700 Subject: [PATCH 04/12] feat: implement QueueStrategy consuming tasks from internal queue From 7a46edbdbc2566fd9b6f0085a2e02d7e7e10304e Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 12:11:36 -0700 Subject: [PATCH 05/12] feat(scheduler): implement QueueStrategy consuming tasks from internal queue --- crates/orchestrator/src/scheduler/strategy.rs | 123 ++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/crates/orchestrator/src/scheduler/strategy.rs b/crates/orchestrator/src/scheduler/strategy.rs index 185faa78..4dc6310c 100644 --- a/crates/orchestrator/src/scheduler/strategy.rs +++ b/crates/orchestrator/src/scheduler/strategy.rs @@ -7,6 +7,7 @@ use crate::scheduler::events::{EventBus, SystemEvent}; use crate::scheduler::source::TaskSource; +use crate::scheduler::storage::SchedulerStorage; use crate::scheduler::types::{DispatchStatus, Task}; use async_trait::async_trait; use chrono::Utc; @@ -837,6 +838,128 @@ impl TriggerStrategy for IdleStrategy { } } +// --------------------------------------------------------------------------- +// QueueStrategy +// --------------------------------------------------------------------------- + +/// A [`TriggerStrategy`] that consumes tasks from a persistent internal queue. +/// +/// The strategy polls the named queue via [`SchedulerStorage::dequeue`] at a +/// configurable interval. Each dequeued item is converted to a [`Task`] and +/// returned for dispatch. While the task is being processed it is invisible to +/// other consumers (visibility timeout). On dispatch completion the runner +/// should call [`SchedulerStorage::complete_queue_task`] or +/// [`SchedulerStorage::fail_queue_task`] to finalize the record. +/// +/// # Ordering +/// +/// Tasks are dequeued in FIFO order within each priority level — higher +/// priority tasks are always preferred over lower ones, and equal-priority +/// tasks are delivered oldest-first. +/// +/// # Polling +/// +/// When the queue is empty the strategy sleeps for `poll_interval` before +/// retrying. The sleep is interruptible via the shutdown signal. +/// +/// # Example +/// +/// ```rust,ignore +/// use orchestrator::scheduler::strategy::QueueStrategy; +/// use orchestrator::scheduler::storage::SchedulerStorage; +/// use std::time::Duration; +/// +/// let strategy = QueueStrategy::new( +/// storage.clone(), +/// "my-queue".to_string(), +/// Duration::from_secs(5), +/// 300, // 5-minute visibility timeout +/// ); +/// ``` +pub struct QueueStrategy { + storage: SchedulerStorage, + queue_name: String, + poll_interval: Duration, + visibility_timeout_secs: u64, +} + +impl QueueStrategy { + /// Create a new queue strategy. + /// + /// * `storage` — shared scheduler storage. + /// * `queue_name` — the named queue to consume from. + /// * `poll_interval` — how long to wait when the queue is empty before retrying. + /// * `visibility_timeout_secs` — seconds a dequeued task remains invisible to + /// other consumers while being processed. + pub fn new( + storage: SchedulerStorage, + queue_name: String, + poll_interval: Duration, + visibility_timeout_secs: u64, + ) -> Self { + Self { storage, queue_name, poll_interval, visibility_timeout_secs } + } + + /// Convert a raw queue entity model to a workflow [`Task`]. + fn build_task(&self, row: crate::entity::task_queue::Model) -> Task { + let mut metadata = HashMap::new(); + metadata.insert("queue_name".to_string(), self.queue_name.clone()); + metadata.insert("queue_task_id".to_string(), row.id.clone()); + metadata.insert("queue_priority".to_string(), row.priority.to_string()); + + Task { + source_id: format!("queue:{}:{}", self.queue_name, row.id), + title: row.title, + body: row.body.unwrap_or_default(), + url: String::new(), + labels: vec![], + assignee: None, + metadata, + } + } +} + +#[async_trait] +impl TriggerStrategy for QueueStrategy { + async fn next_tasks(&mut self, shutdown: &watch::Receiver) -> anyhow::Result> { + let mut shutdown = shutdown.clone(); + + loop { + // Attempt to dequeue the next task. + match self.storage.dequeue(&self.queue_name, self.visibility_timeout_secs).await { + Ok(Some(row)) => { + let task = self.build_task(row); + info!( + queue = %self.queue_name, + source_id = %task.source_id, + "QueueStrategy: dequeued task" + ); + return Ok(vec![task]); + } + Ok(None) => { + // Queue is empty — sleep for poll_interval then retry. + } + Err(e) => { + warn!(%e, queue = %self.queue_name, "QueueStrategy: dequeue error, will retry"); + // Treat errors like empty queue — sleep and retry. + } + } + + // Sleep for poll_interval, respecting shutdown. + let sleep = tokio::time::sleep(self.poll_interval); + tokio::pin!(sleep); + tokio::select! { + _ = &mut sleep => {} + _ = shutdown.changed() => { + if *shutdown.borrow() { + return Ok(vec![]); + } + } + } + } + } +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- From 37c7bb14241db1a1baf7a260c770ff9008337082 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 12:12:05 -0700 Subject: [PATCH 06/12] feat(api): add queue API endpoints for push/stats/peek/purge From 589213abed61845c6b0dd2f10515719dc5d3a8ad Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 13:18:53 -0700 Subject: [PATCH 07/12] feat(api): register queue routes and add push/stats/peek/purge handlers Implements POST /queues/{name}/push, GET /queues/{name}/stats, GET /queues/{name}/peek, and DELETE /queues/{name} endpoints, including queue name validation (alphanumeric + hyphens, max 64 chars). Co-Authored-By: Claude Sonnet 4.6 --- crates/orchestrator/src/api.rs | 6 +- crates/orchestrator/src/scheduler/api.rs | 156 ++++++++++++++++++++++- 2 files changed, 158 insertions(+), 4 deletions(-) diff --git a/crates/orchestrator/src/api.rs b/crates/orchestrator/src/api.rs index fcee7c0d..9ea0dfe5 100644 --- a/crates/orchestrator/src/api.rs +++ b/crates/orchestrator/src/api.rs @@ -1,5 +1,5 @@ use crate::manager::AgentManager; -use crate::scheduler::api::{webhook_routes, workflow_routes, WorkflowState}; +use crate::scheduler::api::{queue_routes, webhook_routes, workflow_routes, WorkflowState}; use crate::scheduler::Scheduler; use crate::types::*; use crate::websocket::{ @@ -55,7 +55,8 @@ pub fn create_router(state: ApiState) -> Router { let wf_state = WorkflowState { scheduler: state.scheduler.clone(), manager: state.manager.clone() }; let wf_routes = workflow_routes(wf_state.clone()); - let wh_routes = webhook_routes(wf_state); + let wh_routes = webhook_routes(wf_state.clone()); + let q_routes = queue_routes(wf_state); let api_routes = Router::new() .route("/health", get(health_check)) @@ -88,6 +89,7 @@ pub fn create_router(state: ApiState) -> Router { .merge(ws_terminal_routes) .merge(wf_routes) .merge(wh_routes) + .merge(q_routes) } #[derive(Deserialize)] diff --git a/crates/orchestrator/src/scheduler/api.rs b/crates/orchestrator/src/scheduler/api.rs index 13dbe820..9fad7625 100644 --- a/crates/orchestrator/src/scheduler/api.rs +++ b/crates/orchestrator/src/scheduler/api.rs @@ -1,5 +1,6 @@ use crate::api::ApiError; use crate::manager::AgentManager; +use crate::scheduler::storage::QueueStats; use crate::scheduler::template::validate_template; use crate::scheduler::types::{WebhookSource, *}; use crate::scheduler::webhook; @@ -10,11 +11,11 @@ use axum::{ extract::{Path, Query, State}, http::{HeaderMap, StatusCode}, response::IntoResponse, - routing::{get, post}, + routing::{delete, get, post}, Json, Router, }; use chrono::Utc; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tracing::info; use uuid::Uuid; @@ -494,3 +495,154 @@ async fn handle_webhook( Ok(StatusCode::ACCEPTED) } + +// --------------------------------------------------------------------------- +// Queue endpoints +// --------------------------------------------------------------------------- + +/// Validates a queue name: alphanumeric + hyphens, 1–64 characters. +fn validate_queue_name(name: &str) -> Result<(), ApiError> { + if name.is_empty() || name.len() > 64 { + return Err(ApiError::InvalidInput("Queue name must be 1–64 characters long".to_string())); + } + if !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '-') { + return Err(ApiError::InvalidInput( + "Queue name may only contain alphanumeric characters and hyphens".to_string(), + )); + } + Ok(()) +} + +/// Request body for `POST /queues/{name}/push`. +#[derive(Deserialize)] +struct PushQueueRequest { + title: String, + body: Option, + #[serde(default)] + priority: i32, +} + +/// Response for a newly enqueued task. +#[derive(Serialize)] +struct QueueTaskResponse { + id: String, + queue_name: String, + status: String, + created_at: String, +} + +/// Query params for `GET /queues/{name}/peek`. +#[derive(Deserialize)] +struct PeekParams { + limit: Option, +} + +/// Response item for a peeked queue task. +#[derive(Serialize)] +struct QueueTaskItem { + id: String, + queue_name: String, + title: String, + body: Option, + priority: i32, + status: String, + retry_count: i32, + max_retries: i32, + created_at: String, +} + +/// Routes for queue management endpoints. +pub fn queue_routes(state: WorkflowState) -> Router { + Router::new() + .route("/queues/{name}/push", post(push_queue)) + .route("/queues/{name}/stats", get(queue_stats)) + .route("/queues/{name}/peek", get(peek_queue)) + .route("/queues/{name}", delete(purge_queue)) + .with_state(state) +} + +/// `POST /queues/{name}/push` — enqueue a task. +async fn push_queue( + State(state): State, + Path(name): Path, + Json(req): Json, +) -> Result { + validate_queue_name(&name)?; + + if req.title.trim().is_empty() { + return Err(ApiError::InvalidInput("Task title must not be empty".to_string())); + } + + let now = Utc::now().to_rfc3339(); + let id = state + .scheduler + .storage() + .enqueue(&name, &req.title, req.body.as_deref(), req.priority) + .await?; + + info!(queue_name = %name, task_id = %id, "Task enqueued"); + + Ok(( + StatusCode::CREATED, + Json(QueueTaskResponse { + id, + queue_name: name, + status: "pending".to_string(), + created_at: now, + }), + )) +} + +/// `GET /queues/{name}/stats` — return counts by status. +async fn queue_stats( + State(state): State, + Path(name): Path, +) -> Result { + validate_queue_name(&name)?; + + let stats: QueueStats = state.scheduler.storage().queue_stats(&name).await?; + Ok(Json(stats)) +} + +/// `GET /queues/{name}/peek?limit=N` — view pending tasks without claiming. +async fn peek_queue( + State(state): State, + Path(name): Path, + Query(params): Query, +) -> Result { + validate_queue_name(&name)?; + + let limit = params.limit.unwrap_or(10).min(100); + let tasks = state.scheduler.storage().peek_queue(&name, limit).await?; + + let items: Vec = tasks + .into_iter() + .map(|t| QueueTaskItem { + id: t.id, + queue_name: t.queue_name, + title: t.title, + body: t.body, + priority: t.priority, + status: t.status, + retry_count: t.retry_count, + max_retries: t.max_retries, + created_at: t.created_at, + }) + .collect(); + + Ok(Json(items)) +} + +/// `DELETE /queues/{name}` — purge all tasks from the queue. +async fn purge_queue( + State(state): State, + Path(name): Path, +) -> Result { + validate_queue_name(&name)?; + + let deleted = state.scheduler.storage().purge_queue(&name).await?; + + info!(queue_name = %name, deleted, "Queue purged"); + + Ok(Json(serde_json::json!({ "deleted": deleted }))) +} From 21f88712956d85238a2f3a2aaf2553eae8a561f4 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 13:19:19 -0700 Subject: [PATCH 08/12] feat: add Queue variant to TriggerConfig and wire into API/CLI From 82d38b351bb83d08f96eaa0b6ffb7d5255633677 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 13:32:39 -0700 Subject: [PATCH 09/12] feat(scheduler): add Queue trigger type wired into strategy, API, and CLI Adds TriggerConfig::Queue variant, QueueStrategy factory wiring in create_strategy(), API validation, template variables (queue_name, queue_task_id, queue_priority), client methods, and CLI subcommands (queue-stats, queue-peek, queue-purge) with --trigger-type queue support. Co-Authored-By: Claude Sonnet 4.6 --- crates/cli/src/commands/orchestrator.rs | 144 ++++++++++++++++++ crates/orchestrator/src/client.rs | 35 +++++ crates/orchestrator/src/scheduler/api.rs | 14 ++ crates/orchestrator/src/scheduler/mod.rs | 2 +- crates/orchestrator/src/scheduler/runner.rs | 14 +- crates/orchestrator/src/scheduler/template.rs | 4 + crates/orchestrator/src/scheduler/types.rs | 18 ++- 7 files changed, 228 insertions(+), 3 deletions(-) diff --git a/crates/cli/src/commands/orchestrator.rs b/crates/cli/src/commands/orchestrator.rs index ce9f6780..40e7f35b 100644 --- a/crates/cli/src/commands/orchestrator.rs +++ b/crates/cli/src/commands/orchestrator.rs @@ -77,6 +77,8 @@ pub enum TriggerType { AgentIdle, /// Linear issues trigger — polls Linear for matching issues. LinearIssues, + /// Queue-based trigger — consumes tasks from a named internal queue. + Queue, } /// Orchestrator service management subcommands. @@ -696,6 +698,18 @@ pub enum OrchestratorCommand { #[arg(long)] idle_seconds: Option, + /// Queue name to consume from (required for queue trigger type) + #[arg(long)] + queue_name: Option, + + /// Queue poll interval in seconds (optional, for queue trigger type, default 5) + #[arg(long)] + queue_poll_interval: Option, + + /// Queue visibility timeout in seconds (optional, for queue trigger type, default 300) + #[arg(long)] + queue_visibility_timeout: Option, + /// Prompt template with {{placeholders}} (e.g. "Fix: {{title}}\n{{body}}") #[arg(long, conflicts_with = "prompt_template_file")] prompt_template: Option, @@ -807,6 +821,42 @@ pub enum OrchestratorCommand { /// Directory path to remove path: String, }, + + /// Get statistics for a named queue. + /// + /// Returns counts of pending, processing, completed, failed, and dead tasks. + /// + /// # Examples + /// + /// agent orchestrator queue-stats my-queue + QueueStats { + /// Queue name + name: String, + }, + + /// Peek at pending tasks in a named queue without consuming them. + /// + /// # Examples + /// + /// agent orchestrator queue-peek my-queue + /// agent orchestrator queue-peek my-queue --limit 5 + QueuePeek { + /// Queue name + name: String, + /// Number of tasks to return (default 10, max 100) + #[arg(long, default_value = "10")] + limit: u64, + }, + + /// Purge all tasks from a named queue. + /// + /// # Examples + /// + /// agent orchestrator queue-purge my-queue + QueuePurge { + /// Queue name + name: String, + }, } impl OrchestratorCommand { @@ -935,6 +985,9 @@ impl OrchestratorCommand { linear_labels, linear_assignee, idle_seconds, + queue_name, + queue_poll_interval, + queue_visibility_timeout, prompt_template, prompt_template_file, poll_interval, @@ -959,6 +1012,9 @@ impl OrchestratorCommand { linear_labels, linear_assignee.as_deref(), *idle_seconds, + queue_name.as_deref(), + *queue_poll_interval, + *queue_visibility_timeout, prompt_template.as_deref(), prompt_template_file.as_deref(), *poll_interval, @@ -995,6 +1051,11 @@ impl OrchestratorCommand { OrchestratorCommand::RemoveDir { id, path } => { remove_dir_cmd(client, id, path, json).await } + OrchestratorCommand::QueueStats { name } => queue_stats_cmd(client, name, json).await, + OrchestratorCommand::QueuePeek { name, limit } => { + queue_peek_cmd(client, name, *limit, json).await + } + OrchestratorCommand::QueuePurge { name } => queue_purge_cmd(client, name, json).await, } } } @@ -1297,6 +1358,70 @@ fn display_add_dir_response(response: &AddDirResponse) { } } +// -- Queue operations -- + +async fn queue_stats_cmd(client: &OrchestratorClient, name: &str, json: bool) -> Result<()> { + let stats = client.queue_stats(name).await.context("Failed to get queue stats")?; + if json { + println!("{}", serde_json::to_string_pretty(&stats)?); + } else { + println!("{}: {}", "Queue".bold(), name); + if let Some(obj) = stats.as_object() { + for (k, v) in obj { + println!(" {}: {}", k.bold(), v); + } + } + } + Ok(()) +} + +async fn queue_peek_cmd( + client: &OrchestratorClient, + name: &str, + limit: u64, + json: bool, +) -> Result<()> { + let tasks = client.queue_peek(name, Some(limit)).await.context("Failed to peek queue")?; + if json { + println!("{}", serde_json::to_string_pretty(&tasks)?); + } else if tasks.is_empty() { + println!("{}", "Queue is empty.".yellow()); + } else { + println!("{}: {}", "Queue".bold(), name); + println!("{}", "=".repeat(80).cyan()); + for task in &tasks { + if let Some(obj) = task.as_object() { + if let Some(id) = obj.get("id").and_then(|v| v.as_str()) { + println!("{}: {}", "ID".bold(), id); + } + if let Some(title) = obj.get("title").and_then(|v| v.as_str()) { + println!("{}: {}", "Title".bold(), title); + } + if let Some(priority) = obj.get("priority") { + println!("{}: {}", "Priority".bold(), priority); + } + if let Some(created) = obj.get("created_at").and_then(|v| v.as_str()) { + println!("{}: {}", "Created".bold(), created); + } + } + println!("{}", "-".repeat(80).cyan()); + } + println!("Total: {} task(s)", tasks.len()); + } + Ok(()) +} + +async fn queue_purge_cmd(client: &OrchestratorClient, name: &str, json: bool) -> Result<()> { + let result = client.queue_purge(name).await.context("Failed to purge queue")?; + if json { + println!("{}", serde_json::to_string_pretty(&result)?); + } else { + let deleted = result.get("deleted").and_then(|v| v.as_u64()).unwrap_or(0); + println!("Purged {} task(s) from queue '{}'", deleted, name); + } + Ok(()) +} + // -- Attach -- async fn attach_agent( @@ -2279,6 +2404,9 @@ async fn create_workflow( linear_labels: &[String], linear_assignee: Option<&str>, idle_seconds: Option, + queue_name: Option<&str>, + queue_poll_interval: Option, + queue_visibility_timeout: Option, prompt_template: Option<&str>, prompt_template_file: Option<&std::path::Path>, poll_interval: u64, @@ -2372,6 +2500,15 @@ async fn create_workflow( assignee: linear_assignee.map(|s| s.to_string()), } } + TriggerType::Queue => { + let qname = queue_name + .ok_or_else(|| anyhow::anyhow!("--queue-name is required for queue trigger"))?; + TriggerConfig::Queue { + queue_name: qname.to_string(), + poll_interval_secs: queue_poll_interval, + visibility_timeout_secs: queue_visibility_timeout, + } + } }; let request = CreateWorkflowRequest { @@ -2742,6 +2879,13 @@ fn display_workflow(workflow: &WorkflowResponse) { println!("{}: {}", "Assignee".bold(), a); } } + TriggerConfig::Queue { queue_name, poll_interval_secs, visibility_timeout_secs } => { + println!("{}: {}", "Queue Name".bold(), queue_name); + let interval = poll_interval_secs.unwrap_or(5); + println!("{}: {}s", "Poll Interval".bold(), interval); + let timeout = visibility_timeout_secs.unwrap_or(300); + println!("{}: {}s", "Visibility Timeout".bold(), timeout); + } } let template = &workflow.prompt_template; let display = diff --git a/crates/orchestrator/src/client.rs b/crates/orchestrator/src/client.rs index 93bb28a5..17aa1011 100644 --- a/crates/orchestrator/src/client.rs +++ b/crates/orchestrator/src/client.rs @@ -26,6 +26,7 @@ use anyhow::{Context, Result}; use serde::de::DeserializeOwned; use serde::Serialize; +use serde_json; use uuid::Uuid; use crate::scheduler::types::{ @@ -303,6 +304,40 @@ impl OrchestratorClient { self.post(&format!("/workflows/{}/trigger", id), request).await } + // -- Queue operations -- + + /// Push a task onto a named queue. + pub async fn queue_push( + &self, + queue_name: &str, + request: &serde_json::Value, + ) -> Result { + self.post(&format!("/queues/{}/push", queue_name), request).await + } + + /// Get statistics for a named queue. + pub async fn queue_stats(&self, queue_name: &str) -> Result { + self.get(&format!("/queues/{}/stats", queue_name)).await + } + + /// Peek at pending tasks in a named queue. + pub async fn queue_peek( + &self, + queue_name: &str, + limit: Option, + ) -> Result> { + let path = match limit { + Some(n) => format!("/queues/{}/peek?limit={}", queue_name, n), + None => format!("/queues/{}/peek", queue_name), + }; + self.get(&path).await + } + + /// Purge all tasks from a named queue. + pub async fn queue_purge(&self, queue_name: &str) -> Result { + self.delete_with_response(&format!("/queues/{}", queue_name)).await + } + // -- Private HTTP helpers -- async fn get(&self, path: &str) -> Result { diff --git a/crates/orchestrator/src/scheduler/api.rs b/crates/orchestrator/src/scheduler/api.rs index 9fad7625..3e1b9fe9 100644 --- a/crates/orchestrator/src/scheduler/api.rs +++ b/crates/orchestrator/src/scheduler/api.rs @@ -139,6 +139,20 @@ async fn create_workflow( )); } } + TriggerConfig::Queue { queue_name, .. } => { + if queue_name.trim().is_empty() { + return Err(ApiError::InvalidInput( + "Queue trigger requires a non-empty 'queue_name'".to_string(), + )); + } + if !queue_name.chars().all(|c| c.is_ascii_alphanumeric() || c == '-') + || queue_name.len() > 64 + { + return Err(ApiError::InvalidInput( + "Queue name may only contain alphanumeric characters and hyphens (max 64 chars)".to_string(), + )); + } + } } // Reject trigger types that are not yet implemented. diff --git a/crates/orchestrator/src/scheduler/mod.rs b/crates/orchestrator/src/scheduler/mod.rs index f796ca9e..31b5a137 100644 --- a/crates/orchestrator/src/scheduler/mod.rs +++ b/crates/orchestrator/src/scheduler/mod.rs @@ -101,7 +101,7 @@ impl Scheduler { manual_tx = Some(tx); Box::new(ManualStrategy::new(rx)) } else { - create_strategy(&config, self.event_bus.as_ref())? + create_strategy(&config, self.event_bus.as_ref(), Some(&self.storage))? }; let runner = WorkflowRunner::new(config, self.storage.clone(), self.registry.clone(), strategy); diff --git a/crates/orchestrator/src/scheduler/runner.rs b/crates/orchestrator/src/scheduler/runner.rs index 3113fa59..ff612b2f 100644 --- a/crates/orchestrator/src/scheduler/runner.rs +++ b/crates/orchestrator/src/scheduler/runner.rs @@ -5,7 +5,7 @@ use crate::scheduler::source::TaskSource; use crate::scheduler::storage::SchedulerStorage; use crate::scheduler::strategy::{ CronStrategy, DelayStrategy, EventFilter, EventStrategy, IdleStrategy, PollingStrategy, - TriggerStrategy, + QueueStrategy, TriggerStrategy, }; use crate::scheduler::template::render_template; use crate::scheduler::types::{ @@ -304,6 +304,7 @@ fn create_source(config: &TriggerConfig) -> anyhow::Result> pub fn create_strategy( config: &WorkflowConfig, event_bus: Option<&Arc>, + storage: Option<&SchedulerStorage>, ) -> anyhow::Result> { match &config.trigger_config { TriggerConfig::Cron { expression } => { @@ -342,6 +343,17 @@ pub fn create_strategy( let duration = std::time::Duration::from_secs(*idle_seconds); Ok(Box::new(IdleStrategy::new(bus.clone(), config.agent_id, duration))) } + TriggerConfig::Queue { queue_name, poll_interval_secs, visibility_timeout_secs } => { + let st = storage.ok_or_else(|| { + anyhow::anyhow!("SchedulerStorage is required for queue triggers") + })?; + Ok(Box::new(QueueStrategy::new( + st.clone(), + queue_name.clone(), + std::time::Duration::from_secs(poll_interval_secs.unwrap_or(5)), + visibility_timeout_secs.unwrap_or(300), + ))) + } _ => { let source = create_source(&config.trigger_config)?; Ok(Box::new(PollingStrategy::new(source, config.poll_interval_secs))) diff --git a/crates/orchestrator/src/scheduler/template.rs b/crates/orchestrator/src/scheduler/template.rs index e37c919f..74130292 100644 --- a/crates/orchestrator/src/scheduler/template.rs +++ b/crates/orchestrator/src/scheduler/template.rs @@ -76,6 +76,10 @@ pub const KNOWN_VARIABLES: &[&str] = &[ "team_name", "project", "linear_id", + // Metadata-backed (queue trigger) + "queue_name", + "queue_task_id", + "queue_priority", ]; /// Validate a prompt template, returning any warnings or errors. diff --git a/crates/orchestrator/src/scheduler/types.rs b/crates/orchestrator/src/scheduler/types.rs index 0c6f4501..3f3372e5 100644 --- a/crates/orchestrator/src/scheduler/types.rs +++ b/crates/orchestrator/src/scheduler/types.rs @@ -174,6 +174,20 @@ pub enum TriggerConfig { #[serde(default)] assignee: Option, }, + /// Queue-based trigger — consumes tasks from a named internal queue. + /// + /// Workers push tasks via `POST /queues/{name}/push`; this trigger + /// dequeues them one at a time with an atomic visibility timeout. + Queue { + /// The named queue to consume from. + queue_name: String, + /// How often to poll when the queue is empty (seconds, default 5). + #[serde(default)] + poll_interval_secs: Option, + /// Visibility timeout for dequeued tasks (seconds, default 300). + #[serde(default)] + visibility_timeout_secs: Option, + }, } fn default_issue_state() -> String { @@ -197,6 +211,7 @@ impl TriggerConfig { TriggerConfig::Manual { .. } => "manual", TriggerConfig::AgentIdle { .. } => "agent_idle", TriggerConfig::LinearIssues { .. } => "linear_issues", + TriggerConfig::Queue { .. } => "queue", } } @@ -212,7 +227,8 @@ impl TriggerConfig { | TriggerConfig::Webhook { .. } | TriggerConfig::Manual { .. } | TriggerConfig::AgentIdle { .. } - | TriggerConfig::LinearIssues { .. } => true, + | TriggerConfig::LinearIssues { .. } + | TriggerConfig::Queue { .. } => true, } } From 52373de7b1c8bc39cadf24dd4ec8f62284cb1b7d Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 13:32:56 -0700 Subject: [PATCH 10/12] test+docs: add tests and documentation for queue-based trigger From a965cff0ed54bb756160e0acb4be411b76d0c744 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 13:39:04 -0700 Subject: [PATCH 11/12] test+docs: add comprehensive tests and documentation for queue trigger Adds unit tests for queue storage (FIFO, priority ordering, visibility timeout, retry/dead-letter, stats, peek, purge), QueueStrategy unit tests, 8 integration tests in queue_trigger.rs, CLI argument parsing tests for queue subcommands, and docs/public/queue-trigger.md covering API, CLI, template variables, retry semantics, and architecture patterns. Co-Authored-By: Claude Sonnet 4.6 --- crates/cli/src/commands/orchestrator.rs | 132 +++++++++ crates/orchestrator/src/scheduler/storage.rs | 185 +++++++++++++ crates/orchestrator/src/scheduler/strategy.rs | 94 +++++++ crates/orchestrator/tests/queue_trigger.rs | 238 ++++++++++++++++ docs/public/queue-trigger.md | 256 ++++++++++++++++++ 5 files changed, 905 insertions(+) create mode 100644 crates/orchestrator/tests/queue_trigger.rs create mode 100644 docs/public/queue-trigger.md diff --git a/crates/cli/src/commands/orchestrator.rs b/crates/cli/src/commands/orchestrator.rs index 40e7f35b..9a2cfa03 100644 --- a/crates/cli/src/commands/orchestrator.rs +++ b/crates/cli/src/commands/orchestrator.rs @@ -4206,6 +4206,9 @@ mod tests { &[], // linear_labels None, // linear_assignee None, // idle_seconds + None, // queue_name + None, // queue_poll_interval + None, // queue_visibility_timeout Some("Fix: {{title}}"), None, // prompt_template_file 60, @@ -4277,6 +4280,9 @@ mod tests { &[], // linear_labels None, // linear_assignee None, // idle_seconds — missing! + None, // queue_name + None, // queue_poll_interval + None, // queue_visibility_timeout Some("Do background work"), None, // prompt_template_file 60, @@ -4313,6 +4319,9 @@ mod tests { &[], // linear_labels None, // linear_assignee Some(0), // idle_seconds = 0 (invalid) + None, // queue_name + None, // queue_poll_interval + None, // queue_visibility_timeout Some("Do background work"), None, // prompt_template_file 60, @@ -4328,4 +4337,127 @@ mod tests { "expected validation error for zero idle_seconds, got: {msg}" ); } + + // ----------------------------------------------------------------------- + // Queue subcommand argument parsing + // ----------------------------------------------------------------------- + + #[test] + fn parse_queue_stats_subcommand() { + use clap::Parser; + + #[derive(Parser)] + struct Cli { + #[command(subcommand)] + command: OrchestratorCommand, + } + + let cli = Cli::try_parse_from(["test", "queue-stats", "my-queue"]) + .expect("should parse queue-stats"); + + assert!(matches!( + cli.command, + OrchestratorCommand::QueueStats { name } if name == "my-queue" + )); + } + + #[test] + fn parse_queue_peek_with_limit() { + use clap::Parser; + + #[derive(Parser)] + struct Cli { + #[command(subcommand)] + command: OrchestratorCommand, + } + + let cli = Cli::try_parse_from(["test", "queue-peek", "my-queue", "--limit", "25"]) + .expect("should parse queue-peek"); + + assert!(matches!( + cli.command, + OrchestratorCommand::QueuePeek { name, limit } if name == "my-queue" && limit == 25 + )); + } + + #[test] + fn parse_queue_peek_default_limit() { + use clap::Parser; + + #[derive(Parser)] + struct Cli { + #[command(subcommand)] + command: OrchestratorCommand, + } + + let cli = Cli::try_parse_from(["test", "queue-peek", "reports"]) + .expect("should parse queue-peek without limit"); + + assert!(matches!( + cli.command, + OrchestratorCommand::QueuePeek { name, limit } if name == "reports" && limit == 10 + )); + } + + #[test] + fn parse_queue_purge_subcommand() { + use clap::Parser; + + #[derive(Parser)] + struct Cli { + #[command(subcommand)] + command: OrchestratorCommand, + } + + let cli = Cli::try_parse_from(["test", "queue-purge", "my-queue"]) + .expect("should parse queue-purge"); + + assert!(matches!( + cli.command, + OrchestratorCommand::QueuePurge { name } if name == "my-queue" + )); + } + + #[test] + fn parse_create_workflow_with_queue_trigger() { + use clap::Parser; + + #[derive(Parser)] + struct Cli { + #[command(subcommand)] + command: OrchestratorCommand, + } + + let cli = Cli::try_parse_from([ + "test", + "create-workflow", + "--name", + "bg-worker", + "--agent-id", + "550e8400-e29b-41d4-a716-446655440000", + "--trigger-type", + "queue", + "--queue-name", + "work-items", + "--prompt-template", + "Process: {{title}}", + ]) + .expect("should parse queue workflow creation"); + + if let OrchestratorCommand::CreateWorkflow { + trigger_type, + queue_name, + queue_poll_interval, + queue_visibility_timeout, + .. + } = cli.command + { + assert!(matches!(trigger_type, TriggerType::Queue)); + assert_eq!(queue_name, Some("work-items".to_string())); + assert_eq!(queue_poll_interval, None); + assert_eq!(queue_visibility_timeout, None); + } else { + panic!("Expected CreateWorkflow variant"); + } + } } diff --git a/crates/orchestrator/src/scheduler/storage.rs b/crates/orchestrator/src/scheduler/storage.rs index 657ff630..fa265595 100644 --- a/crates/orchestrator/src/scheduler/storage.rs +++ b/crates/orchestrator/src/scheduler/storage.rs @@ -651,4 +651,189 @@ mod tests { let updated = storage.list_dispatches(&workflow.id).await.unwrap(); assert_eq!(updated[0].status, DispatchStatus::Failed); } + + // ----------------------------------------------------------------------- + // Queue storage tests + // ----------------------------------------------------------------------- + + #[tokio::test] + async fn test_enqueue_dequeue_fifo() { + let (storage, _tmp) = create_test_storage().await; + + // Enqueue three tasks at the same priority (FIFO order expected). + storage.enqueue("q1", "Task A", None, 0).await.unwrap(); + storage.enqueue("q1", "Task B", None, 0).await.unwrap(); + storage.enqueue("q1", "Task C", None, 0).await.unwrap(); + + let first = storage.dequeue("q1", 60).await.unwrap().unwrap(); + assert_eq!(first.title, "Task A"); + + let second = storage.dequeue("q1", 60).await.unwrap().unwrap(); + assert_eq!(second.title, "Task B"); + + let third = storage.dequeue("q1", 60).await.unwrap().unwrap(); + assert_eq!(third.title, "Task C"); + + // Queue is now empty. + assert!(storage.dequeue("q1", 60).await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_priority_ordering() { + let (storage, _tmp) = create_test_storage().await; + + // Enqueue tasks with different priorities. + storage.enqueue("prio", "Low", None, 1).await.unwrap(); + storage.enqueue("prio", "High", None, 10).await.unwrap(); + storage.enqueue("prio", "Medium", None, 5).await.unwrap(); + + // Higher priority should be dequeued first. + let first = storage.dequeue("prio", 60).await.unwrap().unwrap(); + assert_eq!(first.title, "High"); + assert_eq!(first.priority, 10); + + let second = storage.dequeue("prio", 60).await.unwrap().unwrap(); + assert_eq!(second.title, "Medium"); + + let third = storage.dequeue("prio", 60).await.unwrap().unwrap(); + assert_eq!(third.title, "Low"); + } + + #[tokio::test] + async fn test_visibility_timeout_prevents_double_processing() { + let (storage, _tmp) = create_test_storage().await; + + storage.enqueue("vis", "Task X", Some("body"), 0).await.unwrap(); + + // Dequeue with a 60-second visibility timeout. + let task = storage.dequeue("vis", 60).await.unwrap().unwrap(); + assert_eq!(task.title, "Task X"); + assert_eq!(task.status, "processing"); + + // A second dequeue should return None (task is still in the processing window). + let again = storage.dequeue("vis", 60).await.unwrap(); + assert!(again.is_none(), "Expected None — task should be invisible while processing"); + } + + #[tokio::test] + async fn test_complete_queue_task() { + let (storage, _tmp) = create_test_storage().await; + + let id = storage.enqueue("done", "Finish me", None, 0).await.unwrap(); + storage.dequeue("done", 60).await.unwrap().unwrap(); + + storage.complete_queue_task(&id).await.unwrap(); + + let stats = storage.queue_stats("done").await.unwrap(); + assert_eq!(stats.completed, 1); + assert_eq!(stats.processing, 0); + assert_eq!(stats.pending, 0); + } + + #[tokio::test] + async fn test_fail_task_retry_and_dead_letter() { + let (storage, _tmp) = create_test_storage().await; + + let id = storage.enqueue("retry", "Retry me", None, 0).await.unwrap(); + + // Fail 3 times — default max_retries is 3, so after 3 failures it's dead. + for _ in 0..3 { + storage.dequeue("retry", 60).await.unwrap().unwrap(); + storage.fail_queue_task(&id).await.unwrap(); + } + + let stats = storage.queue_stats("retry").await.unwrap(); + assert_eq!(stats.dead, 1, "Task should be dead after exceeding max_retries"); + assert_eq!(stats.pending, 0); + } + + #[tokio::test] + async fn test_peek_queue() { + let (storage, _tmp) = create_test_storage().await; + + storage.enqueue("peek", "First", None, 0).await.unwrap(); + storage.enqueue("peek", "Second", None, 0).await.unwrap(); + + let items = storage.peek_queue("peek", 10).await.unwrap(); + assert_eq!(items.len(), 2); + assert_eq!(items[0].title, "First"); + assert_eq!(items[1].title, "Second"); + + // Peeking should not consume the tasks. + let items_again = storage.peek_queue("peek", 10).await.unwrap(); + assert_eq!(items_again.len(), 2); + } + + #[tokio::test] + async fn test_peek_limit() { + let (storage, _tmp) = create_test_storage().await; + + for i in 0..5 { + storage.enqueue("lim", &format!("Task {i}"), None, 0).await.unwrap(); + } + + let items = storage.peek_queue("lim", 3).await.unwrap(); + assert_eq!(items.len(), 3); + } + + #[tokio::test] + async fn test_purge_queue() { + let (storage, _tmp) = create_test_storage().await; + + storage.enqueue("purge", "A", None, 0).await.unwrap(); + storage.enqueue("purge", "B", None, 0).await.unwrap(); + storage.enqueue("purge", "C", None, 0).await.unwrap(); + + let deleted = storage.purge_queue("purge").await.unwrap(); + assert_eq!(deleted, 3); + + let stats = storage.queue_stats("purge").await.unwrap(); + assert_eq!(stats.pending, 0); + } + + #[tokio::test] + async fn test_queue_stats_all_statuses() { + let (storage, _tmp) = create_test_storage().await; + + // Enqueue 2 tasks. + let id1 = storage.enqueue("stats", "T1", None, 0).await.unwrap(); + let id2 = storage.enqueue("stats", "T2", None, 0).await.unwrap(); + + // Dequeue both — now processing. + storage.dequeue("stats", 300).await.unwrap(); + storage.dequeue("stats", 300).await.unwrap(); + + // Complete one. + storage.complete_queue_task(&id1).await.unwrap(); + + // Fail the other until it's dead. + for _ in 0..3 { + // Re-dequeue (after each fail, it goes back to pending except on final fail) + if let Some(row) = storage.dequeue("stats", 300).await.unwrap() { + let _ = row; // claim it + } + storage.fail_queue_task(&id2).await.unwrap(); + } + + let stats = storage.queue_stats("stats").await.unwrap(); + assert_eq!(stats.completed, 1); + assert_eq!(stats.dead, 1); + } + + #[tokio::test] + async fn test_queue_isolation() { + let (storage, _tmp) = create_test_storage().await; + + storage.enqueue("q-a", "From A", None, 0).await.unwrap(); + storage.enqueue("q-b", "From B", None, 0).await.unwrap(); + + // Dequeue from q-a should only return q-a tasks. + let task = storage.dequeue("q-a", 60).await.unwrap().unwrap(); + assert_eq!(task.queue_name, "q-a"); + assert_eq!(task.title, "From A"); + + // q-b is still untouched. + let stats_b = storage.queue_stats("q-b").await.unwrap(); + assert_eq!(stats_b.pending, 1); + } } diff --git a/crates/orchestrator/src/scheduler/strategy.rs b/crates/orchestrator/src/scheduler/strategy.rs index 4dc6310c..39c49708 100644 --- a/crates/orchestrator/src/scheduler/strategy.rs +++ b/crates/orchestrator/src/scheduler/strategy.rs @@ -2131,4 +2131,98 @@ mod tests { assert!(r1[0].source_id.starts_with("idle:")); assert!(r2[0].source_id.starts_with("idle:")); } + + // ----------------------------------------------------------------------- + // QueueStrategy tests + // ----------------------------------------------------------------------- + + use crate::scheduler::storage::SchedulerStorage; + use crate::storage::AgentStorage; + use tempfile::TempDir; + + async fn create_queue_storage() -> (SchedulerStorage, TempDir) { + let temp_dir = TempDir::new().unwrap(); + let db_path = temp_dir.path().join("test.db"); + let agent_storage = AgentStorage::with_path(&db_path).await.unwrap(); + let storage = SchedulerStorage::new(agent_storage.db().clone()); + (storage, temp_dir) + } + + #[tokio::test] + async fn queue_strategy_produces_task_from_queue() { + let (storage, _tmp) = create_queue_storage().await; + + // Pre-populate the queue. + storage.enqueue("test-q", "My Task", Some("some body"), 0).await.unwrap(); + + let mut strategy = + QueueStrategy::new(storage, "test-q".to_string(), Duration::from_millis(10), 60); + let (_tx, rx) = watch::channel(false); + + let tasks = strategy.next_tasks(&rx).await.unwrap(); + + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].title, "My Task"); + assert_eq!(tasks[0].body, "some body"); + assert!(tasks[0].source_id.starts_with("queue:test-q:")); + assert_eq!(tasks[0].metadata.get("queue_name").map(|s| s.as_str()), Some("test-q")); + assert!(tasks[0].metadata.contains_key("queue_task_id")); + assert!(tasks[0].metadata.contains_key("queue_priority")); + } + + #[tokio::test] + async fn queue_strategy_polls_when_empty_then_picks_up_task() { + let (storage, _tmp) = create_queue_storage().await; + let storage_clone = storage.clone(); + + let mut strategy = + QueueStrategy::new(storage, "delayed-q".to_string(), Duration::from_millis(20), 60); + let (_tx, rx) = watch::channel(false); + + // Enqueue a task in a background task after a short delay. + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(50)).await; + storage_clone.enqueue("delayed-q", "Delayed Task", None, 0).await.unwrap(); + }); + + let tasks = strategy.next_tasks(&rx).await.unwrap(); + + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].title, "Delayed Task"); + } + + #[tokio::test] + async fn queue_strategy_shuts_down_cleanly_when_queue_empty() { + let (storage, _tmp) = create_queue_storage().await; + + let mut strategy = + QueueStrategy::new(storage, "empty-q".to_string(), Duration::from_millis(10), 60); + + // Signal shutdown immediately. + let (tx, rx) = watch::channel(false); + tx.send(true).unwrap(); + + let result = strategy.next_tasks(&rx).await; + // With shutdown signalled, next_tasks may return an error or empty vec — + // it should not hang indefinitely. + let _ = result; // Just verify it returns at all. + } + + #[tokio::test] + async fn queue_strategy_task_metadata_includes_priority() { + let (storage, _tmp) = create_queue_storage().await; + + storage.enqueue("meta-q", "High Priority Task", None, 99).await.unwrap(); + + let mut strategy = + QueueStrategy::new(storage, "meta-q".to_string(), Duration::from_millis(10), 60); + let (_tx, rx) = watch::channel(false); + + let tasks = strategy.next_tasks(&rx).await.unwrap(); + assert_eq!(tasks.len(), 1); + + let priority = + tasks[0].metadata.get("queue_priority").expect("queue_priority should be in metadata"); + assert_eq!(priority, "99"); + } } diff --git a/crates/orchestrator/tests/queue_trigger.rs b/crates/orchestrator/tests/queue_trigger.rs new file mode 100644 index 00000000..93e764d4 --- /dev/null +++ b/crates/orchestrator/tests/queue_trigger.rs @@ -0,0 +1,238 @@ +//! Integration tests for the queue-based trigger system. +//! +//! Tests cover: +//! - Queue storage operations (enqueue, dequeue, stats, peek, purge) +//! - TriggerConfig::Queue serialization / deserialization +//! - `create_strategy()` factory creates a `QueueStrategy` +//! - QueueStrategy picks up a task that is pushed into the queue +//! - Queue name validation in the API layer +//! - CLI queue management subcommand argument parsing + +use chrono::Utc; +use orchestrator::{ + scheduler::{ + runner::create_strategy, + storage::SchedulerStorage, + types::{TriggerConfig, WorkflowConfig}, + Scheduler, + }, + storage::AgentStorage, + websocket::ConnectionRegistry, +}; +use std::sync::Arc; +use tempfile::TempDir; +use uuid::Uuid; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +async fn create_test_storage() -> (SchedulerStorage, TempDir) { + let temp_dir = TempDir::new().unwrap(); + let db_path = temp_dir.path().join("test.db"); + let agent_storage = AgentStorage::with_path(&db_path).await.unwrap(); + let storage = SchedulerStorage::new(agent_storage.db().clone()); + (storage, temp_dir) +} + +fn queue_workflow(agent_id: Uuid, queue_name: &str) -> WorkflowConfig { + let now = Utc::now(); + WorkflowConfig { + id: Uuid::new_v4(), + name: format!("queue-workflow-{queue_name}"), + agent_id, + trigger_config: TriggerConfig::Queue { + queue_name: queue_name.to_string(), + poll_interval_secs: Some(1), + visibility_timeout_secs: Some(300), + }, + prompt_template: "Process: {{title}} (queue={{queue_name}}, task={{queue_task_id}})" + .to_string(), + poll_interval_secs: 60, + enabled: true, + tool_policy: Default::default(), + created_at: now, + updated_at: now, + } +} + +// --------------------------------------------------------------------------- +// Serde round-trip +// --------------------------------------------------------------------------- + +#[test] +fn queue_trigger_config_serde_roundtrip() { + let config = TriggerConfig::Queue { + queue_name: "my-queue".to_string(), + poll_interval_secs: Some(5), + visibility_timeout_secs: Some(300), + }; + + let json = serde_json::to_string(&config).expect("serialization failed"); + assert!(json.contains("\"type\":\"queue\""), "missing type tag: {json}"); + assert!(json.contains("my-queue"), "missing queue_name: {json}"); + + let decoded: TriggerConfig = serde_json::from_str(&json).expect("deserialization failed"); + if let TriggerConfig::Queue { queue_name, poll_interval_secs, visibility_timeout_secs } = + decoded + { + assert_eq!(queue_name, "my-queue"); + assert_eq!(poll_interval_secs, Some(5)); + assert_eq!(visibility_timeout_secs, Some(300)); + } else { + panic!("Expected Queue variant after round-trip, got something else"); + } +} + +#[test] +fn queue_trigger_config_defaults_serde() { + // Queue with optional fields omitted. + let json = r#"{"type":"queue","queue_name":"bg-tasks"}"#; + let config: TriggerConfig = serde_json::from_str(json).expect("deserialization failed"); + + if let TriggerConfig::Queue { queue_name, poll_interval_secs, visibility_timeout_secs } = config + { + assert_eq!(queue_name, "bg-tasks"); + assert_eq!(poll_interval_secs, None); + assert_eq!(visibility_timeout_secs, None); + } else { + panic!("Expected Queue variant"); + } +} + +#[test] +fn queue_trigger_type_string() { + let config = TriggerConfig::Queue { + queue_name: "x".to_string(), + poll_interval_secs: None, + visibility_timeout_secs: None, + }; + assert_eq!(config.trigger_type(), "queue"); + assert!(config.is_implemented()); + assert!(!config.is_one_shot()); +} + +// --------------------------------------------------------------------------- +// create_strategy factory +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn create_strategy_queue_creates_queue_strategy() { + let (storage, _tmp) = create_test_storage().await; + let agent_id = Uuid::new_v4(); + let config = queue_workflow(agent_id, "factory-q"); + + // Should succeed with storage provided. + let result = create_strategy(&config, None, Some(&storage)); + assert!(result.is_ok(), "create_strategy failed: {:?}", result.err()); +} + +#[tokio::test] +async fn create_strategy_queue_requires_storage() { + let agent_id = Uuid::new_v4(); + let config = queue_workflow(agent_id, "no-storage-q"); + + // Should fail when storage is None. + let result = create_strategy(&config, None, None); + let err = result.err().expect("expected an error without storage").to_string(); + assert!(err.contains("SchedulerStorage is required"), "unexpected error: {err}"); +} + +// --------------------------------------------------------------------------- +// Queue storage: push + QueueStrategy consumes +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn queue_strategy_picks_up_pushed_task() { + use orchestrator::scheduler::strategy::{QueueStrategy, TriggerStrategy}; + use std::time::Duration; + use tokio::sync::watch; + + let (storage, _tmp) = create_test_storage().await; + + // Push a task into the queue. + storage.enqueue("pickup-q", "Important Work", Some("details"), 5).await.unwrap(); + + let mut strategy = + QueueStrategy::new(storage.clone(), "pickup-q".to_string(), Duration::from_millis(10), 60); + let (_tx, rx) = watch::channel(false); + + let tasks = strategy.next_tasks(&rx).await.unwrap(); + + assert_eq!(tasks.len(), 1); + let task = &tasks[0]; + assert_eq!(task.title, "Important Work"); + assert_eq!(task.body, "details"); + assert!(task.source_id.starts_with("queue:pickup-q:")); + assert_eq!(task.metadata.get("queue_name").map(String::as_str), Some("pickup-q")); + assert_eq!(task.metadata.get("queue_priority").map(String::as_str), Some("5")); + + // The task should now be in processing state — queue appears empty. + let stats = storage.queue_stats("pickup-q").await.unwrap(); + assert_eq!(stats.pending, 0); + assert_eq!(stats.processing, 1); +} + +// --------------------------------------------------------------------------- +// Queue storage: concurrent producers, single consumer ordering +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn concurrent_producers_consumed_in_priority_then_fifo_order() { + let (storage, _tmp) = create_test_storage().await; + + // Simulate concurrent producers by enqueuing tasks with various priorities. + let mut handles = vec![]; + for i in 0..5 { + let s = storage.clone(); + handles.push(tokio::spawn(async move { + s.enqueue("concurrent-q", &format!("Low {i}"), None, 1).await.unwrap(); + })); + } + // One high-priority task. + let s2 = storage.clone(); + handles.push(tokio::spawn(async move { + s2.enqueue("concurrent-q", "High-Pri", None, 100).await.unwrap(); + })); + + for h in handles { + h.await.unwrap(); + } + + // First dequeued should be the high-priority task. + let first = storage.dequeue("concurrent-q", 60).await.unwrap().unwrap(); + assert_eq!(first.title, "High-Pri"); + assert_eq!(first.priority, 100); + + // Remaining tasks should be low priority (FIFO among equals). + for _ in 0..5 { + let t = storage.dequeue("concurrent-q", 60).await.unwrap().unwrap(); + assert_eq!(t.priority, 1); + } + + // Queue should now be empty. + assert!(storage.dequeue("concurrent-q", 60).await.unwrap().is_none()); +} + +// --------------------------------------------------------------------------- +// Scheduler integration: start_workflow with Queue trigger +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn scheduler_accepts_queue_workflow() { + let (storage, _tmp) = create_test_storage().await; + let registry = ConnectionRegistry::new(); + + let scheduler = Arc::new(Scheduler::new(storage.clone(), registry)); + let agent_id = Uuid::new_v4(); + let workflow = queue_workflow(agent_id, "sched-q"); + + storage.add_workflow(&workflow).await.unwrap(); + + // Scheduler should be able to start the workflow without error. + let result = scheduler.start_workflow(workflow.clone()).await; + assert!(result.is_ok(), "start_workflow failed: {:?}", result.err()); + + // Clean up. + let _ = scheduler.stop_workflow(&workflow.id).await; +} diff --git a/docs/public/queue-trigger.md b/docs/public/queue-trigger.md new file mode 100644 index 00000000..f7ec50d9 --- /dev/null +++ b/docs/public/queue-trigger.md @@ -0,0 +1,256 @@ +# Queue-Based Trigger + +The `queue` trigger type allows workflows to consume tasks from a named internal +queue. Producers push work via the REST API; the workflow's agent processes items +one at a time with visibility timeout and automatic retry semantics. + +## Overview + +``` +Producer Orchestrator Agent + │ │ │ + │ POST /queues/q/push │ │ + │─────────────────────────>│ │ + │ { "title": "...", ... } │ │ + │ │ dequeue (atomic) │ + │ │ visibility timeout starts │ + │ │─────────────────────────>│ + │ │ dispatch prompt │ + │ │<─────────────────────────│ + │ │ task complete │ +``` + +Tasks are dequeued in **priority-descending, FIFO** order: + +- Tasks with higher `priority` values are processed first. +- Among tasks with equal priority, the oldest is taken first. + +While a task is processing, it is invisible to other consumers for +`visibility_timeout_secs` seconds (default 300 / 5 minutes). + +--- + +## Configuration + +### `TriggerConfig` variant + +```json +{ + "type": "queue", + "queue_name": "work-items", + "poll_interval_secs": 5, + "visibility_timeout_secs": 300 +} +``` + +| Field | Type | Required | Default | Description | +|--------------------------|----------|----------|---------|-----------------------------------------------------| +| `queue_name` | `string` | yes | — | Name of the queue to consume from | +| `poll_interval_secs` | `u64` | no | `5` | How often to poll when the queue is empty (seconds) | +| `visibility_timeout_secs`| `u64` | no | `300` | Visibility timeout for dequeued tasks (seconds) | + +**Queue name rules:** alphanumeric characters and hyphens only, 1–64 characters. + +--- + +## Creating a Queue-Triggered Workflow + +### Via REST API + +```bash +curl -X POST http://localhost:7006/workflows \ + -H 'Content-Type: application/json' \ + -d '{ + "name": "report-processor", + "agent_id": "", + "trigger_config": { + "type": "queue", + "queue_name": "reports", + "poll_interval_secs": 5, + "visibility_timeout_secs": 300 + }, + "prompt_template": "Process report: {{title}}\n\nDetails: {{body}}\n\nQueue: {{queue_name}}, Task ID: {{queue_task_id}}" + }' +``` + +### Via CLI + +```bash +agent orchestrator create-workflow \ + --name report-processor \ + --agent-name reporter \ + --trigger-type queue \ + --queue-name reports \ + --queue-poll-interval 5 \ + --queue-visibility-timeout 300 \ + --prompt-template "Process report: {{title}}\n\nDetails: {{body}}" +``` + +--- + +## Pushing Tasks + +### REST API + +```bash +curl -X POST http://localhost:7006/queues/reports/push \ + -H 'Content-Type: application/json' \ + -d '{ + "title": "Process Q4 report", + "body": "{\"report_id\": 42, \"format\": \"pdf\"}", + "priority": 5 + }' +``` + +**Response (201 Created):** + +```json +{ + "id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", + "queue_name": "reports", + "status": "pending", + "created_at": "2026-03-28T12:00:00Z" +} +``` + +| Field | Type | Required | Default | Description | +|------------|----------|----------|---------|--------------------------------------| +| `title` | `string` | yes | — | Short description of the task | +| `body` | `string` | no | `""` | Task payload (JSON, text, etc.) | +| `priority` | `i32` | no | `0` | Higher values processed first | + +--- + +## Queue Management + +### Statistics + +```bash +# REST +GET /queues/{name}/stats + +# CLI +agent orchestrator queue-stats reports +``` + +**Response:** + +```json +{ + "pending": 3, + "processing": 1, + "completed": 42, + "failed": 2, + "dead": 1 +} +``` + +### Peek (non-destructive view) + +```bash +# REST +GET /queues/{name}/peek?limit=10 + +# CLI +agent orchestrator queue-peek reports --limit 5 +``` + +Returns up to `limit` pending tasks without claiming them. Default limit: 10, max: 100. + +### Purge + +```bash +# REST +DELETE /queues/{name} + +# CLI +agent orchestrator queue-purge reports +``` + +**Response:** + +```json +{ "deleted": 7 } +``` + +Removes **all** tasks from the queue regardless of status. Use with care. + +--- + +## Template Variables + +Queue workflows have access to these `{{placeholder}}` variables in addition to +standard task fields: + +| Variable | Description | +|-------------------|-----------------------------------------| +| `{{queue_name}}` | Name of the queue the task came from | +| `{{queue_task_id}}`| Internal UUID of the queue task | +| `{{queue_priority}}`| Priority value assigned at push time | +| `{{title}}` | Task title (from push request) | +| `{{body}}` | Task body / payload (from push request) | + +--- + +## Retry and Dead-Letter Semantics + +When a task fails to dispatch (agent unavailable, network error, etc.): + +1. The task's `retry_count` is incremented. +2. If `retry_count < max_retries` (default 3), the task is moved back to `pending`. +3. If `retry_count >= max_retries`, the task is moved to `dead` (dead letter). + +Dead-letter tasks are not retried automatically. They remain in the queue for +inspection via `queue-stats` and `queue-peek` (visible in stats but not in peek, +which only shows pending tasks). Purge the queue to remove them. + +--- + +## Architecture Patterns + +### Simple background work queue + +``` +Agent A (producer) → POST /queues/bg-tasks/push +Agent B (worker) → queue trigger workflow consuming bg-tasks +``` + +### Priority queue for tiered processing + +```bash +# High priority bug fix +curl -X POST .../queues/work/push -d '{"title":"Critical bug","priority":100}' + +# Normal feature request +curl -X POST .../queues/work/push -d '{"title":"New feature","priority":10}' +``` + +### Multi-producer pattern with webhooks feeding the queue + +``` +GitHub webhook → webhook-workflow → POST /queues/gh-issues/push +Linear webhook → webhook-workflow → POST /queues/gh-issues/push +Queue workflow → consumes gh-issues → agent processes unified stream +``` + +--- + +## Troubleshooting + +**Tasks are not being picked up:** +- Verify the workflow is enabled: `agent orchestrator get-workflow ` +- Check that `queue_name` in the trigger config matches the name used in `POST /queues/{name}/push` +- Confirm the agent is running: `agent orchestrator list-agents` + +**Tasks stay in `processing` state:** +- The agent may be busy or disconnected. +- After `visibility_timeout_secs` have elapsed, the task will become visible again + and be retried automatically (up to `max_retries`). + +**Queue grows unboundedly:** +- The consumer workflow may be disabled or the agent may be unavailable. +- Monitor with `queue-stats` and adjust producer rate. + +**Dead-letter tasks:** +- Check agent logs for error details. +- Use `queue-purge` to clear dead-letter tasks after investigation. From 61a5ddae920ce035c1295bc2b28c08feac119003 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 14:15:20 -0700 Subject: [PATCH 12/12] fix(scheduler): wire queue task lifecycle into notify_complete Complete the QueueStrategy lifecycle by calling complete_queue_task or fail_queue_task in notify_complete when a queue-sourced dispatch finishes. The source_id format 'queue:{name}:{id}' is parsed to extract the task ID. This also resolves the clippy dead_code warning on these storage methods. --- crates/orchestrator/src/scheduler/runner.rs | 24 ++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/crates/orchestrator/src/scheduler/runner.rs b/crates/orchestrator/src/scheduler/runner.rs index ff612b2f..87d9f70d 100644 --- a/crates/orchestrator/src/scheduler/runner.rs +++ b/crates/orchestrator/src/scheduler/runner.rs @@ -246,10 +246,10 @@ pub async fn notify_complete( storage: &SchedulerStorage, is_error: bool, ) { - let dispatch_id = { + let (dispatch_id, source_id) = { let mut busy = busy.lock().await; - busy.active_source_id = None; - busy.active_dispatch_id.take() + let source_id = busy.active_source_id.take(); + (busy.active_dispatch_id.take(), source_id) }; if let Some(id) = dispatch_id { @@ -260,6 +260,24 @@ pub async fn notify_complete( error!(%id, %e, "Failed to update dispatch status on completion"); } } + + // If the task was sourced from an internal queue, finalize the queue task record. + // source_id format for queue tasks: "queue:{queue_name}:{task_id}" + if let Some(sid) = source_id { + if sid.starts_with("queue:") { + let parts: Vec<&str> = sid.splitn(3, ':').collect(); + if let Some(task_id) = parts.get(2) { + let result = if is_error { + storage.fail_queue_task(task_id).await + } else { + storage.complete_queue_task(task_id).await + }; + if let Err(e) = result { + error!(task_id = *task_id, %e, "Failed to finalize queue task on completion"); + } + } + } + } } /// Create a [`TaskSource`] from a [`TriggerConfig`].