diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 6b5b5afdf..5a32a8e18 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -3776,7 +3776,6 @@ dependencies = [ "chrono", "codex-file-search", "codex-git-utils", - "codex-login", "codex-otel", "codex-protocol", "codex-state", diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 38d953961..9f8df04b4 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -168,6 +168,17 @@ use std::sync::Arc; use std::sync::Mutex; use tokio::sync::mpsc; +const TEST_PRODUCT_CLIENT_ID: &str = "codex_work_desktop"; + +fn test_tracking_context(thread_id: &str, turn_id: &str) -> TrackEventsContext { + TrackEventsContext { + model_slug: "gpt-5".to_string(), + thread_id: thread_id.to_string(), + turn_id: turn_id.to_string(), + product_client_id: TEST_PRODUCT_CLIENT_ID.to_string(), + } +} + fn sample_thread_with_metadata( thread_id: &str, ephemeral: bool, @@ -1024,11 +1035,7 @@ fn normalize_path_for_skill_id_repo_root_not_in_skill_path_uses_absolute_path() #[test] fn app_mentioned_event_serializes_expected_shape() { - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; + let tracking = test_tracking_context("thread-1", "turn-1"); let event = TrackEventRequest::AppMentioned(CodexAppMentionedEventRequest { event_type: "codex_app_mentioned", event_params: codex_app_metadata( @@ -1052,7 +1059,7 @@ fn app_mentioned_event_serializes_expected_shape() { "thread_id": "thread-1", "turn_id": "turn-1", "app_name": "Calendar", - "product_client_id": originator().value, + "product_client_id": TEST_PRODUCT_CLIENT_ID, "invoke_type": "explicit", "model_slug": "gpt-5" } @@ -1062,11 +1069,7 @@ fn app_mentioned_event_serializes_expected_shape() { #[test] fn app_used_event_serializes_expected_shape() { - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-2".to_string(), - turn_id: "turn-2".to_string(), - }; + let tracking = test_tracking_context("thread-2", "turn-2"); let event = TrackEventRequest::AppUsed(CodexAppUsedEventRequest { event_type: "codex_app_used", event_params: codex_app_metadata( @@ -1090,7 +1093,7 @@ fn app_used_event_serializes_expected_shape() { "thread_id": "thread-2", "turn_id": "turn-2", "app_name": "Google Drive", - "product_client_id": originator().value, + "product_client_id": TEST_PRODUCT_CLIENT_ID, "invoke_type": "implicit", "model_slug": "gpt-5" } @@ -1381,16 +1384,8 @@ fn app_used_dedupe_is_keyed_by_turn_and_connector() { invocation_type: Some(InvocationType::Implicit), }; - let turn_1 = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; - let turn_2 = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-2".to_string(), - }; + let turn_1 = test_tracking_context("thread-1", "turn-1"); + let turn_2 = test_tracking_context("thread-1", "turn-2"); assert_eq!(queue.should_enqueue_app_used(&turn_1, &app), true); assert_eq!(queue.should_enqueue_app_used(&turn_1, &app), false); @@ -3009,11 +3004,7 @@ async fn subagent_tool_items_inherit_parent_connection_metadata() { #[test] fn plugin_used_event_serializes_expected_shape() { - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-3".to_string(), - turn_id: "turn-3".to_string(), - }; + let tracking = test_tracking_context("thread-3", "turn-3"); let event = TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest { event_type: "codex_plugin_used", event_params: codex_plugin_used_metadata(&tracking, sample_plugin_metadata()), @@ -3033,7 +3024,7 @@ fn plugin_used_event_serializes_expected_shape() { "has_skills": true, "mcp_server_count": 2, "connector_ids": ["calendar", "drive"], - "product_client_id": originator().value, + "product_client_id": TEST_PRODUCT_CLIENT_ID, "mcp_server_names": ["mcp-1", "mcp-2"], "thread_id": "thread-3", "turn_id": "turn-3", @@ -3132,11 +3123,7 @@ fn plugin_management_event_keeps_plugin_id_local_when_remote_id_exists() { #[test] fn hook_run_event_serializes_expected_shape() { - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-3".to_string(), - turn_id: "turn-3".to_string(), - }; + let tracking = test_tracking_context("thread-3", "turn-3"); let event = TrackEventRequest::HookRun(CodexHookRunEventRequest { event_type: "codex_hook_run", event_params: codex_hook_run_metadata( @@ -3158,6 +3145,7 @@ fn hook_run_event_serializes_expected_shape() { "event_params": { "thread_id": "thread-3", "turn_id": "turn-3", + "product_client_id": TEST_PRODUCT_CLIENT_ID, "model_slug": "gpt-5", "hook_name": "PreToolUse", "hook_source": "user", @@ -3169,11 +3157,7 @@ fn hook_run_event_serializes_expected_shape() { #[test] fn hook_run_metadata_maps_sources_and_statuses() { - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; + let tracking = test_tracking_context("thread-1", "turn-1"); let system = serde_json::to_value(codex_hook_run_metadata( &tracking, @@ -3224,11 +3208,7 @@ fn hook_run_metadata_maps_sources_and_statuses() { #[test] fn hook_run_metadata_maps_stopped_status() { - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; + let tracking = test_tracking_context("thread-1", "turn-1"); let stopped = serde_json::to_value(codex_hook_run_metadata( &tracking, @@ -3254,16 +3234,8 @@ fn plugin_used_dedupe_is_keyed_by_turn_and_plugin() { }; let plugin = sample_plugin_metadata(); - let turn_1 = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; - let turn_2 = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-2".to_string(), - }; + let turn_1 = test_tracking_context("thread-1", "turn-1"); + let turn_2 = test_tracking_context("thread-1", "turn-2"); assert_eq!(queue.should_enqueue_plugin_used(&turn_1, &plugin), true); assert_eq!(queue.should_enqueue_plugin_used(&turn_1, &plugin), false); @@ -3274,11 +3246,7 @@ fn plugin_used_dedupe_is_keyed_by_turn_and_plugin() { async fn reducer_ingests_skill_invoked_fact() { let mut reducer = AnalyticsReducer::default(); let mut events = Vec::new(); - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; + let tracking = test_tracking_context("thread-1", "turn-1"); let skill_path = PathBuf::from("/Users/abc/.codex/skills/doc/SKILL.md"); let expected_skill_id = skill_id_for_local_skill( /*repo_url*/ None, @@ -3311,7 +3279,7 @@ async fn reducer_ingests_skill_invoked_fact() { "skill_id": expected_skill_id, "skill_name": "doc", "event_params": { - "product_client_id": originator().value, + "product_client_id": TEST_PRODUCT_CLIENT_ID, "skill_scope": "user", "plugin_id": null, "repo_url": null, @@ -3328,11 +3296,7 @@ async fn reducer_ingests_skill_invoked_fact() { async fn reducer_includes_plugin_id_for_plugin_skill_invocations() { let mut reducer = AnalyticsReducer::default(); let mut events = Vec::new(); - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; + let tracking = test_tracking_context("thread-1", "turn-1"); let skill_path = PathBuf::from("/Users/abc/.codex/plugins/cache/test/sample/skills/doc/SKILL.md"); @@ -3367,11 +3331,7 @@ async fn reducer_ingests_hook_run_fact() { reducer .ingest( AnalyticsFact::Custom(CustomAnalyticsFact::HookRun(HookRunInput { - tracking: TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }, + tracking: test_tracking_context("thread-1", "turn-1"), hook: HookRunFact { event_name: HookEventName::PostToolUse, hook_source: HookSource::Unknown, @@ -3394,11 +3354,7 @@ async fn reducer_ingests_hook_run_fact() { async fn reducer_ingests_app_and_plugin_facts() { let mut reducer = AnalyticsReducer::default(); let mut events = Vec::new(); - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; + let tracking = test_tracking_context("thread-1", "turn-1"); reducer .ingest( @@ -3441,6 +3397,18 @@ async fn reducer_ingests_app_and_plugin_facts() { assert_eq!(payload[0]["event_type"], "codex_app_mentioned"); assert_eq!(payload[1]["event_type"], "codex_app_used"); assert_eq!(payload[2]["event_type"], "codex_plugin_used"); + assert_eq!( + payload[0]["event_params"]["product_client_id"], + TEST_PRODUCT_CLIENT_ID + ); + assert_eq!( + payload[1]["event_params"]["product_client_id"], + TEST_PRODUCT_CLIENT_ID + ); + assert_eq!( + payload[2]["event_params"]["product_client_id"], + TEST_PRODUCT_CLIENT_ID + ); } #[tokio::test] diff --git a/codex-rs/analytics/src/events.rs b/codex-rs/analytics/src/events.rs index 2fe465c9b..1365e5406 100644 --- a/codex-rs/analytics/src/events.rs +++ b/codex-rs/analytics/src/events.rs @@ -773,6 +773,7 @@ pub(crate) struct CodexAppUsedEventRequest { pub(crate) struct CodexHookRunMetadata { pub(crate) thread_id: Option, pub(crate) turn_id: Option, + pub(crate) product_client_id: Option, pub(crate) model_slug: Option, pub(crate) hook_name: Option, pub(crate) hook_source: Option<&'static str>, @@ -1030,13 +1031,20 @@ pub(crate) fn codex_app_metadata( thread_id: Some(tracking.thread_id.clone()), turn_id: Some(tracking.turn_id.clone()), app_name: app.app_name, - product_client_id: Some(originator().value), + product_client_id: Some(tracking.product_client_id.clone()), invoke_type: app.invocation_type, model_slug: Some(tracking.model_slug.clone()), } } pub(crate) fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPluginMetadata { + codex_plugin_metadata_with_product_client_id(plugin, originator().value) +} + +fn codex_plugin_metadata_with_product_client_id( + plugin: PluginTelemetryMetadata, + product_client_id: String, +) -> CodexPluginMetadata { let PluginTelemetryMetadata { plugin_id, remote_plugin_id, @@ -1062,7 +1070,7 @@ pub(crate) fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPlu .map(|connector_id| connector_id.0) .collect() }), - product_client_id: Some(originator().value), + product_client_id: Some(product_client_id), } } @@ -1139,7 +1147,10 @@ pub(crate) fn codex_plugin_used_metadata( .as_ref() .map(|summary| summary.mcp_server_names.clone()); CodexPluginUsedMetadata { - plugin: codex_plugin_metadata(plugin), + plugin: codex_plugin_metadata_with_product_client_id( + plugin, + tracking.product_client_id.clone(), + ), mcp_server_names, thread_id: Some(tracking.thread_id.clone()), turn_id: Some(tracking.turn_id.clone()), @@ -1154,6 +1165,7 @@ pub(crate) fn codex_hook_run_metadata( CodexHookRunMetadata { thread_id: Some(tracking.thread_id.clone()), turn_id: Some(tracking.turn_id.clone()), + product_client_id: Some(tracking.product_client_id.clone()), model_slug: Some(tracking.model_slug.clone()), hook_name: Some(analytics_hook_event_name(hook.event_name).to_owned()), hook_source: Some(analytics_hook_source(hook.hook_source)), diff --git a/codex-rs/analytics/src/facts.rs b/codex-rs/analytics/src/facts.rs index 026c2e876..4505c0c69 100644 --- a/codex-rs/analytics/src/facts.rs +++ b/codex-rs/analytics/src/facts.rs @@ -41,17 +41,20 @@ pub struct TrackEventsContext { pub model_slug: String, pub thread_id: String, pub turn_id: String, + pub product_client_id: String, } pub fn build_track_events_context( model_slug: String, thread_id: String, turn_id: String, + product_client_id: String, ) -> TrackEventsContext { TrackEventsContext { model_slug, thread_id, turn_id, + product_client_id, } } diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index b647984df..666e756d3 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -729,7 +729,7 @@ impl AnalyticsReducer { turn_id: Some(tracking.turn_id.clone()), invoke_type: Some(invocation.invocation_type), model_slug: Some(tracking.model_slug.clone()), - product_client_id: Some(originator().value), + product_client_id: Some(tracking.product_client_id.clone()), repo_url, skill_scope: Some(skill_scope.to_string()), plugin_id: invocation.plugin_id, diff --git a/codex-rs/app-server/src/request_processors/external_agent_session_import.rs b/codex-rs/app-server/src/request_processors/external_agent_session_import.rs index dfa2ea94f..77b9bfcbf 100644 --- a/codex-rs/app-server/src/request_processors/external_agent_session_import.rs +++ b/codex-rs/app-server/src/request_processors/external_agent_session_import.rs @@ -207,6 +207,7 @@ impl ExternalAgentSessionImporter { parent_thread_id: None, source: source.clone(), thread_source: None, + originator: codex_login::default_client::originator().value, base_instructions: BaseInstructions { text: config .base_instructions diff --git a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs index 1f5ae12f4..3f7cbc9d0 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs @@ -781,6 +781,7 @@ mod thread_processor_behavior_tests { forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), }; assert_eq!( diff --git a/codex-rs/app-server/tests/suite/conversation_summary.rs b/codex-rs/app-server/tests/suite/conversation_summary.rs index d4d0cf061..7f964d314 100644 --- a/codex-rs/app-server/tests/suite/conversation_summary.rs +++ b/codex-rs/app-server/tests/suite/conversation_summary.rs @@ -128,6 +128,7 @@ async fn get_conversation_summary_by_thread_id_reads_pathless_store_thread() -> parent_thread_id: None, source: SessionSource::Cli, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, diff --git a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs index 1a6d1eb4f..717fe228f 100644 --- a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs +++ b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs @@ -154,6 +154,7 @@ async fn thread_delete_with_non_local_thread_store_does_not_create_local_persist parent_thread_id: None, source: SessionSource::Cli, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, diff --git a/codex-rs/app-server/tests/suite/v2/thread_read.rs b/codex-rs/app-server/tests/suite/v2/thread_read.rs index 0b4f101c0..3e39070a7 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_read.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_read.rs @@ -1364,6 +1364,7 @@ async fn seed_pathless_store_thread( parent_thread_id: None, source: ProtocolSessionSource::Cli, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, diff --git a/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs b/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs index db8b60d16..708d4bdfb 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs @@ -215,6 +215,7 @@ async fn thread_unarchive_preserves_pathless_store_metadata() -> Result<()> { parent_thread_id: None, source: SessionSource::Cli, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index e37e9b04d..225fd4ba9 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -9,7 +9,9 @@ use crate::config::ConfigBuilder; use crate::context::ContextualUserFragment; use crate::context::SubagentNotification; use crate::init_state_db; +use crate::thread_manager::StartThreadOptions; use assert_matches::assert_matches; +use codex_extension_api::ExtensionDataInit; use codex_features::Feature; use codex_login::CodexAuth; use codex_protocol::AgentPath; @@ -21,7 +23,9 @@ use codex_protocol::models::ResponseItem; use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::InterAgentCommunication; +use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::TurnAbortReason; @@ -139,6 +143,33 @@ impl AgentControlHarness { } } +async fn persisted_originator(thread: &CodexThread) -> String { + thread.ensure_rollout_materialized().await; + thread + .flush_rollout() + .await + .expect("thread rollout should flush"); + let stored_thread = thread + .read_thread( + /*include_archived*/ true, /*include_history*/ true, + ) + .await + .expect("thread should be readable"); + let history = stored_thread.history.expect("history should be loaded"); + history + .items + .iter() + .find_map(|item| match item { + RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.originator.clone()), + RolloutItem::ResponseItem(_) + | RolloutItem::InterAgentCommunication(_) + | RolloutItem::EventMsg(_) + | RolloutItem::Compacted(_) + | RolloutItem::TurnContext(_) => None, + }) + .expect("session metadata should be persisted") +} + fn has_subagent_notification(history_items: &[ResponseItem]) -> bool { history_items.iter().any(|item| { let ResponseItem::Message { role, content, .. } = item else { @@ -2199,6 +2230,83 @@ async fn spawn_thread_subagent_gets_random_nickname_in_session_source() { assert_eq!(agent_role, Some("explorer".to_string())); } +#[tokio::test] +async fn spawn_thread_subagents_persist_parent_originator_across_new_and_truncated_fork() { + let harness = AgentControlHarness::new().await; + let parent = harness + .manager + .start_thread_with_options(StartThreadOptions { + config: harness.config.clone(), + initial_history: InitialHistory::New, + session_source: None, + thread_source: None, + dynamic_tools: Vec::new(), + metrics_service_name: Some("codex_work_desktop".to_string()), + multi_agent_mode: None, + parent_trace: None, + environments: Vec::new(), + thread_extension_init: ExtensionDataInit::default(), + supports_openai_form_elicitation: false, + }) + .await + .expect("parent thread should start"); + let parent_originator = persisted_originator(&parent.thread).await; + assert_eq!(parent_originator, "codex_work_desktop"); + + let child_thread_id = harness + .control + .spawn_agent( + harness.config.clone(), + text_input("hello child"), + Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id: parent.thread_id, + depth: 1, + agent_path: None, + agent_nickname: None, + agent_role: Some("explorer".to_string()), + })), + ) + .await + .expect("child spawn should succeed"); + + let child_thread = harness + .manager + .get_thread(child_thread_id) + .await + .expect("child thread should be registered"); + let child_originator = persisted_originator(&child_thread).await; + assert_eq!(child_originator, parent_originator); + + let child = harness + .control + .spawn_agent_with_metadata( + harness.config.clone(), + text_input("hello forked child"), + Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id: parent.thread_id, + depth: 1, + agent_path: None, + agent_nickname: None, + agent_role: Some("explorer".to_string()), + })), + SpawnAgentOptions { + fork_parent_spawn_call_id: Some("spawn-call-last-n".to_string()), + fork_mode: Some(SpawnAgentForkMode::LastNTurns(1)), + ..Default::default() + }, + ) + .await + .expect("forked child spawn should succeed"); + + let child_thread = harness + .manager + .get_thread(child.thread_id) + .await + .expect("child thread should be registered"); + let child_originator = persisted_originator(&child_thread).await; + assert_eq!(child_originator, parent_originator); +} + #[tokio::test] async fn spawn_thread_subagent_uses_role_specific_nickname_candidates() { let mut harness = AgentControlHarness::new().await; diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 9e240e968..67a940dc9 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -186,6 +186,7 @@ struct ModelClientState { provider: SharedModelProvider, auth_env_telemetry: AuthEnvTelemetry, session_source: SessionSource, + originator: String, model_verbosity: Option, enable_request_compression: bool, include_timing_metrics: bool, @@ -387,6 +388,7 @@ impl ModelClient { thread_id: ThreadId, provider_info: ModelProviderInfo, session_source: SessionSource, + originator: String, model_verbosity: Option, enable_request_compression: bool, include_timing_metrics: bool, @@ -408,6 +410,7 @@ impl ModelClient { provider: model_provider, auth_env_telemetry, session_source, + originator, model_verbosity, enable_request_compression, include_timing_metrics, @@ -565,6 +568,7 @@ impl ModelClient { self.state.beta_features_header.as_deref(), turn_state.as_ref(), )); + add_originator_header(&mut extra_headers, self.state.originator.as_str()); extra_headers.extend(self.build_responses_compatibility_headers(responses_metadata)); extra_headers.extend(build_session_headers( Some(responses_metadata.session_id.to_string()), @@ -676,6 +680,7 @@ impl ModelClient { fn build_subagent_headers(&self) -> ApiHeaderMap { let mut extra_headers = ApiHeaderMap::new(); + add_originator_header(&mut extra_headers, self.state.originator.as_str()); if let Some(subagent) = subagent_header_value(&self.state.session_source) && let Ok(val) = HeaderValue::from_str(&subagent) { @@ -997,6 +1002,7 @@ impl ModelClient { self.state.beta_features_header.as_deref(), /*turn_state*/ None, ); + add_originator_header(&mut headers, self.state.originator.as_str()); if let Ok(header_value) = HeaderValue::from_str(&responses_metadata.thread_id) { headers.insert("x-client-request-id", header_value); } @@ -1064,6 +1070,7 @@ impl ModelClientSession { self.client.state.beta_features_header.as_deref(), Some(&self.turn_state), ); + add_originator_header(&mut headers, self.client.state.originator.as_str()); headers.extend( self.client .build_responses_compatibility_headers(responses_metadata), @@ -1791,6 +1798,22 @@ fn build_responses_headers( headers } +pub(crate) fn add_originator_header(headers: &mut ApiHeaderMap, originator: &str) { + let default_originator = codex_login::default_client::originator(); + if originator == default_originator.value.as_str() { + return; + } + + match HeaderValue::from_str(originator) { + Ok(header_value) => { + headers.insert("originator", header_value); + } + Err(err) => { + warn!("ignoring invalid thread originator header value: {err}"); + } + } +} + fn add_responses_lite_header(headers: &mut ApiHeaderMap, use_responses_lite: bool) { if use_responses_lite { headers.insert( diff --git a/codex-rs/core/src/client_tests.rs b/codex-rs/core/src/client_tests.rs index 9df7d4d01..8d58fec9b 100644 --- a/codex-rs/core/src/client_tests.rs +++ b/codex-rs/core/src/client_tests.rs @@ -75,6 +75,7 @@ fn test_model_client(session_source: SessionSource) -> ModelClient { thread_id, provider, session_source, + "test_originator".to_string(), /*model_verbosity*/ None, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, @@ -298,6 +299,10 @@ fn build_subagent_headers_sets_internal_memory_consolidation_label() { .get(X_OPENAI_SUBAGENT_HEADER) .and_then(|value| value.to_str().ok()); assert_eq!(value, Some("memory_consolidation")); + assert_eq!( + headers.get("originator"), + Some(&http::HeaderValue::from_static("test_originator")) + ); } #[test] @@ -606,6 +611,7 @@ fn model_client_with_counting_attestation( ThreadId::new(), provider, SessionSource::Exec, + "test_originator".to_string(), /*model_verbosity*/ None, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 9f5b867f7..640e1f8df 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -103,6 +103,7 @@ pub(crate) async fn run_codex_thread_interactive( forked_from_thread_id, parent_thread_id: Some(parent_session.thread_id), thread_source: Some(ThreadSource::Subagent), + originator: parent_ctx.originator.clone(), agent_control: parent_session.services.agent_control.clone(), dynamic_tools: Vec::new(), metrics_service_name: None, diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index a522cd62a..dccfd4a76 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -77,6 +77,7 @@ pub struct ThreadConfigSnapshot { pub forked_from_thread_id: Option, pub parent_thread_id: Option, pub thread_source: Option, + pub originator: String, } /// Explains why `CodexThread::try_start_turn_if_idle` rejected an automatic diff --git a/codex-rs/core/src/hook_runtime.rs b/codex-rs/core/src/hook_runtime.rs index 12627e2d5..342e4d4b8 100644 --- a/codex-rs/core/src/hook_runtime.rs +++ b/codex-rs/core/src/hook_runtime.rs @@ -685,6 +685,7 @@ fn hook_run_analytics_payload( .turn_id .clone() .unwrap_or_else(|| turn_context.sub_id.clone()), + turn_context.originator.clone(), ), HookRunFact { event_name: completed.run.event_name, diff --git a/codex-rs/core/src/mcp_tool_call.rs b/codex-rs/core/src/mcp_tool_call.rs index c7187c18f..085fd496d 100644 --- a/codex-rs/core/src/mcp_tool_call.rs +++ b/codex-rs/core/src/mcp_tool_call.rs @@ -971,6 +971,7 @@ async fn maybe_track_codex_app_used( turn_context.model_info.slug.clone(), sess.thread_id.to_string(), turn_context.sub_id.clone(), + turn_context.originator.clone(), ); sess.services.analytics_events_client.track_app_used( tracking, diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 667939f10..43ecc3e57 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -1,4 +1,5 @@ use crate::client::ModelClient; +use crate::client::add_originator_header; use crate::realtime_context::build_realtime_startup_context; use crate::realtime_context::truncate_realtime_text_to_token_budget; use crate::realtime_prompt::prepare_realtime_backend_prompt; @@ -775,6 +776,7 @@ async fn prepare_realtime_start( let session_config = build_realtime_session_config(sess, ¶ms, version, configured_voice).await?; let requested_realtime_session_id = session_config.session_id.clone(); + let originator = sess.originator().await; let extra_headers = match transport { ConversationStartTransport::Websocket => { let realtime_api_key = realtime_api_key(auth.as_ref(), &provider)?; @@ -782,6 +784,7 @@ async fn prepare_realtime_start( requested_realtime_session_id.as_deref(), Some(realtime_api_key.as_str()), version, + originator.as_str(), )? } ConversationStartTransport::Webrtc { .. } => { @@ -789,6 +792,7 @@ async fn prepare_realtime_start( requested_realtime_session_id.as_deref(), /*api_key*/ None, version, + originator.as_str(), )? } }; @@ -1171,6 +1175,7 @@ fn realtime_request_headers( realtime_session_id: Option<&str>, api_key: Option<&str>, version: RealtimeWsVersion, + originator: &str, ) -> CodexResult> { let mut headers = HeaderMap::new(); @@ -1191,6 +1196,8 @@ fn realtime_request_headers( headers.insert(AUTHORIZATION, auth_value); } + add_originator_header(&mut headers, originator); + Ok(Some(headers)) } diff --git a/codex-rs/core/src/realtime_conversation_tests.rs b/codex-rs/core/src/realtime_conversation_tests.rs index 55525774b..51425e4c3 100644 --- a/codex-rs/core/src/realtime_conversation_tests.rs +++ b/codex-rs/core/src/realtime_conversation_tests.rs @@ -149,10 +149,14 @@ async fn clears_active_handoff_explicitly() { #[test] fn uses_quicksilver_alpha_header_for_realtime_v1() { - let headers = - realtime_request_headers(Some("session_1"), Some("sk-test"), RealtimeWsVersion::V1) - .expect("headers") - .expect("headers"); + let headers = realtime_request_headers( + Some("session_1"), + Some("sk-test"), + RealtimeWsVersion::V1, + "codex_work_desktop", + ) + .expect("headers") + .expect("headers"); assert_eq!( headers @@ -164,10 +168,39 @@ fn uses_quicksilver_alpha_header_for_realtime_v1() { #[test] fn omits_quicksilver_alpha_header_for_realtime_v2() { - let headers = - realtime_request_headers(Some("session_1"), Some("sk-test"), RealtimeWsVersion::V2) - .expect("headers") - .expect("headers"); + let headers = realtime_request_headers( + Some("session_1"), + Some("sk-test"), + RealtimeWsVersion::V2, + "codex_work_desktop", + ) + .expect("headers") + .expect("headers"); assert!(headers.get("openai-alpha").is_none()); } + +#[test] +fn realtime_headers_include_only_non_default_originator() { + let default_originator = codex_login::default_client::originator(); + for (originator, expected_header) in [ + ("codex_work_desktop", Some("codex_work_desktop")), + (default_originator.value.as_str(), None), + ] { + let headers = realtime_request_headers( + Some("session_1"), + Some("sk-test"), + RealtimeWsVersion::V2, + originator, + ) + .expect("headers") + .expect("headers"); + + assert_eq!( + headers + .get("originator") + .and_then(|value| value.to_str().ok()), + expected_header + ); + } +} diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index d94ca7f34..9db1df044 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -73,7 +73,6 @@ use codex_hooks::HooksConfig; use codex_login::AuthManager; use codex_login::CodexAuth; use codex_login::auth_env_telemetry::collect_auth_env_telemetry; -use codex_login::default_client::originator; use codex_mcp::McpConnectionManager; use codex_mcp::McpResourceClient; use codex_mcp::McpRuntimeContext; @@ -433,6 +432,7 @@ pub(crate) struct CodexSpawnArgs { pub(crate) forked_from_thread_id: Option, pub(crate) parent_thread_id: Option, pub(crate) thread_source: Option, + pub(crate) originator: String, pub(crate) agent_control: AgentControl, pub(crate) dynamic_tools: Vec, pub(crate) metrics_service_name: Option, @@ -522,6 +522,7 @@ impl Codex { forked_from_thread_id, parent_thread_id, thread_source, + originator, agent_control, dynamic_tools, metrics_service_name, @@ -654,6 +655,7 @@ impl Codex { forked_from_thread_id, parent_thread_id, thread_source, + originator, dynamic_tools, user_shell_override, }; @@ -3913,7 +3915,7 @@ pub(crate) fn emit_subagent_session_started( forked_from_thread_id: thread_config .forked_from_thread_id .map(|thread_id| thread_id.to_string()), - product_client_id: client_name.clone(), + product_client_id: thread_config.originator.clone(), client_name, client_version, model: thread_config.model, diff --git a/codex-rs/core/src/session/review.rs b/codex-rs/core/src/session/review.rs index 8986ffd05..c55afbf14 100644 --- a/codex-rs/core/src/session/review.rs +++ b/codex-rs/core/src/session/review.rs @@ -118,6 +118,7 @@ pub(super) async fn spawn_review_thread( reasoning_summary, session_source, parent_thread_id: parent_turn_context.parent_thread_id, + originator: parent_turn_context.originator.clone(), environments: parent_turn_context.environments.clone(), available_models, unified_exec_shell_mode, diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index ee62155a8..af64e0bef 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -106,6 +106,8 @@ pub(crate) struct SessionConfiguration { pub(super) parent_thread_id: Option, /// Optional analytics source classification for this thread. pub(super) thread_source: Option, + /// Effective originator used for this thread's Responses requests and analytics events. + pub(super) originator: String, pub(super) dynamic_tools: Vec, pub(super) user_shell_override: Option, } @@ -198,6 +200,7 @@ impl SessionConfiguration { forked_from_thread_id: self.forked_from_thread_id, parent_thread_id: self.parent_thread_id, thread_source: self.thread_source.clone(), + originator: self.originator.clone(), } } @@ -473,6 +476,11 @@ impl Session { self.services.agent_control.session_id() } + pub(crate) async fn originator(&self) -> String { + let state = self.state.lock().await; + state.session_configuration.originator.clone() + } + #[instrument(name = "session_init", level = "info", skip_all)] #[allow(clippy::too_many_arguments)] pub(crate) async fn new( @@ -581,6 +589,7 @@ impl Session { parent_thread_id, source: session_source, thread_source: session_configuration.thread_source.clone(), + originator: session_configuration.originator.clone(), base_instructions: BaseInstructions { text: session_configuration.base_instructions.clone(), }, @@ -756,7 +765,7 @@ impl Session { let auth_mode = auth.map(CodexAuth::auth_mode).map(TelemetryAuthMode::from); let account_id = auth.and_then(CodexAuth::get_account_id); let account_email = auth.and_then(CodexAuth::get_account_email); - let originator = originator().value; + let originator = session_configuration.originator.clone(); let terminal_type = user_agent(); let session_model = session_configuration.collaboration_mode.model().to_string(); let auth_env_telemetry = collect_auth_env_telemetry( @@ -1057,6 +1066,7 @@ impl Session { thread_id, session_configuration.provider.clone(), session_configuration.session_source.clone(), + session_configuration.originator.clone(), config.model_verbosity, config.features.enabled(Feature::EnableRequestCompression), config.features.enabled(Feature::RuntimeMetrics), diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 1ebb069f0..685005b40 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -479,6 +479,7 @@ fn test_model_client_session() -> crate::client::ModelClientSession { thread_id, ModelProviderInfo::create_openai_provider(/* base_url */ /*base_url*/ None), codex_protocol::protocol::SessionSource::Exec, + "test_originator".to_string(), /*model_verbosity*/ None, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, @@ -3485,6 +3486,7 @@ async fn set_rate_limits_retains_previous_credits() { forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, }; @@ -3592,6 +3594,7 @@ async fn set_rate_limits_updates_plan_type_when_present() { forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, }; @@ -3841,6 +3844,7 @@ async fn attach_thread_persistence(session: &mut Session) -> PathBuf { parent_thread_id: None, source: SessionSource::Exec, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, @@ -4122,13 +4126,14 @@ pub(crate) async fn make_session_configuration_for_tests() -> SessionConfigurati forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, } } #[tokio::test] -async fn emit_subagent_session_started_includes_fork_lineage_from_session_configuration() { +async fn emit_subagent_session_started_includes_fork_lineage_and_originator() { use wiremock::Mock; use wiremock::MockServer; use wiremock::ResponseTemplate; @@ -4204,6 +4209,10 @@ async fn emit_subagent_session_started_includes_fork_lineage_from_session_config event["event_params"]["forked_from_thread_id"], forked_from_thread_id.to_string() ); + assert_eq!( + event["event_params"]["app_server_client"]["product_client_id"], + "test_originator" + ); } async fn resolved_environments_for_configuration( @@ -4989,6 +4998,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_packaged_zsh() { forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, }; @@ -5118,6 +5128,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, }; @@ -5220,6 +5231,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { thread_id, session_configuration.provider.clone(), session_configuration.session_source.clone(), + session_configuration.originator.clone(), config.model_verbosity, config.features.enabled(Feature::EnableRequestCompression), config.features.enabled(Feature::RuntimeMetrics), @@ -5364,6 +5376,7 @@ async fn make_session_with_config_and_rx( forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, }; @@ -5471,6 +5484,7 @@ async fn make_session_with_history_source_and_agent_control_and_rx( forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, }; @@ -6713,6 +6727,7 @@ async fn shutdown_complete_does_not_append_to_thread_store_after_shutdown() { parent_thread_id: None, source: SessionSource::Exec, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, @@ -7194,6 +7209,7 @@ where forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools, user_shell_override: None, }; @@ -7295,6 +7311,7 @@ where thread_id, session_configuration.provider.clone(), session_configuration.session_source.clone(), + session_configuration.originator.clone(), config.model_verbosity, config.features.enabled(Feature::EnableRequestCompression), config.features.enabled(Feature::RuntimeMetrics), diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index e61578929..6f23cb096 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -733,6 +733,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), agent_control: AgentControl::default(), dynamic_tools: Vec::new(), metrics_service_name: None, diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 397dd17b6..42c072dd0 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -526,6 +526,7 @@ async fn build_skills_and_plugins( turn_context.model_info.slug.clone(), sess.thread_id.to_string(), turn_context.sub_id.clone(), + turn_context.originator.clone(), ); let loaded_plugins = sess .services diff --git a/codex-rs/core/src/session/turn_context.rs b/codex-rs/core/src/session/turn_context.rs index fdd9df5d5..da526d491 100644 --- a/codex-rs/core/src/session/turn_context.rs +++ b/codex-rs/core/src/session/turn_context.rs @@ -113,6 +113,7 @@ pub struct TurnContext { pub(crate) reasoning_summary: ReasoningSummaryConfig, pub(crate) session_source: SessionSource, pub(crate) parent_thread_id: Option, + pub(crate) originator: String, pub(crate) environments: TurnEnvironmentSnapshot, /// The session's absolute working directory. All relative paths provided /// by the model as well as sandbox policies are resolved against this path @@ -265,6 +266,7 @@ impl TurnContext { reasoning_summary: self.reasoning_summary, session_source: self.session_source.clone(), parent_thread_id: self.parent_thread_id, + originator: self.originator.clone(), environments: self.environments.clone(), #[allow(deprecated)] cwd: self.cwd.clone(), @@ -548,6 +550,7 @@ impl Session { reasoning_summary, session_source, parent_thread_id: session_configuration.parent_thread_id, + originator: session_configuration.originator.clone(), environments, #[allow(deprecated)] cwd, diff --git a/codex-rs/core/src/skills.rs b/codex-rs/core/src/skills.rs index 2c1ba8245..7b5d87b26 100644 --- a/codex-rs/core/src/skills.rs +++ b/codex-rs/core/src/skills.rs @@ -102,6 +102,7 @@ pub(crate) async fn maybe_emit_implicit_skill_invocation( turn_context.model_info.slug.clone(), sess.thread_id.to_string(), turn_context.sub_id.clone(), + turn_context.originator.clone(), ), vec![invocation], ); diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 816df89c3..75a02d999 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -29,6 +29,8 @@ use codex_extension_api::empty_extension_registry; use codex_features::Feature; use codex_login::AuthManager; use codex_login::CodexAuth; +use codex_login::default_client::CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR; +use codex_login::default_client::originator; use codex_model_provider::create_model_provider; use codex_model_provider_info::ModelProviderInfo; use codex_model_provider_info::OPENAI_PROVIDER_ID; @@ -192,6 +194,28 @@ pub struct StartThreadOptions { pub supports_openai_form_elicitation: bool, } +fn originator_from_service_name(service_name: Option<&str>) -> Option { + let service_name = service_name?.trim(); + if service_name.eq_ignore_ascii_case("codex_work_desktop") { + return Some("codex_work_desktop".to_string()); + } + None +} + +fn effective_originator_value( + metrics_service_name: Option<&str>, + env_originator: Option, + persisted_originator: Option, + inherited_originator: Option, + default_originator: String, +) -> String { + originator_from_service_name(metrics_service_name) + .or(persisted_originator) + .or(inherited_originator) + .or(env_originator) + .unwrap_or(default_originator) +} + pub(crate) struct ResumeThreadWithHistoryOptions { pub(crate) config: Config, pub(crate) initial_history: InitialHistory, @@ -1220,6 +1244,64 @@ impl ThreadManagerState { } } + async fn inherited_originator_for_parent_thread( + &self, + session_source: &SessionSource, + parent_thread_id: Option, + forked_from_thread_id: Option, + ) -> Option { + let inherited_thread_id = match session_source { + SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id, .. + }) => Some(*parent_thread_id), + _ => parent_thread_id.or(forked_from_thread_id), + }; + let thread = self.get_thread(inherited_thread_id?).await.ok()?; + let originator = thread.config_snapshot().await.originator; + (!originator.is_empty()).then_some(originator) + } + + async fn effective_originator( + &self, + initial_history: &InitialHistory, + metrics_service_name: Option<&str>, + session_source: &SessionSource, + parent_thread_id: Option, + forked_from_thread_id: Option, + ) -> String { + let persisted_originator = initial_history.get_session_originator(); + let inherited_originator = match initial_history { + InitialHistory::New | InitialHistory::Cleared => { + self.inherited_originator_for_parent_thread( + session_source, + parent_thread_id, + forked_from_thread_id, + ) + .await + } + InitialHistory::Forked(_) if persisted_originator.is_none() => { + self.inherited_originator_for_parent_thread( + session_source, + parent_thread_id, + forked_from_thread_id, + ) + .await + } + InitialHistory::Resumed(_) | InitialHistory::Forked(_) => None, + }; + + let env_originator = std::env::var(CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR) + .is_ok() + .then(|| originator().value); + effective_originator_value( + metrics_service_name, + env_originator, + persisted_originator, + inherited_originator, + originator().value, + ) + } + /// Spawn a new thread with no history using a provided config. pub(crate) async fn spawn_new_thread( &self, @@ -1466,6 +1548,15 @@ impl ThreadManagerState { forked_from_thread_id, ) .await; + let originator = self + .effective_originator( + &initial_history, + metrics_service_name.as_deref(), + &session_source, + parent_thread_id, + forked_from_thread_id, + ) + .await; let CodexSpawnOk { codex, thread_id, .. } = Box::pin(Codex::spawn(CodexSpawnArgs { @@ -1484,6 +1575,7 @@ impl ThreadManagerState { forked_from_thread_id, parent_thread_id, thread_source, + originator, agent_control, dynamic_tools, metrics_service_name, diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 990c5ca5d..e6b222f6b 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -68,6 +68,41 @@ fn developer_interrupted_marker() -> ResponseItem { .expect("developer interrupted marker should be enabled") } +#[test] +fn effective_originator_prefers_thread_scoped_sources_before_env_originator() { + for (metrics_service_name, persisted_originator, inherited_originator, expected_originator) in [ + ( + Some("codex_work_desktop"), + Some("persisted_originator"), + Some("inherited_originator"), + "codex_work_desktop", + ), + ( + None, + Some("persisted_originator"), + Some("inherited_originator"), + "persisted_originator", + ), + ( + None, + None, + Some("inherited_originator"), + "inherited_originator", + ), + ] { + assert_eq!( + effective_originator_value( + metrics_service_name, + Some("Codex Desktop".to_string()), + persisted_originator.map(str::to_string), + inherited_originator.map(str::to_string), + "codex_cli_rs".to_string(), + ), + expected_originator + ); + } +} + #[test] fn truncates_before_requested_user_message() { let items = [ diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index 2f323e0dd..7e213991c 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -123,6 +123,7 @@ async fn responses_stream_includes_subagent_header_on_review() { thread_id, provider.clone(), session_source.clone(), + "test_originator".to_string(), config.model_verbosity, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, @@ -255,6 +256,7 @@ async fn responses_stream_includes_subagent_header_on_other() { thread_id, provider.clone(), session_source.clone(), + "test_originator".to_string(), config.model_verbosity, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, @@ -373,6 +375,7 @@ async fn responses_respects_model_info_overrides_from_config() { thread_id, provider.clone(), session_source.clone(), + "test_originator".to_string(), config.model_verbosity, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index aa2026be1..98303d09b 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -1223,6 +1223,7 @@ async fn send_provider_auth_request(server: &MockServer, auth: ModelProviderAuth thread_id, provider, SessionSource::Exec, + "test_originator".to_string(), config.model_verbosity, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, @@ -2835,6 +2836,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { thread_id, provider.clone(), SessionSource::Exec, + "test_originator".to_string(), config.model_verbosity, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index d35714ea4..b1b731a08 100755 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -2192,6 +2192,7 @@ async fn websocket_harness_with_provider_options( thread_id, provider.clone(), SessionSource::Exec, + "test_originator".to_string(), config.model_verbosity, /*enable_request_compression*/ false, runtime_metrics_enabled, diff --git a/codex-rs/core/tests/suite/rollout_list_find.rs b/codex-rs/core/tests/suite/rollout_list_find.rs index 1461c1af3..c8b10b24e 100644 --- a/codex-rs/core/tests/suite/rollout_list_find.rs +++ b/codex-rs/core/tests/suite/rollout_list_find.rs @@ -188,6 +188,7 @@ async fn find_locates_rollout_file_written_by_recorder() -> std::io::Result<()> /*parent_thread_id*/ None, SessionSource::Exec, /*thread_source*/ None, + "test_originator".to_string(), BaseInstructions::default(), Vec::new(), ), diff --git a/codex-rs/memories/write/src/runtime.rs b/codex-rs/memories/write/src/runtime.rs index 0835e1552..bd86d8313 100644 --- a/codex-rs/memories/write/src/runtime.rs +++ b/codex-rs/memories/write/src/runtime.rs @@ -75,6 +75,38 @@ pub(crate) struct MemoryStartupContext { session_telemetry: SessionTelemetry, } +fn build_session_telemetry( + auth_manager: &AuthManager, + thread_id: ThreadId, + config: &Config, + source: SessionSource, + model: &str, + originator: String, +) -> SessionTelemetry { + let auth = auth_manager.auth_cached(); + let auth = auth.as_ref(); + let auth_mode = auth.map(CodexAuth::auth_mode).map(TelemetryAuthMode::from); + let account_id = auth.and_then(CodexAuth::get_account_id); + let account_email = auth.and_then(CodexAuth::get_account_email); + let auth_env_telemetry = collect_auth_env_telemetry( + &config.model_provider, + auth_manager.codex_api_key_env_enabled(), + ); + SessionTelemetry::new( + thread_id, + model, + model, + account_id, + account_email, + auth_mode, + originator, + config.otel.log_user_prompt, + user_agent(), + source, + ) + .with_auth_env(auth_env_telemetry.to_otel_metadata()) +} + impl MemoryStartupContext { pub(crate) fn new( thread_manager: Arc, @@ -129,29 +161,15 @@ impl MemoryStartupContext { source: SessionSource, provider: SharedModelProvider, ) -> Self { - let auth = auth_manager.auth_cached(); - let auth = auth.as_ref(); - let auth_mode = auth.map(CodexAuth::auth_mode).map(TelemetryAuthMode::from); - let account_id = auth.and_then(CodexAuth::get_account_id); - let account_email = auth.and_then(CodexAuth::get_account_email); let model = config.model.as_deref().unwrap_or("unknown"); - let auth_env_telemetry = collect_auth_env_telemetry( - &config.model_provider, - auth_manager.codex_api_key_env_enabled(), - ); - let session_telemetry = SessionTelemetry::new( + let session_telemetry = build_session_telemetry( + &auth_manager, thread_id, - model, - model, - account_id, - account_email, - auth_mode, - originator().value, - config.otel.log_user_prompt, - user_agent(), + config, source, - ) - .with_auth_env(auth_env_telemetry.to_otel_metadata()); + model, + originator().value, + ); Self { thread_id, @@ -205,10 +223,14 @@ impl MemoryStartupContext { StageOneRequestContext { model_info, - session_telemetry: self - .session_telemetry - .clone() - .with_model(model_name, model_name), + session_telemetry: build_session_telemetry( + &self.auth_manager, + self.thread_id, + config, + config_snapshot.session_source, + model_name, + config_snapshot.originator, + ), reasoning_effort: Some(reasoning_effort), reasoning_summary, service_tier: config_snapshot.service_tier, @@ -231,6 +253,7 @@ impl MemoryStartupContext { self.thread_id, config.model_provider.clone(), session_source.clone(), + config_snapshot.originator, config.model_verbosity, config.features.enabled(Feature::EnableRequestCompression), config.features.enabled(Feature::RuntimeMetrics), diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index b625392e5..a5ff55d47 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -2609,11 +2609,33 @@ impl InitialHistory { .and_then(|meta| meta.thread_source.clone()) } + pub fn get_session_originator(&self) -> Option { + self.get_session_meta() + .map(|meta| meta.originator.clone()) + .filter(|originator| !originator.is_empty()) + } + pub fn get_resumed_parent_thread_id(&self) -> Option { self.get_resumed_session_meta() .and_then(|meta| meta.parent_thread_id) } + fn get_session_meta(&self) -> Option<&SessionMeta> { + match self { + InitialHistory::New | InitialHistory::Cleared => None, + InitialHistory::Resumed(resumed) => { + resumed.history.iter().find_map(|item| match item { + RolloutItem::SessionMeta(meta_line) => Some(&meta_line.meta), + _ => None, + }) + } + InitialHistory::Forked(items) => items.iter().find_map(|item| match item { + RolloutItem::SessionMeta(meta_line) => Some(&meta_line.meta), + _ => None, + }), + } + } + fn get_resumed_session_meta(&self) -> Option<&SessionMeta> { match self { InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => None, diff --git a/codex-rs/rollout/Cargo.toml b/codex-rs/rollout/Cargo.toml index edc905968..838081bb5 100644 --- a/codex-rs/rollout/Cargo.toml +++ b/codex-rs/rollout/Cargo.toml @@ -17,7 +17,6 @@ anyhow = { workspace = true } chrono = { workspace = true, features = ["serde"] } codex-file-search = { workspace = true } codex-git-utils = { workspace = true } -codex-login = { workspace = true } codex-otel = { workspace = true } codex-protocol = { workspace = true } codex-state = { workspace = true } diff --git a/codex-rs/rollout/src/lib.rs b/codex-rs/rollout/src/lib.rs index 804b8ac10..3c103750c 100644 --- a/codex-rs/rollout/src/lib.rs +++ b/codex-rs/rollout/src/lib.rs @@ -16,10 +16,6 @@ pub(crate) mod session_index; mod sqlite_metrics; pub mod state_db; -pub(crate) mod default_client { - pub use codex_login::default_client::*; -} - pub(crate) use codex_protocol::protocol; pub const SESSIONS_SUBDIR: &str = "sessions"; diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 53bf115d8..dad7f8bc3 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -46,7 +46,6 @@ use super::list::parse_timestamp_uuid_from_filename; use super::metadata; use super::session_index::find_thread_names_by_ids; use crate::config::RolloutConfigView; -use crate::default_client::originator; use crate::state_db; use crate::state_db::StateDbHandle; use codex_git_utils::collect_git_info; @@ -89,6 +88,7 @@ pub enum RolloutRecorderParams { parent_thread_id: Option, source: Box, thread_source: Option, + originator: String, base_instructions: BaseInstructions, dynamic_tools: Vec, multi_agent_version: Option, @@ -161,12 +161,14 @@ fn clone_io_error(err: &IoError) -> IoError { } impl RolloutRecorderParams { + #[allow(clippy::too_many_arguments)] pub fn new( conversation_id: ThreadId, forked_from_id: Option, parent_thread_id: Option, source: SessionSource, thread_source: Option, + originator: String, base_instructions: BaseInstructions, dynamic_tools: Vec, ) -> Self { @@ -177,6 +179,7 @@ impl RolloutRecorderParams { parent_thread_id, source: Box::new(source), thread_source, + originator, base_instructions, dynamic_tools, multi_agent_version: None, @@ -726,6 +729,7 @@ impl RolloutRecorder { parent_thread_id, source, thread_source, + originator, base_instructions, dynamic_tools, multi_agent_version, @@ -751,7 +755,7 @@ impl RolloutRecorder { parent_thread_id, timestamp, cwd: config.cwd().to_path_buf(), - originator: originator().value, + originator, cli_version: env!("CARGO_PKG_VERSION").to_string(), agent_nickname: source.get_nickname(), agent_role: source.get_agent_role(), diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index 23618cb81..4b72ff767 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -385,6 +385,7 @@ async fn recorder_materializes_on_flush_with_pending_items() -> std::io::Result< /*parent_thread_id*/ None, SessionSource::Exec, /*thread_source*/ None, + "test_originator".to_string(), BaseInstructions::default(), Vec::new(), ) @@ -477,6 +478,7 @@ async fn persist_reports_filesystem_error_and_retries_buffered_items() -> std::i /*parent_thread_id*/ None, SessionSource::Exec, /*thread_source*/ None, + "test_originator".to_string(), BaseInstructions::default(), Vec::new(), ), diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index d7d85c3cf..8cd801c9c 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -118,6 +118,7 @@ mod tests { parent_thread_id, source: SessionSource::Exec, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, @@ -242,6 +243,7 @@ impl InMemoryThreadStore { agent_nickname: params.source.get_nickname(), agent_role: params.source.get_agent_role(), agent_path: params.source.get_agent_path().map(Into::into), + originator: params.originator.clone(), source: params.source.clone(), thread_source: params.thread_source.clone(), model_provider: Some(params.metadata.model_provider.clone()), diff --git a/codex-rs/thread-store/src/local/create_thread.rs b/codex-rs/thread-store/src/local/create_thread.rs index 36c3cdc13..eba6a5a6a 100644 --- a/codex-rs/thread-store/src/local/create_thread.rs +++ b/codex-rs/thread-store/src/local/create_thread.rs @@ -33,6 +33,7 @@ pub(super) async fn create_thread( params.parent_thread_id, params.source, params.thread_source, + params.originator, params.base_instructions, params.dynamic_tools, ) diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs index d0a4d6518..51e622fc5 100644 --- a/codex-rs/thread-store/src/local/mod.rs +++ b/codex-rs/thread-store/src/local/mod.rs @@ -1130,6 +1130,7 @@ mod tests { parent_thread_id: None, source: SessionSource::Exec, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index c18f39882..feea814f9 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -79,6 +79,8 @@ pub struct CreateThreadParams { pub source: SessionSource, /// Optional analytics source classification for this thread. pub thread_source: Option, + /// Effective originator used for this thread's Responses requests and analytics events. + pub originator: String, /// Base instructions persisted in session metadata. pub base_instructions: BaseInstructions, /// Dynamic tools available to the thread at startup.