From c4e53d103c102f8d5201247adbc60bbddd47c88d Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 26 May 2026 15:28:02 +0200 Subject: [PATCH] Wire app-server extension event sink (#24586) ## Why The goal extension already emits `ThreadGoalUpdated` events, but production app-server thread extensions were built with the default no-op extension event sink. That meant extension-driven goal updates could be produced without ever reaching app-server clients. ## What changed - Build app-server thread extensions with a host-provided `ExtensionEventSink`. - Add an app-server sink that converts extension `ThreadGoalUpdated` events into `ServerNotification::ThreadGoalUpdated` broadcasts. - Use the existing bounded outgoing message channel via `try_send` so event forwarding cannot create an unbounded queue. - Pass `NoopExtensionEventSink` in app-server tests that construct a `ThreadManager` without an app-server host. - Refresh `Cargo.lock` for the existing `codex-memories-extension` `codex-otel` dependency. ## Verification - `just test -p codex-app-server extensions::tests::app_server_event_sink_forwards_thread_goal_updates` --- codex-rs/app-server/src/extensions.rs | 125 ++++++++++++++++++- codex-rs/app-server/src/mcp_refresh.rs | 6 +- codex-rs/app-server/src/message_processor.rs | 6 +- codex-rs/app-server/src/outgoing_message.rs | 10 ++ 4 files changed, 143 insertions(+), 4 deletions(-) diff --git a/codex-rs/app-server/src/extensions.rs b/codex-rs/app-server/src/extensions.rs index 20a21a94c..9232cb758 100644 --- a/codex-rs/app-server/src/extensions.rs +++ b/codex-rs/app-server/src/extensions.rs @@ -1,27 +1,67 @@ use std::sync::Arc; use std::sync::Weak; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ThreadGoalUpdatedNotification; use codex_core::NewThread; use codex_core::StartThreadOptions; use codex_core::ThreadManager; use codex_core::config::Config; use codex_extension_api::AgentSpawnFuture; use codex_extension_api::AgentSpawner; +use codex_extension_api::ExtensionEventSink; use codex_extension_api::ExtensionRegistry; use codex_extension_api::ExtensionRegistryBuilder; use codex_protocol::ThreadId; use codex_protocol::error::CodexErr; +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; -pub(crate) fn thread_extensions(guardian_agent_spawner: S) -> Arc> +use crate::outgoing_message::OutgoingMessageSender; + +pub(crate) fn thread_extensions( + guardian_agent_spawner: S, + event_sink: Arc, +) -> Arc> where S: AgentSpawner + 'static, { - let mut builder = ExtensionRegistryBuilder::::new(); + let mut builder = ExtensionRegistryBuilder::::with_event_sink(event_sink); codex_guardian::install(&mut builder, guardian_agent_spawner); codex_memories_extension::install(&mut builder, codex_otel::global()); Arc::new(builder.build()) } +pub(crate) fn app_server_extension_event_sink( + outgoing: Arc, +) -> Arc { + Arc::new(AppServerExtensionEventSink { outgoing }) +} + +struct AppServerExtensionEventSink { + outgoing: Arc, +} + +impl ExtensionEventSink for AppServerExtensionEventSink { + fn emit(&self, event: Event) { + match event.msg { + EventMsg::ThreadGoalUpdated(thread_goal_event) => { + self.outgoing + .try_send_server_notification(ServerNotification::ThreadGoalUpdated( + ThreadGoalUpdatedNotification { + thread_id: thread_goal_event.thread_id.to_string(), + turn_id: thread_goal_event.turn_id, + goal: thread_goal_event.goal.into(), + }, + )); + } + msg => { + tracing::debug!(event_id = %event.id, ?msg, "dropping unsupported extension event"); + } + } + } +} + pub(crate) fn guardian_agent_spawner( thread_manager: Weak, ) -> impl AgentSpawner { @@ -39,3 +79,84 @@ pub(crate) fn guardian_agent_spawner( }) } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use codex_analytics::AnalyticsEventsClient; + use codex_app_server_protocol::ServerNotification; + use codex_app_server_protocol::ThreadGoal as AppServerThreadGoal; + use codex_app_server_protocol::ThreadGoalStatus as AppServerThreadGoalStatus; + use codex_protocol::protocol::ThreadGoal; + use codex_protocol::protocol::ThreadGoalStatus; + use codex_protocol::protocol::ThreadGoalUpdatedEvent; + use pretty_assertions::assert_eq; + use tokio::sync::mpsc; + use tokio::time::timeout; + + use super::*; + use crate::outgoing_message::OutgoingEnvelope; + use crate::outgoing_message::OutgoingMessage; + + #[tokio::test] + async fn app_server_event_sink_forwards_thread_goal_updates() { + let (outgoing_tx, mut outgoing_rx) = mpsc::channel(4); + let outgoing = Arc::new(OutgoingMessageSender::new( + outgoing_tx, + AnalyticsEventsClient::disabled(), + )); + let sink = app_server_extension_event_sink(outgoing); + let thread_id = ThreadId::default(); + + sink.emit(Event { + id: "call-1".to_string(), + msg: EventMsg::ThreadGoalUpdated(ThreadGoalUpdatedEvent { + thread_id, + turn_id: Some("turn-1".to_string()), + goal: ThreadGoal { + thread_id, + objective: "wire extension events".to_string(), + status: ThreadGoalStatus::Active, + token_budget: Some(123), + tokens_used: 45, + time_used_seconds: 6, + created_at: 7, + updated_at: 8, + }, + }), + }); + + let envelope = timeout(Duration::from_secs(1), outgoing_rx.recv()) + .await + .expect("timed out waiting for forwarded extension event") + .expect("outgoing channel closed unexpectedly"); + let OutgoingEnvelope::Broadcast { message } = envelope else { + panic!("expected broadcast notification"); + }; + let OutgoingMessage::AppServerNotification(ServerNotification::ThreadGoalUpdated( + notification, + )) = message + else { + panic!("expected thread goal updated notification"); + }; + + assert_eq!( + ThreadGoalUpdatedNotification { + thread_id: thread_id.to_string(), + turn_id: Some("turn-1".to_string()), + goal: AppServerThreadGoal { + thread_id: thread_id.to_string(), + objective: "wire extension events".to_string(), + status: AppServerThreadGoalStatus::Active, + token_budget: Some(123), + tokens_used: 45, + time_used_seconds: 6, + created_at: 7, + updated_at: 8, + }, + }, + notification + ); + } +} diff --git a/codex-rs/app-server/src/mcp_refresh.rs b/codex-rs/app-server/src/mcp_refresh.rs index f7d32b2ea..d65b1d83e 100644 --- a/codex-rs/app-server/src/mcp_refresh.rs +++ b/codex-rs/app-server/src/mcp_refresh.rs @@ -114,6 +114,7 @@ mod tests { use codex_core::init_state_db; use codex_core::thread_store_from_config; use codex_exec_server::EnvironmentManager; + use codex_extension_api::NoopExtensionEventSink; use codex_login::AuthManager; use codex_login::CodexAuth; use codex_protocol::protocol::SessionSource; @@ -186,7 +187,10 @@ mod tests { auth_manager, SessionSource::Exec, Arc::new(EnvironmentManager::default_for_tests()), - thread_extensions(guardian_agent_spawner(thread_manager.clone())), + thread_extensions( + guardian_agent_spawner(thread_manager.clone()), + Arc::new(NoopExtensionEventSink), + ), /*analytics_events_client*/ None, thread_store, Some(state_db.clone()), diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 0e1f019de..2d0e8d085 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -8,6 +8,7 @@ use crate::attestation::app_server_attestation_provider; use crate::config_manager::ConfigManager; use crate::connection_rpc_gate::ConnectionRpcGate; use crate::error_code::invalid_request; +use crate::extensions::app_server_extension_event_sink; use crate::extensions::guardian_agent_spawner; use crate::extensions::thread_extensions; use crate::fs_watch::FsWatchManager; @@ -310,7 +311,10 @@ impl MessageProcessor { auth_manager.clone(), session_source, environment_manager, - thread_extensions(guardian_agent_spawner(thread_manager.clone())), + thread_extensions( + guardian_agent_spawner(thread_manager.clone()), + app_server_extension_event_sink(outgoing.clone()), + ), Some(analytics_events_client.clone()), Arc::clone(&thread_store), state_db.clone(), diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 97ef37f74..6cab439bc 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -555,6 +555,16 @@ impl OutgoingMessageSender { .await; } + pub(crate) fn try_send_server_notification(&self, notification: ServerNotification) { + tracing::trace!("app-server event: {notification}"); + let outgoing_message = OutgoingMessage::AppServerNotification(notification); + if let Err(err) = self.sender.try_send(OutgoingEnvelope::Broadcast { + message: outgoing_message, + }) { + warn!("failed to send server notification to client without waiting: {err:?}"); + } + } + pub(crate) async fn send_server_notification_to_connections( &self, connection_ids: &[ConnectionId],