Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
bcca13b
epic: Queue-based trigger — internal task queue with push API
geoffjay Mar 28, 2026
b852f42
feat: add queue table migration and storage operations for task queue
geoffjay Mar 28, 2026
733148b
feat(storage): add task_queue table migration and storage operations …
geoffjay Mar 28, 2026
3f85a64
feat: implement QueueStrategy consuming tasks from internal queue
geoffjay Mar 28, 2026
7a46edb
feat(scheduler): implement QueueStrategy consuming tasks from interna…
geoffjay Mar 28, 2026
37c7bb1
feat(api): add queue API endpoints for push/stats/peek/purge
geoffjay Mar 28, 2026
589213a
feat(api): register queue routes and add push/stats/peek/purge handlers
geoffjay Mar 28, 2026
21f8871
feat: add Queue variant to TriggerConfig and wire into API/CLI
geoffjay Mar 28, 2026
82d38b3
feat(scheduler): add Queue trigger type wired into strategy, API, and…
geoffjay Mar 28, 2026
52373de
test+docs: add tests and documentation for queue-based trigger
geoffjay Mar 28, 2026
a965cff
test+docs: add comprehensive tests and documentation for queue trigger
geoffjay Mar 28, 2026
61a5dda
fix(scheduler): wire queue task lifecycle into notify_complete
geoffjay Mar 28, 2026
6c584de
Merge pull request #887 from geoffjay/issue-822
geoffjay Mar 29, 2026
e008327
Merge pull request #886 from geoffjay/issue-821
geoffjay Mar 29, 2026
673e173
Merge pull request #885 from geoffjay/issue-819
geoffjay Mar 29, 2026
de90561
Merge pull request #879 from geoffjay/issue-820
geoffjay Mar 29, 2026
100a0ec
Merge pull request #878 from geoffjay/issue-818
geoffjay Mar 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 276 additions & 0 deletions crates/cli/src/commands/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub enum TriggerType {
AgentIdle,
/// Linear issues trigger — polls Linear for matching issues.
LinearIssues,
/// Queue-based trigger — consumes tasks from a named internal queue.
Queue,
}

/// Orchestrator service management subcommands.
Expand Down Expand Up @@ -696,6 +698,18 @@ pub enum OrchestratorCommand {
#[arg(long)]
idle_seconds: Option<u64>,

/// Queue name to consume from (required for queue trigger type)
#[arg(long)]
queue_name: Option<String>,

/// Queue poll interval in seconds (optional, for queue trigger type, default 5)
#[arg(long)]
queue_poll_interval: Option<u64>,

/// Queue visibility timeout in seconds (optional, for queue trigger type, default 300)
#[arg(long)]
queue_visibility_timeout: Option<u64>,

/// Prompt template with {{placeholders}} (e.g. "Fix: {{title}}\n{{body}}")
#[arg(long, conflicts_with = "prompt_template_file")]
prompt_template: Option<String>,
Expand Down Expand Up @@ -807,6 +821,42 @@ pub enum OrchestratorCommand {
/// Directory path to remove
path: String,
},

/// Get statistics for a named queue.
///
/// Returns counts of pending, processing, completed, failed, and dead tasks.
///
/// # Examples
///
/// agent orchestrator queue-stats my-queue
QueueStats {
/// Queue name
name: String,
},

/// Peek at pending tasks in a named queue without consuming them.
///
/// # Examples
///
/// agent orchestrator queue-peek my-queue
/// agent orchestrator queue-peek my-queue --limit 5
QueuePeek {
/// Queue name
name: String,
/// Number of tasks to return (default 10, max 100)
#[arg(long, default_value = "10")]
limit: u64,
},

/// Purge all tasks from a named queue.
///
/// # Examples
///
/// agent orchestrator queue-purge my-queue
QueuePurge {
/// Queue name
name: String,
},
}

impl OrchestratorCommand {
Expand Down Expand Up @@ -935,6 +985,9 @@ impl OrchestratorCommand {
linear_labels,
linear_assignee,
idle_seconds,
queue_name,
queue_poll_interval,
queue_visibility_timeout,
prompt_template,
prompt_template_file,
poll_interval,
Expand All @@ -959,6 +1012,9 @@ impl OrchestratorCommand {
linear_labels,
linear_assignee.as_deref(),
*idle_seconds,
queue_name.as_deref(),
*queue_poll_interval,
*queue_visibility_timeout,
prompt_template.as_deref(),
prompt_template_file.as_deref(),
*poll_interval,
Expand Down Expand Up @@ -995,6 +1051,11 @@ impl OrchestratorCommand {
OrchestratorCommand::RemoveDir { id, path } => {
remove_dir_cmd(client, id, path, json).await
}
OrchestratorCommand::QueueStats { name } => queue_stats_cmd(client, name, json).await,
OrchestratorCommand::QueuePeek { name, limit } => {
queue_peek_cmd(client, name, *limit, json).await
}
OrchestratorCommand::QueuePurge { name } => queue_purge_cmd(client, name, json).await,
}
}
}
Expand Down Expand Up @@ -1297,6 +1358,70 @@ fn display_add_dir_response(response: &AddDirResponse) {
}
}

