From a5811f711b97e5dd7eb514180a3a76fb7e3d38d5 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 11:52:22 -0700 Subject: [PATCH 1/3] feat: add FileWatch tests and documentation From 0c7f596b97ae0dcd4c885facc4e0bd8aefb7fc63 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 11:57:52 -0700 Subject: [PATCH 2/3] test(scheduler): add FileWatchStrategy and TriggerConfig::FileWatch tests --- crates/orchestrator/src/scheduler/strategy.rs | 367 +++++++++++++++++- crates/orchestrator/src/scheduler/types.rs | 111 ++++++ 2 files changed, 473 insertions(+), 5 deletions(-) diff --git a/crates/orchestrator/src/scheduler/strategy.rs b/crates/orchestrator/src/scheduler/strategy.rs index 071c71da..c53d38f1 100644 --- a/crates/orchestrator/src/scheduler/strategy.rs +++ b/crates/orchestrator/src/scheduler/strategy.rs @@ -895,6 +895,15 @@ enum FileWatchHandle { Polling(tokio::task::JoinHandle<()>), } +impl std::fmt::Debug for FileWatchHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Native(_) => f.write_str("FileWatchHandle::Native(..)"), + Self::Polling(_) => f.write_str("FileWatchHandle::Polling(..)"), + } + } +} + /// Recursively walk `root` and collect the mtime for every regular file. fn snapshot_mtimes(root: &Path) -> HashMap { let mut map = HashMap::new(); @@ -1062,6 +1071,16 @@ pub struct FileWatchStrategy { watch_paths: Vec, } +impl std::fmt::Debug for FileWatchStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileWatchStrategy") + .field("handle", &self._handle) + .field("event_kinds", &self.event_kinds) + .field("debounce_ms", &self.debounce_ms) + .finish() + } +} + impl FileWatchStrategy { /// Create a new file-watch strategy. /// @@ -1078,11 +1097,7 @@ impl FileWatchStrategy { debounce_ms: u64, mode: WatchMode, ) -> anyhow::Result { - let (tx, rx) = mpsc::unbounded_channel(); - - let handle = Self::create_handle(paths.clone(), mode, tx)?; - - // Build the optional glob set. + // Validate and build the optional glob set first (fail fast before spawning). let include_patterns = match patterns { None => None, Some(pats) if pats.is_empty() => None, @@ -1101,6 +1116,10 @@ impl FileWatchStrategy { } }; + // Spawn the watcher after glob validation succeeds. + let (tx, rx) = mpsc::unbounded_channel(); + let handle = Self::create_handle(paths.clone(), mode, tx)?; + Ok(Self { _handle: handle, rx, @@ -2480,4 +2499,342 @@ mod tests { assert!(r1[0].source_id.starts_with("idle:")); assert!(r2[0].source_id.starts_with("idle:")); } + + // ── FileWatchStrategy tests ─────────────────────────────────────── + + #[test] + fn file_watch_strategy_constructs_with_native_mode() { + let dir = tempfile::tempdir().unwrap(); + // Native watcher may or may not work in CI; just verify construction succeeds + // (or gracefully returns an error — we don't mandate native support). + let _ = FileWatchStrategy::new( + vec![dir.path().to_path_buf()], + None, + vec![], + 200, + WatchMode::Native, + ); + } + + #[tokio::test] + async fn file_watch_strategy_constructs_with_polling_mode() { + let dir = tempfile::tempdir().unwrap(); + let strategy = FileWatchStrategy::new( + vec![dir.path().to_path_buf()], + None, + vec![], + 200, + WatchMode::Polling { interval_secs: 1 }, + ); + assert!(strategy.is_ok()); + } + + #[tokio::test] + async fn file_watch_strategy_constructs_with_auto_mode() { + let dir = tempfile::tempdir().unwrap(); + let strategy = FileWatchStrategy::new( + vec![dir.path().to_path_buf()], + None, + vec![], + 200, + WatchMode::Auto { poll_interval_secs: 5 }, + ); + assert!(strategy.is_ok()); + } + + #[tokio::test] + async fn file_watch_strategy_rejects_invalid_glob_pattern() { + let dir = tempfile::tempdir().unwrap(); + // Build with Native mode so no tokio::spawn is needed before validation. + let result = FileWatchStrategy::new( + vec![dir.path().to_path_buf()], + Some(vec!["[invalid".to_string()]), + vec![], + 200, + WatchMode::Auto { poll_interval_secs: 1 }, + ); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Invalid glob pattern")); + } + + #[tokio::test] + async fn file_watch_strategy_accepts_valid_glob_patterns() { + let dir = tempfile::tempdir().unwrap(); + let result = FileWatchStrategy::new( + vec![dir.path().to_path_buf()], + Some(vec!["**/*.toml".to_string(), "*.rs".to_string()]), + vec![], + 200, + WatchMode::Polling { interval_secs: 1 }, + ); + assert!(result.is_ok()); + } + + #[tokio::test] + async fn file_watch_strategy_polling_detects_file_create() { + let dir = tempfile::tempdir().unwrap(); + let dir_path = dir.path().to_path_buf(); + + // Create strategy with 1-second polling interval, no debounce. + let mut strategy = FileWatchStrategy::new( + vec![dir_path.clone()], + None, + vec!["create".to_string()], + 0, // no debounce + WatchMode::Polling { interval_secs: 1 }, + ) + .unwrap(); + let (_tx, rx) = watch::channel(false); + + // Allow the poller to start and take its initial snapshot. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Create a file inside the watched directory. + let file_path = dir_path.join("test.txt"); + std::fs::write(&file_path, b"hello").unwrap(); + + // Wait up to 3 seconds for the polling task to detect the new file. + let result = tokio::time::timeout(Duration::from_secs(3), strategy.next_tasks(&rx)).await; + + let tasks = result.expect("timed out waiting for file create event").unwrap(); + assert_eq!(tasks.len(), 1); + assert!(tasks[0].source_id.starts_with("file:")); + assert!(tasks[0].source_id.ends_with(":create")); + assert_eq!(tasks[0].metadata.get("event_type"), Some(&"create".to_string())); + assert!(tasks[0].metadata.contains_key("file_path")); + assert!(tasks[0].metadata.contains_key("file_name")); + assert!(tasks[0].metadata.contains_key("file_dir")); + assert!(tasks[0].metadata.contains_key("timestamp")); + } + + #[tokio::test] + async fn file_watch_strategy_polling_detects_file_modify() { + let dir = tempfile::tempdir().unwrap(); + let dir_path = dir.path().to_path_buf(); + + // Pre-create the file so the initial snapshot includes it. + let file_path = dir_path.join("watched.txt"); + std::fs::write(&file_path, b"initial").unwrap(); + + let mut strategy = FileWatchStrategy::new( + vec![dir_path.clone()], + None, + vec!["modify".to_string()], + 0, + WatchMode::Polling { interval_secs: 1 }, + ) + .unwrap(); + let (_tx, rx) = watch::channel(false); + + // Wait for poller to capture initial snapshot. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Modify the file — use a small sleep to ensure mtime changes. + tokio::time::sleep(Duration::from_millis(10)).await; + std::fs::write(&file_path, b"modified").unwrap(); + + let result = tokio::time::timeout(Duration::from_secs(3), strategy.next_tasks(&rx)).await; + + let tasks = result.expect("timed out waiting for file modify event").unwrap(); + assert_eq!(tasks.len(), 1); + assert!(tasks[0].source_id.ends_with(":modify")); + assert_eq!(tasks[0].metadata.get("event_type"), Some(&"modify".to_string())); + } + + #[tokio::test] + async fn file_watch_strategy_polling_detects_file_delete() { + let dir = tempfile::tempdir().unwrap(); + let dir_path = dir.path().to_path_buf(); + + // Pre-create the file. + let file_path = dir_path.join("to_delete.txt"); + std::fs::write(&file_path, b"bye").unwrap(); + + let mut strategy = FileWatchStrategy::new( + vec![dir_path.clone()], + None, + vec!["delete".to_string()], + 0, + WatchMode::Polling { interval_secs: 1 }, + ) + .unwrap(); + let (_tx, rx) = watch::channel(false); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Remove the file. + std::fs::remove_file(&file_path).unwrap(); + + let result = tokio::time::timeout(Duration::from_secs(3), strategy.next_tasks(&rx)).await; + + let tasks = result.expect("timed out waiting for file delete event").unwrap(); + assert_eq!(tasks.len(), 1); + assert!(tasks[0].source_id.ends_with(":delete")); + assert_eq!(tasks[0].metadata.get("event_type"), Some(&"delete".to_string())); + } + + #[tokio::test] + async fn file_watch_strategy_filters_events_by_kind() { + let dir = tempfile::tempdir().unwrap(); + let dir_path = dir.path().to_path_buf(); + + // Strategy only accepts "delete" events. + let mut strategy = FileWatchStrategy::new( + vec![dir_path.clone()], + None, + vec!["delete".to_string()], + 0, + WatchMode::Polling { interval_secs: 1 }, + ) + .unwrap(); + let (shutdown_tx, rx) = watch::channel(false); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Create a new file — should NOT produce a task (create filtered out). + std::fs::write(dir_path.join("new.txt"), b"hi").unwrap(); + + // After 1.5 seconds the poll would have seen the create — fire shutdown + // to confirm no task was emitted. + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(1500)).await; + let _ = shutdown_tx.send(true); + }); + + let result = tokio::time::timeout(Duration::from_secs(3), strategy.next_tasks(&rx)).await; + + let tasks = result.expect("timed out").unwrap(); + // Shutdown should return empty — no "delete" event was triggered. + assert!(tasks.is_empty()); + } + + #[tokio::test] + async fn file_watch_strategy_filters_by_glob_pattern() { + let dir = tempfile::tempdir().unwrap(); + let dir_path = dir.path().to_path_buf(); + + // Only TOML files should match. + let mut strategy = FileWatchStrategy::new( + vec![dir_path.clone()], + Some(vec!["**/*.toml".to_string()]), + vec!["create".to_string()], + 0, + WatchMode::Polling { interval_secs: 1 }, + ) + .unwrap(); + let (shutdown_tx, rx) = watch::channel(false); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Create a .txt file — should NOT match **/*.toml. + std::fs::write(dir_path.join("ignored.txt"), b"ignored").unwrap(); + + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(1500)).await; + let _ = shutdown_tx.send(true); + }); + + let result = tokio::time::timeout(Duration::from_secs(3), strategy.next_tasks(&rx)).await; + + let tasks = result.expect("timed out").unwrap(); + assert!(tasks.is_empty(), "Expected txt file to be filtered out by **/*.toml pattern"); + } + + #[tokio::test] + async fn file_watch_strategy_glob_pattern_matches_toml() { + let dir = tempfile::tempdir().unwrap(); + let dir_path = dir.path().to_path_buf(); + + let mut strategy = FileWatchStrategy::new( + vec![dir_path.clone()], + Some(vec!["**/*.toml".to_string()]), + vec!["create".to_string()], + 0, + WatchMode::Polling { interval_secs: 1 }, + ) + .unwrap(); + let (_tx, rx) = watch::channel(false); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Create a matching TOML file. + std::fs::write(dir_path.join("config.toml"), b"[section]").unwrap(); + + let result = tokio::time::timeout(Duration::from_secs(3), strategy.next_tasks(&rx)).await; + + let tasks = result.expect("timed out waiting for toml create event").unwrap(); + assert_eq!(tasks.len(), 1); + let file_name = tasks[0].metadata.get("file_name").unwrap(); + assert_eq!(file_name, "config.toml"); + } + + #[tokio::test] + async fn file_watch_strategy_respects_shutdown() { + let dir = tempfile::tempdir().unwrap(); + let mut strategy = FileWatchStrategy::new( + vec![dir.path().to_path_buf()], + None, + vec![], + 0, + WatchMode::Polling { interval_secs: 60 }, // long interval — would block + ) + .unwrap(); + let (tx, rx) = watch::channel(false); + + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(50)).await; + let _ = tx.send(true); + }); + + let start = tokio::time::Instant::now(); + let result = strategy.next_tasks(&rx).await.unwrap(); + let elapsed = start.elapsed(); + + assert!(result.is_empty()); + assert!(elapsed < Duration::from_secs(2)); + } + + #[tokio::test] + async fn file_watch_strategy_is_object_safe() { + let dir = tempfile::tempdir().unwrap(); + let strategy: Box = Box::new( + FileWatchStrategy::new( + vec![dir.path().to_path_buf()], + None, + vec![], + 0, + WatchMode::Polling { interval_secs: 60 }, + ) + .unwrap(), + ); + let (tx, rx) = watch::channel(false); + + let mut strategy = strategy; + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(50)).await; + let _ = tx.send(true); + }); + + let result = strategy.next_tasks(&rx).await.unwrap(); + assert!(result.is_empty()); + } + + #[test] + fn event_kind_to_str_returns_correct_strings() { + use notify::event::{CreateKind, DataChange, ModifyKind, RemoveKind}; + assert_eq!(event_kind_to_str(¬ify::EventKind::Create(CreateKind::File)), "create"); + assert_eq!( + event_kind_to_str(¬ify::EventKind::Modify(ModifyKind::Data(DataChange::Content))), + "modify" + ); + assert_eq!(event_kind_to_str(¬ify::EventKind::Remove(RemoveKind::File)), "delete"); + assert_eq!(event_kind_to_str(¬ify::EventKind::Other), "other"); + assert_eq!(event_kind_to_str(¬ify::EventKind::Any), "any"); + } + + #[test] + fn watch_mode_default_is_auto() { + let mode = WatchMode::default(); + assert!(matches!(mode, WatchMode::Auto { .. })); + } } diff --git a/crates/orchestrator/src/scheduler/types.rs b/crates/orchestrator/src/scheduler/types.rs index 4249c7ff..a21d33c0 100644 --- a/crates/orchestrator/src/scheduler/types.rs +++ b/crates/orchestrator/src/scheduler/types.rs @@ -545,4 +545,115 @@ mod tests { // Serialized tag must be snake_case. assert!(json.contains(r#""type":"linear_issues""#)); } + + // ── FileWatch TriggerConfig tests ───────────────────────────────── + + fn sample_file_watch() -> TriggerConfig { + TriggerConfig::FileWatch { + paths: vec!["/tmp/watched".to_string()], + patterns: vec!["**/*.toml".to_string()], + events: vec!["create".to_string(), "modify".to_string()], + debounce_ms: 200, + mode: "auto".to_string(), + poll_interval_secs: 5, + } + } + + #[test] + fn test_trigger_config_file_watch_trigger_type() { + assert_eq!(sample_file_watch().trigger_type(), "file_watch"); + } + + #[test] + fn test_trigger_config_file_watch_is_implemented() { + assert!(sample_file_watch().is_implemented()); + } + + #[test] + fn test_trigger_config_file_watch_is_not_one_shot() { + assert!(!sample_file_watch().is_one_shot(), "FileWatch should not be one-shot"); + } + + #[test] + fn test_trigger_config_file_watch_serde_roundtrip() { + let original = sample_file_watch(); + let json = serde_json::to_string(&original).unwrap(); + let decoded: TriggerConfig = serde_json::from_str(&json).unwrap(); + assert_eq!(decoded.trigger_type(), "file_watch"); + assert!(json.contains(r#""type":"file_watch""#)); + if let TriggerConfig::FileWatch { + paths, + patterns, + events, + debounce_ms, + mode, + poll_interval_secs, + } = decoded + { + assert_eq!(paths, vec!["/tmp/watched"]); + assert_eq!(patterns, vec!["**/*.toml"]); + assert_eq!(events, vec!["create", "modify"]); + assert_eq!(debounce_ms, 200); + assert_eq!(mode, "auto"); + assert_eq!(poll_interval_secs, 5); + } else { + panic!("Expected FileWatch variant"); + } + } + + #[test] + fn test_trigger_config_file_watch_serde_from_json_minimal() { + // Only `type` and `paths` — all other fields should use defaults. + let json = r#"{"type": "file_watch", "paths": ["/var/log"]}"#; + let cfg: TriggerConfig = serde_json::from_str(json).unwrap(); + assert_eq!(cfg.trigger_type(), "file_watch"); + if let TriggerConfig::FileWatch { + paths, + patterns, + events, + debounce_ms, + mode, + poll_interval_secs, + } = cfg + { + assert_eq!(paths, vec!["/var/log"]); + assert!(patterns.is_empty()); + assert!(events.is_empty()); + assert_eq!(debounce_ms, 200); // default + assert_eq!(mode, "auto"); // default + assert_eq!(poll_interval_secs, 5); // default + } else { + panic!("Expected FileWatch variant"); + } + } + + #[test] + fn test_trigger_config_file_watch_serde_polling_mode() { + let json = r#"{ + "type": "file_watch", + "paths": ["/tmp/a", "/tmp/b"], + "events": ["delete"], + "mode": "polling", + "poll_interval_secs": 10, + "debounce_ms": 500 + }"#; + let cfg: TriggerConfig = serde_json::from_str(json).unwrap(); + if let TriggerConfig::FileWatch { + paths, + events, + mode, + poll_interval_secs, + debounce_ms, + .. + } = cfg + { + assert_eq!(paths.len(), 2); + assert_eq!(events, vec!["delete"]); + assert_eq!(mode, "polling"); + assert_eq!(poll_interval_secs, 10); + assert_eq!(debounce_ms, 500); + } else { + panic!("Expected FileWatch variant"); + } + } } From 4ce8d01b34188b8b49432e14ee3371b64d818572 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Sat, 28 Mar 2026 14:22:30 -0700 Subject: [PATCH 3/3] fix(scheduler): suppress dead_code warning on FileWatchHandle drop-guard fields The Native and Polling fields are intentional drop-guards: they keep the watcher alive (or cancel the polling task) purely via their Drop impl. Add #[allow(dead_code)] with an explanatory doc comment. --- crates/orchestrator/src/scheduler/strategy.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/orchestrator/src/scheduler/strategy.rs b/crates/orchestrator/src/scheduler/strategy.rs index c53d38f1..446f854e 100644 --- a/crates/orchestrator/src/scheduler/strategy.rs +++ b/crates/orchestrator/src/scheduler/strategy.rs @@ -888,6 +888,12 @@ impl Default for WatchMode { /// Internal handle keeping the active watcher alive for the duration of a /// [`FileWatchStrategy`]. +/// +/// The inner values are intentional drop-guards: the watcher stops watching +/// when dropped (Native), and the polling task is cancelled when dropped +/// (Polling). The fields are never read — they exist only for their `Drop` +/// side effects. +#[allow(dead_code)] enum FileWatchHandle { /// Native OS watcher — kept alive by holding the struct. Native(notify::RecommendedWatcher),