From 841f30598c05eebb62139bdd75abef1b7b61d5f7 Mon Sep 17 00:00:00 2001 From: alexsong-oai Date: Thu, 25 Jun 2026 18:15:48 -0700 Subject: [PATCH] [codex] Attribute app-server analytics by thread originator (#29935) ## Why Desktop Work threads and regular Codex threads can share the same app-server connection. App-server analytics currently copy `product_client_id` from connection metadata for every thread-scoped event, so Work thread activity is attributed to the Desktop connection instead of the thread's resolved originator. This prevents analytics from distinguishing the two products on a shared connection. ## What changed - Publish the resolved originator after a thread is materialized, covering new, resumed, forked, and subagent threads. - Store that originator in the analytics reducer's existing per-thread state. - Override only `app_server_client.product_client_id` for thread, turn, tool, review, goal, guardian, and compaction events while preserving the connection's client name, version, and transport metadata. - Fall back to the connection-wide product client ID when a thread has no originator override. - Preserve persisted originators in thread initialization analytics for resume and fork flows. ## Validation - `just test -p codex-analytics thread_originator_overrides_shared_connection_across_thread_events subagent_events_keep_thread_originator_with_explicit_turn_connection` - `just test -p codex-app-server turn_start_tracks_thread_originator_in_analytics thread_start_tracks_thread_initialized_analytics thread_fork_tracks_thread_initialized_analytics thread_resume_tracks_thread_initialized_analytics` - `just test -p codex-core thread_manager` --- .../analytics/src/analytics_client_tests.rs | 176 +++++++++++++++++- codex-rs/analytics/src/client.rs | 26 +++ codex-rs/analytics/src/facts.rs | 1 + codex-rs/analytics/src/reducer.rs | 114 ++++++++---- codex-rs/app-server/src/outgoing_message.rs | 48 ++++- .../request_processors/thread_lifecycle.rs | 5 +- .../request_processors/thread_processor.rs | 13 +- .../app-server/tests/suite/v2/analytics.rs | 5 +- .../app-server/tests/suite/v2/thread_fork.rs | 1 + .../tests/suite/v2/thread_resume.rs | 91 ++++++++- .../app-server/tests/suite/v2/thread_start.rs | 2 + .../app-server/tests/suite/v2/turn_start.rs | 6 +- 12 files changed, 431 insertions(+), 57 deletions(-) diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index c183ad925..4451d39ed 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -558,6 +558,7 @@ async fn ingest_rejected_turn_steer( response: Box::new(sample_thread_resume_response( "thread-2", /*ephemeral*/ false, "gpt-5", )), + thread_originator: None, }, out, ) @@ -631,6 +632,7 @@ async fn ingest_turn_prerequisites( response: Box::new(sample_thread_start_response( "thread-2", /*ephemeral*/ false, "gpt-5", )), + thread_originator: None, }, out, ) @@ -654,6 +656,7 @@ async fn ingest_turn_prerequisites( connection_id: 7, request_id: RequestId::Integer(3), response: Box::new(sample_turn_start_response("turn-2")), + thread_originator: None, }, out, ) @@ -720,6 +723,7 @@ async fn ingest_review_prerequisites( response: Box::new(sample_thread_start_response( "thread-1", /*ephemeral*/ false, "gpt-5", )), + thread_originator: None, }, events, ) @@ -1652,6 +1656,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize /*ephemeral*/ false, "gpt-5", )), + thread_originator: None, }, &mut events, ) @@ -1697,6 +1702,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize response: Box::new(sample_thread_resume_response( "thread-1", /*ephemeral*/ true, "gpt-5", )), + thread_originator: None, }, &mut events, ) @@ -1741,6 +1747,157 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize ); } +#[tokio::test] +async fn thread_originator_overrides_shared_connection_across_thread_events() { + let mut reducer = AnalyticsReducer::default(); + let mut events = Vec::new(); + + reducer + .ingest(sample_initialize_fact(/*connection_id*/ 7), &mut events) + .await; + for (request_id, thread_id, thread_originator) in [ + (1, "thread-work", Some(TEST_PRODUCT_CLIENT_ID.to_string())), + (2, "thread-default", None), + ] { + reducer + .ingest( + AnalyticsFact::ClientResponse { + connection_id: 7, + request_id: RequestId::Integer(request_id), + response: Box::new(sample_thread_start_response( + thread_id, /*ephemeral*/ false, "gpt-5", + )), + thread_originator, + }, + &mut events, + ) + .await; + } + + let initialized = serde_json::to_value(&events).expect("serialize thread events"); + assert_eq!( + initialized + .as_array() + .expect("thread events") + .iter() + .map(|event| { + json!({ + "thread_id": event["event_params"]["thread_id"], + "app_server_client": event["event_params"]["app_server_client"], + }) + }) + .collect::>(), + vec![ + json!({ + "thread_id": "thread-work", + "app_server_client": { + "product_client_id": TEST_PRODUCT_CLIENT_ID, + "client_name": "codex-tui", + "client_version": "1.0.0", + "rpc_transport": "websocket", + "experimental_api_enabled": false, + }, + }), + json!({ + "thread_id": "thread-default", + "app_server_client": { + "product_client_id": DEFAULT_ORIGINATOR, + "client_name": "codex-tui", + "client_version": "1.0.0", + "rpc_transport": "websocket", + "experimental_api_enabled": false, + }, + }), + ] + ); + + events.clear(); + reducer + .ingest( + AnalyticsFact::ClientRequest { + connection_id: 7, + request_id: RequestId::Integer(3), + request: Box::new(sample_turn_start_request( + "thread-work", + /*request_id*/ 3, + )), + }, + &mut events, + ) + .await; + reducer + .ingest( + AnalyticsFact::ClientResponse { + connection_id: 7, + request_id: RequestId::Integer(3), + response: Box::new(sample_turn_start_response("turn-1")), + thread_originator: None, + }, + &mut events, + ) + .await; + ingest_completed_command_execution_item(&mut reducer, &mut events, "thread-work", "item-work") + .await; + ingest_complete_child_turn(&mut reducer, &mut events, "thread-work", "turn-1").await; + reducer + .ingest( + AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new( + CodexCompactionEvent { + thread_id: "thread-work".to_string(), + turn_id: "turn-compact".to_string(), + trigger: CompactionTrigger::Manual, + reason: CompactionReason::UserRequested, + implementation: CompactionImplementation::Responses, + phase: CompactionPhase::StandaloneTurn, + strategy: CompactionStrategy::Memento, + status: CompactionStatus::Completed, + codex_error_kind: None, + codex_error_http_status_code: None, + active_context_tokens_before: 131_000, + active_context_tokens_after: 64_000, + retained_image_count: None, + compaction_summary_tokens: None, + cached_input_tokens: None, + started_at: 100, + completed_at: 101, + duration_ms: Some(1200), + }, + ))), + &mut events, + ) + .await; + + let lifecycle = serde_json::to_value(&events).expect("serialize lifecycle events"); + assert_eq!( + lifecycle + .as_array() + .expect("lifecycle events") + .iter() + .map(|event| { + json!({ + "event_type": event["event_type"], + "product_client_id": + event["event_params"]["app_server_client"]["product_client_id"], + }) + }) + .collect::>(), + vec![ + json!({ + "event_type": "codex_command_execution_event", + "product_client_id": TEST_PRODUCT_CLIENT_ID, + }), + json!({ + "event_type": "codex_turn_event", + "product_client_id": TEST_PRODUCT_CLIENT_ID, + }), + json!({ + "event_type": "codex_compaction_event", + "product_client_id": TEST_PRODUCT_CLIENT_ID, + }), + ] + ); +} + #[tokio::test] async fn unrelated_client_requests_are_ignored_by_reducer() { let mut reducer = AnalyticsReducer::default(); @@ -1767,6 +1924,7 @@ async fn unrelated_client_requests_are_ignored_by_reducer() { connection_id: 7, request_id: RequestId::Integer(3), response: Box::new(sample_turn_start_response("turn-2")), + thread_originator: None, }, &mut events, ) @@ -1792,6 +1950,7 @@ async fn unrelated_client_responses_are_ignored_by_reducer() { response: Box::new(ClientResponsePayload::ThreadArchive( ThreadArchiveResponse {}, )), + thread_originator: None, }, &mut events, ) @@ -1851,6 +2010,7 @@ async fn compaction_event_ingests_custom_fact() { Some(AppServerThreadSource::Subagent), Some(parent_thread_id.to_string()), )), + thread_originator: None, }, &mut events, ) @@ -1971,6 +2131,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() { /*ephemeral*/ false, "gpt-5", )), + thread_originator: None, }, &mut events, ) @@ -2499,6 +2660,7 @@ async fn item_review_summaries_do_not_cross_threads_with_reused_item_ids() { response: Box::new(sample_thread_start_response( "thread-2", /*ephemeral*/ false, "gpt-5", )), + thread_originator: None, }, &mut events, ) @@ -2749,7 +2911,7 @@ async fn subagent_thread_started_publishes_without_initialize() { } #[tokio::test] -async fn subagent_events_use_inherited_connection_unless_turn_connection_is_explicit() { +async fn subagent_events_keep_thread_originator_with_explicit_turn_connection() { let mut reducer = AnalyticsReducer::default(); let mut events = Vec::new(); let parent_thread_id = @@ -2786,6 +2948,7 @@ async fn subagent_events_use_inherited_connection_unless_turn_connection_is_expl /*ephemeral*/ false, "gpt-5", )), + thread_originator: None, }, &mut events, ) @@ -2908,6 +3071,7 @@ async fn subagent_events_use_inherited_connection_unless_turn_connection_is_expl connection_id: 8, request_id: RequestId::Integer(3), response: Box::new(sample_turn_start_response("turn-explicit")), + thread_originator: None, }, &mut events, ) @@ -2918,7 +3082,11 @@ async fn subagent_events_use_inherited_connection_unless_turn_connection_is_expl }; assert_eq!( event.event_params.app_server_client.product_client_id, - DEFAULT_ORIGINATOR + "parent-client" + ); + assert_eq!( + event.event_params.app_server_client.client_name.as_deref(), + Some("codex-tui") ); } @@ -3868,6 +4036,7 @@ async fn accepted_turn_steer_emits_expected_event() { connection_id: 7, request_id: RequestId::Integer(4), response: Box::new(sample_turn_steer_response("turn-2")), + thread_originator: None, }, &mut out, ) @@ -4039,6 +4208,7 @@ async fn turn_start_error_response_discards_pending_start_request() { connection_id: 7, request_id: RequestId::Integer(3), response: Box::new(sample_turn_start_response("turn-2")), + thread_originator: None, }, &mut out, ) @@ -4367,6 +4537,7 @@ async fn accepted_steers_increment_turn_steer_count() { connection_id: 7, request_id: RequestId::Integer(4), response: Box::new(sample_turn_steer_response("turn-2")), + thread_originator: None, }, &mut out, ) @@ -4414,6 +4585,7 @@ async fn accepted_steers_increment_turn_steer_count() { connection_id: 7, request_id: RequestId::Integer(6), response: Box::new(sample_turn_steer_response("turn-2")), + thread_originator: None, }, &mut out, ) diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index 5996734f8..f957a665a 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -435,6 +435,31 @@ impl AnalyticsEventsClient { connection_id: u64, request_id: RequestId, response: ClientResponsePayload, + ) { + self.track_response_inner( + connection_id, + request_id, + response, + /*thread_originator*/ None, + ); + } + + pub fn track_response_with_thread_originator( + &self, + connection_id: u64, + request_id: RequestId, + response: ClientResponsePayload, + thread_originator: String, + ) { + self.track_response_inner(connection_id, request_id, response, Some(thread_originator)); + } + + fn track_response_inner( + &self, + connection_id: u64, + request_id: RequestId, + response: ClientResponsePayload, + thread_originator: Option, ) { if !matches!( response, @@ -450,6 +475,7 @@ impl AnalyticsEventsClient { connection_id, request_id, response: Box::new(response), + thread_originator, }); } diff --git a/codex-rs/analytics/src/facts.rs b/codex-rs/analytics/src/facts.rs index 89b870864..3346a6e19 100644 --- a/codex-rs/analytics/src/facts.rs +++ b/codex-rs/analytics/src/facts.rs @@ -464,6 +464,7 @@ pub(crate) enum AnalyticsFact { connection_id: u64, request_id: RequestId, response: Box, + thread_originator: Option, }, ErrorResponse { connection_id: u64, diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index 522a30768..29f21bac5 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -165,6 +165,20 @@ struct ConnectionState { struct ThreadAnalyticsState { connection_id: Option, metadata: Option, + originator: Option, +} + +impl ThreadAnalyticsState { + fn app_server_client( + &self, + connection_state: &ConnectionState, + ) -> CodexAppServerClientMetadata { + let mut app_server_client = connection_state.app_server_client.clone(); + if let Some(originator) = self.originator.as_ref() { + app_server_client.product_client_id.clone_from(originator); + } + app_server_client + } } #[derive(Clone, Copy)] @@ -434,9 +448,11 @@ impl AnalyticsReducer { connection_id, request_id, response, + thread_originator, } => { if let Some(response) = response.into_client_response(request_id) { - self.ingest_response(connection_id, response, out).await; + self.ingest_response(connection_id, response, thread_originator, out) + .await; } } AnalyticsFact::ErrorResponse { @@ -575,6 +591,9 @@ impl AnalyticsReducer { .and_then(|parent_thread_id| self.threads.get(parent_thread_id)) .and_then(|thread| thread.connection_id); let thread_state = self.threads.entry(input.thread_id.clone()).or_default(); + thread_state + .originator + .get_or_insert_with(|| input.product_client_id.clone()); thread_state .metadata .get_or_insert_with(|| ThreadMetadataState { @@ -597,7 +616,7 @@ impl AnalyticsReducer { input: GuardianReviewEventParams, out: &mut Vec, ) { - let Some((connection_state, thread_metadata)) = + let Some((connection_state, thread_state, thread_metadata)) = self.thread_context_or_warn(AnalyticsDropSite::guardian(&input)) else { return; @@ -607,7 +626,7 @@ impl AnalyticsReducer { event_type: "codex_guardian_review", event_params: GuardianReviewEventPayload { session_id: thread_metadata.session_id.clone(), - app_server_client: connection_state.app_server_client.clone(), + app_server_client: thread_state.app_server_client(connection_state), runtime: connection_state.runtime.clone(), guardian_review: input, }, @@ -874,6 +893,7 @@ impl AnalyticsReducer { &mut self, connection_id: u64, response: ClientResponse, + thread_originator: Option, out: &mut Vec, ) { match response { @@ -883,6 +903,7 @@ impl AnalyticsReducer { response.thread, response.model, ThreadInitializationMode::New, + thread_originator, out, ); } @@ -892,6 +913,7 @@ impl AnalyticsReducer { response.thread, response.model, ThreadInitializationMode::Resumed, + thread_originator, out, ); } @@ -901,6 +923,7 @@ impl AnalyticsReducer { response.thread, response.model, ThreadInitializationMode::Forked, + thread_originator, out, ); } @@ -1248,7 +1271,7 @@ impl AnalyticsReducer { else { return; }; - let Some((connection_state, thread_metadata)) = self + let Some((connection_state, thread_state, thread_metadata)) = self .thread_context_or_warn(AnalyticsDropSite::tool_item(¬ification, item_id)) else { return; @@ -1260,6 +1283,7 @@ impl AnalyticsReducer { started_at_ms, completed_at_ms, connection_state, + thread_state, thread_metadata, review_summary: self.item_review_summaries.get(&key), }) { @@ -1316,6 +1340,7 @@ impl AnalyticsReducer { thread: codex_app_server_protocol::Thread, model: String, initialization_mode: ThreadInitializationMode, + thread_originator: Option, out: &mut Vec, ) { let session_source: SessionSource = thread.source.into(); @@ -1333,20 +1358,20 @@ impl AnalyticsReducer { parent_thread_id, initialization_mode, ); - self.threads.insert( - thread_id.clone(), - ThreadAnalyticsState { - connection_id: Some(connection_id), - metadata: Some(thread_metadata.clone()), - }, - ); + let thread_state = self.threads.entry(thread_id.clone()).or_default(); + if let Some(originator) = thread_originator { + thread_state.originator = Some(originator); + } + thread_state.connection_id = Some(connection_id); + thread_state.metadata = Some(thread_metadata.clone()); + let app_server_client = thread_state.app_server_client(connection_state); out.push(TrackEventRequest::ThreadInitialized( ThreadInitializedEvent { event_type: "codex_thread_initialized", event_params: ThreadInitializedEventParams { thread_id, session_id, - app_server_client: connection_state.app_server_client.clone(), + app_server_client, runtime: connection_state.runtime.clone(), model, ephemeral: thread.ephemeral, @@ -1362,7 +1387,7 @@ impl AnalyticsReducer { } fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec) { - let Some((connection_state, thread_metadata)) = + let Some((connection_state, thread_state, thread_metadata)) = self.thread_context_or_warn(AnalyticsDropSite::compaction(&input)) else { return; @@ -1373,7 +1398,7 @@ impl AnalyticsReducer { event_params: codex_compaction_event_params( input, thread_metadata.session_id.clone(), - connection_state.app_server_client.clone(), + thread_state.app_server_client(connection_state), connection_state.runtime.clone(), thread_metadata.thread_source.clone(), thread_metadata.subagent_source.clone(), @@ -1384,7 +1409,7 @@ impl AnalyticsReducer { } fn ingest_goal(&mut self, input: CodexGoalEvent, out: &mut Vec) { - let Some((connection_state, thread_metadata)) = + let Some((connection_state, thread_state, thread_metadata)) = self.thread_context_or_warn(AnalyticsDropSite::goal(&input)) else { return; @@ -1394,7 +1419,7 @@ impl AnalyticsReducer { event_params: codex_goal_event_params( input, thread_metadata.session_id.clone(), - connection_state.app_server_client.clone(), + thread_state.app_server_client(connection_state), connection_state.runtime.clone(), thread_metadata.thread_source.clone(), thread_metadata.subagent_source.clone(), @@ -1483,11 +1508,11 @@ impl AnalyticsReducer { return; }; let drop_site = AnalyticsDropSite::turn_steer(&pending_request.thread_id); - let Some(thread_metadata) = self - .threads - .get(drop_site.thread_id) - .and_then(|thread| thread.metadata.as_ref()) - else { + let Some(thread_state) = self.threads.get(drop_site.thread_id) else { + warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata); + return; + }; + let Some(thread_metadata) = thread_state.metadata.as_ref() else { warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata); return; }; @@ -1498,7 +1523,7 @@ impl AnalyticsReducer { session_id: thread_metadata.session_id.clone(), expected_turn_id: Some(pending_request.expected_turn_id), accepted_turn_id, - app_server_client: connection_state.app_server_client.clone(), + app_server_client: thread_state.app_server_client(connection_state), runtime: connection_state.runtime.clone(), thread_source: thread_metadata.thread_source.clone(), subagent_source: thread_metadata.subagent_source.clone(), @@ -1529,7 +1554,7 @@ impl AnalyticsReducer { &pending_review, ); } - let Some((connection_state, thread_metadata)) = + let Some((connection_state, thread_state, thread_metadata)) = self.thread_context_or_warn(AnalyticsDropSite::review(&pending_review)) else { return; @@ -1541,7 +1566,7 @@ impl AnalyticsReducer { turn_id: pending_review.turn_id, item_id: pending_review.item_id, review_id: pending_review.review_id, - app_server_client: connection_state.app_server_client.clone(), + app_server_client: thread_state.app_server_client(connection_state), runtime: connection_state.runtime.clone(), thread_source: thread_metadata.thread_source.clone(), subagent_source: thread_metadata.subagent_source.clone(), @@ -1610,18 +1635,18 @@ impl AnalyticsReducer { ); return; }; - let Some(thread_metadata) = self - .threads - .get(drop_site.thread_id) - .and_then(|thread| thread.metadata.as_ref()) - else { + let Some(thread_state) = self.threads.get(drop_site.thread_id) else { + warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata); + return; + }; + let Some(thread_metadata) = thread_state.metadata.as_ref() else { warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata); return; }; let turn_event = TrackEventRequest::TurnEvent(Box::new(CodexTurnEventRequest { event_type: "codex_turn_event", event_params: codex_turn_event_params( - connection_state.app_server_client.clone(), + thread_state.app_server_client(connection_state), connection_state.runtime.clone(), turn_id.to_string(), turn_state, @@ -1663,17 +1688,18 @@ impl AnalyticsReducer { fn thread_context_or_warn( &self, drop_site: AnalyticsDropSite<'_>, - ) -> Option<(&ConnectionState, &ThreadMetadataState)> { + ) -> Option<( + &ConnectionState, + &ThreadAnalyticsState, + &ThreadMetadataState, + )> { let connection_state = self.thread_connection_or_warn(drop_site)?; - let Some(thread_metadata) = self - .threads - .get(drop_site.thread_id) - .and_then(|thread| thread.metadata.as_ref()) - else { + let thread_state = self.threads.get(drop_site.thread_id)?; + let Some(thread_metadata) = thread_state.metadata.as_ref() else { warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata); return None; }; - Some((connection_state, thread_metadata)) + Some((connection_state, thread_state, thread_metadata)) } } @@ -1743,6 +1769,7 @@ struct ToolItemEventInput<'a> { started_at_ms: u64, completed_at_ms: u64, connection_state: &'a ConnectionState, + thread_state: &'a ThreadAnalyticsState, thread_metadata: &'a ThreadMetadataState, review_summary: Option<&'a ItemReviewSummary>, } @@ -1755,6 +1782,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option { started_at_ms, completed_at_ms, connection_state, + thread_state, thread_metadata, review_summary, } = input; @@ -1784,6 +1812,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option { started_at_ms, completed_at_ms, connection_state, + thread_state, thread_metadata, review_summary, }, @@ -1825,6 +1854,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option { started_at_ms, completed_at_ms, connection_state, + thread_state, thread_metadata, review_summary, }, @@ -1866,6 +1896,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option { started_at_ms, completed_at_ms, connection_state, + thread_state, thread_metadata, review_summary, }, @@ -1910,6 +1941,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option { started_at_ms, completed_at_ms, connection_state, + thread_state, thread_metadata, review_summary, }, @@ -1954,6 +1986,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option { started_at_ms, completed_at_ms, connection_state, + thread_state, thread_metadata, review_summary, }, @@ -2009,6 +2042,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option { started_at_ms, completed_at_ms, connection_state, + thread_state, thread_metadata, review_summary, }, @@ -2045,6 +2079,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option { started_at_ms, completed_at_ms, connection_state, + thread_state, thread_metadata, review_summary, }, @@ -2100,6 +2135,7 @@ struct ToolItemContext<'a> { started_at_ms: u64, completed_at_ms: u64, connection_state: &'a ConnectionState, + thread_state: &'a ThreadAnalyticsState, thread_metadata: &'a ThreadMetadataState, review_summary: Option<&'a ItemReviewSummary>, } @@ -2118,7 +2154,9 @@ fn tool_item_base( thread_id: thread_id.to_string(), turn_id: turn_id.to_string(), item_id, - app_server_client: context.connection_state.app_server_client.clone(), + app_server_client: context + .thread_state + .app_server_client(context.connection_state), runtime: context.connection_state.runtime.clone(), thread_source: thread_metadata.thread_source.clone(), subagent_source: thread_metadata.subagent_source.clone(), diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 9ce815baf..16d5c2ff9 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -504,13 +504,36 @@ impl OutgoingMessageSender { where T: Into, { - self.send_response_as(request_id, response.into()).await; + self.send_response_as_inner(request_id, response.into(), /*thread_originator*/ None) + .await; + } + + pub(crate) async fn send_response_with_thread_originator( + &self, + request_id: ConnectionRequestId, + response: T, + thread_originator: String, + ) where + T: Into, + { + self.send_response_as_inner(request_id, response.into(), Some(thread_originator)) + .await; } pub(crate) async fn send_response_as( &self, request_id: ConnectionRequestId, response: ClientResponsePayload, + ) { + self.send_response_as_inner(request_id, response, /*thread_originator*/ None) + .await; + } + + async fn send_response_as_inner( + &self, + request_id: ConnectionRequestId, + response: ClientResponsePayload, + thread_originator: Option, ) { let connection_id = request_id.connection_id; let request_id_for_analytics = request_id.request_id.clone(); @@ -518,11 +541,24 @@ impl OutgoingMessageSender { .into_jsonrpc_parts_and_payload(request_id.request_id.clone()) .map(|(id, result, response)| { if let Some(response) = response { - self.analytics_events_client.track_response( - connection_id.0, - request_id_for_analytics, - response, - ); + match thread_originator { + Some(thread_originator) => { + self.analytics_events_client + .track_response_with_thread_originator( + connection_id.0, + request_id_for_analytics, + response, + thread_originator, + ); + } + None => { + self.analytics_events_client.track_response( + connection_id.0, + request_id_for_analytics, + response, + ); + } + } } (id, result) }); diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index 03031c87f..82a49bae0 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -640,6 +640,7 @@ pub(super) async fn handle_pending_thread_resume_request( active_permission_profile, workspace_roots, reasoning_effort, + originator, .. } = config_snapshot; let instruction_sources = pending.instruction_sources; @@ -665,7 +666,9 @@ pub(super) async fn handle_pending_thread_resume_request( multi_agent_mode: MultiAgentMode::ExplicitRequestOnly, initial_turns_page, }; - outgoing.send_response(request_id, response).await; + outgoing + .send_response_with_thread_originator(request_id, response, originator) + .await; // Match cold resume: metadata-only resume should attach the listener without // paying the cost of turn reconstruction for historical usage replay. if let Some(token_usage_thread) = token_usage_thread { diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 39283bb35..8232d68fe 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -1268,6 +1268,7 @@ impl ThreadRequestProcessor { let cwd = config_snapshot.cwd().clone(); let active_permission_profile = thread_response_active_permission_profile(config_snapshot.active_permission_profile); + let thread_originator = config_snapshot.originator.clone(); let response = ThreadStartResponse { thread: thread.clone(), @@ -1287,7 +1288,7 @@ impl ThreadRequestProcessor { let notif = thread_started_notification(thread); listener_task_context .outgoing - .send_response(request_id, response) + .send_response_with_thread_originator(request_id, response, thread_originator) .instrument(tracing::info_span!( "app_server.thread_start.send_response", otel.name = "app_server.thread_start.send_response", @@ -2863,6 +2864,7 @@ impl ThreadRequestProcessor { } } + let thread_originator = config_snapshot.originator.clone(); let response = ThreadResumeResponse { thread, model: session_configured.model, @@ -2881,7 +2883,9 @@ impl ThreadRequestProcessor { }; let connection_id = request_id.connection_id; - self.outgoing.send_response(request_id, response).await; + self.outgoing + .send_response_with_thread_originator(request_id, response, thread_originator) + .await; // `excludeTurns` is explicitly the cheap resume path, so avoid // rebuilding history only to attribute a replayed usage update. if let Some(token_usage_thread) = token_usage_thread { @@ -3582,6 +3586,7 @@ impl ThreadRequestProcessor { ); let active_permission_profile = thread_response_active_permission_profile(config_snapshot.active_permission_profile); + let thread_originator = config_snapshot.originator.clone(); let response = ThreadForkResponse { thread: thread.clone(), @@ -3602,7 +3607,9 @@ impl ThreadRequestProcessor { let notif = thread_started_notification(thread); let connection_id = request_id.connection_id; let token_usage_thread = include_turns.then(|| response.thread.clone()); - self.outgoing.send_response(request_id, response).await; + self.outgoing + .send_response_with_thread_originator(request_id, response, thread_originator) + .await; // `excludeTurns` is the cheap fork path, so skip restored usage replay // instead of rebuilding history only to attribute a historical update. if let Some(token_usage_thread) = token_usage_thread { diff --git a/codex-rs/app-server/tests/suite/v2/analytics.rs b/codex-rs/app-server/tests/suite/v2/analytics.rs index ea876940a..2acc86cae 100644 --- a/codex-rs/app-server/tests/suite/v2/analytics.rs +++ b/codex-rs/app-server/tests/suite/v2/analytics.rs @@ -145,7 +145,7 @@ pub(crate) async fn wait_for_goal_event( .await } -async fn wait_for_matching_analytics_event( +pub(crate) async fn wait_for_matching_analytics_event( server: &MockServer, read_timeout: Duration, matches: impl Fn(&Value) -> bool, @@ -191,6 +191,7 @@ pub(crate) fn assert_basic_thread_initialized_event( event: &Value, thread_id: &str, session_id: &str, + expected_product_client_id: &str, expected_model: &str, initialization_mode: &str, expected_thread_source: &str, @@ -199,7 +200,7 @@ pub(crate) fn assert_basic_thread_initialized_event( assert_eq!(event["event_params"]["session_id"], session_id); assert_eq!( event["event_params"]["app_server_client"]["product_client_id"], - DEFAULT_CLIENT_NAME + expected_product_client_id ); assert_eq!( event["event_params"]["app_server_client"]["client_name"], diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index 214a67867..4102b45d1 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -488,6 +488,7 @@ async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> { event, &thread.id, &thread.session_id, + "codex", "mock-model", "forked", "user", diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 533087a11..42db56f52 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -107,6 +107,7 @@ use super::analytics::mount_analytics_capture; use super::analytics::thread_initialized_event; use super::analytics::wait_for_analytics_payload; use super::analytics::wait_for_goal_event; +use super::analytics::wait_for_matching_analytics_event; #[cfg(windows)] const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25); @@ -573,11 +574,12 @@ async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> { Some("mock_provider"), /*git_info*/ None, )?; - set_thread_source_on_fake_rollout( + set_session_meta_on_fake_rollout( codex_home.path(), "2025-01-05T12-00-00", &conversation_id, "user", + "codex_work_desktop", )?; let mut mcp = TestAppServer::new_without_managed_config(codex_home.path()).await?; @@ -607,6 +609,7 @@ async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> { event, &thread.id, &thread.session_id, + "codex_work_desktop", "gpt-5.3-codex", "resumed", "user", @@ -615,11 +618,94 @@ async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> { Ok(()) } -fn set_thread_source_on_fake_rollout( +#[tokio::test] +async fn thread_resume_running_thread_tracks_thread_originator_in_analytics() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + + let codex_home = TempDir::new()?; + create_config_toml_with_chatgpt_base_url(codex_home.path(), &server.uri(), &server.uri())?; + mount_analytics_capture(&server, codex_home.path()).await?; + + let mut mcp = TestAppServer::new_without_managed_config(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + thread_source: Some(ThreadSource::User), + service_name: Some("codex_work_desktop".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + client_user_message_id: None, + input: vec![UserInput::Text { + text: "materialize rollout".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread.id.clone(), + exclude_turns: true, + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { + thread: resumed, .. + } = to_response::(resume_resp)?; + + let event = wait_for_matching_analytics_event(&server, DEFAULT_READ_TIMEOUT, |event| { + event["event_type"] == "codex_thread_initialized" + && event["event_params"]["thread_id"] == resumed.id + && event["event_params"]["initialization_mode"] == "resumed" + }) + .await?; + assert_basic_thread_initialized_event( + &event, + &resumed.id, + &resumed.session_id, + "codex_work_desktop", + "mock-model", + "resumed", + "user", + ); + Ok(()) +} + +fn set_session_meta_on_fake_rollout( codex_home: &std::path::Path, filename_ts: &str, thread_id: &str, thread_source: &str, + originator: &str, ) -> Result<()> { let path = rollout_path(codex_home, filename_ts, thread_id); let contents = std::fs::read_to_string(&path)?; @@ -629,6 +715,7 @@ fn set_thread_source_on_fake_rollout( .ok_or_else(|| anyhow::anyhow!("fake rollout missing session meta"))?; let mut session_meta: serde_json::Value = serde_json::from_str(session_meta)?; session_meta["payload"]["thread_source"] = serde_json::json!(thread_source); + session_meta["payload"]["originator"] = serde_json::json!(originator); let remaining = lines.collect::>().join("\n"); std::fs::write(&path, format!("{session_meta}\n{remaining}\n"))?; Ok(()) diff --git a/codex-rs/app-server/tests/suite/v2/thread_start.rs b/codex-rs/app-server/tests/suite/v2/thread_start.rs index 683a2bed1..ac4b255de 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_start.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_start.rs @@ -655,6 +655,7 @@ async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> { let req_id = mcp .send_thread_start_request(ThreadStartParams { thread_source: Some(ThreadSource::User), + service_name: Some("codex_work_desktop".to_string()), ..Default::default() }) .await?; @@ -672,6 +673,7 @@ async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> { event, &thread.id, &thread.session_id, + "codex_work_desktop", "mock-model", "new", "user", diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index a52829858..528292ae3 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -1,6 +1,5 @@ use anyhow::Context; use anyhow::Result; -use app_test_support::DEFAULT_CLIENT_NAME; use app_test_support::TestAppServer; use app_test_support::create_apply_patch_sse_response; use app_test_support::create_exec_command_sse_response; @@ -840,7 +839,7 @@ async fn thread_start_omits_empty_instruction_overrides_from_model_request() -> } #[tokio::test] -async fn turn_start_tracks_turn_event_analytics() -> Result<()> { +async fn turn_start_tracks_thread_originator_in_analytics() -> Result<()> { let server = responses::start_mock_server().await; let response_mock = responses::mount_response_sequence( &server, @@ -875,6 +874,7 @@ async fn turn_start_tracks_turn_event_analytics() -> Result<()> { .send_thread_start_request(ThreadStartParams { model: Some("mock-model".to_string()), thread_source: Some(ThreadSource::User), + service_name: Some("codex_work_desktop".to_string()), ..Default::default() }) .await?; @@ -919,7 +919,7 @@ async fn turn_start_tracks_turn_event_analytics() -> Result<()> { assert_eq!(event["event_params"]["turn_id"], turn.id); assert_eq!( event["event_params"]["app_server_client"]["product_client_id"], - DEFAULT_CLIENT_NAME + "codex_work_desktop" ); assert_eq!(event["event_params"]["model"], "mock-model"); assert_eq!(event["event_params"]["model_provider"], "mock_provider");