// -- Queue operations --

async fn queue_stats_cmd(client: &OrchestratorClient, name: &str, json: bool) -> Result<()> {
let stats = client.queue_stats(name).await.context("Failed to get queue stats")?;
if json {
println!("{}", serde_json::to_string_pretty(&stats)?);
} else {
println!("{}: {}", "Queue".bold(), name);
if let Some(obj) = stats.as_object() {
for (k, v) in obj {
println!(" {}: {}", k.bold(), v);
}
}
}
Ok(())
}

async fn queue_peek_cmd(
client: &OrchestratorClient,
name: &str,
limit: u64,
json: bool,
) -> Result<()> {
let tasks = client.queue_peek(name, Some(limit)).await.context("Failed to peek queue")?;
if json {
println!("{}", serde_json::to_string_pretty(&tasks)?);
} else if tasks.is_empty() {
println!("{}", "Queue is empty.".yellow());
} else {
println!("{}: {}", "Queue".bold(), name);
println!("{}", "=".repeat(80).cyan());
for task in &tasks {
if let Some(obj) = task.as_object() {
if let Some(id) = obj.get("id").and_then(|v| v.as_str()) {
println!("{}: {}", "ID".bold(), id);
}
if let Some(title) = obj.get("title").and_then(|v| v.as_str()) {
println!("{}: {}", "Title".bold(), title);
}
if let Some(priority) = obj.get("priority") {
println!("{}: {}", "Priority".bold(), priority);
}
if let Some(created) = obj.get("created_at").and_then(|v| v.as_str()) {
println!("{}: {}", "Created".bold(), created);
}
}
println!("{}", "-".repeat(80).cyan());
}
println!("Total: {} task(s)", tasks.len());
}
Ok(())
}

async fn queue_purge_cmd(client: &OrchestratorClient, name: &str, json: bool) -> Result<()> {
let result = client.queue_purge(name).await.context("Failed to purge queue")?;
if json {
println!("{}", serde_json::to_string_pretty(&result)?);
} else {
let deleted = result.get("deleted").and_then(|v| v.as_u64()).unwrap_or(0);
println!("Purged {} task(s) from queue '{}'", deleted, name);
}
Ok(())
}

// -- Attach --

async fn attach_agent(
Expand Down Expand Up @@ -2279,6 +2404,9 @@ async fn create_workflow(
linear_labels: &[String],
linear_assignee: Option<&str>,
idle_seconds: Option<u64>,
queue_name: Option<&str>,
queue_poll_interval: Option<u64>,
queue_visibility_timeout: Option<u64>,
prompt_template: Option<&str>,
prompt_template_file: Option<&std::path::Path>,
poll_interval: u64,
Expand Down Expand Up @@ -2372,6 +2500,15 @@ async fn create_workflow(
assignee: linear_assignee.map(|s| s.to_string()),
}
}
TriggerType::Queue => {
let qname = queue_name
.ok_or_else(|| anyhow::anyhow!("--queue-name is required for queue trigger"))?;
TriggerConfig::Queue {
queue_name: qname.to_string(),
poll_interval_secs: queue_poll_interval,
visibility_timeout_secs: queue_visibility_timeout,
}
}
};

