Wire app-server extension event sink (#24586)

## Why

The goal extension already emits `ThreadGoalUpdated` events, but
production app-server thread extensions were built with the default
no-op extension event sink. That meant extension-driven goal updates
could be produced without ever reaching app-server clients.

## What changed

- Build app-server thread extensions with a host-provided
`ExtensionEventSink`.
- Add an app-server sink that converts extension `ThreadGoalUpdated`
events into `ServerNotification::ThreadGoalUpdated` broadcasts.
- Use the existing bounded outgoing message channel via `try_send` so
event forwarding cannot create an unbounded queue.
- Pass `NoopExtensionEventSink` in app-server tests that construct a
`ThreadManager` without an app-server host.
- Refresh `Cargo.lock` for the existing `codex-memories-extension`
`codex-otel` dependency.

## Verification

- `just test -p codex-app-server
extensions::tests::app_server_event_sink_forwards_thread_goal_updates`
This commit is contained in:
jif-oai
2026-05-26 15:28:02 +02:00
committed by GitHub
Unverified
parent 01a8bf0ae3
commit c4e53d103c
4 changed files with 143 additions and 4 deletions
+123 -2
View File
@@ -1,27 +1,67 @@
use std::sync::Arc;
use std::sync::Weak;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadGoalUpdatedNotification;
use codex_core::NewThread;
use codex_core::StartThreadOptions;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_extension_api::AgentSpawnFuture;
use codex_extension_api::AgentSpawner;
use codex_extension_api::ExtensionEventSink;
use codex_extension_api::ExtensionRegistry;
use codex_extension_api::ExtensionRegistryBuilder;
use codex_protocol::ThreadId;
use codex_protocol::error::CodexErr;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
pub(crate) fn thread_extensions<S>(guardian_agent_spawner: S) -> Arc<ExtensionRegistry<Config>>
use crate::outgoing_message::OutgoingMessageSender;
pub(crate) fn thread_extensions<S>(
guardian_agent_spawner: S,
event_sink: Arc<dyn ExtensionEventSink>,
) -> Arc<ExtensionRegistry<Config>>
where
S: AgentSpawner<StartThreadOptions, Spawned = NewThread, Error = CodexErr> + 'static,
{
let mut builder = ExtensionRegistryBuilder::<Config>::new();
let mut builder = ExtensionRegistryBuilder::<Config>::with_event_sink(event_sink);
codex_guardian::install(&mut builder, guardian_agent_spawner);
codex_memories_extension::install(&mut builder, codex_otel::global());
Arc::new(builder.build())
}
pub(crate) fn app_server_extension_event_sink(
outgoing: Arc<OutgoingMessageSender>,
) -> Arc<dyn ExtensionEventSink> {
Arc::new(AppServerExtensionEventSink { outgoing })
}
struct AppServerExtensionEventSink {
outgoing: Arc<OutgoingMessageSender>,
}
impl ExtensionEventSink for AppServerExtensionEventSink {
fn emit(&self, event: Event) {
match event.msg {
EventMsg::ThreadGoalUpdated(thread_goal_event) => {
self.outgoing
.try_send_server_notification(ServerNotification::ThreadGoalUpdated(
ThreadGoalUpdatedNotification {
thread_id: thread_goal_event.thread_id.to_string(),
turn_id: thread_goal_event.turn_id,
goal: thread_goal_event.goal.into(),
},
));
}
msg => {
tracing::debug!(event_id = %event.id, ?msg, "dropping unsupported extension event");
}
}
}
}
pub(crate) fn guardian_agent_spawner(
thread_manager: Weak<ThreadManager>,
) -> impl AgentSpawner<StartThreadOptions, Spawned = NewThread, Error = CodexErr> {
@@ -39,3 +79,84 @@ pub(crate) fn guardian_agent_spawner(
})
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadGoal as AppServerThreadGoal;
use codex_app_server_protocol::ThreadGoalStatus as AppServerThreadGoalStatus;
use codex_protocol::protocol::ThreadGoal;
use codex_protocol::protocol::ThreadGoalStatus;
use codex_protocol::protocol::ThreadGoalUpdatedEvent;
use pretty_assertions::assert_eq;
use tokio::sync::mpsc;
use tokio::time::timeout;
use super::*;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
#[tokio::test]
async fn app_server_event_sink_forwards_thread_goal_updates() {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(4);
let outgoing = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
AnalyticsEventsClient::disabled(),
));
let sink = app_server_extension_event_sink(outgoing);
let thread_id = ThreadId::default();
sink.emit(Event {
id: "call-1".to_string(),
msg: EventMsg::ThreadGoalUpdated(ThreadGoalUpdatedEvent {
thread_id,
turn_id: Some("turn-1".to_string()),
goal: ThreadGoal {
thread_id,
objective: "wire extension events".to_string(),
status: ThreadGoalStatus::Active,
token_budget: Some(123),
tokens_used: 45,
time_used_seconds: 6,
created_at: 7,
updated_at: 8,
},
}),
});
let envelope = timeout(Duration::from_secs(1), outgoing_rx.recv())
.await
.expect("timed out waiting for forwarded extension event")
.expect("outgoing channel closed unexpectedly");
let OutgoingEnvelope::Broadcast { message } = envelope else {
panic!("expected broadcast notification");
};
let OutgoingMessage::AppServerNotification(ServerNotification::ThreadGoalUpdated(
notification,
)) = message
else {
panic!("expected thread goal updated notification");
};
assert_eq!(
ThreadGoalUpdatedNotification {
thread_id: thread_id.to_string(),
turn_id: Some("turn-1".to_string()),
goal: AppServerThreadGoal {
thread_id: thread_id.to_string(),
objective: "wire extension events".to_string(),
status: AppServerThreadGoalStatus::Active,
token_budget: Some(123),
tokens_used: 45,
time_used_seconds: 6,
created_at: 7,
updated_at: 8,
},
},
notification
);
}
}
+5 -1
View File
@@ -114,6 +114,7 @@ mod tests {
use codex_core::init_state_db;
use codex_core::thread_store_from_config;
use codex_exec_server::EnvironmentManager;
use codex_extension_api::NoopExtensionEventSink;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_protocol::protocol::SessionSource;
@@ -186,7 +187,10 @@ mod tests {
auth_manager,
SessionSource::Exec,
Arc::new(EnvironmentManager::default_for_tests()),
thread_extensions(guardian_agent_spawner(thread_manager.clone())),
thread_extensions(
guardian_agent_spawner(thread_manager.clone()),
Arc::new(NoopExtensionEventSink),
),
/*analytics_events_client*/ None,
thread_store,
Some(state_db.clone()),
+5 -1
View File
@@ -8,6 +8,7 @@ use crate::attestation::app_server_attestation_provider;
use crate::config_manager::ConfigManager;
use crate::connection_rpc_gate::ConnectionRpcGate;
use crate::error_code::invalid_request;
use crate::extensions::app_server_extension_event_sink;
use crate::extensions::guardian_agent_spawner;
use crate::extensions::thread_extensions;
use crate::fs_watch::FsWatchManager;
@@ -310,7 +311,10 @@ impl MessageProcessor {
auth_manager.clone(),
session_source,
environment_manager,
thread_extensions(guardian_agent_spawner(thread_manager.clone())),
thread_extensions(
guardian_agent_spawner(thread_manager.clone()),
app_server_extension_event_sink(outgoing.clone()),
),
Some(analytics_events_client.clone()),
Arc::clone(&thread_store),
state_db.clone(),
@@ -555,6 +555,16 @@ impl OutgoingMessageSender {
.await;
}
pub(crate) fn try_send_server_notification(&self, notification: ServerNotification) {
tracing::trace!("app-server event: {notification}");
let outgoing_message = OutgoingMessage::AppServerNotification(notification);
if let Err(err) = self.sender.try_send(OutgoingEnvelope::Broadcast {
message: outgoing_message,
}) {
warn!("failed to send server notification to client without waiting: {err:?}");
}
}
pub(crate) async fn send_server_notification_to_connections(
&self,
connection_ids: &[ConnectionId],