diff --git a/codex-rs/app-server/src/fs_watch.rs b/codex-rs/app-server/src/fs_watch.rs index 09c6dc553..405b0560a 100644 --- a/codex-rs/app-server/src/fs_watch.rs +++ b/codex-rs/app-server/src/fs_watch.rs @@ -8,66 +8,24 @@ use codex_app_server_protocol::FsWatchParams; use codex_app_server_protocol::FsWatchResponse; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::ServerNotification; +use codex_file_watcher::DebouncedWatchReceiver; use codex_file_watcher::FileWatcher; -use codex_file_watcher::FileWatcherEvent; use codex_file_watcher::FileWatcherSubscriber; -use codex_file_watcher::Receiver; use codex_file_watcher::WatchPath; use codex_file_watcher::WatchRegistration; use std::collections::HashMap; -use std::collections::HashSet; use std::collections::hash_map::Entry; use std::hash::Hash; -use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex as AsyncMutex; #[cfg(test)] use tokio::sync::mpsc; use tokio::sync::oneshot; -use tokio::time::Instant; use tracing::warn; const FS_CHANGED_NOTIFICATION_DEBOUNCE: Duration = Duration::from_millis(200); -struct DebouncedReceiver { - rx: Receiver, - interval: Duration, - changed_paths: HashSet, - next_allowance: Option, -} - -impl DebouncedReceiver { - fn new(rx: Receiver, interval: Duration) -> Self { - Self { - rx, - interval, - changed_paths: HashSet::new(), - next_allowance: None, - } - } - - async fn recv(&mut self) -> Option { - while self.changed_paths.is_empty() { - self.changed_paths.extend(self.rx.recv().await?.paths); - } - let next_allowance = *self - .next_allowance - .get_or_insert_with(|| Instant::now() + self.interval); - - loop { - tokio::select! { - event = self.rx.recv() => self.changed_paths.extend(event?.paths), - _ = tokio::time::sleep_until(next_allowance) => break, - } - } - - Some(FileWatcherEvent { - paths: self.changed_paths.drain().collect(), - }) - } -} - #[derive(Clone)] pub(crate) struct FsWatchManager { outgoing: Arc, @@ -151,7 +109,7 @@ impl FsWatchManager { let task_watch_id = watch_id.clone(); tokio::spawn(async move { - let mut rx = DebouncedReceiver::new(rx, FS_CHANGED_NOTIFICATION_DEBOUNCE); + let mut rx = DebouncedWatchReceiver::new(rx, FS_CHANGED_NOTIFICATION_DEBOUNCE); tokio::pin!(terminate_rx); loop { let event = tokio::select! { @@ -219,6 +177,8 @@ mod tests { use super::*; use codex_utils_absolute_path::AbsolutePathBuf; use pretty_assertions::assert_eq; + use std::collections::HashSet; + use std::path::PathBuf; use tempfile::TempDir; fn absolute_path(path: PathBuf) -> AbsolutePathBuf { diff --git a/codex-rs/file-watcher/src/file_watcher_tests.rs b/codex-rs/file-watcher/src/file_watcher_tests.rs index a989300ff..8bd7858c2 100644 --- a/codex-rs/file-watcher/src/file_watcher_tests.rs +++ b/codex-rs/file-watcher/src/file_watcher_tests.rs @@ -86,6 +86,62 @@ async fn throttled_receiver_flushes_pending_on_shutdown() { assert_eq!(closed, None); } +#[tokio::test] +async fn debounced_receiver_coalesces_each_event_batch() { + let (tx, rx) = watch_channel(); + let mut debounced = DebouncedWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL); + + tx.add_changed_paths(&[path("a")]).await; + let first = timeout(TEST_THROTTLE_INTERVAL * 2, debounced.recv()) + .await + .expect("first emit timeout"); + assert_eq!( + first, + Some(FileWatcherEvent { + paths: vec![path("a")], + }) + ); + + tx.add_changed_paths(&[path("c")]).await; + let blocked = timeout(TEST_THROTTLE_INTERVAL / 2, debounced.recv()).await; + assert_eq!(blocked.is_err(), true); + + tx.add_changed_paths(&[path("d")]).await; + let second = timeout(TEST_THROTTLE_INTERVAL * 2, debounced.recv()) + .await + .expect("second emit timeout"); + assert_eq!( + second, + Some(FileWatcherEvent { + paths: vec![path("c"), path("d")], + }) + ); +} + +#[tokio::test] +async fn debounced_receiver_flushes_pending_on_shutdown() { + let (tx, rx) = watch_channel(); + let mut debounced = DebouncedWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL); + + tx.add_changed_paths(&[path("a")]).await; + drop(tx); + + let flushed = timeout(Duration::from_secs(1), debounced.recv()) + .await + .expect("shutdown flush timeout"); + assert_eq!( + flushed, + Some(FileWatcherEvent { + paths: vec![path("a")], + }) + ); + + let closed = timeout(Duration::from_secs(1), debounced.recv()) + .await + .expect("closed recv timeout"); + assert_eq!(closed, None); +} + #[test] fn is_mutating_event_filters_non_mutating_event_kinds() { assert_eq!( diff --git a/codex-rs/file-watcher/src/lib.rs b/codex-rs/file-watcher/src/lib.rs index aa182bd19..a0b1a4240 100644 --- a/codex-rs/file-watcher/src/lib.rs +++ b/codex-rs/file-watcher/src/lib.rs @@ -253,6 +253,49 @@ impl ThrottledWatchReceiver { } } +/// Coalesces file watcher notifications that arrive within a fixed debounce +/// window after the first event in each batch. +pub struct DebouncedWatchReceiver { + rx: Receiver, + interval: Duration, + changed_paths: BTreeSet, +} + +impl DebouncedWatchReceiver { + /// Creates a debouncing wrapper around a raw watcher [`Receiver`]. + pub fn new(rx: Receiver, interval: Duration) -> Self { + Self { + rx, + interval, + changed_paths: BTreeSet::new(), + } + } + + /// Receives the next debounced event batch. + pub async fn recv(&mut self) -> Option { + while self.changed_paths.is_empty() { + self.changed_paths.extend(self.rx.recv().await?.paths); + } + let deadline = Instant::now() + self.interval; + + loop { + tokio::select! { + event = self.rx.recv() => match event { + Some(event) => self.changed_paths.extend(event.paths), + None => break, + }, + _ = sleep_until(deadline) => break, + } + } + + Some(FileWatcherEvent { + paths: std::mem::take(&mut self.changed_paths) + .into_iter() + .collect(), + }) + } +} + /// Handle used to register watched paths for one logical consumer. pub struct FileWatcherSubscriber { id: SubscriberId,