From 522f549922f6e7b8edbd5e9be3fc720ddb9b256a Mon Sep 17 00:00:00 2001 From: Eric Traut Date: Thu, 28 May 2026 23:09:55 -0700 Subject: [PATCH] Fix fs/watch debounce batching (#24716) ## Summary `fs/watch` was using a local debounce wrapper whose deadline was initialized once and then reused after the first batch. Once that stale deadline was in the past, later file changes could bypass the intended 200ms debounce and send noisier `fs/changed` notifications. This moves the debounce wrapper into `codex-file-watcher` as `DebouncedWatchReceiver`, resets the debounce deadline for each event batch, preserves pending paths across cancelled receives, and updates app-server `fs/watch` to use the shared wrapper. Fixes #24692. --- codex-rs/app-server/src/fs_watch.rs | 48 ++-------------- .../file-watcher/src/file_watcher_tests.rs | 56 +++++++++++++++++++ codex-rs/file-watcher/src/lib.rs | 43 ++++++++++++++ 3 files changed, 103 insertions(+), 44 deletions(-) diff --git a/codex-rs/app-server/src/fs_watch.rs b/codex-rs/app-server/src/fs_watch.rs index 09c6dc553..405b0560a 100644 --- a/codex-rs/app-server/src/fs_watch.rs +++ b/codex-rs/app-server/src/fs_watch.rs @@ -8,66 +8,24 @@ use codex_app_server_protocol::FsWatchParams; use codex_app_server_protocol::FsWatchResponse; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::ServerNotification; +use codex_file_watcher::DebouncedWatchReceiver; use codex_file_watcher::FileWatcher; -use codex_file_watcher::FileWatcherEvent; use codex_file_watcher::FileWatcherSubscriber; -use codex_file_watcher::Receiver; use codex_file_watcher::WatchPath; use codex_file_watcher::WatchRegistration; use std::collections::HashMap; -use std::collections::HashSet; use std::collections::hash_map::Entry; use std::hash::Hash; -use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex as AsyncMutex; #[cfg(test)] use tokio::sync::mpsc; use tokio::sync::oneshot; -use tokio::time::Instant; use tracing::warn; const FS_CHANGED_NOTIFICATION_DEBOUNCE: Duration = Duration::from_millis(200); -struct DebouncedReceiver { - rx: Receiver, - interval: Duration, - changed_paths: HashSet, - next_allowance: Option, -} - -impl DebouncedReceiver { - fn new(rx: Receiver, interval: Duration) -> Self { - Self { - rx, - interval, - changed_paths: HashSet::new(), - next_allowance: None, - } - } - - async fn recv(&mut self) -> Option { - while self.changed_paths.is_empty() { - self.changed_paths.extend(self.rx.recv().await?.paths); - } - let next_allowance = *self - .next_allowance - .get_or_insert_with(|| Instant::now() + self.interval); - - loop { - tokio::select! { - event = self.rx.recv() => self.changed_paths.extend(event?.paths), - _ = tokio::time::sleep_until(next_allowance) => break, - } - } - - Some(FileWatcherEvent { - paths: self.changed_paths.drain().collect(), - }) - } -} - #[derive(Clone)] pub(crate) struct FsWatchManager { outgoing: Arc, @@ -151,7 +109,7 @@ impl FsWatchManager { let task_watch_id = watch_id.clone(); tokio::spawn(async move { - let mut rx = DebouncedReceiver::new(rx, FS_CHANGED_NOTIFICATION_DEBOUNCE); + let mut rx = DebouncedWatchReceiver::new(rx, FS_CHANGED_NOTIFICATION_DEBOUNCE); tokio::pin!(terminate_rx); loop { let event = tokio::select! { @@ -219,6 +177,8 @@ mod tests { use super::*; use codex_utils_absolute_path::AbsolutePathBuf; use pretty_assertions::assert_eq; + use std::collections::HashSet; + use std::path::PathBuf; use tempfile::TempDir; fn absolute_path(path: PathBuf) -> AbsolutePathBuf { diff --git a/codex-rs/file-watcher/src/file_watcher_tests.rs b/codex-rs/file-watcher/src/file_watcher_tests.rs index a989300ff..8bd7858c2 100644 --- a/codex-rs/file-watcher/src/file_watcher_tests.rs +++ b/codex-rs/file-watcher/src/file_watcher_tests.rs @@ -86,6 +86,62 @@ async fn throttled_receiver_flushes_pending_on_shutdown() { assert_eq!(closed, None); } +#[tokio::test] +async fn debounced_receiver_coalesces_each_event_batch() { + let (tx, rx) = watch_channel(); + let mut debounced = DebouncedWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL); + + tx.add_changed_paths(&[path("a")]).await; + let first = timeout(TEST_THROTTLE_INTERVAL * 2, debounced.recv()) + .await + .expect("first emit timeout"); + assert_eq!( + first, + Some(FileWatcherEvent { + paths: vec![path("a")], + }) + ); + + tx.add_changed_paths(&[path("c")]).await; + let blocked = timeout(TEST_THROTTLE_INTERVAL / 2, debounced.recv()).await; + assert_eq!(blocked.is_err(), true); + + tx.add_changed_paths(&[path("d")]).await; + let second = timeout(TEST_THROTTLE_INTERVAL * 2, debounced.recv()) + .await + .expect("second emit timeout"); + assert_eq!( + second, + Some(FileWatcherEvent { + paths: vec![path("c"), path("d")], + }) + ); +} + +#[tokio::test] +async fn debounced_receiver_flushes_pending_on_shutdown() { + let (tx, rx) = watch_channel(); + let mut debounced = DebouncedWatchReceiver::new(rx, TEST_THROTTLE_INTERVAL); + + tx.add_changed_paths(&[path("a")]).await; + drop(tx); + + let flushed = timeout(Duration::from_secs(1), debounced.recv()) + .await + .expect("shutdown flush timeout"); + assert_eq!( + flushed, + Some(FileWatcherEvent { + paths: vec![path("a")], + }) + ); + + let closed = timeout(Duration::from_secs(1), debounced.recv()) + .await + .expect("closed recv timeout"); + assert_eq!(closed, None); +} + #[test] fn is_mutating_event_filters_non_mutating_event_kinds() { assert_eq!( diff --git a/codex-rs/file-watcher/src/lib.rs b/codex-rs/file-watcher/src/lib.rs index aa182bd19..a0b1a4240 100644 --- a/codex-rs/file-watcher/src/lib.rs +++ b/codex-rs/file-watcher/src/lib.rs @@ -253,6 +253,49 @@ impl ThrottledWatchReceiver { } } +/// Coalesces file watcher notifications that arrive within a fixed debounce +/// window after the first event in each batch. +pub struct DebouncedWatchReceiver { + rx: Receiver, + interval: Duration, + changed_paths: BTreeSet, +} + +impl DebouncedWatchReceiver { + /// Creates a debouncing wrapper around a raw watcher [`Receiver`]. + pub fn new(rx: Receiver, interval: Duration) -> Self { + Self { + rx, + interval, + changed_paths: BTreeSet::new(), + } + } + + /// Receives the next debounced event batch. + pub async fn recv(&mut self) -> Option { + while self.changed_paths.is_empty() { + self.changed_paths.extend(self.rx.recv().await?.paths); + } + let deadline = Instant::now() + self.interval; + + loop { + tokio::select! { + event = self.rx.recv() => match event { + Some(event) => self.changed_paths.extend(event.paths), + None => break, + }, + _ = sleep_until(deadline) => break, + } + } + + Some(FileWatcherEvent { + paths: std::mem::take(&mut self.changed_paths) + .into_iter() + .collect(), + }) + } +} + /// Handle used to register watched paths for one logical consumer. pub struct FileWatcherSubscriber { id: SubscriberId,