From b43caedcd07087dc0ad305e7fa8be7f793037045 Mon Sep 17 00:00:00 2001 From: Yureka Date: Mon, 23 Mar 2026 18:21:49 +0100 Subject: [PATCH 1/5] rename Scheduler -> ConstraintsHelper --- vicky/src/bin/vicky/tasks.rs | 7 +- .../{scheduler.rs => constraints_helper.rs} | 96 +++++++++---------- vicky/src/lib/vicky/mod.rs | 2 +- 3 files changed, 53 insertions(+), 52 deletions(-) rename vicky/src/lib/vicky/{scheduler.rs => constraints_helper.rs} (83%) diff --git a/vicky/src/bin/vicky/tasks.rs b/vicky/src/bin/vicky/tasks.rs index 556416e..2d02c7a 100644 --- a/vicky/src/bin/vicky/tasks.rs +++ b/vicky/src/bin/vicky/tasks.rs @@ -12,7 +12,8 @@ use vickylib::database::entities::task::{FlakeRef, TaskResult, TaskStatus}; use vickylib::database::entities::{Database, Lock, Task}; use vickylib::query::FilterParams; use vickylib::{ - errors::VickyError, logs::LogDrain, s3::client::S3Client, vicky::scheduler::Scheduler, + errors::VickyError, logs::LogDrain, s3::client::S3Client, + vicky::constraints_helper::ConstraintsHelper, }; macro_rules! task_or { @@ -255,9 +256,9 @@ pub async fn tasks_claim( ) -> Result>, AppError> { let tasks = db.get_all_tasks().await?; let poisoned_locks = db.get_poisoned_locks().await?; - let scheduler = Scheduler::new(&tasks, &poisoned_locks, &features.features) + let constraints_helper = ConstraintsHelper::new(&tasks, &poisoned_locks, &features.features) .map_err(|x| VickyError::Scheduler { source: x })?; - let next_task = scheduler.get_next_task(); + let next_task = constraints_helper.get_next_task(); match next_task { Some(next_task) => { diff --git a/vicky/src/lib/vicky/scheduler.rs b/vicky/src/lib/vicky/constraints_helper.rs similarity index 83% rename from vicky/src/lib/vicky/scheduler.rs rename to vicky/src/lib/vicky/constraints_helper.rs index dba48f3..88bc01f 100644 --- a/vicky/src/lib/vicky/scheduler.rs +++ b/vicky/src/lib/vicky/constraints_helper.rs @@ -5,13 +5,13 @@ use crate::{ errors::SchedulerError, }; -pub struct Scheduler<'a> { +pub struct ConstraintsHelper<'a> { constraints: Constraints<'a>, tasks: &'a Vec, machine_features: &'a [String], } -impl<'a> Scheduler<'a> { +impl<'a> ConstraintsHelper<'a> { pub fn new( tasks: &'a Vec, poisoned_locks: &'a [Lock], @@ -19,7 +19,7 @@ impl<'a> Scheduler<'a> { ) -> Result { let constraints: Constraints = Constraints::from_tasks(tasks, poisoned_locks)?; - let s = Scheduler { + let s = ConstraintsHelper { constraints, tasks, machine_features, @@ -80,12 +80,12 @@ impl<'a> Scheduler<'a> { mod tests { use uuid::Uuid; - use super::Scheduler; + use super::ConstraintsHelper; use crate::database::entities::task::{TaskResult, TaskStatus}; use crate::database::entities::{Lock, Task}; #[test] - fn scheduler_creation_no_constraints() { + fn constraints_helper_creation_no_constraints() { let tasks = vec![ Task::builder() .display_name("Test 1") @@ -97,11 +97,11 @@ mod tests { .build_expect(), ]; - Scheduler::new(&tasks, &[], &[]).unwrap(); + ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); } #[test] - fn scheduler_creation_multiple_read_constraints() { + fn constraints_helper_creation_multiple_read_constraints() { let tasks = vec![ Task::builder() .display_name("Test 1") @@ -115,11 +115,11 @@ mod tests { .build_expect(), ]; - Scheduler::new(&tasks, &[], &[]).unwrap(); + ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); } #[test] - fn scheduler_creation_single_write_constraints() { + fn constraints_helper_creation_single_write_constraints() { let tasks = vec![ Task::builder() .display_name("Test 1") @@ -133,11 +133,11 @@ mod tests { .build_expect(), ]; - Scheduler::new(&tasks, &[], &[]).unwrap(); + ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); } #[test] - fn scheduler_creation_read_and_cleanup_constraints_is_order_independent() { + fn constraints_helper_creation_read_and_cleanup_constraints_is_order_independent() { let mut tasks_read_then_clean = vec![ Task::builder() .display_name("Read lock") @@ -151,18 +151,18 @@ mod tests { .build_expect(), ]; - Scheduler::new(&tasks_read_then_clean, &[], &[]) - .expect("read->cleanup lock order must not fail scheduler creation"); + ConstraintsHelper::new(&tasks_read_then_clean, &[], &[]) + .expect("read->cleanup lock order must not fail constraints_helper creation"); tasks_read_then_clean.reverse(); let tasks_clean_then_read = tasks_read_then_clean; - Scheduler::new(&tasks_clean_then_read, &[], &[]) - .expect("cleanup->read lock order must not fail scheduler creation"); + ConstraintsHelper::new(&tasks_clean_then_read, &[], &[]) + .expect("cleanup->read lock order must not fail constraints_helper creation"); } #[test] - fn scheduler_creation_multiple_write_constraints() { + fn constraints_helper_creation_multiple_write_constraints() { let tasks = vec![ Task::builder() .display_name("Test 1") @@ -176,12 +176,12 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]); + let res = ConstraintsHelper::new(&tasks, &[], &[]); assert!(res.is_err()); } #[test] - fn scheduler_no_new_task() { + fn constraints_helper_no_new_task() { let tasks = vec![ Task::builder() .display_name("Test 1") @@ -195,13 +195,13 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task(), None) } #[test] - fn scheduler_no_new_task_with_feature() { + fn constraints_helper_no_new_task_with_feature() { let tasks = vec![ Task::builder() .display_name("Test 1") @@ -215,13 +215,13 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); // Test 1 and Test 2 have required features, which our runner does not have. assert_eq!(res.get_next_task(), None) } #[test] - fn scheduler_new_task_with_specific_feature() { + fn constraints_helper_new_task_with_specific_feature() { let tasks = vec![ Task::builder() .display_name("Test 1") @@ -236,13 +236,13 @@ mod tests { ]; let features = &["huge_cpu".to_string()]; - let res = Scheduler::new(&tasks, &[], features).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], features).unwrap(); // Test 1 and Test 2 have required features, which our runner matches. assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") } #[test] - fn scheduler_new_task() { + fn constraints_helper_new_task() { let tasks = vec![ Task::builder() .display_name("Test 1") @@ -256,13 +256,13 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") } #[test] - fn scheduler_new_task_ro() { + fn constraints_helper_new_task_ro() { let tasks = vec![ Task::builder() .display_name("Test 1") @@ -276,13 +276,13 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") } #[test] - fn scheduler_new_task_rw_ro() { + fn constraints_helper_new_task_rw_ro() { let tasks = vec![ Task::builder() .display_name("Test 1") @@ -296,13 +296,13 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task(), None) } #[test] - fn scheduler_new_task_cleanup_single() { + fn constraints_helper_new_task_cleanup_single() { let tasks = vec![ Task::builder() .display_name("Test 1") @@ -311,13 +311,13 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") } #[test] - fn scheduler_new_task_cleanup_with_finished() { + fn constraints_helper_new_task_cleanup_with_finished() { let tasks = vec![ Task::builder() .display_name("Test 5") @@ -331,13 +331,13 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") } #[test] - fn scheduler_cleanup_waits_for_running_task_using_same_lock() { + fn constraints_helper_cleanup_waits_for_running_task_using_same_lock() { let tasks = vec![ Task::builder() .display_name("Im doing something") @@ -351,7 +351,7 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); let eval = res.evaluate_task_readiness(&res.tasks[1]); assert!( @@ -362,7 +362,7 @@ mod tests { } #[test] - fn scheduler_new_task_cleanup() { + fn constraints_helper_new_task_cleanup() { let tasks = vec![ Task::builder() .display_name("Test 1") @@ -376,7 +376,7 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); let eval = res.evaluate_task_readiness(&res.tasks[0]); assert!( eval.is_passive_collision(), @@ -393,7 +393,7 @@ mod tests { } #[test] - fn scheduler_new_task_cleanup_unrelated_pending_lock() { + fn constraints_helper_new_task_cleanup_unrelated_pending_lock() { let tasks = vec![ Task::builder() .display_name("Cleanup lock A") @@ -407,13 +407,13 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); assert_eq!(res.get_next_task().unwrap().display_name, "Cleanup lock A") } #[test] - fn scheduler_needs_validation_locks_block_conflicts_only() { + fn constraints_helper_needs_validation_locks_block_conflicts_only() { let tasks = vec![ Task::builder() .display_name("Task A") @@ -433,13 +433,13 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); assert_eq!(res.get_next_task().unwrap().display_name, "Task B"); } #[test] - fn scheduler_needs_validation_keeps_strongest_lock_when_names_collide() { + fn constraints_helper_needs_validation_keeps_strongest_lock_when_names_collide() { let tasks = vec![ Task::builder() .display_name("Validation writer") @@ -458,7 +458,7 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); let eval = res.evaluate_task_readiness(&res.tasks[2]); assert!( eval.is_passive_collision(), @@ -468,7 +468,7 @@ mod tests { } #[test] - fn scheduler_cleanup_waits_for_non_cleanup_even_with_later_cleanup() { + fn constraints_helper_cleanup_waits_for_non_cleanup_even_with_later_cleanup() { let tasks = vec![ Task::builder() .display_name("Cleanup 1") @@ -487,7 +487,7 @@ mod tests { .build_expect(), ]; - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); let eval = res.evaluate_task_readiness(&res.tasks[0]); assert!( eval.is_passive_collision(), @@ -508,7 +508,7 @@ mod tests { poisoned_lock.poison(&Uuid::new_v4()); let poisoned_locks = vec![poisoned_lock]; - let res = Scheduler::new(&tasks, &poisoned_locks, &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &poisoned_locks, &[]).unwrap(); assert_eq!(res.get_next_task(), None); } @@ -529,7 +529,7 @@ mod tests { poisoned_lock.poison(&Uuid::new_v4()); let poisoned_locks = vec![poisoned_lock]; - let res = Scheduler::new(&tasks, &poisoned_locks, &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &poisoned_locks, &[]).unwrap(); assert_eq!( res.get_next_task().unwrap().display_name, @@ -549,7 +549,7 @@ mod tests { poisoned_lock.poison(&Uuid::new_v4()); let poisoned_locks = vec![poisoned_lock]; - let res = Scheduler::new(&tasks, &poisoned_locks, &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &poisoned_locks, &[]).unwrap(); assert_eq!(res.get_next_task(), None); } diff --git a/vicky/src/lib/vicky/mod.rs b/vicky/src/lib/vicky/mod.rs index 801968f..ba0340d 100644 --- a/vicky/src/lib/vicky/mod.rs +++ b/vicky/src/lib/vicky/mod.rs @@ -1,2 +1,2 @@ mod constraints; -pub mod scheduler; +pub mod constraints_helper; From 630fc167e2ffc9592af4edcc4263bfaf6176280f Mon Sep 17 00:00:00 2001 From: Yureka Date: Tue, 24 Mar 2026 01:09:49 +0100 Subject: [PATCH 2/5] move GlobalEvent to vickylib --- vicky/src/bin/vicky/errors.rs | 3 +-- vicky/src/bin/vicky/events.rs | 9 +-------- vicky/src/bin/vicky/main.rs | 3 ++- vicky/src/bin/vicky/tasks.rs | 3 +-- vicky/src/lib/vicky/events.rs | 8 ++++++++ vicky/src/lib/vicky/mod.rs | 1 + 6 files changed, 14 insertions(+), 13 deletions(-) create mode 100644 vicky/src/lib/vicky/events.rs diff --git a/vicky/src/bin/vicky/errors.rs b/vicky/src/bin/vicky/errors.rs index 9319601..42eb472 100644 --- a/vicky/src/bin/vicky/errors.rs +++ b/vicky/src/bin/vicky/errors.rs @@ -3,8 +3,7 @@ use rocket::{Request, http::Status, response::Responder}; use thiserror::Error; use tokio::sync::broadcast::error::SendError; use vickylib::errors::VickyError; - -use crate::events::GlobalEvent; +use vickylib::vicky::events::GlobalEvent; #[derive(Error, Debug)] pub enum AppError { diff --git a/vicky/src/bin/vicky/events.rs b/vicky/src/bin/vicky/events.rs index 769b1b6..5c89e45 100644 --- a/vicky/src/bin/vicky/events.rs +++ b/vicky/src/bin/vicky/events.rs @@ -1,15 +1,8 @@ use rocket::response::stream::{Event, EventStream}; use rocket::{State, get}; -use serde::{Deserialize, Serialize}; use std::time; use tokio::sync::broadcast::{self, error::TryRecvError}; - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum GlobalEvent { - TaskAdd, - TaskUpdate { uuid: uuid::Uuid }, -} +use vickylib::vicky::events::GlobalEvent; #[get("/")] pub fn get_global_events( diff --git a/vicky/src/bin/vicky/main.rs b/vicky/src/bin/vicky/main.rs index 964874a..340eff4 100644 --- a/vicky/src/bin/vicky/main.rs +++ b/vicky/src/bin/vicky/main.rs @@ -1,5 +1,5 @@ use crate::config::{Config, OIDCConfigResolved, build_rocket_config}; -use crate::events::{GlobalEvent, get_global_events}; +use crate::events::get_global_events; use crate::locks::{ locks_get_active, locks_get_detailed_poisoned, locks_get_poisoned, locks_unlock, }; @@ -25,6 +25,7 @@ use vickylib::database::entities::Database; use vickylib::database::entities::task::HEARTBEAT_TIMEOUT_SEC; use vickylib::logs::LogDrain; use vickylib::s3::client::S3Client; +use vickylib::vicky::events::GlobalEvent; mod auth; mod config; diff --git a/vicky/src/bin/vicky/tasks.rs b/vicky/src/bin/vicky/tasks.rs index 2d02c7a..123c2ef 100644 --- a/vicky/src/bin/vicky/tasks.rs +++ b/vicky/src/bin/vicky/tasks.rs @@ -13,7 +13,7 @@ use vickylib::database::entities::{Database, Lock, Task}; use vickylib::query::FilterParams; use vickylib::{ errors::VickyError, logs::LogDrain, s3::client::S3Client, - vicky::constraints_helper::ConstraintsHelper, + vicky::constraints_helper::ConstraintsHelper, vicky::events::GlobalEvent, }; macro_rules! task_or { @@ -35,7 +35,6 @@ use crate::auth::AnyAuthGuard; use crate::{ auth::{MachineGuard, UserGuard}, errors::AppError, - events::GlobalEvent, }; #[derive(Debug, PartialEq, Serialize, Deserialize)] diff --git a/vicky/src/lib/vicky/events.rs b/vicky/src/lib/vicky/events.rs new file mode 100644 index 0000000..510ebf0 --- /dev/null +++ b/vicky/src/lib/vicky/events.rs @@ -0,0 +1,8 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum GlobalEvent { + TaskAdd, + TaskUpdate { uuid: uuid::Uuid }, +} diff --git a/vicky/src/lib/vicky/mod.rs b/vicky/src/lib/vicky/mod.rs index ba0340d..adccbe4 100644 --- a/vicky/src/lib/vicky/mod.rs +++ b/vicky/src/lib/vicky/mod.rs @@ -1,2 +1,3 @@ mod constraints; pub mod constraints_helper; +pub mod events; From a4a1d2925276bbbe81918496a0e2b73e74ca7691 Mon Sep 17 00:00:00 2001 From: Yureka Date: Wed, 25 Mar 2026 11:42:28 +0100 Subject: [PATCH 3/5] tasks: Make features a HashSet --- vicky/src/bin/vicky/tasks.rs | 3 +- vicky/src/lib/database/entities/task.rs | 11 +++-- vicky/src/lib/vicky/constraints_helper.rs | 59 ++++++++++++----------- 3 files changed, 40 insertions(+), 33 deletions(-) diff --git a/vicky/src/bin/vicky/tasks.rs b/vicky/src/bin/vicky/tasks.rs index 123c2ef..8c04dd4 100644 --- a/vicky/src/bin/vicky/tasks.rs +++ b/vicky/src/bin/vicky/tasks.rs @@ -4,6 +4,7 @@ use rocket::http::Status; use rocket::response::stream::{Event, EventStream}; use rocket::{State, get, post, serde::json::Json}; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::time; use tokio::sync::broadcast::{self, error::TryRecvError}; use uuid::Uuid; @@ -44,7 +45,7 @@ pub struct RoTaskNew { display_name: String, flake_ref: FlakeRef, locks: Vec, - features: Vec, + features: HashSet, group: Option, } diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 30fdc98..5918def 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -8,6 +8,7 @@ use chrono::{DateTime, Utc}; use diesel::{AsExpression, FromSqlRow}; use itertools::Itertools; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use uuid::Uuid; pub const HEARTBEAT_TIMEOUT_SEC: i64 = 60; @@ -62,7 +63,7 @@ pub struct Task { pub flake_ref: FlakeRef, #[builder(field)] - pub features: Vec, + pub features: HashSet, #[builder(default = Uuid::new_v4())] pub id: Uuid, @@ -163,11 +164,11 @@ impl TaskBuilder { } pub fn requires_feature>(mut self, feature: S) -> Self { - self.features.push(feature.into()); + self.features.insert(feature.into()); self } - pub fn requires_features(mut self, features: Vec) -> Self { + pub fn requires_features(mut self, features: HashSet) -> Self { self.features = features; self } @@ -216,7 +217,7 @@ impl From<(DbTask, Vec)> for Task { flake: task.flake_ref_uri, args: task.flake_ref_args, }, - features: task.features, + features: task.features.into_iter().collect(), created_at: task.created_at, claimed_at: task.claimed_at, finished_at: task.finished_at, @@ -366,7 +367,7 @@ pub mod db_impl { id: task.id, display_name: task.display_name, status: task.status, - features: task.features, + features: task.features.into_iter().collect(), flake_ref_uri: task.flake_ref.flake, flake_ref_args: task.flake_ref.args, created_at: task.created_at, diff --git a/vicky/src/lib/vicky/constraints_helper.rs b/vicky/src/lib/vicky/constraints_helper.rs index 88bc01f..8ffff04 100644 --- a/vicky/src/lib/vicky/constraints_helper.rs +++ b/vicky/src/lib/vicky/constraints_helper.rs @@ -4,18 +4,19 @@ use crate::{ database::entities::{Lock, Task}, errors::SchedulerError, }; +use std::collections::HashSet; pub struct ConstraintsHelper<'a> { constraints: Constraints<'a>, tasks: &'a Vec, - machine_features: &'a [String], + machine_features: &'a HashSet, } impl<'a> ConstraintsHelper<'a> { pub fn new( tasks: &'a Vec, poisoned_locks: &'a [Lock], - machine_features: &'a [String], + machine_features: &'a HashSet, ) -> Result { let constraints: Constraints = Constraints::from_tasks(tasks, poisoned_locks)?; @@ -40,7 +41,7 @@ impl<'a> ConstraintsHelper<'a> { fn find_unsupported_features(&self, task: &Task) -> Option { task.features .iter() - .find(|feat| !self.machine_features.contains(feat)) + .find(|feat| !self.machine_features.contains(feat.as_str())) .cloned() } @@ -84,6 +85,10 @@ mod tests { use crate::database::entities::task::{TaskResult, TaskStatus}; use crate::database::entities::{Lock, Task}; + use std::collections::HashSet; + use std::sync::LazyLock; + static EMPTY_SET: LazyLock> = LazyLock::new(HashSet::new); + #[test] fn constraints_helper_creation_no_constraints() { let tasks = vec![ @@ -97,7 +102,7 @@ mod tests { .build_expect(), ]; - ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); } #[test] @@ -115,7 +120,7 @@ mod tests { .build_expect(), ]; - ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); } #[test] @@ -133,7 +138,7 @@ mod tests { .build_expect(), ]; - ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); } #[test] @@ -151,13 +156,13 @@ mod tests { .build_expect(), ]; - ConstraintsHelper::new(&tasks_read_then_clean, &[], &[]) + ConstraintsHelper::new(&tasks_read_then_clean, &[], &*EMPTY_SET) .expect("read->cleanup lock order must not fail constraints_helper creation"); tasks_read_then_clean.reverse(); let tasks_clean_then_read = tasks_read_then_clean; - ConstraintsHelper::new(&tasks_clean_then_read, &[], &[]) + ConstraintsHelper::new(&tasks_clean_then_read, &[], &*EMPTY_SET) .expect("cleanup->read lock order must not fail constraints_helper creation"); } @@ -176,7 +181,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET); assert!(res.is_err()); } @@ -195,7 +200,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task(), None) } @@ -215,7 +220,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); // Test 1 and Test 2 have required features, which our runner does not have. assert_eq!(res.get_next_task(), None) } @@ -235,8 +240,8 @@ mod tests { .build_expect(), ]; - let features = &["huge_cpu".to_string()]; - let res = ConstraintsHelper::new(&tasks, &[], features).unwrap(); + let features = ["huge_cpu".to_string()].into(); + let res = ConstraintsHelper::new(&tasks, &[], &features).unwrap(); // Test 1 and Test 2 have required features, which our runner matches. assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") } @@ -256,7 +261,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") } @@ -276,7 +281,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") } @@ -296,7 +301,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task(), None) } @@ -311,7 +316,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") } @@ -331,7 +336,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); // Test 1 is currently running and has the write lock assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") } @@ -351,7 +356,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); let eval = res.evaluate_task_readiness(&res.tasks[1]); assert!( @@ -376,7 +381,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); let eval = res.evaluate_task_readiness(&res.tasks[0]); assert!( eval.is_passive_collision(), @@ -407,7 +412,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); assert_eq!(res.get_next_task().unwrap().display_name, "Cleanup lock A") } @@ -433,7 +438,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); assert_eq!(res.get_next_task().unwrap().display_name, "Task B"); } @@ -458,7 +463,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); let eval = res.evaluate_task_readiness(&res.tasks[2]); assert!( eval.is_passive_collision(), @@ -487,7 +492,7 @@ mod tests { .build_expect(), ]; - let res = ConstraintsHelper::new(&tasks, &[], &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); let eval = res.evaluate_task_readiness(&res.tasks[0]); assert!( eval.is_passive_collision(), @@ -508,7 +513,7 @@ mod tests { poisoned_lock.poison(&Uuid::new_v4()); let poisoned_locks = vec![poisoned_lock]; - let res = ConstraintsHelper::new(&tasks, &poisoned_locks, &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &poisoned_locks, &*EMPTY_SET).unwrap(); assert_eq!(res.get_next_task(), None); } @@ -529,7 +534,7 @@ mod tests { poisoned_lock.poison(&Uuid::new_v4()); let poisoned_locks = vec![poisoned_lock]; - let res = ConstraintsHelper::new(&tasks, &poisoned_locks, &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &poisoned_locks, &*EMPTY_SET).unwrap(); assert_eq!( res.get_next_task().unwrap().display_name, @@ -549,7 +554,7 @@ mod tests { poisoned_lock.poison(&Uuid::new_v4()); let poisoned_locks = vec![poisoned_lock]; - let res = ConstraintsHelper::new(&tasks, &poisoned_locks, &[]).unwrap(); + let res = ConstraintsHelper::new(&tasks, &poisoned_locks, &*EMPTY_SET).unwrap(); assert_eq!(res.get_next_task(), None); } From 54d547ce6640f0d638124406e3d328281d768c30 Mon Sep 17 00:00:00 2001 From: Yureka Date: Wed, 25 Mar 2026 11:41:50 +0100 Subject: [PATCH 4/5] Add scheduler and make /claim long-polling --- vicky/src/bin/vicky/main.rs | 14 +++- vicky/src/bin/vicky/tasks.rs | 16 +++-- vicky/src/lib/errors.rs | 2 + vicky/src/lib/vicky/mod.rs | 1 + vicky/src/lib/vicky/scheduler.rs | 114 +++++++++++++++++++++++++++++++ 5 files changed, 139 insertions(+), 8 deletions(-) create mode 100644 vicky/src/lib/vicky/scheduler.rs diff --git a/vicky/src/bin/vicky/main.rs b/vicky/src/bin/vicky/main.rs index 340eff4..9f70e81 100644 --- a/vicky/src/bin/vicky/main.rs +++ b/vicky/src/bin/vicky/main.rs @@ -17,6 +17,7 @@ use log::{LevelFilter, error, info, trace, warn}; use rocket::fairing::AdHoc; use rocket::{Build, Ignite, Rocket, routes}; use snafu::ResultExt; +use std::sync::Arc; use std::time::Duration; use tokio::select; use tokio::sync::broadcast; @@ -26,6 +27,7 @@ use vickylib::database::entities::task::HEARTBEAT_TIMEOUT_SEC; use vickylib::logs::LogDrain; use vickylib::s3::client::S3Client; use vickylib::vicky::events::GlobalEvent; +use vickylib::vicky::scheduler::Scheduler; mod auth; mod config; @@ -109,6 +111,8 @@ async fn inner_main() -> Result<()> { let (tx_global_events, _rx_task_events) = broadcast::channel::(5); + let scheduler = Scheduler::new(); + let web_server = build_web_api( app_config, build_rocket, @@ -116,7 +120,8 @@ async fn inner_main() -> Result<()> { jwks_verifier, s3_log_bucket_client, log_drain, - tx_global_events, + tx_global_events.clone(), + scheduler.clone(), ) .await?; @@ -127,6 +132,9 @@ async fn inner_main() -> Result<()> { let web_task = tokio::task::spawn(async move { web_server.launch().await.context(startup::LaunchErr) }); + let scheduler_join_handle = + tokio::task::spawn(scheduler.run(tx_global_events, db_pool.clone())); + let task_timeout_sweeper = tokio::task::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(10)); @@ -154,11 +162,13 @@ async fn inner_main() -> Result<()> { select! { e = web_task => e.map(|_| ()).context(startup::JoinErr)?, _ = task_timeout_sweeper => panic!("Task timeout sweeper shouldn't exit"), + _ = scheduler_join_handle => panic!("Scheduler shouldn't exit"), } Ok(()) } +#[allow(clippy::too_many_arguments)] async fn build_web_api( app_config: Config, build_rocket: Rocket, @@ -167,6 +177,7 @@ async fn build_web_api( s3_log_bucket_client: S3Client, log_drain: LogDrain, tx_global_events: Sender, + scheduler: Arc, ) -> Result> { info!("starting web api"); @@ -177,6 +188,7 @@ async fn build_web_api( .manage(tx_global_events) .manage(app_config.web_config) .manage(oidc_config_resolved) + .manage(scheduler) .attach(Database::fairing()) .attach(AdHoc::config::()) .attach(AdHoc::try_on_ignite( diff --git a/vicky/src/bin/vicky/tasks.rs b/vicky/src/bin/vicky/tasks.rs index 8c04dd4..5876dcf 100644 --- a/vicky/src/bin/vicky/tasks.rs +++ b/vicky/src/bin/vicky/tasks.rs @@ -5,6 +5,7 @@ use rocket::response::stream::{Event, EventStream}; use rocket::{State, get, post, serde::json::Json}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; +use std::sync::Arc; use std::time; use tokio::sync::broadcast::{self, error::TryRecvError}; use uuid::Uuid; @@ -13,8 +14,7 @@ use vickylib::database::entities::task::{FlakeRef, TaskResult, TaskStatus}; use vickylib::database::entities::{Database, Lock, Task}; use vickylib::query::FilterParams; use vickylib::{ - errors::VickyError, logs::LogDrain, s3::client::S3Client, - vicky::constraints_helper::ConstraintsHelper, vicky::events::GlobalEvent, + logs::LogDrain, s3::client::S3Client, vicky::events::GlobalEvent, vicky::scheduler::Scheduler, }; macro_rules! task_or { @@ -250,15 +250,17 @@ pub async fn tasks_put_logs( #[post("/claim", format = "json", data = "")] pub async fn tasks_claim( db: Database, + scheduler: &State>, features: Json, global_events: &State>, _machine: MachineGuard, ) -> Result>, AppError> { - let tasks = db.get_all_tasks().await?; - let poisoned_locks = db.get_poisoned_locks().await?; - let constraints_helper = ConstraintsHelper::new(&tasks, &poisoned_locks, &features.features) - .map_err(|x| VickyError::Scheduler { source: x })?; - let next_task = constraints_helper.get_next_task(); + let next_task: Option = tokio::time::timeout( + time::Duration::from_secs(10), + scheduler.get_next_task(&features.features), + ) + .await + .ok(); match next_task { Some(next_task) => { diff --git a/vicky/src/lib/errors.rs b/vicky/src/lib/errors.rs index 0f2c66d..64758a8 100644 --- a/vicky/src/lib/errors.rs +++ b/vicky/src/lib/errors.rs @@ -47,6 +47,8 @@ pub enum SchedulerError { GeneralSchedulingError, #[error("lock already owned")] LockAlreadyOwnedError, + #[error("channel closed")] + ChannelClosed, } #[derive(Error, Debug)] diff --git a/vicky/src/lib/vicky/mod.rs b/vicky/src/lib/vicky/mod.rs index adccbe4..f46483c 100644 --- a/vicky/src/lib/vicky/mod.rs +++ b/vicky/src/lib/vicky/mod.rs @@ -1,3 +1,4 @@ mod constraints; pub mod constraints_helper; pub mod events; +pub mod scheduler; diff --git a/vicky/src/lib/vicky/scheduler.rs b/vicky/src/lib/vicky/scheduler.rs new file mode 100644 index 0000000..4a6b4b9 --- /dev/null +++ b/vicky/src/lib/vicky/scheduler.rs @@ -0,0 +1,114 @@ +use crate::database::entities::Database; +use crate::errors::SchedulerError; +use crate::errors::VickyError; +use crate::vicky::events::GlobalEvent; +use crate::{database::entities::Task, vicky::constraints_helper::ConstraintsHelper}; +use diesel::PgConnection; +use log::warn; +use std::collections::HashSet; +use std::collections::VecDeque; +use std::sync::Arc; + +pub struct Scheduler { + fairy_handles_tx: tokio::sync::mpsc::Sender, + fairy_handles_rx: std::sync::Mutex>>, +} + +pub struct FairyHandle { + features: HashSet, + task_tx: tokio::sync::oneshot::Sender, +} + +impl Scheduler { + pub fn new() -> Arc { + let (fairy_handles_tx, fairy_handles_rx) = tokio::sync::mpsc::channel(1); + Arc::new(Self { + fairy_handles_tx, + fairy_handles_rx: Some(fairy_handles_rx).into(), + }) + } + + pub async fn get_next_task(&self, machine_features: &[String]) -> Task { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.fairy_handles_tx + .send(FairyHandle { + features: machine_features.iter().cloned().collect(), + task_tx: tx, + }) + .await + .unwrap(); + rx.await.unwrap() + } + + pub async fn run( + self: Arc, + global_events: tokio::sync::broadcast::Sender, + db_pool: rocket_sync_db_pools::ConnectionPool, + ) -> Result<(), VickyError> { + let mut fairy_handles_rx = self + .fairy_handles_rx + .lock() + .unwrap() + .take() + .expect("Scheduler can only be running once"); + let mut global_events_rx = global_events.subscribe(); + let mut waiting_fairies = VecDeque::::new(); + loop { + let Some(db) = Database::get_one_from_pool(&db_pool).await else { + warn!("Scheduler timed out waiting for database connection"); + continue; + }; + + while { + // do: try to schedule a task + + let mut task_scheduled = false; + + // empty the queue, all changes up to here will have been accounted for + global_events_rx.resubscribe(); + + let all_features = waiting_fairies + .iter() + .flat_map(|fairy| fairy.features.iter()) + .cloned() + .collect::>(); + + let tasks = db.get_all_tasks().await?; + let poisoned_locks = db.get_poisoned_locks().await?; + let constraints_helper = + ConstraintsHelper::new(&tasks, &poisoned_locks, &all_features)?; + + if let Some(next_task) = constraints_helper.get_next_task() { + while let Some((idx, _)) = waiting_fairies + .iter() + .enumerate() + .find(|(_, fairy)| fairy.features.is_superset(&next_task.features)) + { + let fairy = waiting_fairies.remove(idx).unwrap(); + if fairy.task_tx.send(next_task.clone()).is_ok() { + task_scheduled = true; + break; + } + } + } + + // while: task was scheduled + task_scheduled + } {} + + tokio::select! { + res = global_events_rx.recv() => { + // wait for changed or new tasks... + if res.is_err() { + return Err(SchedulerError::ChannelClosed.into()); + } + } + res = fairy_handles_rx.recv() => { + // ... or a new fairy asking for a task + let fairy_handle = res.ok_or(SchedulerError::ChannelClosed)?; + waiting_fairies.push_back(fairy_handle); + } + } + } + } +} From 51541d535316dd3a265bee5b50c13be4c0808ff6 Mon Sep 17 00:00:00 2001 From: Yureka Date: Wed, 25 Mar 2026 11:41:55 +0100 Subject: [PATCH 5/5] fairy: remove sleep around claim which is now long-polling --- fairy/src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/fairy/src/main.rs b/fairy/src/main.rs index f7feefd..40b43ba 100644 --- a/fairy/src/main.rs +++ b/fairy/src/main.rs @@ -303,6 +303,5 @@ async fn run(cfg: AppConfig) -> Result<()> { error!("{e}"); tokio::time::sleep(Duration::from_secs(5)).await; } - tokio::time::sleep(Duration::from_secs(1)).await; } }