From 1acb722e8afeba0cd54c9019f140830d05fdf5e7 Mon Sep 17 00:00:00 2001 From: alexsong-oai Date: Tue, 23 Jun 2026 17:23:38 -0700 Subject: [PATCH] Support thread-level originator overrides (#29477) ## Why Work(TPP) threads can be launched from the Desktop app, but if they all keep the Desktop app's default originator then downstream attribution cannot distinguish local Work launches from cloud-backed Work launches. `thread/start.serviceName` already carries that launch signal, while `SessionMeta.originator` is the durable thread-level value that survives resume and fork. This change converts the Desktop Work service names into an effective originator at thread creation time, persists that originator with the thread, and keeps using it for later model requests and memory writes. ## What changed - Map `CODEX_WORK_LOCAL` and `CODEX_WORK_CLOUD` service names to per-thread originators, while preserving `CODEX_INTERNAL_ORIGINATOR_OVERRIDE` as the highest-precedence override. - Persist the effective originator in `SessionMeta.originator`, read it back on resume/fork, and inherit the parent originator for subagent spawns when there is no persisted session metadata. - Handle truncated `SpawnAgentForkMode::LastNTurns` forks by falling back to the live parent originator when the forked history no longer includes `SessionMeta`. - Thread the per-thread originator through Responses headers, websocket/compaction request paths, thread-store creation, rollout metadata, and memory stage-one telemetry. ## Verification - `just test -p codex-core agent::control::tests::spawn_thread_subagent_inherits_parent_originator_without_fork agent::control::tests::spawn_thread_subagent_fork_last_n_turns_inherits_parent_originator_without_session_meta thread_manager::tests::originator_override_precedes_service_name_remapping` - `just test -p codex-core agent::control::tests::resume_thread_subagent_restores_stored_metadata_and_effective_multi_agent_mode` - `just test -p codex-memories-write` - `just fix -p codex-core -p codex-memories-write` - `git diff --check` --- codex-rs/Cargo.lock | 1 - .../analytics/src/analytics_client_tests.rs | 116 +++++++----------- codex-rs/analytics/src/events.rs | 18 ++- codex-rs/analytics/src/facts.rs | 3 + codex-rs/analytics/src/reducer.rs | 2 +- .../external_agent_session_import.rs | 1 + .../thread_processor_tests.rs | 1 + .../tests/suite/conversation_summary.rs | 1 + .../tests/suite/v2/remote_thread_store.rs | 1 + .../app-server/tests/suite/v2/thread_read.rs | 1 + .../tests/suite/v2/thread_unarchive.rs | 1 + codex-rs/core/src/agent/control_tests.rs | 108 ++++++++++++++++ codex-rs/core/src/client.rs | 23 ++++ codex-rs/core/src/client_tests.rs | 6 + codex-rs/core/src/codex_delegate.rs | 1 + codex-rs/core/src/codex_thread.rs | 1 + codex-rs/core/src/hook_runtime.rs | 1 + codex-rs/core/src/mcp_tool_call.rs | 1 + codex-rs/core/src/realtime_conversation.rs | 7 ++ .../core/src/realtime_conversation_tests.rs | 49 ++++++-- codex-rs/core/src/session/mod.rs | 6 +- codex-rs/core/src/session/review.rs | 1 + codex-rs/core/src/session/session.rs | 12 +- codex-rs/core/src/session/tests.rs | 19 ++- .../core/src/session/tests/guardian_tests.rs | 1 + codex-rs/core/src/session/turn.rs | 1 + codex-rs/core/src/session/turn_context.rs | 3 + codex-rs/core/src/skills.rs | 1 + codex-rs/core/src/thread_manager.rs | 92 ++++++++++++++ codex-rs/core/src/thread_manager_tests.rs | 35 ++++++ codex-rs/core/tests/responses_headers.rs | 3 + codex-rs/core/tests/suite/client.rs | 2 + .../core/tests/suite/client_websockets.rs | 1 + .../core/tests/suite/rollout_list_find.rs | 1 + codex-rs/memories/write/src/runtime.rs | 71 +++++++---- codex-rs/protocol/src/protocol.rs | 22 ++++ codex-rs/rollout/Cargo.toml | 1 - codex-rs/rollout/src/lib.rs | 4 - codex-rs/rollout/src/recorder.rs | 8 +- codex-rs/rollout/src/recorder_tests.rs | 2 + codex-rs/thread-store/src/in_memory.rs | 2 + .../thread-store/src/local/create_thread.rs | 1 + codex-rs/thread-store/src/local/mod.rs | 1 + codex-rs/thread-store/src/types.rs | 2 + 44 files changed, 513 insertions(+), 122 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 6b5b5afdf..5a32a8e18 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -3776,7 +3776,6 @@ dependencies = [ "chrono", "codex-file-search", "codex-git-utils", - "codex-login", "codex-otel", "codex-protocol", "codex-state", diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 38d953961..9f8df04b4 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -168,6 +168,17 @@ use std::sync::Arc; use std::sync::Mutex; use tokio::sync::mpsc; +const TEST_PRODUCT_CLIENT_ID: &str = "codex_work_desktop"; + +fn test_tracking_context(thread_id: &str, turn_id: &str) -> TrackEventsContext { + TrackEventsContext { + model_slug: "gpt-5".to_string(), + thread_id: thread_id.to_string(), + turn_id: turn_id.to_string(), + product_client_id: TEST_PRODUCT_CLIENT_ID.to_string(), + } +} + fn sample_thread_with_metadata( thread_id: &str, ephemeral: bool, @@ -1024,11 +1035,7 @@ fn normalize_path_for_skill_id_repo_root_not_in_skill_path_uses_absolute_path() #[test] fn app_mentioned_event_serializes_expected_shape() { - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; + let tracking = test_tracking_context("thread-1", "turn-1"); let event = TrackEventRequest::AppMentioned(CodexAppMentionedEventRequest { event_type: "codex_app_mentioned", event_params: codex_app_metadata( @@ -1052,7 +1059,7 @@ fn app_mentioned_event_serializes_expected_shape() { "thread_id": "thread-1", "turn_id": "turn-1", "app_name": "Calendar", - "product_client_id": originator().value, + "product_client_id": TEST_PRODUCT_CLIENT_ID, "invoke_type": "explicit", "model_slug": "gpt-5" } @@ -1062,11 +1069,7 @@ fn app_mentioned_event_serializes_expected_shape() { #[test] fn app_used_event_serializes_expected_shape() { - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-2".to_string(), - turn_id: "turn-2".to_string(), - }; + let tracking = test_tracking_context("thread-2", "turn-2"); let event = TrackEventRequest::AppUsed(CodexAppUsedEventRequest { event_type: "codex_app_used", event_params: codex_app_metadata( @@ -1090,7 +1093,7 @@ fn app_used_event_serializes_expected_shape() { "thread_id": "thread-2", "turn_id": "turn-2", "app_name": "Google Drive", - "product_client_id": originator().value, + "product_client_id": TEST_PRODUCT_CLIENT_ID, "invoke_type": "implicit", "model_slug": "gpt-5" } @@ -1381,16 +1384,8 @@ fn app_used_dedupe_is_keyed_by_turn_and_connector() { invocation_type: Some(InvocationType::Implicit), }; - let turn_1 = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; - let turn_2 = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-2".to_string(), - }; + let turn_1 = test_tracking_context("thread-1", "turn-1"); + let turn_2 = test_tracking_context("thread-1", "turn-2"); assert_eq!(queue.should_enqueue_app_used(&turn_1, &app), true); assert_eq!(queue.should_enqueue_app_used(&turn_1, &app), false); @@ -3009,11 +3004,7 @@ async fn subagent_tool_items_inherit_parent_connection_metadata() { #[test] fn plugin_used_event_serializes_expected_shape() { - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-3".to_string(), - turn_id: "turn-3".to_string(), - }; + let tracking = test_tracking_context("thread-3", "turn-3"); let event = TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest { event_type: "codex_plugin_used", event_params: codex_plugin_used_metadata(&tracking, sample_plugin_metadata()), @@ -3033,7 +3024,7 @@ fn plugin_used_event_serializes_expected_shape() { "has_skills": true, "mcp_server_count": 2, "connector_ids": ["calendar", "drive"], - "product_client_id": originator().value, + "product_client_id": TEST_PRODUCT_CLIENT_ID, "mcp_server_names": ["mcp-1", "mcp-2"], "thread_id": "thread-3", "turn_id": "turn-3", @@ -3132,11 +3123,7 @@ fn plugin_management_event_keeps_plugin_id_local_when_remote_id_exists() { #[test] fn hook_run_event_serializes_expected_shape() { - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-3".to_string(), - turn_id: "turn-3".to_string(), - }; + let tracking = test_tracking_context("thread-3", "turn-3"); let event = TrackEventRequest::HookRun(CodexHookRunEventRequest { event_type: "codex_hook_run", event_params: codex_hook_run_metadata( @@ -3158,6 +3145,7 @@ fn hook_run_event_serializes_expected_shape() { "event_params": { "thread_id": "thread-3", "turn_id": "turn-3", + "product_client_id": TEST_PRODUCT_CLIENT_ID, "model_slug": "gpt-5", "hook_name": "PreToolUse", "hook_source": "user", @@ -3169,11 +3157,7 @@ fn hook_run_event_serializes_expected_shape() { #[test] fn hook_run_metadata_maps_sources_and_statuses() { - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; + let tracking = test_tracking_context("thread-1", "turn-1"); let system = serde_json::to_value(codex_hook_run_metadata( &tracking, @@ -3224,11 +3208,7 @@ fn hook_run_metadata_maps_sources_and_statuses() { #[test] fn hook_run_metadata_maps_stopped_status() { - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; + let tracking = test_tracking_context("thread-1", "turn-1"); let stopped = serde_json::to_value(codex_hook_run_metadata( &tracking, @@ -3254,16 +3234,8 @@ fn plugin_used_dedupe_is_keyed_by_turn_and_plugin() { }; let plugin = sample_plugin_metadata(); - let turn_1 = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; - let turn_2 = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-2".to_string(), - }; + let turn_1 = test_tracking_context("thread-1", "turn-1"); + let turn_2 = test_tracking_context("thread-1", "turn-2"); assert_eq!(queue.should_enqueue_plugin_used(&turn_1, &plugin), true); assert_eq!(queue.should_enqueue_plugin_used(&turn_1, &plugin), false); @@ -3274,11 +3246,7 @@ fn plugin_used_dedupe_is_keyed_by_turn_and_plugin() { async fn reducer_ingests_skill_invoked_fact() { let mut reducer = AnalyticsReducer::default(); let mut events = Vec::new(); - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; + let tracking = test_tracking_context("thread-1", "turn-1"); let skill_path = PathBuf::from("/Users/abc/.codex/skills/doc/SKILL.md"); let expected_skill_id = skill_id_for_local_skill( /*repo_url*/ None, @@ -3311,7 +3279,7 @@ async fn reducer_ingests_skill_invoked_fact() { "skill_id": expected_skill_id, "skill_name": "doc", "event_params": { - "product_client_id": originator().value, + "product_client_id": TEST_PRODUCT_CLIENT_ID, "skill_scope": "user", "plugin_id": null, "repo_url": null, @@ -3328,11 +3296,7 @@ async fn reducer_ingests_skill_invoked_fact() { async fn reducer_includes_plugin_id_for_plugin_skill_invocations() { let mut reducer = AnalyticsReducer::default(); let mut events = Vec::new(); - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; + let tracking = test_tracking_context("thread-1", "turn-1"); let skill_path = PathBuf::from("/Users/abc/.codex/plugins/cache/test/sample/skills/doc/SKILL.md"); @@ -3367,11 +3331,7 @@ async fn reducer_ingests_hook_run_fact() { reducer .ingest( AnalyticsFact::Custom(CustomAnalyticsFact::HookRun(HookRunInput { - tracking: TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }, + tracking: test_tracking_context("thread-1", "turn-1"), hook: HookRunFact { event_name: HookEventName::PostToolUse, hook_source: HookSource::Unknown, @@ -3394,11 +3354,7 @@ async fn reducer_ingests_hook_run_fact() { async fn reducer_ingests_app_and_plugin_facts() { let mut reducer = AnalyticsReducer::default(); let mut events = Vec::new(); - let tracking = TrackEventsContext { - model_slug: "gpt-5".to_string(), - thread_id: "thread-1".to_string(), - turn_id: "turn-1".to_string(), - }; + let tracking = test_tracking_context("thread-1", "turn-1"); reducer .ingest( @@ -3441,6 +3397,18 @@ async fn reducer_ingests_app_and_plugin_facts() { assert_eq!(payload[0]["event_type"], "codex_app_mentioned"); assert_eq!(payload[1]["event_type"], "codex_app_used"); assert_eq!(payload[2]["event_type"], "codex_plugin_used"); + assert_eq!( + payload[0]["event_params"]["product_client_id"], + TEST_PRODUCT_CLIENT_ID + ); + assert_eq!( + payload[1]["event_params"]["product_client_id"], + TEST_PRODUCT_CLIENT_ID + ); + assert_eq!( + payload[2]["event_params"]["product_client_id"], + TEST_PRODUCT_CLIENT_ID + ); } #[tokio::test] diff --git a/codex-rs/analytics/src/events.rs b/codex-rs/analytics/src/events.rs index 2fe465c9b..1365e5406 100644 --- a/codex-rs/analytics/src/events.rs +++ b/codex-rs/analytics/src/events.rs @@ -773,6 +773,7 @@ pub(crate) struct CodexAppUsedEventRequest { pub(crate) struct CodexHookRunMetadata { pub(crate) thread_id: Option, pub(crate) turn_id: Option, + pub(crate) product_client_id: Option, pub(crate) model_slug: Option, pub(crate) hook_name: Option, pub(crate) hook_source: Option<&'static str>, @@ -1030,13 +1031,20 @@ pub(crate) fn codex_app_metadata( thread_id: Some(tracking.thread_id.clone()), turn_id: Some(tracking.turn_id.clone()), app_name: app.app_name, - product_client_id: Some(originator().value), + product_client_id: Some(tracking.product_client_id.clone()), invoke_type: app.invocation_type, model_slug: Some(tracking.model_slug.clone()), } } pub(crate) fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPluginMetadata { + codex_plugin_metadata_with_product_client_id(plugin, originator().value) +} + +fn codex_plugin_metadata_with_product_client_id( + plugin: PluginTelemetryMetadata, + product_client_id: String, +) -> CodexPluginMetadata { let PluginTelemetryMetadata { plugin_id, remote_plugin_id, @@ -1062,7 +1070,7 @@ pub(crate) fn codex_plugin_metadata(plugin: PluginTelemetryMetadata) -> CodexPlu .map(|connector_id| connector_id.0) .collect() }), - product_client_id: Some(originator().value), + product_client_id: Some(product_client_id), } } @@ -1139,7 +1147,10 @@ pub(crate) fn codex_plugin_used_metadata( .as_ref() .map(|summary| summary.mcp_server_names.clone()); CodexPluginUsedMetadata { - plugin: codex_plugin_metadata(plugin), + plugin: codex_plugin_metadata_with_product_client_id( + plugin, + tracking.product_client_id.clone(), + ), mcp_server_names, thread_id: Some(tracking.thread_id.clone()), turn_id: Some(tracking.turn_id.clone()), @@ -1154,6 +1165,7 @@ pub(crate) fn codex_hook_run_metadata( CodexHookRunMetadata { thread_id: Some(tracking.thread_id.clone()), turn_id: Some(tracking.turn_id.clone()), + product_client_id: Some(tracking.product_client_id.clone()), model_slug: Some(tracking.model_slug.clone()), hook_name: Some(analytics_hook_event_name(hook.event_name).to_owned()), hook_source: Some(analytics_hook_source(hook.hook_source)), diff --git a/codex-rs/analytics/src/facts.rs b/codex-rs/analytics/src/facts.rs index 026c2e876..4505c0c69 100644 --- a/codex-rs/analytics/src/facts.rs +++ b/codex-rs/analytics/src/facts.rs @@ -41,17 +41,20 @@ pub struct TrackEventsContext { pub model_slug: String, pub thread_id: String, pub turn_id: String, + pub product_client_id: String, } pub fn build_track_events_context( model_slug: String, thread_id: String, turn_id: String, + product_client_id: String, ) -> TrackEventsContext { TrackEventsContext { model_slug, thread_id, turn_id, + product_client_id, } } diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index b647984df..666e756d3 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -729,7 +729,7 @@ impl AnalyticsReducer { turn_id: Some(tracking.turn_id.clone()), invoke_type: Some(invocation.invocation_type), model_slug: Some(tracking.model_slug.clone()), - product_client_id: Some(originator().value), + product_client_id: Some(tracking.product_client_id.clone()), repo_url, skill_scope: Some(skill_scope.to_string()), plugin_id: invocation.plugin_id, diff --git a/codex-rs/app-server/src/request_processors/external_agent_session_import.rs b/codex-rs/app-server/src/request_processors/external_agent_session_import.rs index dfa2ea94f..77b9bfcbf 100644 --- a/codex-rs/app-server/src/request_processors/external_agent_session_import.rs +++ b/codex-rs/app-server/src/request_processors/external_agent_session_import.rs @@ -207,6 +207,7 @@ impl ExternalAgentSessionImporter { parent_thread_id: None, source: source.clone(), thread_source: None, + originator: codex_login::default_client::originator().value, base_instructions: BaseInstructions { text: config .base_instructions diff --git a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs index 1f5ae12f4..3f7cbc9d0 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs @@ -781,6 +781,7 @@ mod thread_processor_behavior_tests { forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), }; assert_eq!( diff --git a/codex-rs/app-server/tests/suite/conversation_summary.rs b/codex-rs/app-server/tests/suite/conversation_summary.rs index d4d0cf061..7f964d314 100644 --- a/codex-rs/app-server/tests/suite/conversation_summary.rs +++ b/codex-rs/app-server/tests/suite/conversation_summary.rs @@ -128,6 +128,7 @@ async fn get_conversation_summary_by_thread_id_reads_pathless_store_thread() -> parent_thread_id: None, source: SessionSource::Cli, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, diff --git a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs index 1a6d1eb4f..717fe228f 100644 --- a/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs +++ b/codex-rs/app-server/tests/suite/v2/remote_thread_store.rs @@ -154,6 +154,7 @@ async fn thread_delete_with_non_local_thread_store_does_not_create_local_persist parent_thread_id: None, source: SessionSource::Cli, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, diff --git a/codex-rs/app-server/tests/suite/v2/thread_read.rs b/codex-rs/app-server/tests/suite/v2/thread_read.rs index 0b4f101c0..3e39070a7 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_read.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_read.rs @@ -1364,6 +1364,7 @@ async fn seed_pathless_store_thread( parent_thread_id: None, source: ProtocolSessionSource::Cli, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, diff --git a/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs b/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs index db8b60d16..708d4bdfb 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_unarchive.rs @@ -215,6 +215,7 @@ async fn thread_unarchive_preserves_pathless_store_metadata() -> Result<()> { parent_thread_id: None, source: SessionSource::Cli, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index e37e9b04d..225fd4ba9 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -9,7 +9,9 @@ use crate::config::ConfigBuilder; use crate::context::ContextualUserFragment; use crate::context::SubagentNotification; use crate::init_state_db; +use crate::thread_manager::StartThreadOptions; use assert_matches::assert_matches; +use codex_extension_api::ExtensionDataInit; use codex_features::Feature; use codex_login::CodexAuth; use codex_protocol::AgentPath; @@ -21,7 +23,9 @@ use codex_protocol::models::ResponseItem; use codex_protocol::protocol::CompactedItem; use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::InterAgentCommunication; +use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::TurnAbortReason; @@ -139,6 +143,33 @@ impl AgentControlHarness { } } +async fn persisted_originator(thread: &CodexThread) -> String { + thread.ensure_rollout_materialized().await; + thread + .flush_rollout() + .await + .expect("thread rollout should flush"); + let stored_thread = thread + .read_thread( + /*include_archived*/ true, /*include_history*/ true, + ) + .await + .expect("thread should be readable"); + let history = stored_thread.history.expect("history should be loaded"); + history + .items + .iter() + .find_map(|item| match item { + RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.originator.clone()), + RolloutItem::ResponseItem(_) + | RolloutItem::InterAgentCommunication(_) + | RolloutItem::EventMsg(_) + | RolloutItem::Compacted(_) + | RolloutItem::TurnContext(_) => None, + }) + .expect("session metadata should be persisted") +} + fn has_subagent_notification(history_items: &[ResponseItem]) -> bool { history_items.iter().any(|item| { let ResponseItem::Message { role, content, .. } = item else { @@ -2199,6 +2230,83 @@ async fn spawn_thread_subagent_gets_random_nickname_in_session_source() { assert_eq!(agent_role, Some("explorer".to_string())); } +#[tokio::test] +async fn spawn_thread_subagents_persist_parent_originator_across_new_and_truncated_fork() { + let harness = AgentControlHarness::new().await; + let parent = harness + .manager + .start_thread_with_options(StartThreadOptions { + config: harness.config.clone(), + initial_history: InitialHistory::New, + session_source: None, + thread_source: None, + dynamic_tools: Vec::new(), + metrics_service_name: Some("codex_work_desktop".to_string()), + multi_agent_mode: None, + parent_trace: None, + environments: Vec::new(), + thread_extension_init: ExtensionDataInit::default(), + supports_openai_form_elicitation: false, + }) + .await + .expect("parent thread should start"); + let parent_originator = persisted_originator(&parent.thread).await; + assert_eq!(parent_originator, "codex_work_desktop"); + + let child_thread_id = harness + .control + .spawn_agent( + harness.config.clone(), + text_input("hello child"), + Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id: parent.thread_id, + depth: 1, + agent_path: None, + agent_nickname: None, + agent_role: Some("explorer".to_string()), + })), + ) + .await + .expect("child spawn should succeed"); + + let child_thread = harness + .manager + .get_thread(child_thread_id) + .await + .expect("child thread should be registered"); + let child_originator = persisted_originator(&child_thread).await; + assert_eq!(child_originator, parent_originator); + + let child = harness + .control + .spawn_agent_with_metadata( + harness.config.clone(), + text_input("hello forked child"), + Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id: parent.thread_id, + depth: 1, + agent_path: None, + agent_nickname: None, + agent_role: Some("explorer".to_string()), + })), + SpawnAgentOptions { + fork_parent_spawn_call_id: Some("spawn-call-last-n".to_string()), + fork_mode: Some(SpawnAgentForkMode::LastNTurns(1)), + ..Default::default() + }, + ) + .await + .expect("forked child spawn should succeed"); + + let child_thread = harness + .manager + .get_thread(child.thread_id) + .await + .expect("child thread should be registered"); + let child_originator = persisted_originator(&child_thread).await; + assert_eq!(child_originator, parent_originator); +} + #[tokio::test] async fn spawn_thread_subagent_uses_role_specific_nickname_candidates() { let mut harness = AgentControlHarness::new().await; diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 9e240e968..67a940dc9 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -186,6 +186,7 @@ struct ModelClientState { provider: SharedModelProvider, auth_env_telemetry: AuthEnvTelemetry, session_source: SessionSource, + originator: String, model_verbosity: Option, enable_request_compression: bool, include_timing_metrics: bool, @@ -387,6 +388,7 @@ impl ModelClient { thread_id: ThreadId, provider_info: ModelProviderInfo, session_source: SessionSource, + originator: String, model_verbosity: Option, enable_request_compression: bool, include_timing_metrics: bool, @@ -408,6 +410,7 @@ impl ModelClient { provider: model_provider, auth_env_telemetry, session_source, + originator, model_verbosity, enable_request_compression, include_timing_metrics, @@ -565,6 +568,7 @@ impl ModelClient { self.state.beta_features_header.as_deref(), turn_state.as_ref(), )); + add_originator_header(&mut extra_headers, self.state.originator.as_str()); extra_headers.extend(self.build_responses_compatibility_headers(responses_metadata)); extra_headers.extend(build_session_headers( Some(responses_metadata.session_id.to_string()), @@ -676,6 +680,7 @@ impl ModelClient { fn build_subagent_headers(&self) -> ApiHeaderMap { let mut extra_headers = ApiHeaderMap::new(); + add_originator_header(&mut extra_headers, self.state.originator.as_str()); if let Some(subagent) = subagent_header_value(&self.state.session_source) && let Ok(val) = HeaderValue::from_str(&subagent) { @@ -997,6 +1002,7 @@ impl ModelClient { self.state.beta_features_header.as_deref(), /*turn_state*/ None, ); + add_originator_header(&mut headers, self.state.originator.as_str()); if let Ok(header_value) = HeaderValue::from_str(&responses_metadata.thread_id) { headers.insert("x-client-request-id", header_value); } @@ -1064,6 +1070,7 @@ impl ModelClientSession { self.client.state.beta_features_header.as_deref(), Some(&self.turn_state), ); + add_originator_header(&mut headers, self.client.state.originator.as_str()); headers.extend( self.client .build_responses_compatibility_headers(responses_metadata), @@ -1791,6 +1798,22 @@ fn build_responses_headers( headers } +pub(crate) fn add_originator_header(headers: &mut ApiHeaderMap, originator: &str) { + let default_originator = codex_login::default_client::originator(); + if originator == default_originator.value.as_str() { + return; + } + + match HeaderValue::from_str(originator) { + Ok(header_value) => { + headers.insert("originator", header_value); + } + Err(err) => { + warn!("ignoring invalid thread originator header value: {err}"); + } + } +} + fn add_responses_lite_header(headers: &mut ApiHeaderMap, use_responses_lite: bool) { if use_responses_lite { headers.insert( diff --git a/codex-rs/core/src/client_tests.rs b/codex-rs/core/src/client_tests.rs index 9df7d4d01..8d58fec9b 100644 --- a/codex-rs/core/src/client_tests.rs +++ b/codex-rs/core/src/client_tests.rs @@ -75,6 +75,7 @@ fn test_model_client(session_source: SessionSource) -> ModelClient { thread_id, provider, session_source, + "test_originator".to_string(), /*model_verbosity*/ None, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, @@ -298,6 +299,10 @@ fn build_subagent_headers_sets_internal_memory_consolidation_label() { .get(X_OPENAI_SUBAGENT_HEADER) .and_then(|value| value.to_str().ok()); assert_eq!(value, Some("memory_consolidation")); + assert_eq!( + headers.get("originator"), + Some(&http::HeaderValue::from_static("test_originator")) + ); } #[test] @@ -606,6 +611,7 @@ fn model_client_with_counting_attestation( ThreadId::new(), provider, SessionSource::Exec, + "test_originator".to_string(), /*model_verbosity*/ None, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 9f5b867f7..640e1f8df 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -103,6 +103,7 @@ pub(crate) async fn run_codex_thread_interactive( forked_from_thread_id, parent_thread_id: Some(parent_session.thread_id), thread_source: Some(ThreadSource::Subagent), + originator: parent_ctx.originator.clone(), agent_control: parent_session.services.agent_control.clone(), dynamic_tools: Vec::new(), metrics_service_name: None, diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index a522cd62a..dccfd4a76 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -77,6 +77,7 @@ pub struct ThreadConfigSnapshot { pub forked_from_thread_id: Option, pub parent_thread_id: Option, pub thread_source: Option, + pub originator: String, } /// Explains why `CodexThread::try_start_turn_if_idle` rejected an automatic diff --git a/codex-rs/core/src/hook_runtime.rs b/codex-rs/core/src/hook_runtime.rs index 12627e2d5..342e4d4b8 100644 --- a/codex-rs/core/src/hook_runtime.rs +++ b/codex-rs/core/src/hook_runtime.rs @@ -685,6 +685,7 @@ fn hook_run_analytics_payload( .turn_id .clone() .unwrap_or_else(|| turn_context.sub_id.clone()), + turn_context.originator.clone(), ), HookRunFact { event_name: completed.run.event_name, diff --git a/codex-rs/core/src/mcp_tool_call.rs b/codex-rs/core/src/mcp_tool_call.rs index c7187c18f..085fd496d 100644 --- a/codex-rs/core/src/mcp_tool_call.rs +++ b/codex-rs/core/src/mcp_tool_call.rs @@ -971,6 +971,7 @@ async fn maybe_track_codex_app_used( turn_context.model_info.slug.clone(), sess.thread_id.to_string(), turn_context.sub_id.clone(), + turn_context.originator.clone(), ); sess.services.analytics_events_client.track_app_used( tracking, diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 667939f10..43ecc3e57 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -1,4 +1,5 @@ use crate::client::ModelClient; +use crate::client::add_originator_header; use crate::realtime_context::build_realtime_startup_context; use crate::realtime_context::truncate_realtime_text_to_token_budget; use crate::realtime_prompt::prepare_realtime_backend_prompt; @@ -775,6 +776,7 @@ async fn prepare_realtime_start( let session_config = build_realtime_session_config(sess, ¶ms, version, configured_voice).await?; let requested_realtime_session_id = session_config.session_id.clone(); + let originator = sess.originator().await; let extra_headers = match transport { ConversationStartTransport::Websocket => { let realtime_api_key = realtime_api_key(auth.as_ref(), &provider)?; @@ -782,6 +784,7 @@ async fn prepare_realtime_start( requested_realtime_session_id.as_deref(), Some(realtime_api_key.as_str()), version, + originator.as_str(), )? } ConversationStartTransport::Webrtc { .. } => { @@ -789,6 +792,7 @@ async fn prepare_realtime_start( requested_realtime_session_id.as_deref(), /*api_key*/ None, version, + originator.as_str(), )? } }; @@ -1171,6 +1175,7 @@ fn realtime_request_headers( realtime_session_id: Option<&str>, api_key: Option<&str>, version: RealtimeWsVersion, + originator: &str, ) -> CodexResult> { let mut headers = HeaderMap::new(); @@ -1191,6 +1196,8 @@ fn realtime_request_headers( headers.insert(AUTHORIZATION, auth_value); } + add_originator_header(&mut headers, originator); + Ok(Some(headers)) } diff --git a/codex-rs/core/src/realtime_conversation_tests.rs b/codex-rs/core/src/realtime_conversation_tests.rs index 55525774b..51425e4c3 100644 --- a/codex-rs/core/src/realtime_conversation_tests.rs +++ b/codex-rs/core/src/realtime_conversation_tests.rs @@ -149,10 +149,14 @@ async fn clears_active_handoff_explicitly() { #[test] fn uses_quicksilver_alpha_header_for_realtime_v1() { - let headers = - realtime_request_headers(Some("session_1"), Some("sk-test"), RealtimeWsVersion::V1) - .expect("headers") - .expect("headers"); + let headers = realtime_request_headers( + Some("session_1"), + Some("sk-test"), + RealtimeWsVersion::V1, + "codex_work_desktop", + ) + .expect("headers") + .expect("headers"); assert_eq!( headers @@ -164,10 +168,39 @@ fn uses_quicksilver_alpha_header_for_realtime_v1() { #[test] fn omits_quicksilver_alpha_header_for_realtime_v2() { - let headers = - realtime_request_headers(Some("session_1"), Some("sk-test"), RealtimeWsVersion::V2) - .expect("headers") - .expect("headers"); + let headers = realtime_request_headers( + Some("session_1"), + Some("sk-test"), + RealtimeWsVersion::V2, + "codex_work_desktop", + ) + .expect("headers") + .expect("headers"); assert!(headers.get("openai-alpha").is_none()); } + +#[test] +fn realtime_headers_include_only_non_default_originator() { + let default_originator = codex_login::default_client::originator(); + for (originator, expected_header) in [ + ("codex_work_desktop", Some("codex_work_desktop")), + (default_originator.value.as_str(), None), + ] { + let headers = realtime_request_headers( + Some("session_1"), + Some("sk-test"), + RealtimeWsVersion::V2, + originator, + ) + .expect("headers") + .expect("headers"); + + assert_eq!( + headers + .get("originator") + .and_then(|value| value.to_str().ok()), + expected_header + ); + } +} diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index d94ca7f34..9db1df044 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -73,7 +73,6 @@ use codex_hooks::HooksConfig; use codex_login::AuthManager; use codex_login::CodexAuth; use codex_login::auth_env_telemetry::collect_auth_env_telemetry; -use codex_login::default_client::originator; use codex_mcp::McpConnectionManager; use codex_mcp::McpResourceClient; use codex_mcp::McpRuntimeContext; @@ -433,6 +432,7 @@ pub(crate) struct CodexSpawnArgs { pub(crate) forked_from_thread_id: Option, pub(crate) parent_thread_id: Option, pub(crate) thread_source: Option, + pub(crate) originator: String, pub(crate) agent_control: AgentControl, pub(crate) dynamic_tools: Vec, pub(crate) metrics_service_name: Option, @@ -522,6 +522,7 @@ impl Codex { forked_from_thread_id, parent_thread_id, thread_source, + originator, agent_control, dynamic_tools, metrics_service_name, @@ -654,6 +655,7 @@ impl Codex { forked_from_thread_id, parent_thread_id, thread_source, + originator, dynamic_tools, user_shell_override, }; @@ -3913,7 +3915,7 @@ pub(crate) fn emit_subagent_session_started( forked_from_thread_id: thread_config .forked_from_thread_id .map(|thread_id| thread_id.to_string()), - product_client_id: client_name.clone(), + product_client_id: thread_config.originator.clone(), client_name, client_version, model: thread_config.model, diff --git a/codex-rs/core/src/session/review.rs b/codex-rs/core/src/session/review.rs index 8986ffd05..c55afbf14 100644 --- a/codex-rs/core/src/session/review.rs +++ b/codex-rs/core/src/session/review.rs @@ -118,6 +118,7 @@ pub(super) async fn spawn_review_thread( reasoning_summary, session_source, parent_thread_id: parent_turn_context.parent_thread_id, + originator: parent_turn_context.originator.clone(), environments: parent_turn_context.environments.clone(), available_models, unified_exec_shell_mode, diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index ee62155a8..af64e0bef 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -106,6 +106,8 @@ pub(crate) struct SessionConfiguration { pub(super) parent_thread_id: Option, /// Optional analytics source classification for this thread. pub(super) thread_source: Option, + /// Effective originator used for this thread's Responses requests and analytics events. + pub(super) originator: String, pub(super) dynamic_tools: Vec, pub(super) user_shell_override: Option, } @@ -198,6 +200,7 @@ impl SessionConfiguration { forked_from_thread_id: self.forked_from_thread_id, parent_thread_id: self.parent_thread_id, thread_source: self.thread_source.clone(), + originator: self.originator.clone(), } } @@ -473,6 +476,11 @@ impl Session { self.services.agent_control.session_id() } + pub(crate) async fn originator(&self) -> String { + let state = self.state.lock().await; + state.session_configuration.originator.clone() + } + #[instrument(name = "session_init", level = "info", skip_all)] #[allow(clippy::too_many_arguments)] pub(crate) async fn new( @@ -581,6 +589,7 @@ impl Session { parent_thread_id, source: session_source, thread_source: session_configuration.thread_source.clone(), + originator: session_configuration.originator.clone(), base_instructions: BaseInstructions { text: session_configuration.base_instructions.clone(), }, @@ -756,7 +765,7 @@ impl Session { let auth_mode = auth.map(CodexAuth::auth_mode).map(TelemetryAuthMode::from); let account_id = auth.and_then(CodexAuth::get_account_id); let account_email = auth.and_then(CodexAuth::get_account_email); - let originator = originator().value; + let originator = session_configuration.originator.clone(); let terminal_type = user_agent(); let session_model = session_configuration.collaboration_mode.model().to_string(); let auth_env_telemetry = collect_auth_env_telemetry( @@ -1057,6 +1066,7 @@ impl Session { thread_id, session_configuration.provider.clone(), session_configuration.session_source.clone(), + session_configuration.originator.clone(), config.model_verbosity, config.features.enabled(Feature::EnableRequestCompression), config.features.enabled(Feature::RuntimeMetrics), diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 1ebb069f0..685005b40 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -479,6 +479,7 @@ fn test_model_client_session() -> crate::client::ModelClientSession { thread_id, ModelProviderInfo::create_openai_provider(/* base_url */ /*base_url*/ None), codex_protocol::protocol::SessionSource::Exec, + "test_originator".to_string(), /*model_verbosity*/ None, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, @@ -3485,6 +3486,7 @@ async fn set_rate_limits_retains_previous_credits() { forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, }; @@ -3592,6 +3594,7 @@ async fn set_rate_limits_updates_plan_type_when_present() { forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, }; @@ -3841,6 +3844,7 @@ async fn attach_thread_persistence(session: &mut Session) -> PathBuf { parent_thread_id: None, source: SessionSource::Exec, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, @@ -4122,13 +4126,14 @@ pub(crate) async fn make_session_configuration_for_tests() -> SessionConfigurati forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, } } #[tokio::test] -async fn emit_subagent_session_started_includes_fork_lineage_from_session_configuration() { +async fn emit_subagent_session_started_includes_fork_lineage_and_originator() { use wiremock::Mock; use wiremock::MockServer; use wiremock::ResponseTemplate; @@ -4204,6 +4209,10 @@ async fn emit_subagent_session_started_includes_fork_lineage_from_session_config event["event_params"]["forked_from_thread_id"], forked_from_thread_id.to_string() ); + assert_eq!( + event["event_params"]["app_server_client"]["product_client_id"], + "test_originator" + ); } async fn resolved_environments_for_configuration( @@ -4989,6 +4998,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_packaged_zsh() { forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, }; @@ -5118,6 +5128,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, }; @@ -5220,6 +5231,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { thread_id, session_configuration.provider.clone(), session_configuration.session_source.clone(), + session_configuration.originator.clone(), config.model_verbosity, config.features.enabled(Feature::EnableRequestCompression), config.features.enabled(Feature::RuntimeMetrics), @@ -5364,6 +5376,7 @@ async fn make_session_with_config_and_rx( forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, }; @@ -5471,6 +5484,7 @@ async fn make_session_with_history_source_and_agent_control_and_rx( forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools: Vec::new(), user_shell_override: None, }; @@ -6713,6 +6727,7 @@ async fn shutdown_complete_does_not_append_to_thread_store_after_shutdown() { parent_thread_id: None, source: SessionSource::Exec, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, @@ -7194,6 +7209,7 @@ where forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), dynamic_tools, user_shell_override: None, }; @@ -7295,6 +7311,7 @@ where thread_id, session_configuration.provider.clone(), session_configuration.session_source.clone(), + session_configuration.originator.clone(), config.model_verbosity, config.features.enabled(Feature::EnableRequestCompression), config.features.enabled(Feature::RuntimeMetrics), diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index e61578929..6f23cb096 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -733,6 +733,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { forked_from_thread_id: None, parent_thread_id: None, thread_source: None, + originator: "test_originator".to_string(), agent_control: AgentControl::default(), dynamic_tools: Vec::new(), metrics_service_name: None, diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 397dd17b6..42c072dd0 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -526,6 +526,7 @@ async fn build_skills_and_plugins( turn_context.model_info.slug.clone(), sess.thread_id.to_string(), turn_context.sub_id.clone(), + turn_context.originator.clone(), ); let loaded_plugins = sess .services diff --git a/codex-rs/core/src/session/turn_context.rs b/codex-rs/core/src/session/turn_context.rs index fdd9df5d5..da526d491 100644 --- a/codex-rs/core/src/session/turn_context.rs +++ b/codex-rs/core/src/session/turn_context.rs @@ -113,6 +113,7 @@ pub struct TurnContext { pub(crate) reasoning_summary: ReasoningSummaryConfig, pub(crate) session_source: SessionSource, pub(crate) parent_thread_id: Option, + pub(crate) originator: String, pub(crate) environments: TurnEnvironmentSnapshot, /// The session's absolute working directory. All relative paths provided /// by the model as well as sandbox policies are resolved against this path @@ -265,6 +266,7 @@ impl TurnContext { reasoning_summary: self.reasoning_summary, session_source: self.session_source.clone(), parent_thread_id: self.parent_thread_id, + originator: self.originator.clone(), environments: self.environments.clone(), #[allow(deprecated)] cwd: self.cwd.clone(), @@ -548,6 +550,7 @@ impl Session { reasoning_summary, session_source, parent_thread_id: session_configuration.parent_thread_id, + originator: session_configuration.originator.clone(), environments, #[allow(deprecated)] cwd, diff --git a/codex-rs/core/src/skills.rs b/codex-rs/core/src/skills.rs index 2c1ba8245..7b5d87b26 100644 --- a/codex-rs/core/src/skills.rs +++ b/codex-rs/core/src/skills.rs @@ -102,6 +102,7 @@ pub(crate) async fn maybe_emit_implicit_skill_invocation( turn_context.model_info.slug.clone(), sess.thread_id.to_string(), turn_context.sub_id.clone(), + turn_context.originator.clone(), ), vec![invocation], ); diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 816df89c3..75a02d999 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -29,6 +29,8 @@ use codex_extension_api::empty_extension_registry; use codex_features::Feature; use codex_login::AuthManager; use codex_login::CodexAuth; +use codex_login::default_client::CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR; +use codex_login::default_client::originator; use codex_model_provider::create_model_provider; use codex_model_provider_info::ModelProviderInfo; use codex_model_provider_info::OPENAI_PROVIDER_ID; @@ -192,6 +194,28 @@ pub struct StartThreadOptions { pub supports_openai_form_elicitation: bool, } +fn originator_from_service_name(service_name: Option<&str>) -> Option { + let service_name = service_name?.trim(); + if service_name.eq_ignore_ascii_case("codex_work_desktop") { + return Some("codex_work_desktop".to_string()); + } + None +} + +fn effective_originator_value( + metrics_service_name: Option<&str>, + env_originator: Option, + persisted_originator: Option, + inherited_originator: Option, + default_originator: String, +) -> String { + originator_from_service_name(metrics_service_name) + .or(persisted_originator) + .or(inherited_originator) + .or(env_originator) + .unwrap_or(default_originator) +} + pub(crate) struct ResumeThreadWithHistoryOptions { pub(crate) config: Config, pub(crate) initial_history: InitialHistory, @@ -1220,6 +1244,64 @@ impl ThreadManagerState { } } + async fn inherited_originator_for_parent_thread( + &self, + session_source: &SessionSource, + parent_thread_id: Option, + forked_from_thread_id: Option, + ) -> Option { + let inherited_thread_id = match session_source { + SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id, .. + }) => Some(*parent_thread_id), + _ => parent_thread_id.or(forked_from_thread_id), + }; + let thread = self.get_thread(inherited_thread_id?).await.ok()?; + let originator = thread.config_snapshot().await.originator; + (!originator.is_empty()).then_some(originator) + } + + async fn effective_originator( + &self, + initial_history: &InitialHistory, + metrics_service_name: Option<&str>, + session_source: &SessionSource, + parent_thread_id: Option, + forked_from_thread_id: Option, + ) -> String { + let persisted_originator = initial_history.get_session_originator(); + let inherited_originator = match initial_history { + InitialHistory::New | InitialHistory::Cleared => { + self.inherited_originator_for_parent_thread( + session_source, + parent_thread_id, + forked_from_thread_id, + ) + .await + } + InitialHistory::Forked(_) if persisted_originator.is_none() => { + self.inherited_originator_for_parent_thread( + session_source, + parent_thread_id, + forked_from_thread_id, + ) + .await + } + InitialHistory::Resumed(_) | InitialHistory::Forked(_) => None, + }; + + let env_originator = std::env::var(CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR) + .is_ok() + .then(|| originator().value); + effective_originator_value( + metrics_service_name, + env_originator, + persisted_originator, + inherited_originator, + originator().value, + ) + } + /// Spawn a new thread with no history using a provided config. pub(crate) async fn spawn_new_thread( &self, @@ -1466,6 +1548,15 @@ impl ThreadManagerState { forked_from_thread_id, ) .await; + let originator = self + .effective_originator( + &initial_history, + metrics_service_name.as_deref(), + &session_source, + parent_thread_id, + forked_from_thread_id, + ) + .await; let CodexSpawnOk { codex, thread_id, .. } = Box::pin(Codex::spawn(CodexSpawnArgs { @@ -1484,6 +1575,7 @@ impl ThreadManagerState { forked_from_thread_id, parent_thread_id, thread_source, + originator, agent_control, dynamic_tools, metrics_service_name, diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 990c5ca5d..e6b222f6b 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -68,6 +68,41 @@ fn developer_interrupted_marker() -> ResponseItem { .expect("developer interrupted marker should be enabled") } +#[test] +fn effective_originator_prefers_thread_scoped_sources_before_env_originator() { + for (metrics_service_name, persisted_originator, inherited_originator, expected_originator) in [ + ( + Some("codex_work_desktop"), + Some("persisted_originator"), + Some("inherited_originator"), + "codex_work_desktop", + ), + ( + None, + Some("persisted_originator"), + Some("inherited_originator"), + "persisted_originator", + ), + ( + None, + None, + Some("inherited_originator"), + "inherited_originator", + ), + ] { + assert_eq!( + effective_originator_value( + metrics_service_name, + Some("Codex Desktop".to_string()), + persisted_originator.map(str::to_string), + inherited_originator.map(str::to_string), + "codex_cli_rs".to_string(), + ), + expected_originator + ); + } +} + #[test] fn truncates_before_requested_user_message() { let items = [ diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index 2f323e0dd..7e213991c 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -123,6 +123,7 @@ async fn responses_stream_includes_subagent_header_on_review() { thread_id, provider.clone(), session_source.clone(), + "test_originator".to_string(), config.model_verbosity, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, @@ -255,6 +256,7 @@ async fn responses_stream_includes_subagent_header_on_other() { thread_id, provider.clone(), session_source.clone(), + "test_originator".to_string(), config.model_verbosity, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, @@ -373,6 +375,7 @@ async fn responses_respects_model_info_overrides_from_config() { thread_id, provider.clone(), session_source.clone(), + "test_originator".to_string(), config.model_verbosity, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index aa2026be1..98303d09b 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -1223,6 +1223,7 @@ async fn send_provider_auth_request(server: &MockServer, auth: ModelProviderAuth thread_id, provider, SessionSource::Exec, + "test_originator".to_string(), config.model_verbosity, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, @@ -2835,6 +2836,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { thread_id, provider.clone(), SessionSource::Exec, + "test_originator".to_string(), config.model_verbosity, /*enable_request_compression*/ false, /*include_timing_metrics*/ false, diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index d35714ea4..b1b731a08 100755 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -2192,6 +2192,7 @@ async fn websocket_harness_with_provider_options( thread_id, provider.clone(), SessionSource::Exec, + "test_originator".to_string(), config.model_verbosity, /*enable_request_compression*/ false, runtime_metrics_enabled, diff --git a/codex-rs/core/tests/suite/rollout_list_find.rs b/codex-rs/core/tests/suite/rollout_list_find.rs index 1461c1af3..c8b10b24e 100644 --- a/codex-rs/core/tests/suite/rollout_list_find.rs +++ b/codex-rs/core/tests/suite/rollout_list_find.rs @@ -188,6 +188,7 @@ async fn find_locates_rollout_file_written_by_recorder() -> std::io::Result<()> /*parent_thread_id*/ None, SessionSource::Exec, /*thread_source*/ None, + "test_originator".to_string(), BaseInstructions::default(), Vec::new(), ), diff --git a/codex-rs/memories/write/src/runtime.rs b/codex-rs/memories/write/src/runtime.rs index 0835e1552..bd86d8313 100644 --- a/codex-rs/memories/write/src/runtime.rs +++ b/codex-rs/memories/write/src/runtime.rs @@ -75,6 +75,38 @@ pub(crate) struct MemoryStartupContext { session_telemetry: SessionTelemetry, } +fn build_session_telemetry( + auth_manager: &AuthManager, + thread_id: ThreadId, + config: &Config, + source: SessionSource, + model: &str, + originator: String, +) -> SessionTelemetry { + let auth = auth_manager.auth_cached(); + let auth = auth.as_ref(); + let auth_mode = auth.map(CodexAuth::auth_mode).map(TelemetryAuthMode::from); + let account_id = auth.and_then(CodexAuth::get_account_id); + let account_email = auth.and_then(CodexAuth::get_account_email); + let auth_env_telemetry = collect_auth_env_telemetry( + &config.model_provider, + auth_manager.codex_api_key_env_enabled(), + ); + SessionTelemetry::new( + thread_id, + model, + model, + account_id, + account_email, + auth_mode, + originator, + config.otel.log_user_prompt, + user_agent(), + source, + ) + .with_auth_env(auth_env_telemetry.to_otel_metadata()) +} + impl MemoryStartupContext { pub(crate) fn new( thread_manager: Arc, @@ -129,29 +161,15 @@ impl MemoryStartupContext { source: SessionSource, provider: SharedModelProvider, ) -> Self { - let auth = auth_manager.auth_cached(); - let auth = auth.as_ref(); - let auth_mode = auth.map(CodexAuth::auth_mode).map(TelemetryAuthMode::from); - let account_id = auth.and_then(CodexAuth::get_account_id); - let account_email = auth.and_then(CodexAuth::get_account_email); let model = config.model.as_deref().unwrap_or("unknown"); - let auth_env_telemetry = collect_auth_env_telemetry( - &config.model_provider, - auth_manager.codex_api_key_env_enabled(), - ); - let session_telemetry = SessionTelemetry::new( + let session_telemetry = build_session_telemetry( + &auth_manager, thread_id, - model, - model, - account_id, - account_email, - auth_mode, - originator().value, - config.otel.log_user_prompt, - user_agent(), + config, source, - ) - .with_auth_env(auth_env_telemetry.to_otel_metadata()); + model, + originator().value, + ); Self { thread_id, @@ -205,10 +223,14 @@ impl MemoryStartupContext { StageOneRequestContext { model_info, - session_telemetry: self - .session_telemetry - .clone() - .with_model(model_name, model_name), + session_telemetry: build_session_telemetry( + &self.auth_manager, + self.thread_id, + config, + config_snapshot.session_source, + model_name, + config_snapshot.originator, + ), reasoning_effort: Some(reasoning_effort), reasoning_summary, service_tier: config_snapshot.service_tier, @@ -231,6 +253,7 @@ impl MemoryStartupContext { self.thread_id, config.model_provider.clone(), session_source.clone(), + config_snapshot.originator, config.model_verbosity, config.features.enabled(Feature::EnableRequestCompression), config.features.enabled(Feature::RuntimeMetrics), diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index b625392e5..a5ff55d47 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -2609,11 +2609,33 @@ impl InitialHistory { .and_then(|meta| meta.thread_source.clone()) } + pub fn get_session_originator(&self) -> Option { + self.get_session_meta() + .map(|meta| meta.originator.clone()) + .filter(|originator| !originator.is_empty()) + } + pub fn get_resumed_parent_thread_id(&self) -> Option { self.get_resumed_session_meta() .and_then(|meta| meta.parent_thread_id) } + fn get_session_meta(&self) -> Option<&SessionMeta> { + match self { + InitialHistory::New | InitialHistory::Cleared => None, + InitialHistory::Resumed(resumed) => { + resumed.history.iter().find_map(|item| match item { + RolloutItem::SessionMeta(meta_line) => Some(&meta_line.meta), + _ => None, + }) + } + InitialHistory::Forked(items) => items.iter().find_map(|item| match item { + RolloutItem::SessionMeta(meta_line) => Some(&meta_line.meta), + _ => None, + }), + } + } + fn get_resumed_session_meta(&self) -> Option<&SessionMeta> { match self { InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => None, diff --git a/codex-rs/rollout/Cargo.toml b/codex-rs/rollout/Cargo.toml index edc905968..838081bb5 100644 --- a/codex-rs/rollout/Cargo.toml +++ b/codex-rs/rollout/Cargo.toml @@ -17,7 +17,6 @@ anyhow = { workspace = true } chrono = { workspace = true, features = ["serde"] } codex-file-search = { workspace = true } codex-git-utils = { workspace = true } -codex-login = { workspace = true } codex-otel = { workspace = true } codex-protocol = { workspace = true } codex-state = { workspace = true } diff --git a/codex-rs/rollout/src/lib.rs b/codex-rs/rollout/src/lib.rs index 804b8ac10..3c103750c 100644 --- a/codex-rs/rollout/src/lib.rs +++ b/codex-rs/rollout/src/lib.rs @@ -16,10 +16,6 @@ pub(crate) mod session_index; mod sqlite_metrics; pub mod state_db; -pub(crate) mod default_client { - pub use codex_login::default_client::*; -} - pub(crate) use codex_protocol::protocol; pub const SESSIONS_SUBDIR: &str = "sessions"; diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 53bf115d8..dad7f8bc3 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -46,7 +46,6 @@ use super::list::parse_timestamp_uuid_from_filename; use super::metadata; use super::session_index::find_thread_names_by_ids; use crate::config::RolloutConfigView; -use crate::default_client::originator; use crate::state_db; use crate::state_db::StateDbHandle; use codex_git_utils::collect_git_info; @@ -89,6 +88,7 @@ pub enum RolloutRecorderParams { parent_thread_id: Option, source: Box, thread_source: Option, + originator: String, base_instructions: BaseInstructions, dynamic_tools: Vec, multi_agent_version: Option, @@ -161,12 +161,14 @@ fn clone_io_error(err: &IoError) -> IoError { } impl RolloutRecorderParams { + #[allow(clippy::too_many_arguments)] pub fn new( conversation_id: ThreadId, forked_from_id: Option, parent_thread_id: Option, source: SessionSource, thread_source: Option, + originator: String, base_instructions: BaseInstructions, dynamic_tools: Vec, ) -> Self { @@ -177,6 +179,7 @@ impl RolloutRecorderParams { parent_thread_id, source: Box::new(source), thread_source, + originator, base_instructions, dynamic_tools, multi_agent_version: None, @@ -726,6 +729,7 @@ impl RolloutRecorder { parent_thread_id, source, thread_source, + originator, base_instructions, dynamic_tools, multi_agent_version, @@ -751,7 +755,7 @@ impl RolloutRecorder { parent_thread_id, timestamp, cwd: config.cwd().to_path_buf(), - originator: originator().value, + originator, cli_version: env!("CARGO_PKG_VERSION").to_string(), agent_nickname: source.get_nickname(), agent_role: source.get_agent_role(), diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index 23618cb81..4b72ff767 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -385,6 +385,7 @@ async fn recorder_materializes_on_flush_with_pending_items() -> std::io::Result< /*parent_thread_id*/ None, SessionSource::Exec, /*thread_source*/ None, + "test_originator".to_string(), BaseInstructions::default(), Vec::new(), ) @@ -477,6 +478,7 @@ async fn persist_reports_filesystem_error_and_retries_buffered_items() -> std::i /*parent_thread_id*/ None, SessionSource::Exec, /*thread_source*/ None, + "test_originator".to_string(), BaseInstructions::default(), Vec::new(), ), diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index d7d85c3cf..8cd801c9c 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -118,6 +118,7 @@ mod tests { parent_thread_id, source: SessionSource::Exec, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, @@ -242,6 +243,7 @@ impl InMemoryThreadStore { agent_nickname: params.source.get_nickname(), agent_role: params.source.get_agent_role(), agent_path: params.source.get_agent_path().map(Into::into), + originator: params.originator.clone(), source: params.source.clone(), thread_source: params.thread_source.clone(), model_provider: Some(params.metadata.model_provider.clone()), diff --git a/codex-rs/thread-store/src/local/create_thread.rs b/codex-rs/thread-store/src/local/create_thread.rs index 36c3cdc13..eba6a5a6a 100644 --- a/codex-rs/thread-store/src/local/create_thread.rs +++ b/codex-rs/thread-store/src/local/create_thread.rs @@ -33,6 +33,7 @@ pub(super) async fn create_thread( params.parent_thread_id, params.source, params.thread_source, + params.originator, params.base_instructions, params.dynamic_tools, ) diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs index d0a4d6518..51e622fc5 100644 --- a/codex-rs/thread-store/src/local/mod.rs +++ b/codex-rs/thread-store/src/local/mod.rs @@ -1130,6 +1130,7 @@ mod tests { parent_thread_id: None, source: SessionSource::Exec, thread_source: None, + originator: "test_originator".to_string(), base_instructions: BaseInstructions::default(), dynamic_tools: Vec::new(), multi_agent_version: None, diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index c18f39882..feea814f9 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -79,6 +79,8 @@ pub struct CreateThreadParams { pub source: SessionSource, /// Optional analytics source classification for this thread. pub thread_source: Option, + /// Effective originator used for this thread's Responses requests and analytics events. + pub originator: String, /// Base instructions persisted in session metadata. pub base_instructions: BaseInstructions, /// Dynamic tools available to the thread at startup.