[codex] Attribute app-server analytics by thread originator (#29935)

## Why

Desktop Work threads and regular Codex threads can share the same
app-server connection. App-server analytics currently copy
`product_client_id` from connection metadata for every thread-scoped
event, so Work thread activity is attributed to the Desktop connection
instead of the thread's resolved originator. This prevents analytics
from distinguishing the two products on a shared connection.

## What changed

- Publish the resolved originator after a thread is materialized,
covering new, resumed, forked, and subagent threads.
- Store that originator in the analytics reducer's existing per-thread
state.
- Override only `app_server_client.product_client_id` for thread, turn,
tool, review, goal, guardian, and compaction events while preserving the
connection's client name, version, and transport metadata.
- Fall back to the connection-wide product client ID when a thread has
no originator override.
- Preserve persisted originators in thread initialization analytics for
resume and fork flows.

## Validation

- `just test -p codex-analytics
thread_originator_overrides_shared_connection_across_thread_events
subagent_events_keep_thread_originator_with_explicit_turn_connection`
- `just test -p codex-app-server
turn_start_tracks_thread_originator_in_analytics
thread_start_tracks_thread_initialized_analytics
thread_fork_tracks_thread_initialized_analytics
thread_resume_tracks_thread_initialized_analytics`
- `just test -p codex-core thread_manager`
This commit is contained in:
alexsong-oai
2026-06-25 18:15:48 -07:00
committed by GitHub
Unverified
parent da78d5fdc5
commit 841f30598c
12 changed files with 431 additions and 57 deletions
@@ -558,6 +558,7 @@ async fn ingest_rejected_turn_steer(
response: Box::new(sample_thread_resume_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
thread_originator: None,
},
out,
)
@@ -631,6 +632,7 @@ async fn ingest_turn_prerequisites(
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
thread_originator: None,
},
out,
)
@@ -654,6 +656,7 @@ async fn ingest_turn_prerequisites(
connection_id: 7,
request_id: RequestId::Integer(3),
response: Box::new(sample_turn_start_response("turn-2")),
thread_originator: None,
},
out,
)
@@ -720,6 +723,7 @@ async fn ingest_review_prerequisites(
response: Box::new(sample_thread_start_response(
"thread-1", /*ephemeral*/ false, "gpt-5",
)),
thread_originator: None,
},
events,
)
@@ -1652,6 +1656,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
/*ephemeral*/ false,
"gpt-5",
)),
thread_originator: None,
},
&mut events,
)
@@ -1697,6 +1702,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
response: Box::new(sample_thread_resume_response(
"thread-1", /*ephemeral*/ true, "gpt-5",
)),
thread_originator: None,
},
&mut events,
)
@@ -1741,6 +1747,157 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
);
}
#[tokio::test]
async fn thread_originator_overrides_shared_connection_across_thread_events() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
reducer
.ingest(sample_initialize_fact(/*connection_id*/ 7), &mut events)
.await;
for (request_id, thread_id, thread_originator) in [
(1, "thread-work", Some(TEST_PRODUCT_CLIENT_ID.to_string())),
(2, "thread-default", None),
] {
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(request_id),
response: Box::new(sample_thread_start_response(
thread_id, /*ephemeral*/ false, "gpt-5",
)),
thread_originator,
},
&mut events,
)
.await;
}
let initialized = serde_json::to_value(&events).expect("serialize thread events");
assert_eq!(
initialized
.as_array()
.expect("thread events")
.iter()
.map(|event| {
json!({
"thread_id": event["event_params"]["thread_id"],
"app_server_client": event["event_params"]["app_server_client"],
})
})
.collect::<Vec<_>>(),
vec![
json!({
"thread_id": "thread-work",
"app_server_client": {
"product_client_id": TEST_PRODUCT_CLIENT_ID,
"client_name": "codex-tui",
"client_version": "1.0.0",
"rpc_transport": "websocket",
"experimental_api_enabled": false,
},
}),
json!({
"thread_id": "thread-default",
"app_server_client": {
"product_client_id": DEFAULT_ORIGINATOR,
"client_name": "codex-tui",
"client_version": "1.0.0",
"rpc_transport": "websocket",
"experimental_api_enabled": false,
},
}),
]
);
events.clear();
reducer
.ingest(
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(sample_turn_start_request(
"thread-work",
/*request_id*/ 3,
)),
},
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(3),
response: Box::new(sample_turn_start_response("turn-1")),
thread_originator: None,
},
&mut events,
)
.await;
ingest_completed_command_execution_item(&mut reducer, &mut events, "thread-work", "item-work")
.await;
ingest_complete_child_turn(&mut reducer, &mut events, "thread-work", "turn-1").await;
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
CodexCompactionEvent {
thread_id: "thread-work".to_string(),
turn_id: "turn-compact".to_string(),
trigger: CompactionTrigger::Manual,
reason: CompactionReason::UserRequested,
implementation: CompactionImplementation::Responses,
phase: CompactionPhase::StandaloneTurn,
strategy: CompactionStrategy::Memento,
status: CompactionStatus::Completed,
codex_error_kind: None,
codex_error_http_status_code: None,
active_context_tokens_before: 131_000,
active_context_tokens_after: 64_000,
retained_image_count: None,
compaction_summary_tokens: None,
cached_input_tokens: None,
started_at: 100,
completed_at: 101,
duration_ms: Some(1200),
},
))),
&mut events,
)
.await;
let lifecycle = serde_json::to_value(&events).expect("serialize lifecycle events");
assert_eq!(
lifecycle
.as_array()
.expect("lifecycle events")
.iter()
.map(|event| {
json!({
"event_type": event["event_type"],
"product_client_id":
event["event_params"]["app_server_client"]["product_client_id"],
})
})
.collect::<Vec<_>>(),
vec![
json!({
"event_type": "codex_command_execution_event",
"product_client_id": TEST_PRODUCT_CLIENT_ID,
}),
json!({
"event_type": "codex_turn_event",
"product_client_id": TEST_PRODUCT_CLIENT_ID,
}),
json!({
"event_type": "codex_compaction_event",
"product_client_id": TEST_PRODUCT_CLIENT_ID,
}),
]
);
}
#[tokio::test]
async fn unrelated_client_requests_are_ignored_by_reducer() {
let mut reducer = AnalyticsReducer::default();
@@ -1767,6 +1924,7 @@ async fn unrelated_client_requests_are_ignored_by_reducer() {
connection_id: 7,
request_id: RequestId::Integer(3),
response: Box::new(sample_turn_start_response("turn-2")),
thread_originator: None,
},
&mut events,
)
@@ -1792,6 +1950,7 @@ async fn unrelated_client_responses_are_ignored_by_reducer() {
response: Box::new(ClientResponsePayload::ThreadArchive(
ThreadArchiveResponse {},
)),
thread_originator: None,
},
&mut events,
)
@@ -1851,6 +2010,7 @@ async fn compaction_event_ingests_custom_fact() {
Some(AppServerThreadSource::Subagent),
Some(parent_thread_id.to_string()),
)),
thread_originator: None,
},
&mut events,
)
@@ -1971,6 +2131,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() {
/*ephemeral*/ false,
"gpt-5",
)),
thread_originator: None,
},
&mut events,
)
@@ -2499,6 +2660,7 @@ async fn item_review_summaries_do_not_cross_threads_with_reused_item_ids() {
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
thread_originator: None,
},
&mut events,
)
@@ -2749,7 +2911,7 @@ async fn subagent_thread_started_publishes_without_initialize() {
}
#[tokio::test]
async fn subagent_events_use_inherited_connection_unless_turn_connection_is_explicit() {
async fn subagent_events_keep_thread_originator_with_explicit_turn_connection() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
let parent_thread_id =
@@ -2786,6 +2948,7 @@ async fn subagent_events_use_inherited_connection_unless_turn_connection_is_expl
/*ephemeral*/ false,
"gpt-5",
)),
thread_originator: None,
},
&mut events,
)
@@ -2908,6 +3071,7 @@ async fn subagent_events_use_inherited_connection_unless_turn_connection_is_expl
connection_id: 8,
request_id: RequestId::Integer(3),
response: Box::new(sample_turn_start_response("turn-explicit")),
thread_originator: None,
},
&mut events,
)
@@ -2918,7 +3082,11 @@ async fn subagent_events_use_inherited_connection_unless_turn_connection_is_expl
};
assert_eq!(
event.event_params.app_server_client.product_client_id,
DEFAULT_ORIGINATOR
"parent-client"
);
assert_eq!(
event.event_params.app_server_client.client_name.as_deref(),
Some("codex-tui")
);
}
@@ -3868,6 +4036,7 @@ async fn accepted_turn_steer_emits_expected_event() {
connection_id: 7,
request_id: RequestId::Integer(4),
response: Box::new(sample_turn_steer_response("turn-2")),
thread_originator: None,
},
&mut out,
)
@@ -4039,6 +4208,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
connection_id: 7,
request_id: RequestId::Integer(3),
response: Box::new(sample_turn_start_response("turn-2")),
thread_originator: None,
},
&mut out,
)
@@ -4367,6 +4537,7 @@ async fn accepted_steers_increment_turn_steer_count() {
connection_id: 7,
request_id: RequestId::Integer(4),
response: Box::new(sample_turn_steer_response("turn-2")),
thread_originator: None,
},
&mut out,
)
@@ -4414,6 +4585,7 @@ async fn accepted_steers_increment_turn_steer_count() {
connection_id: 7,
request_id: RequestId::Integer(6),
response: Box::new(sample_turn_steer_response("turn-2")),
thread_originator: None,
},
&mut out,
)
+26
View File
@@ -435,6 +435,31 @@ impl AnalyticsEventsClient {
connection_id: u64,
request_id: RequestId,
response: ClientResponsePayload,
) {
self.track_response_inner(
connection_id,
request_id,
response,
/*thread_originator*/ None,
);
}
pub fn track_response_with_thread_originator(
&self,
connection_id: u64,
request_id: RequestId,
response: ClientResponsePayload,
thread_originator: String,
) {
self.track_response_inner(connection_id, request_id, response, Some(thread_originator));
}
fn track_response_inner(
&self,
connection_id: u64,
request_id: RequestId,
response: ClientResponsePayload,
thread_originator: Option<String>,
) {
if !matches!(
response,
@@ -450,6 +475,7 @@ impl AnalyticsEventsClient {
connection_id,
request_id,
response: Box::new(response),
thread_originator,
});
}
+1
View File
@@ -464,6 +464,7 @@ pub(crate) enum AnalyticsFact {
connection_id: u64,
request_id: RequestId,
response: Box<ClientResponsePayload>,
thread_originator: Option<String>,
},
ErrorResponse {
connection_id: u64,
+76 -38
View File
@@ -165,6 +165,20 @@ struct ConnectionState {
struct ThreadAnalyticsState {
connection_id: Option<u64>,
metadata: Option<ThreadMetadataState>,
originator: Option<String>,
}
impl ThreadAnalyticsState {
fn app_server_client(
&self,
connection_state: &ConnectionState,
) -> CodexAppServerClientMetadata {
let mut app_server_client = connection_state.app_server_client.clone();
if let Some(originator) = self.originator.as_ref() {
app_server_client.product_client_id.clone_from(originator);
}
app_server_client
}
}
#[derive(Clone, Copy)]
@@ -434,9 +448,11 @@ impl AnalyticsReducer {
connection_id,
request_id,
response,
thread_originator,
} => {
if let Some(response) = response.into_client_response(request_id) {
self.ingest_response(connection_id, response, out).await;
self.ingest_response(connection_id, response, thread_originator, out)
.await;
}
}
AnalyticsFact::ErrorResponse {
@@ -575,6 +591,9 @@ impl AnalyticsReducer {
.and_then(|parent_thread_id| self.threads.get(parent_thread_id))
.and_then(|thread| thread.connection_id);
let thread_state = self.threads.entry(input.thread_id.clone()).or_default();
thread_state
.originator
.get_or_insert_with(|| input.product_client_id.clone());
thread_state
.metadata
.get_or_insert_with(|| ThreadMetadataState {
@@ -597,7 +616,7 @@ impl AnalyticsReducer {
input: GuardianReviewEventParams,
out: &mut Vec<TrackEventRequest>,
) {
let Some((connection_state, thread_metadata)) =
let Some((connection_state, thread_state, thread_metadata)) =
self.thread_context_or_warn(AnalyticsDropSite::guardian(&input))
else {
return;
@@ -607,7 +626,7 @@ impl AnalyticsReducer {
event_type: "codex_guardian_review",
event_params: GuardianReviewEventPayload {
session_id: thread_metadata.session_id.clone(),
app_server_client: connection_state.app_server_client.clone(),
app_server_client: thread_state.app_server_client(connection_state),
runtime: connection_state.runtime.clone(),
guardian_review: input,
},
@@ -874,6 +893,7 @@ impl AnalyticsReducer {
&mut self,
connection_id: u64,
response: ClientResponse,
thread_originator: Option<String>,
out: &mut Vec<TrackEventRequest>,
) {
match response {
@@ -883,6 +903,7 @@ impl AnalyticsReducer {
response.thread,
response.model,
ThreadInitializationMode::New,
thread_originator,
out,
);
}
@@ -892,6 +913,7 @@ impl AnalyticsReducer {
response.thread,
response.model,
ThreadInitializationMode::Resumed,
thread_originator,
out,
);
}
@@ -901,6 +923,7 @@ impl AnalyticsReducer {
response.thread,
response.model,
ThreadInitializationMode::Forked,
thread_originator,
out,
);
}
@@ -1248,7 +1271,7 @@ impl AnalyticsReducer {
else {
return;
};
let Some((connection_state, thread_metadata)) = self
let Some((connection_state, thread_state, thread_metadata)) = self
.thread_context_or_warn(AnalyticsDropSite::tool_item(&notification, item_id))
else {
return;
@@ -1260,6 +1283,7 @@ impl AnalyticsReducer {
started_at_ms,
completed_at_ms,
connection_state,
thread_state,
thread_metadata,
review_summary: self.item_review_summaries.get(&key),
}) {
@@ -1316,6 +1340,7 @@ impl AnalyticsReducer {
thread: codex_app_server_protocol::Thread,
model: String,
initialization_mode: ThreadInitializationMode,
thread_originator: Option<String>,
out: &mut Vec<TrackEventRequest>,
) {
let session_source: SessionSource = thread.source.into();
@@ -1333,20 +1358,20 @@ impl AnalyticsReducer {
parent_thread_id,
initialization_mode,
);
self.threads.insert(
thread_id.clone(),
ThreadAnalyticsState {
connection_id: Some(connection_id),
metadata: Some(thread_metadata.clone()),
},
);
let thread_state = self.threads.entry(thread_id.clone()).or_default();
if let Some(originator) = thread_originator {
thread_state.originator = Some(originator);
}
thread_state.connection_id = Some(connection_id);
thread_state.metadata = Some(thread_metadata.clone());
let app_server_client = thread_state.app_server_client(connection_state);
out.push(TrackEventRequest::ThreadInitialized(
ThreadInitializedEvent {
event_type: "codex_thread_initialized",
event_params: ThreadInitializedEventParams {
thread_id,
session_id,
app_server_client: connection_state.app_server_client.clone(),
app_server_client,
runtime: connection_state.runtime.clone(),
model,
ephemeral: thread.ephemeral,
@@ -1362,7 +1387,7 @@ impl AnalyticsReducer {
}
fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec<TrackEventRequest>) {
let Some((connection_state, thread_metadata)) =
let Some((connection_state, thread_state, thread_metadata)) =
self.thread_context_or_warn(AnalyticsDropSite::compaction(&input))
else {
return;
@@ -1373,7 +1398,7 @@ impl AnalyticsReducer {
event_params: codex_compaction_event_params(
input,
thread_metadata.session_id.clone(),
connection_state.app_server_client.clone(),
thread_state.app_server_client(connection_state),
connection_state.runtime.clone(),
thread_metadata.thread_source.clone(),
thread_metadata.subagent_source.clone(),
@@ -1384,7 +1409,7 @@ impl AnalyticsReducer {
}
fn ingest_goal(&mut self, input: CodexGoalEvent, out: &mut Vec<TrackEventRequest>) {
let Some((connection_state, thread_metadata)) =
let Some((connection_state, thread_state, thread_metadata)) =
self.thread_context_or_warn(AnalyticsDropSite::goal(&input))
else {
return;
@@ -1394,7 +1419,7 @@ impl AnalyticsReducer {
event_params: codex_goal_event_params(
input,
thread_metadata.session_id.clone(),
connection_state.app_server_client.clone(),
thread_state.app_server_client(connection_state),
connection_state.runtime.clone(),
thread_metadata.thread_source.clone(),
thread_metadata.subagent_source.clone(),
@@ -1483,11 +1508,11 @@ impl AnalyticsReducer {
return;
};
let drop_site = AnalyticsDropSite::turn_steer(&pending_request.thread_id);
let Some(thread_metadata) = self
.threads
.get(drop_site.thread_id)
.and_then(|thread| thread.metadata.as_ref())
else {
let Some(thread_state) = self.threads.get(drop_site.thread_id) else {
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata);
return;
};
let Some(thread_metadata) = thread_state.metadata.as_ref() else {
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata);
return;
};
@@ -1498,7 +1523,7 @@ impl AnalyticsReducer {
session_id: thread_metadata.session_id.clone(),
expected_turn_id: Some(pending_request.expected_turn_id),
accepted_turn_id,
app_server_client: connection_state.app_server_client.clone(),
app_server_client: thread_state.app_server_client(connection_state),
runtime: connection_state.runtime.clone(),
thread_source: thread_metadata.thread_source.clone(),
subagent_source: thread_metadata.subagent_source.clone(),
@@ -1529,7 +1554,7 @@ impl AnalyticsReducer {
&pending_review,
);
}
let Some((connection_state, thread_metadata)) =
let Some((connection_state, thread_state, thread_metadata)) =
self.thread_context_or_warn(AnalyticsDropSite::review(&pending_review))
else {
return;
@@ -1541,7 +1566,7 @@ impl AnalyticsReducer {
turn_id: pending_review.turn_id,
item_id: pending_review.item_id,
review_id: pending_review.review_id,
app_server_client: connection_state.app_server_client.clone(),
app_server_client: thread_state.app_server_client(connection_state),
runtime: connection_state.runtime.clone(),
thread_source: thread_metadata.thread_source.clone(),
subagent_source: thread_metadata.subagent_source.clone(),
@@ -1610,18 +1635,18 @@ impl AnalyticsReducer {
);
return;
};
let Some(thread_metadata) = self
.threads
.get(drop_site.thread_id)
.and_then(|thread| thread.metadata.as_ref())
else {
let Some(thread_state) = self.threads.get(drop_site.thread_id) else {
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata);
return;
};
let Some(thread_metadata) = thread_state.metadata.as_ref() else {
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata);
return;
};
let turn_event = TrackEventRequest::TurnEvent(Box::new(CodexTurnEventRequest {
event_type: "codex_turn_event",
event_params: codex_turn_event_params(
connection_state.app_server_client.clone(),
thread_state.app_server_client(connection_state),
connection_state.runtime.clone(),
turn_id.to_string(),
turn_state,
@@ -1663,17 +1688,18 @@ impl AnalyticsReducer {
fn thread_context_or_warn(
&self,
drop_site: AnalyticsDropSite<'_>,
) -> Option<(&ConnectionState, &ThreadMetadataState)> {
) -> Option<(
&ConnectionState,
&ThreadAnalyticsState,
&ThreadMetadataState,
)> {
let connection_state = self.thread_connection_or_warn(drop_site)?;
let Some(thread_metadata) = self
.threads
.get(drop_site.thread_id)
.and_then(|thread| thread.metadata.as_ref())
else {
let thread_state = self.threads.get(drop_site.thread_id)?;
let Some(thread_metadata) = thread_state.metadata.as_ref() else {
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata);
return None;
};
Some((connection_state, thread_metadata))
Some((connection_state, thread_state, thread_metadata))
}
}
@@ -1743,6 +1769,7 @@ struct ToolItemEventInput<'a> {
started_at_ms: u64,
completed_at_ms: u64,
connection_state: &'a ConnectionState,
thread_state: &'a ThreadAnalyticsState,
thread_metadata: &'a ThreadMetadataState,
review_summary: Option<&'a ItemReviewSummary>,
}
@@ -1755,6 +1782,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option<TrackEventRequest> {
started_at_ms,
completed_at_ms,
connection_state,
thread_state,
thread_metadata,
review_summary,
} = input;
@@ -1784,6 +1812,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option<TrackEventRequest> {
started_at_ms,
completed_at_ms,
connection_state,
thread_state,
thread_metadata,
review_summary,
},
@@ -1825,6 +1854,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option<TrackEventRequest> {
started_at_ms,
completed_at_ms,
connection_state,
thread_state,
thread_metadata,
review_summary,
},
@@ -1866,6 +1896,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option<TrackEventRequest> {
started_at_ms,
completed_at_ms,
connection_state,
thread_state,
thread_metadata,
review_summary,
},
@@ -1910,6 +1941,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option<TrackEventRequest> {
started_at_ms,
completed_at_ms,
connection_state,
thread_state,
thread_metadata,
review_summary,
},
@@ -1954,6 +1986,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option<TrackEventRequest> {
started_at_ms,
completed_at_ms,
connection_state,
thread_state,
thread_metadata,
review_summary,
},
@@ -2009,6 +2042,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option<TrackEventRequest> {
started_at_ms,
completed_at_ms,
connection_state,
thread_state,
thread_metadata,
review_summary,
},
@@ -2045,6 +2079,7 @@ fn tool_item_event(input: ToolItemEventInput<'_>) -> Option<TrackEventRequest> {
started_at_ms,
completed_at_ms,
connection_state,
thread_state,
thread_metadata,
review_summary,
},
@@ -2100,6 +2135,7 @@ struct ToolItemContext<'a> {
started_at_ms: u64,
completed_at_ms: u64,
connection_state: &'a ConnectionState,
thread_state: &'a ThreadAnalyticsState,
thread_metadata: &'a ThreadMetadataState,
review_summary: Option<&'a ItemReviewSummary>,
}
@@ -2118,7 +2154,9 @@ fn tool_item_base(
thread_id: thread_id.to_string(),
turn_id: turn_id.to_string(),
item_id,
app_server_client: context.connection_state.app_server_client.clone(),
app_server_client: context
.thread_state
.app_server_client(context.connection_state),
runtime: context.connection_state.runtime.clone(),
thread_source: thread_metadata.thread_source.clone(),
subagent_source: thread_metadata.subagent_source.clone(),
+42 -6
View File
@@ -504,13 +504,36 @@ impl OutgoingMessageSender {
where
T: Into<ClientResponsePayload>,
{
self.send_response_as(request_id, response.into()).await;
self.send_response_as_inner(request_id, response.into(), /*thread_originator*/ None)
.await;
}
pub(crate) async fn send_response_with_thread_originator<T>(
&self,
request_id: ConnectionRequestId,
response: T,
thread_originator: String,
) where
T: Into<ClientResponsePayload>,
{
self.send_response_as_inner(request_id, response.into(), Some(thread_originator))
.await;
}
pub(crate) async fn send_response_as(
&self,
request_id: ConnectionRequestId,
response: ClientResponsePayload,
) {
self.send_response_as_inner(request_id, response, /*thread_originator*/ None)
.await;
}
async fn send_response_as_inner(
&self,
request_id: ConnectionRequestId,
response: ClientResponsePayload,
thread_originator: Option<String>,
) {
let connection_id = request_id.connection_id;
let request_id_for_analytics = request_id.request_id.clone();
@@ -518,11 +541,24 @@ impl OutgoingMessageSender {
.into_jsonrpc_parts_and_payload(request_id.request_id.clone())
.map(|(id, result, response)| {
if let Some(response) = response {
self.analytics_events_client.track_response(
connection_id.0,
request_id_for_analytics,
response,
);
match thread_originator {
Some(thread_originator) => {
self.analytics_events_client
.track_response_with_thread_originator(
connection_id.0,
request_id_for_analytics,
response,
thread_originator,
);
}
None => {
self.analytics_events_client.track_response(
connection_id.0,
request_id_for_analytics,
response,
);
}
}
}
(id, result)
});
@@ -640,6 +640,7 @@ pub(super) async fn handle_pending_thread_resume_request(
active_permission_profile,
workspace_roots,
reasoning_effort,
originator,
..
} = config_snapshot;
let instruction_sources = pending.instruction_sources;
@@ -665,7 +666,9 @@ pub(super) async fn handle_pending_thread_resume_request(
multi_agent_mode: MultiAgentMode::ExplicitRequestOnly,
initial_turns_page,
};
outgoing.send_response(request_id, response).await;
outgoing
.send_response_with_thread_originator(request_id, response, originator)
.await;
// Match cold resume: metadata-only resume should attach the listener without
// paying the cost of turn reconstruction for historical usage replay.
if let Some(token_usage_thread) = token_usage_thread {
@@ -1268,6 +1268,7 @@ impl ThreadRequestProcessor {
let cwd = config_snapshot.cwd().clone();
let active_permission_profile =
thread_response_active_permission_profile(config_snapshot.active_permission_profile);
let thread_originator = config_snapshot.originator.clone();
let response = ThreadStartResponse {
thread: thread.clone(),
@@ -1287,7 +1288,7 @@ impl ThreadRequestProcessor {
let notif = thread_started_notification(thread);
listener_task_context
.outgoing
.send_response(request_id, response)
.send_response_with_thread_originator(request_id, response, thread_originator)
.instrument(tracing::info_span!(
"app_server.thread_start.send_response",
otel.name = "app_server.thread_start.send_response",
@@ -2863,6 +2864,7 @@ impl ThreadRequestProcessor {
}
}
let thread_originator = config_snapshot.originator.clone();
let response = ThreadResumeResponse {
thread,
model: session_configured.model,
@@ -2881,7 +2883,9 @@ impl ThreadRequestProcessor {
};
let connection_id = request_id.connection_id;
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response_with_thread_originator(request_id, response, thread_originator)
.await;
// `excludeTurns` is explicitly the cheap resume path, so avoid
// rebuilding history only to attribute a replayed usage update.
if let Some(token_usage_thread) = token_usage_thread {
@@ -3582,6 +3586,7 @@ impl ThreadRequestProcessor {
);
let active_permission_profile =
thread_response_active_permission_profile(config_snapshot.active_permission_profile);
let thread_originator = config_snapshot.originator.clone();
let response = ThreadForkResponse {
thread: thread.clone(),
@@ -3602,7 +3607,9 @@ impl ThreadRequestProcessor {
let notif = thread_started_notification(thread);
let connection_id = request_id.connection_id;
let token_usage_thread = include_turns.then(|| response.thread.clone());
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response_with_thread_originator(request_id, response, thread_originator)
.await;
// `excludeTurns` is the cheap fork path, so skip restored usage replay
// instead of rebuilding history only to attribute a historical update.
if let Some(token_usage_thread) = token_usage_thread {
@@ -145,7 +145,7 @@ pub(crate) async fn wait_for_goal_event(
.await
}
async fn wait_for_matching_analytics_event(
pub(crate) async fn wait_for_matching_analytics_event(
server: &MockServer,
read_timeout: Duration,
matches: impl Fn(&Value) -> bool,
@@ -191,6 +191,7 @@ pub(crate) fn assert_basic_thread_initialized_event(
event: &Value,
thread_id: &str,
session_id: &str,
expected_product_client_id: &str,
expected_model: &str,
initialization_mode: &str,
expected_thread_source: &str,
@@ -199,7 +200,7 @@ pub(crate) fn assert_basic_thread_initialized_event(
assert_eq!(event["event_params"]["session_id"], session_id);
assert_eq!(
event["event_params"]["app_server_client"]["product_client_id"],
DEFAULT_CLIENT_NAME
expected_product_client_id
);
assert_eq!(
event["event_params"]["app_server_client"]["client_name"],
@@ -488,6 +488,7 @@ async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> {
event,
&thread.id,
&thread.session_id,
"codex",
"mock-model",
"forked",
"user",
@@ -107,6 +107,7 @@ use super::analytics::mount_analytics_capture;
use super::analytics::thread_initialized_event;
use super::analytics::wait_for_analytics_payload;
use super::analytics::wait_for_goal_event;
use super::analytics::wait_for_matching_analytics_event;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
@@ -573,11 +574,12 @@ async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> {
Some("mock_provider"),
/*git_info*/ None,
)?;
set_thread_source_on_fake_rollout(
set_session_meta_on_fake_rollout(
codex_home.path(),
"2025-01-05T12-00-00",
&conversation_id,
"user",
"codex_work_desktop",
)?;
let mut mcp = TestAppServer::new_without_managed_config(codex_home.path()).await?;
@@ -607,6 +609,7 @@ async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> {
event,
&thread.id,
&thread.session_id,
"codex_work_desktop",
"gpt-5.3-codex",
"resumed",
"user",
@@ -615,11 +618,94 @@ async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> {
Ok(())
}
fn set_thread_source_on_fake_rollout(
#[tokio::test]
async fn thread_resume_running_thread_tracks_thread_originator_in_analytics() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml_with_chatgpt_base_url(codex_home.path(), &server.uri(), &server.uri())?;
mount_analytics_capture(&server, codex_home.path()).await?;
let mut mcp = TestAppServer::new_without_managed_config(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
thread_source: Some(ThreadSource::User),
service_name: Some("codex_work_desktop".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![UserInput::Text {
text: "materialize rollout".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id.clone(),
exclude_turns: true,
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse {
thread: resumed, ..
} = to_response::<ThreadResumeResponse>(resume_resp)?;
let event = wait_for_matching_analytics_event(&server, DEFAULT_READ_TIMEOUT, |event| {
event["event_type"] == "codex_thread_initialized"
&& event["event_params"]["thread_id"] == resumed.id
&& event["event_params"]["initialization_mode"] == "resumed"
})
.await?;
assert_basic_thread_initialized_event(
&event,
&resumed.id,
&resumed.session_id,
"codex_work_desktop",
"mock-model",
"resumed",
"user",
);
Ok(())
}
fn set_session_meta_on_fake_rollout(
codex_home: &std::path::Path,
filename_ts: &str,
thread_id: &str,
thread_source: &str,
originator: &str,
) -> Result<()> {
let path = rollout_path(codex_home, filename_ts, thread_id);
let contents = std::fs::read_to_string(&path)?;
@@ -629,6 +715,7 @@ fn set_thread_source_on_fake_rollout(
.ok_or_else(|| anyhow::anyhow!("fake rollout missing session meta"))?;
let mut session_meta: serde_json::Value = serde_json::from_str(session_meta)?;
session_meta["payload"]["thread_source"] = serde_json::json!(thread_source);
session_meta["payload"]["originator"] = serde_json::json!(originator);
let remaining = lines.collect::<Vec<_>>().join("\n");
std::fs::write(&path, format!("{session_meta}\n{remaining}\n"))?;
Ok(())
@@ -655,6 +655,7 @@ async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> {
let req_id = mcp
.send_thread_start_request(ThreadStartParams {
thread_source: Some(ThreadSource::User),
service_name: Some("codex_work_desktop".to_string()),
..Default::default()
})
.await?;
@@ -672,6 +673,7 @@ async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> {
event,
&thread.id,
&thread.session_id,
"codex_work_desktop",
"mock-model",
"new",
"user",
@@ -1,6 +1,5 @@
use anyhow::Context;
use anyhow::Result;
use app_test_support::DEFAULT_CLIENT_NAME;
use app_test_support::TestAppServer;
use app_test_support::create_apply_patch_sse_response;
use app_test_support::create_exec_command_sse_response;
@@ -840,7 +839,7 @@ async fn thread_start_omits_empty_instruction_overrides_from_model_request() ->
}
#[tokio::test]
async fn turn_start_tracks_turn_event_analytics() -> Result<()> {
async fn turn_start_tracks_thread_originator_in_analytics() -> Result<()> {
let server = responses::start_mock_server().await;
let response_mock = responses::mount_response_sequence(
&server,
@@ -875,6 +874,7 @@ async fn turn_start_tracks_turn_event_analytics() -> Result<()> {
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
thread_source: Some(ThreadSource::User),
service_name: Some("codex_work_desktop".to_string()),
..Default::default()
})
.await?;
@@ -919,7 +919,7 @@ async fn turn_start_tracks_turn_event_analytics() -> Result<()> {
assert_eq!(event["event_params"]["turn_id"], turn.id);
assert_eq!(
event["event_params"]["app_server_client"]["product_client_id"],
DEFAULT_CLIENT_NAME
"codex_work_desktop"
);
assert_eq!(event["event_params"]["model"], "mock-model");
assert_eq!(event["event_params"]["model_provider"], "mock_provider");