let request = CreateWorkflowRequest {
Expand Down Expand Up @@ -2742,6 +2879,13 @@ fn display_workflow(workflow: &WorkflowResponse) {
println!("{}: {}", "Assignee".bold(), a);
}
}
TriggerConfig::Queue { queue_name, poll_interval_secs, visibility_timeout_secs } => {
println!("{}: {}", "Queue Name".bold(), queue_name);
let interval = poll_interval_secs.unwrap_or(5);
println!("{}: {}s", "Poll Interval".bold(), interval);
let timeout = visibility_timeout_secs.unwrap_or(300);
println!("{}: {}s", "Visibility Timeout".bold(), timeout);
}
}
let template = &workflow.prompt_template;
let display =
Expand Down Expand Up @@ -4062,6 +4206,9 @@ mod tests {
&[], // linear_labels
None, // linear_assignee
None, // idle_seconds
None, // queue_name
None, // queue_poll_interval
None, // queue_visibility_timeout
Some("Fix: {{title}}"),
None, // prompt_template_file
60,
Expand Down Expand Up @@ -4133,6 +4280,9 @@ mod tests {
&[], // linear_labels
None, // linear_assignee
None, // idle_seconds — missing!
None, // queue_name
None, // queue_poll_interval
None, // queue_visibility_timeout
Some("Do background work"),
None, // prompt_template_file
60,
Expand Down Expand Up @@ -4169,6 +4319,9 @@ mod tests {
&[], // linear_labels
None, // linear_assignee
Some(0), // idle_seconds = 0 (invalid)
None, // queue_name
None, // queue_poll_interval
None, // queue_visibility_timeout
Some("Do background work"),
None, // prompt_template_file
60,
Expand All @@ -4184,4 +4337,127 @@ mod tests {
"expected validation error for zero idle_seconds, got: {msg}"
);
}

// -----------------------------------------------------------------------
// Queue subcommand argument parsing
// -----------------------------------------------------------------------

#[test]
fn parse_queue_stats_subcommand() {
use clap::Parser;

#[derive(Parser)]
struct Cli {
#[command(subcommand)]
command: OrchestratorCommand,
}

let cli = Cli::try_parse_from(["test", "queue-stats", "my-queue"])
.expect("should parse queue-stats");

assert!(matches!(
cli.command,
OrchestratorCommand::QueueStats { name } if name == "my-queue"
));
}

#[test]
fn parse_queue_peek_with_limit() {
use clap::Parser;

#[derive(Parser)]
struct Cli {
#[command(subcommand)]
command: OrchestratorCommand,
}

let cli = Cli::try_parse_from(["test", "queue-peek", "my-queue", "--limit", "25"])
.expect("should parse queue-peek");

assert!(matches!(
cli.command,
OrchestratorCommand::QueuePeek { name, limit } if name == "my-queue" && limit == 25
));
}

#[test]
fn parse_queue_peek_default_limit() {
use clap::Parser;

#[derive(Parser)]
struct Cli {
#[command(subcommand)]
command: OrchestratorCommand,
}

let cli = Cli::try_parse_from(["test", "queue-peek", "reports"])
.expect("should parse queue-peek without limit");

assert!(matches!(
cli.command,
OrchestratorCommand::QueuePeek { name, limit } if name == "reports" && limit == 10
));
}

#[test]
fn parse_queue_purge_subcommand() {
use clap::Parser;

#[derive(Parser)]
struct Cli {
#[command(subcommand)]
command: OrchestratorCommand,
}

let cli = Cli::try_parse_from(["test", "queue-purge", "my-queue"])
.expect("should parse queue-purge");

assert!(matches!(
cli.command,
OrchestratorCommand::QueuePurge { name } if name == "my-queue"
));
}

#[test]
fn parse_create_workflow_with_queue_trigger() {
use clap::Parser;

#[derive(Parser)]
struct Cli {
#[command(subcommand)]
command: OrchestratorCommand,
}

let cli = Cli::try_parse_from([
"test",
"create-workflow",
"--name",
"bg-worker",
"--agent-id",
"550e8400-e29b-41d4-a716-446655440000",
"--trigger-type",
"queue",
"--queue-name",
"work-items",
"--prompt-template",
"Process: {{title}}",
])
.expect("should parse queue workflow creation");

if let OrchestratorCommand::CreateWorkflow {
trigger_type,
queue_name,
queue_poll_interval,
queue_visibility_timeout,
..
} = cli.command
{
assert!(matches!(trigger_type, TriggerType::Queue));
assert_eq!(queue_name, Some("work-items".to_string()));
assert_eq!(queue_poll_interval, None);
assert_eq!(queue_visibility_timeout, None);
} else {
panic!("Expected CreateWorkflow variant");
}
}
}
6 changes: 4 additions & 2 deletions crates/orchestrator/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::manager::AgentManager;
use crate::scheduler::api::{webhook_routes, workflow_routes, WorkflowState};
use crate::scheduler::api::{queue_routes, webhook_routes, workflow_routes, WorkflowState};
use crate::scheduler::Scheduler;
use crate::types::*;
use crate::websocket::{
Expand Down Expand Up @@ -55,7 +55,8 @@ pub fn create_router(state: ApiState) -> Router {
let wf_state =
WorkflowState { scheduler: state.scheduler.clone(), manager: state.manager.clone() };
let wf_routes = workflow_routes(wf_state.clone());
let wh_routes = webhook_routes(wf_state);
let wh_routes = webhook_routes(wf_state.clone());
let q_routes = queue_routes(wf_state);

let api_routes = Router::new()
.route("/health", get(health_check))
Expand Down Expand Up @@ -88,6 +89,7 @@ pub fn create_router(state: ApiState) -> Router {
.merge(ws_terminal_routes)
.merge(wf_routes)
.merge(wh_routes)
.merge(q_routes)
}

#[derive(Deserialize)]
Expand Down
Loading
Loading