diff --git a/fairy/src/main.rs b/fairy/src/main.rs index f7feefd..40b43ba 100644 --- a/fairy/src/main.rs +++ b/fairy/src/main.rs @@ -303,6 +303,5 @@ async fn run(cfg: AppConfig) -> Result<()> { error!("{e}"); tokio::time::sleep(Duration::from_secs(5)).await; } - tokio::time::sleep(Duration::from_secs(1)).await; } } diff --git a/vicky/src/bin/vicky/errors.rs b/vicky/src/bin/vicky/errors.rs index 9319601..42eb472 100644 --- a/vicky/src/bin/vicky/errors.rs +++ b/vicky/src/bin/vicky/errors.rs @@ -3,8 +3,7 @@ use rocket::{Request, http::Status, response::Responder}; use thiserror::Error; use tokio::sync::broadcast::error::SendError; use vickylib::errors::VickyError; - -use crate::events::GlobalEvent; +use vickylib::vicky::events::GlobalEvent; #[derive(Error, Debug)] pub enum AppError { diff --git a/vicky/src/bin/vicky/events.rs b/vicky/src/bin/vicky/events.rs index 769b1b6..5c89e45 100644 --- a/vicky/src/bin/vicky/events.rs +++ b/vicky/src/bin/vicky/events.rs @@ -1,15 +1,8 @@ use rocket::response::stream::{Event, EventStream}; use rocket::{State, get}; -use serde::{Deserialize, Serialize}; use std::time; use tokio::sync::broadcast::{self, error::TryRecvError}; - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum GlobalEvent { - TaskAdd, - TaskUpdate { uuid: uuid::Uuid }, -} +use vickylib::vicky::events::GlobalEvent; #[get("/")] pub fn get_global_events( diff --git a/vicky/src/bin/vicky/main.rs b/vicky/src/bin/vicky/main.rs index 964874a..9f70e81 100644 --- a/vicky/src/bin/vicky/main.rs +++ b/vicky/src/bin/vicky/main.rs @@ -1,5 +1,5 @@ use crate::config::{Config, OIDCConfigResolved, build_rocket_config}; -use crate::events::{GlobalEvent, get_global_events}; +use crate::events::get_global_events; use crate::locks::{ locks_get_active, locks_get_detailed_poisoned, locks_get_poisoned, locks_unlock, }; @@ -17,6 +17,7 @@ use log::{LevelFilter, error, info, trace, warn}; use rocket::fairing::AdHoc; use rocket::{Build, Ignite, Rocket, routes}; use snafu::ResultExt; +use std::sync::Arc; use std::time::Duration; use tokio::select; use tokio::sync::broadcast; @@ -25,6 +26,8 @@ use vickylib::database::entities::Database; use vickylib::database::entities::task::HEARTBEAT_TIMEOUT_SEC; use vickylib::logs::LogDrain; use vickylib::s3::client::S3Client; +use vickylib::vicky::events::GlobalEvent; +use vickylib::vicky::scheduler::Scheduler; mod auth; mod config; @@ -108,6 +111,8 @@ async fn inner_main() -> Result<()> { let (tx_global_events, _rx_task_events) = broadcast::channel::(5); + let scheduler = Scheduler::new(); + let web_server = build_web_api( app_config, build_rocket, @@ -115,7 +120,8 @@ async fn inner_main() -> Result<()> { jwks_verifier, s3_log_bucket_client, log_drain, - tx_global_events, + tx_global_events.clone(), + scheduler.clone(), ) .await?; @@ -126,6 +132,9 @@ async fn inner_main() -> Result<()> { let web_task = tokio::task::spawn(async move { web_server.launch().await.context(startup::LaunchErr) }); + let scheduler_join_handle = + tokio::task::spawn(scheduler.run(tx_global_events, db_pool.clone())); + let task_timeout_sweeper = tokio::task::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(10)); @@ -153,11 +162,13 @@ async fn inner_main() -> Result<()> { select! { e = web_task => e.map(|_| ()).context(startup::JoinErr)?, _ = task_timeout_sweeper => panic!("Task timeout sweeper shouldn't exit"), + _ = scheduler_join_handle => panic!("Scheduler shouldn't exit"), } Ok(()) } +#[allow(clippy::too_many_arguments)] async fn build_web_api( app_config: Config, build_rocket: Rocket, @@ -166,6 +177,7 @@ async fn build_web_api( s3_log_bucket_client: S3Client, log_drain: LogDrain, tx_global_events: Sender, + scheduler: Arc, ) -> Result> { info!("starting web api"); @@ -176,6 +188,7 @@ async fn build_web_api( .manage(tx_global_events) .manage(app_config.web_config) .manage(oidc_config_resolved) + .manage(scheduler) .attach(Database::fairing()) .attach(AdHoc::config::()) .attach(AdHoc::try_on_ignite( diff --git a/vicky/src/bin/vicky/tasks.rs b/vicky/src/bin/vicky/tasks.rs index 556416e..5876dcf 100644 --- a/vicky/src/bin/vicky/tasks.rs +++ b/vicky/src/bin/vicky/tasks.rs @@ -4,6 +4,8 @@ use rocket::http::Status; use rocket::response::stream::{Event, EventStream}; use rocket::{State, get, post, serde::json::Json}; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::sync::Arc; use std::time; use tokio::sync::broadcast::{self, error::TryRecvError}; use uuid::Uuid; @@ -12,7 +14,7 @@ use vickylib::database::entities::task::{FlakeRef, TaskResult, TaskStatus}; use vickylib::database::entities::{Database, Lock, Task}; use vickylib::query::FilterParams; use vickylib::{ - errors::VickyError, logs::LogDrain, s3::client::S3Client, vicky::scheduler::Scheduler, + logs::LogDrain, s3::client::S3Client, vicky::events::GlobalEvent, vicky::scheduler::Scheduler, }; macro_rules! task_or { @@ -34,7 +36,6 @@ use crate::auth::AnyAuthGuard; use crate::{ auth::{MachineGuard, UserGuard}, errors::AppError, - events::GlobalEvent, }; #[derive(Debug, PartialEq, Serialize, Deserialize)] @@ -44,7 +45,7 @@ pub struct RoTaskNew { display_name: String, flake_ref: FlakeRef, locks: Vec, - features: Vec, + features: HashSet, group: Option, } @@ -249,15 +250,17 @@ pub async fn tasks_put_logs( #[post("/claim", format = "json", data = "")] pub async fn tasks_claim( db: Database, + scheduler: &State>, features: Json, global_events: &State>, _machine: MachineGuard, ) -> Result>, AppError> { - let tasks = db.get_all_tasks().await?; - let poisoned_locks = db.get_poisoned_locks().await?; - let scheduler = Scheduler::new(&tasks, &poisoned_locks, &features.features) - .map_err(|x| VickyError::Scheduler { source: x })?; - let next_task = scheduler.get_next_task(); + let next_task: Option = tokio::time::timeout( + time::Duration::from_secs(10), + scheduler.get_next_task(&features.features), + ) + .await + .ok(); match next_task { Some(next_task) => { diff --git a/vicky/src/lib/database/entities/task.rs b/vicky/src/lib/database/entities/task.rs index 30fdc98..5918def 100644 --- a/vicky/src/lib/database/entities/task.rs +++ b/vicky/src/lib/database/entities/task.rs @@ -8,6 +8,7 @@ use chrono::{DateTime, Utc}; use diesel::{AsExpression, FromSqlRow}; use itertools::Itertools; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use uuid::Uuid; pub const HEARTBEAT_TIMEOUT_SEC: i64 = 60; @@ -62,7 +63,7 @@ pub struct Task { pub flake_ref: FlakeRef, #[builder(field)] - pub features: Vec, + pub features: HashSet, #[builder(default = Uuid::new_v4())] pub id: Uuid, @@ -163,11 +164,11 @@ impl TaskBuilder { } pub fn requires_feature>(mut self, feature: S) -> Self { - self.features.push(feature.into()); + self.features.insert(feature.into()); self } - pub fn requires_features(mut self, features: Vec) -> Self { + pub fn requires_features(mut self, features: HashSet) -> Self { self.features = features; self } @@ -216,7 +217,7 @@ impl From<(DbTask, Vec)> for Task { flake: task.flake_ref_uri, args: task.flake_ref_args, }, - features: task.features, + features: task.features.into_iter().collect(), created_at: task.created_at, claimed_at: task.claimed_at, finished_at: task.finished_at, @@ -366,7 +367,7 @@ pub mod db_impl { id: task.id, display_name: task.display_name, status: task.status, - features: task.features, + features: task.features.into_iter().collect(), flake_ref_uri: task.flake_ref.flake, flake_ref_args: task.flake_ref.args, created_at: task.created_at, diff --git a/vicky/src/lib/errors.rs b/vicky/src/lib/errors.rs index 0f2c66d..64758a8 100644 --- a/vicky/src/lib/errors.rs +++ b/vicky/src/lib/errors.rs @@ -47,6 +47,8 @@ pub enum SchedulerError { GeneralSchedulingError, #[error("lock already owned")] LockAlreadyOwnedError, + #[error("channel closed")] + ChannelClosed, } #[derive(Error, Debug)] diff --git a/vicky/src/lib/vicky/constraints_helper.rs b/vicky/src/lib/vicky/constraints_helper.rs new file mode 100644 index 0000000..8ffff04 --- /dev/null +++ b/vicky/src/lib/vicky/constraints_helper.rs @@ -0,0 +1,561 @@ +use crate::database::entities::task::TaskStatus; +use crate::vicky::constraints::{ConstraintEvaluation, ConstraintFail, Constraints}; +use crate::{ + database::entities::{Lock, Task}, + errors::SchedulerError, +}; +use std::collections::HashSet; + +pub struct ConstraintsHelper<'a> { + constraints: Constraints<'a>, + tasks: &'a Vec, + machine_features: &'a HashSet, +} + +impl<'a> ConstraintsHelper<'a> { + pub fn new( + tasks: &'a Vec, + poisoned_locks: &'a [Lock], + machine_features: &'a HashSet, + ) -> Result { + let constraints: Constraints = Constraints::from_tasks(tasks, poisoned_locks)?; + + let s = ConstraintsHelper { + constraints, + tasks, + machine_features, + }; + + #[cfg(test)] + s.print_debug_evaluation(); + + Ok(s) + } + + fn find_constraint(&'a self, task: &Task) -> Option> { + task.locks + .iter() + .find_map(|lock| self.constraints.try_acquire(lock)) + } + + fn find_unsupported_features(&self, task: &Task) -> Option { + task.features + .iter() + .find(|feat| !self.machine_features.contains(feat.as_str())) + .cloned() + } + + fn evaluate_task_readiness(&'a self, task: &Task) -> ConstraintEvaluation<'a> { + if task.status != TaskStatus::New { + return ConstraintEvaluation::NotReady; + } + + if let Some(feature) = self.find_unsupported_features(task) { + return ConstraintEvaluation::missing_feature(feature); + } + + if let Some(constraint) = self.find_constraint(task) { + return ConstraintEvaluation::Constrained(constraint); + } + + ConstraintEvaluation::Ready + } + + pub fn get_next_task(self) -> Option { + self.tasks + .iter() + .find(|task| self.evaluate_task_readiness(task).is_ready()) + .cloned() + } + + #[allow(unused)] + pub fn print_debug_evaluation(&self) { + for task in self.tasks { + let eval = self.evaluate_task_readiness(task); + println!("Readiness of {} ({}): {eval:?}", task.id, task.display_name); + } + } +} + +#[cfg(test)] +mod tests { + use uuid::Uuid; + + use super::ConstraintsHelper; + use crate::database::entities::task::{TaskResult, TaskStatus}; + use crate::database::entities::{Lock, Task}; + + use std::collections::HashSet; + use std::sync::LazyLock; + static EMPTY_SET: LazyLock> = LazyLock::new(HashSet::new); + + #[test] + fn constraints_helper_creation_no_constraints() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::Running) + .build_expect(), + Task::builder() + .display_name("Test 2") + .status(TaskStatus::Running) + .build_expect(), + ]; + + ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + } + + #[test] + fn constraints_helper_creation_multiple_read_constraints() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::Running) + .read_lock("foo 1") + .build_expect(), + Task::builder() + .display_name("Test 2") + .status(TaskStatus::Running) + .read_lock("foo 1") + .build_expect(), + ]; + + ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + } + + #[test] + fn constraints_helper_creation_single_write_constraints() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::Running) + .write_lock("foo1") + .build_expect(), + Task::builder() + .display_name("Test 2") + .status(TaskStatus::Running) + .write_lock("foo2") + .build_expect(), + ]; + + ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + } + + #[test] + fn constraints_helper_creation_read_and_cleanup_constraints_is_order_independent() { + let mut tasks_read_then_clean = vec![ + Task::builder() + .display_name("Read lock") + .status(TaskStatus::Running) + .read_lock("shared") + .build_expect(), + Task::builder() + .display_name("Cleanup lock") + .status(TaskStatus::New) + .clean_lock("shared") + .build_expect(), + ]; + + ConstraintsHelper::new(&tasks_read_then_clean, &[], &*EMPTY_SET) + .expect("read->cleanup lock order must not fail constraints_helper creation"); + + tasks_read_then_clean.reverse(); + let tasks_clean_then_read = tasks_read_then_clean; + + ConstraintsHelper::new(&tasks_clean_then_read, &[], &*EMPTY_SET) + .expect("cleanup->read lock order must not fail constraints_helper creation"); + } + + #[test] + fn constraints_helper_creation_multiple_write_constraints() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::Running) + .write_lock("foo1") + .build_expect(), + Task::builder() + .display_name("Test 2") + .status(TaskStatus::Running) + .write_lock("foo1") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET); + assert!(res.is_err()); + } + + #[test] + fn constraints_helper_no_new_task() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::Running) + .write_lock("foo1") + .build_expect(), + Task::builder() + .display_name("Test 2") + .status(TaskStatus::New) + .write_lock("foo1") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + // Test 1 is currently running and has the write lock + assert_eq!(res.get_next_task(), None) + } + + #[test] + fn constraints_helper_no_new_task_with_feature() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::New) + .requires_feature("huge_cpu") + .build_expect(), + Task::builder() + .display_name("Test 2") + .status(TaskStatus::New) + .requires_feature("huge_cpu") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + // Test 1 and Test 2 have required features, which our runner does not have. + assert_eq!(res.get_next_task(), None) + } + + #[test] + fn constraints_helper_new_task_with_specific_feature() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::New) + .requires_feature("huge_cpu") + .build_expect(), + Task::builder() + .display_name("Test 2") + .status(TaskStatus::New) + .requires_feature("huge_cpu") + .build_expect(), + ]; + + let features = ["huge_cpu".to_string()].into(); + let res = ConstraintsHelper::new(&tasks, &[], &features).unwrap(); + // Test 1 and Test 2 have required features, which our runner matches. + assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") + } + + #[test] + fn constraints_helper_new_task() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::Running) + .write_lock("foo1") + .build_expect(), + Task::builder() + .display_name("Test 2") + .status(TaskStatus::New) + .write_lock("foo2") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + // Test 1 is currently running and has the write lock + assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") + } + + #[test] + fn constraints_helper_new_task_ro() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::Running) + .read_lock("foo1") + .build_expect(), + Task::builder() + .display_name("Test 2") + .status(TaskStatus::New) + .read_lock("foo1") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + // Test 1 is currently running and has the write lock + assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") + } + + #[test] + fn constraints_helper_new_task_rw_ro() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::Running) + .write_lock("foo1") + .build_expect(), + Task::builder() + .display_name("Test 2") + .status(TaskStatus::New) + .read_lock("foo1") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + // Test 1 is currently running and has the write lock + assert_eq!(res.get_next_task(), None) + } + + #[test] + fn constraints_helper_new_task_cleanup_single() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::New) + .clean_lock("foo1") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + // Test 1 is currently running and has the write lock + assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") + } + + #[test] + fn constraints_helper_new_task_cleanup_with_finished() { + let tasks = vec![ + Task::builder() + .display_name("Test 5") + .status(TaskStatus::Finished(TaskResult::Success)) + .write_lock("foo1") + .build_expect(), + Task::builder() + .display_name("Test 1") + .status(TaskStatus::New) + .clean_lock("foo1") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + // Test 1 is currently running and has the write lock + assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") + } + + #[test] + fn constraints_helper_cleanup_waits_for_running_task_using_same_lock() { + let tasks = vec![ + Task::builder() + .display_name("Im doing something") + .status(TaskStatus::Running) + .read_lock("foo1") + .build_expect(), + Task::builder() + .display_name("Cleanup after") + .status(TaskStatus::New) + .clean_lock("foo1") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + let eval = res.evaluate_task_readiness(&res.tasks[1]); + + assert!( + !eval.is_ready(), + "Expected cleanup to wait while same lock is currently in use. Got {eval:?}" + ); + assert_eq!(res.get_next_task(), None); + } + + #[test] + fn constraints_helper_new_task_cleanup() { + let tasks = vec![ + Task::builder() + .display_name("Test 1") + .status(TaskStatus::New) + .clean_lock("foo1") + .build_expect(), + Task::builder() + .display_name("Test 2") + .status(TaskStatus::New) + .read_lock("foo1") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + let eval = res.evaluate_task_readiness(&res.tasks[0]); + assert!( + eval.is_passive_collision(), + "Expected evaluation to actively collide. But received {eval:?}" + ); + let eval = res.evaluate_task_readiness(&res.tasks[1]); + assert!( + eval.is_ready(), + "Expected evaluation to succeed. But received {eval:?}" + ); + + // Run tasks before cleanup + assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") + } + + #[test] + fn constraints_helper_new_task_cleanup_unrelated_pending_lock() { + let tasks = vec![ + Task::builder() + .display_name("Cleanup lock A") + .status(TaskStatus::New) + .clean_lock("lock_a") + .build_expect(), + Task::builder() + .display_name("Pending lock B") + .status(TaskStatus::New) + .read_lock("lock_b") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + + assert_eq!(res.get_next_task().unwrap().display_name, "Cleanup lock A") + } + + #[test] + fn constraints_helper_needs_validation_locks_block_conflicts_only() { + let tasks = vec![ + Task::builder() + .display_name("Task A") + .status(TaskStatus::NeedsUserValidation) + .write_lock("lock_a") + .read_lock("lock_b") + .build_expect(), + Task::builder() + .display_name("Task A2") + .status(TaskStatus::New) + .read_lock("lock_a") + .build_expect(), + Task::builder() + .display_name("Task B") + .status(TaskStatus::New) + .read_lock("lock_b") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + + assert_eq!(res.get_next_task().unwrap().display_name, "Task B"); + } + + #[test] + fn constraints_helper_needs_validation_keeps_strongest_lock_when_names_collide() { + let tasks = vec![ + Task::builder() + .display_name("Validation writer") + .status(TaskStatus::NeedsUserValidation) + .write_lock("shared_lock") + .build_expect(), + Task::builder() + .display_name("Validation reader") + .status(TaskStatus::NeedsUserValidation) + .read_lock("shared_lock") + .build_expect(), + Task::builder() + .display_name("New reader") + .status(TaskStatus::New) + .read_lock("shared_lock") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + let eval = res.evaluate_task_readiness(&res.tasks[2]); + assert!( + eval.is_passive_collision(), + "Expected passive collision from validation writer, got {eval:?}" + ); + assert_eq!(res.get_next_task(), None); + } + + #[test] + fn constraints_helper_cleanup_waits_for_non_cleanup_even_with_later_cleanup() { + let tasks = vec![ + Task::builder() + .display_name("Cleanup 1") + .status(TaskStatus::New) + .clean_lock("shared_lock") + .build_expect(), + Task::builder() + .display_name("Reader") + .status(TaskStatus::New) + .read_lock("shared_lock") + .build_expect(), + Task::builder() + .display_name("Cleanup 2") + .status(TaskStatus::New) + .clean_lock("shared_lock") + .build_expect(), + ]; + + let res = ConstraintsHelper::new(&tasks, &[], &*EMPTY_SET).unwrap(); + let eval = res.evaluate_task_readiness(&res.tasks[0]); + assert!( + eval.is_passive_collision(), + "Expected cleanup to wait for pending non-clean lock, got {eval:?}" + ); + assert_eq!(res.get_next_task().unwrap().display_name, "Reader"); + } + + #[test] + fn schedule_with_poisoned_lock() { + let tasks = vec![ + Task::builder() + .display_name("I need to do something") + .write_lock("Entire Prod Cluster") + .build_expect(), + ]; + let mut poisoned_lock = Lock::write("Entire Prod Cluster"); + poisoned_lock.poison(&Uuid::new_v4()); + let poisoned_locks = vec![poisoned_lock]; + + let res = ConstraintsHelper::new(&tasks, &poisoned_locks, &*EMPTY_SET).unwrap(); + + assert_eq!(res.get_next_task(), None); + } + + #[test] + fn schedule_different_tasks_with_poisoned_lock() { + let tasks = vec![ + Task::builder() + .display_name("I need to do something") + .write_lock("Entire Prod Cluster") + .build_expect(), + Task::builder() + .display_name("I need to test something") + .write_lock("Entire Staging Cluster") + .build_expect(), + ]; + let mut poisoned_lock = Lock::write("Entire Prod Cluster"); + poisoned_lock.poison(&Uuid::new_v4()); + let poisoned_locks = vec![poisoned_lock]; + + let res = ConstraintsHelper::new(&tasks, &poisoned_locks, &*EMPTY_SET).unwrap(); + + assert_eq!( + res.get_next_task().unwrap().display_name, + "I need to test something" + ); + } + + #[test] + fn schedule_different_tasks_with_poisoned_lock_ro() { + let tasks = vec![ + Task::builder() + .display_name("I need to do something") + .read_lock("Entire Prod Cluster") + .build_expect(), + ]; + let mut poisoned_lock = Lock::read("Entire Prod Cluster"); + poisoned_lock.poison(&Uuid::new_v4()); + let poisoned_locks = vec![poisoned_lock]; + + let res = ConstraintsHelper::new(&tasks, &poisoned_locks, &*EMPTY_SET).unwrap(); + + assert_eq!(res.get_next_task(), None); + } +} diff --git a/vicky/src/lib/vicky/events.rs b/vicky/src/lib/vicky/events.rs new file mode 100644 index 0000000..510ebf0 --- /dev/null +++ b/vicky/src/lib/vicky/events.rs @@ -0,0 +1,8 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum GlobalEvent { + TaskAdd, + TaskUpdate { uuid: uuid::Uuid }, +} diff --git a/vicky/src/lib/vicky/mod.rs b/vicky/src/lib/vicky/mod.rs index 801968f..f46483c 100644 --- a/vicky/src/lib/vicky/mod.rs +++ b/vicky/src/lib/vicky/mod.rs @@ -1,2 +1,4 @@ mod constraints; +pub mod constraints_helper; +pub mod events; pub mod scheduler; diff --git a/vicky/src/lib/vicky/scheduler.rs b/vicky/src/lib/vicky/scheduler.rs index dba48f3..4a6b4b9 100644 --- a/vicky/src/lib/vicky/scheduler.rs +++ b/vicky/src/lib/vicky/scheduler.rs @@ -1,556 +1,114 @@ -use crate::database::entities::task::TaskStatus; -use crate::vicky::constraints::{ConstraintEvaluation, ConstraintFail, Constraints}; -use crate::{ - database::entities::{Lock, Task}, - errors::SchedulerError, -}; - -pub struct Scheduler<'a> { - constraints: Constraints<'a>, - tasks: &'a Vec, - machine_features: &'a [String], +use crate::database::entities::Database; +use crate::errors::SchedulerError; +use crate::errors::VickyError; +use crate::vicky::events::GlobalEvent; +use crate::{database::entities::Task, vicky::constraints_helper::ConstraintsHelper}; +use diesel::PgConnection; +use log::warn; +use std::collections::HashSet; +use std::collections::VecDeque; +use std::sync::Arc; + +pub struct Scheduler { + fairy_handles_tx: tokio::sync::mpsc::Sender, + fairy_handles_rx: std::sync::Mutex>>, } -impl<'a> Scheduler<'a> { - pub fn new( - tasks: &'a Vec, - poisoned_locks: &'a [Lock], - machine_features: &'a [String], - ) -> Result { - let constraints: Constraints = Constraints::from_tasks(tasks, poisoned_locks)?; - - let s = Scheduler { - constraints, - tasks, - machine_features, - }; - - #[cfg(test)] - s.print_debug_evaluation(); - - Ok(s) - } - - fn find_constraint(&'a self, task: &Task) -> Option> { - task.locks - .iter() - .find_map(|lock| self.constraints.try_acquire(lock)) - } - - fn find_unsupported_features(&self, task: &Task) -> Option { - task.features - .iter() - .find(|feat| !self.machine_features.contains(feat)) - .cloned() - } - - fn evaluate_task_readiness(&'a self, task: &Task) -> ConstraintEvaluation<'a> { - if task.status != TaskStatus::New { - return ConstraintEvaluation::NotReady; - } - - if let Some(feature) = self.find_unsupported_features(task) { - return ConstraintEvaluation::missing_feature(feature); - } - - if let Some(constraint) = self.find_constraint(task) { - return ConstraintEvaluation::Constrained(constraint); - } - - ConstraintEvaluation::Ready - } - - pub fn get_next_task(self) -> Option { - self.tasks - .iter() - .find(|task| self.evaluate_task_readiness(task).is_ready()) - .cloned() - } - - #[allow(unused)] - pub fn print_debug_evaluation(&self) { - for task in self.tasks { - let eval = self.evaluate_task_readiness(task); - println!("Readiness of {} ({}): {eval:?}", task.id, task.display_name); - } - } +pub struct FairyHandle { + features: HashSet, + task_tx: tokio::sync::oneshot::Sender, } -#[cfg(test)] -mod tests { - use uuid::Uuid; - - use super::Scheduler; - use crate::database::entities::task::{TaskResult, TaskStatus}; - use crate::database::entities::{Lock, Task}; - - #[test] - fn scheduler_creation_no_constraints() { - let tasks = vec![ - Task::builder() - .display_name("Test 1") - .status(TaskStatus::Running) - .build_expect(), - Task::builder() - .display_name("Test 2") - .status(TaskStatus::Running) - .build_expect(), - ]; - - Scheduler::new(&tasks, &[], &[]).unwrap(); - } - - #[test] - fn scheduler_creation_multiple_read_constraints() { - let tasks = vec![ - Task::builder() - .display_name("Test 1") - .status(TaskStatus::Running) - .read_lock("foo 1") - .build_expect(), - Task::builder() - .display_name("Test 2") - .status(TaskStatus::Running) - .read_lock("foo 1") - .build_expect(), - ]; - - Scheduler::new(&tasks, &[], &[]).unwrap(); - } - - #[test] - fn scheduler_creation_single_write_constraints() { - let tasks = vec![ - Task::builder() - .display_name("Test 1") - .status(TaskStatus::Running) - .write_lock("foo1") - .build_expect(), - Task::builder() - .display_name("Test 2") - .status(TaskStatus::Running) - .write_lock("foo2") - .build_expect(), - ]; - - Scheduler::new(&tasks, &[], &[]).unwrap(); - } - - #[test] - fn scheduler_creation_read_and_cleanup_constraints_is_order_independent() { - let mut tasks_read_then_clean = vec![ - Task::builder() - .display_name("Read lock") - .status(TaskStatus::Running) - .read_lock("shared") - .build_expect(), - Task::builder() - .display_name("Cleanup lock") - .status(TaskStatus::New) - .clean_lock("shared") - .build_expect(), - ]; - - Scheduler::new(&tasks_read_then_clean, &[], &[]) - .expect("read->cleanup lock order must not fail scheduler creation"); - - tasks_read_then_clean.reverse(); - let tasks_clean_then_read = tasks_read_then_clean; - - Scheduler::new(&tasks_clean_then_read, &[], &[]) - .expect("cleanup->read lock order must not fail scheduler creation"); - } - - #[test] - fn scheduler_creation_multiple_write_constraints() { - let tasks = vec![ - Task::builder() - .display_name("Test 1") - .status(TaskStatus::Running) - .write_lock("foo1") - .build_expect(), - Task::builder() - .display_name("Test 2") - .status(TaskStatus::Running) - .write_lock("foo1") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]); - assert!(res.is_err()); - } - - #[test] - fn scheduler_no_new_task() { - let tasks = vec![ - Task::builder() - .display_name("Test 1") - .status(TaskStatus::Running) - .write_lock("foo1") - .build_expect(), - Task::builder() - .display_name("Test 2") - .status(TaskStatus::New) - .write_lock("foo1") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - // Test 1 is currently running and has the write lock - assert_eq!(res.get_next_task(), None) - } - - #[test] - fn scheduler_no_new_task_with_feature() { - let tasks = vec![ - Task::builder() - .display_name("Test 1") - .status(TaskStatus::New) - .requires_feature("huge_cpu") - .build_expect(), - Task::builder() - .display_name("Test 2") - .status(TaskStatus::New) - .requires_feature("huge_cpu") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - // Test 1 and Test 2 have required features, which our runner does not have. - assert_eq!(res.get_next_task(), None) - } - - #[test] - fn scheduler_new_task_with_specific_feature() { - let tasks = vec![ - Task::builder() - .display_name("Test 1") - .status(TaskStatus::New) - .requires_feature("huge_cpu") - .build_expect(), - Task::builder() - .display_name("Test 2") - .status(TaskStatus::New) - .requires_feature("huge_cpu") - .build_expect(), - ]; - - let features = &["huge_cpu".to_string()]; - let res = Scheduler::new(&tasks, &[], features).unwrap(); - // Test 1 and Test 2 have required features, which our runner matches. - assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") - } - - #[test] - fn scheduler_new_task() { - let tasks = vec![ - Task::builder() - .display_name("Test 1") - .status(TaskStatus::Running) - .write_lock("foo1") - .build_expect(), - Task::builder() - .display_name("Test 2") - .status(TaskStatus::New) - .write_lock("foo2") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - // Test 1 is currently running and has the write lock - assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") - } - - #[test] - fn scheduler_new_task_ro() { - let tasks = vec![ - Task::builder() - .display_name("Test 1") - .status(TaskStatus::Running) - .read_lock("foo1") - .build_expect(), - Task::builder() - .display_name("Test 2") - .status(TaskStatus::New) - .read_lock("foo1") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - // Test 1 is currently running and has the write lock - assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") - } - - #[test] - fn scheduler_new_task_rw_ro() { - let tasks = vec![ - Task::builder() - .display_name("Test 1") - .status(TaskStatus::Running) - .write_lock("foo1") - .build_expect(), - Task::builder() - .display_name("Test 2") - .status(TaskStatus::New) - .read_lock("foo1") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - // Test 1 is currently running and has the write lock - assert_eq!(res.get_next_task(), None) - } - - #[test] - fn scheduler_new_task_cleanup_single() { - let tasks = vec![ - Task::builder() - .display_name("Test 1") - .status(TaskStatus::New) - .clean_lock("foo1") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - // Test 1 is currently running and has the write lock - assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") - } - - #[test] - fn scheduler_new_task_cleanup_with_finished() { - let tasks = vec![ - Task::builder() - .display_name("Test 5") - .status(TaskStatus::Finished(TaskResult::Success)) - .write_lock("foo1") - .build_expect(), - Task::builder() - .display_name("Test 1") - .status(TaskStatus::New) - .clean_lock("foo1") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - // Test 1 is currently running and has the write lock - assert_eq!(res.get_next_task().unwrap().display_name, "Test 1") - } - - #[test] - fn scheduler_cleanup_waits_for_running_task_using_same_lock() { - let tasks = vec![ - Task::builder() - .display_name("Im doing something") - .status(TaskStatus::Running) - .read_lock("foo1") - .build_expect(), - Task::builder() - .display_name("Cleanup after") - .status(TaskStatus::New) - .clean_lock("foo1") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - let eval = res.evaluate_task_readiness(&res.tasks[1]); - - assert!( - !eval.is_ready(), - "Expected cleanup to wait while same lock is currently in use. Got {eval:?}" - ); - assert_eq!(res.get_next_task(), None); - } - - #[test] - fn scheduler_new_task_cleanup() { - let tasks = vec![ - Task::builder() - .display_name("Test 1") - .status(TaskStatus::New) - .clean_lock("foo1") - .build_expect(), - Task::builder() - .display_name("Test 2") - .status(TaskStatus::New) - .read_lock("foo1") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - let eval = res.evaluate_task_readiness(&res.tasks[0]); - assert!( - eval.is_passive_collision(), - "Expected evaluation to actively collide. But received {eval:?}" - ); - let eval = res.evaluate_task_readiness(&res.tasks[1]); - assert!( - eval.is_ready(), - "Expected evaluation to succeed. But received {eval:?}" - ); - - // Run tasks before cleanup - assert_eq!(res.get_next_task().unwrap().display_name, "Test 2") - } - - #[test] - fn scheduler_new_task_cleanup_unrelated_pending_lock() { - let tasks = vec![ - Task::builder() - .display_name("Cleanup lock A") - .status(TaskStatus::New) - .clean_lock("lock_a") - .build_expect(), - Task::builder() - .display_name("Pending lock B") - .status(TaskStatus::New) - .read_lock("lock_b") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - - assert_eq!(res.get_next_task().unwrap().display_name, "Cleanup lock A") - } - - #[test] - fn scheduler_needs_validation_locks_block_conflicts_only() { - let tasks = vec![ - Task::builder() - .display_name("Task A") - .status(TaskStatus::NeedsUserValidation) - .write_lock("lock_a") - .read_lock("lock_b") - .build_expect(), - Task::builder() - .display_name("Task A2") - .status(TaskStatus::New) - .read_lock("lock_a") - .build_expect(), - Task::builder() - .display_name("Task B") - .status(TaskStatus::New) - .read_lock("lock_b") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - - assert_eq!(res.get_next_task().unwrap().display_name, "Task B"); - } - - #[test] - fn scheduler_needs_validation_keeps_strongest_lock_when_names_collide() { - let tasks = vec![ - Task::builder() - .display_name("Validation writer") - .status(TaskStatus::NeedsUserValidation) - .write_lock("shared_lock") - .build_expect(), - Task::builder() - .display_name("Validation reader") - .status(TaskStatus::NeedsUserValidation) - .read_lock("shared_lock") - .build_expect(), - Task::builder() - .display_name("New reader") - .status(TaskStatus::New) - .read_lock("shared_lock") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - let eval = res.evaluate_task_readiness(&res.tasks[2]); - assert!( - eval.is_passive_collision(), - "Expected passive collision from validation writer, got {eval:?}" - ); - assert_eq!(res.get_next_task(), None); - } - - #[test] - fn scheduler_cleanup_waits_for_non_cleanup_even_with_later_cleanup() { - let tasks = vec![ - Task::builder() - .display_name("Cleanup 1") - .status(TaskStatus::New) - .clean_lock("shared_lock") - .build_expect(), - Task::builder() - .display_name("Reader") - .status(TaskStatus::New) - .read_lock("shared_lock") - .build_expect(), - Task::builder() - .display_name("Cleanup 2") - .status(TaskStatus::New) - .clean_lock("shared_lock") - .build_expect(), - ]; - - let res = Scheduler::new(&tasks, &[], &[]).unwrap(); - let eval = res.evaluate_task_readiness(&res.tasks[0]); - assert!( - eval.is_passive_collision(), - "Expected cleanup to wait for pending non-clean lock, got {eval:?}" - ); - assert_eq!(res.get_next_task().unwrap().display_name, "Reader"); - } - - #[test] - fn schedule_with_poisoned_lock() { - let tasks = vec![ - Task::builder() - .display_name("I need to do something") - .write_lock("Entire Prod Cluster") - .build_expect(), - ]; - let mut poisoned_lock = Lock::write("Entire Prod Cluster"); - poisoned_lock.poison(&Uuid::new_v4()); - let poisoned_locks = vec![poisoned_lock]; - - let res = Scheduler::new(&tasks, &poisoned_locks, &[]).unwrap(); - - assert_eq!(res.get_next_task(), None); - } - - #[test] - fn schedule_different_tasks_with_poisoned_lock() { - let tasks = vec![ - Task::builder() - .display_name("I need to do something") - .write_lock("Entire Prod Cluster") - .build_expect(), - Task::builder() - .display_name("I need to test something") - .write_lock("Entire Staging Cluster") - .build_expect(), - ]; - let mut poisoned_lock = Lock::write("Entire Prod Cluster"); - poisoned_lock.poison(&Uuid::new_v4()); - let poisoned_locks = vec![poisoned_lock]; - - let res = Scheduler::new(&tasks, &poisoned_locks, &[]).unwrap(); - - assert_eq!( - res.get_next_task().unwrap().display_name, - "I need to test something" - ); - } - - #[test] - fn schedule_different_tasks_with_poisoned_lock_ro() { - let tasks = vec![ - Task::builder() - .display_name("I need to do something") - .read_lock("Entire Prod Cluster") - .build_expect(), - ]; - let mut poisoned_lock = Lock::read("Entire Prod Cluster"); - poisoned_lock.poison(&Uuid::new_v4()); - let poisoned_locks = vec![poisoned_lock]; - - let res = Scheduler::new(&tasks, &poisoned_locks, &[]).unwrap(); - - assert_eq!(res.get_next_task(), None); +impl Scheduler { + pub fn new() -> Arc { + let (fairy_handles_tx, fairy_handles_rx) = tokio::sync::mpsc::channel(1); + Arc::new(Self { + fairy_handles_tx, + fairy_handles_rx: Some(fairy_handles_rx).into(), + }) + } + + pub async fn get_next_task(&self, machine_features: &[String]) -> Task { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.fairy_handles_tx + .send(FairyHandle { + features: machine_features.iter().cloned().collect(), + task_tx: tx, + }) + .await + .unwrap(); + rx.await.unwrap() + } + + pub async fn run( + self: Arc, + global_events: tokio::sync::broadcast::Sender, + db_pool: rocket_sync_db_pools::ConnectionPool, + ) -> Result<(), VickyError> { + let mut fairy_handles_rx = self + .fairy_handles_rx + .lock() + .unwrap() + .take() + .expect("Scheduler can only be running once"); + let mut global_events_rx = global_events.subscribe(); + let mut waiting_fairies = VecDeque::::new(); + loop { + let Some(db) = Database::get_one_from_pool(&db_pool).await else { + warn!("Scheduler timed out waiting for database connection"); + continue; + }; + + while { + // do: try to schedule a task + + let mut task_scheduled = false; + + // empty the queue, all changes up to here will have been accounted for + global_events_rx.resubscribe(); + + let all_features = waiting_fairies + .iter() + .flat_map(|fairy| fairy.features.iter()) + .cloned() + .collect::>(); + + let tasks = db.get_all_tasks().await?; + let poisoned_locks = db.get_poisoned_locks().await?; + let constraints_helper = + ConstraintsHelper::new(&tasks, &poisoned_locks, &all_features)?; + + if let Some(next_task) = constraints_helper.get_next_task() { + while let Some((idx, _)) = waiting_fairies + .iter() + .enumerate() + .find(|(_, fairy)| fairy.features.is_superset(&next_task.features)) + { + let fairy = waiting_fairies.remove(idx).unwrap(); + if fairy.task_tx.send(next_task.clone()).is_ok() { + task_scheduled = true; + break; + } + } + } + + // while: task was scheduled + task_scheduled + } {} + + tokio::select! { + res = global_events_rx.recv() => { + // wait for changed or new tasks... + if res.is_err() { + return Err(SchedulerError::ChannelClosed.into()); + } + } + res = fairy_handles_rx.recv() => { + // ... or a new fairy asking for a task + let fairy_handle = res.ok_or(SchedulerError::ChannelClosed)?; + waiting_fairies.push_back(fairy_handle); + } + } + } } }