From 8d72fb6de92bf332b2ef4a4ccaf992021fbdf4ea Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Fri, 5 Jun 2026 11:27:10 -0700 Subject: [PATCH] [codex] Add turn profiling analytics (#26484) ## Summary Add flat profiling fields to `codex_turn_event` so analytics can explain where turn wall-clock time is spent without changing tool execution behavior. The profile reports: - time before the first sampling request - sampling time across all attempts and follow-ups - overhead between sampling requests - time blocked in the post-sampling tool drain - time after the final sampling request - sampling request and retry counts ## Implementation - Extend the existing turn timing state with constant-memory phase accounting and one RAII phase guard. - Observe sampling and the existing post-sampling drain only at turn orchestration boundaries. - Keep tool runtime, tool futures, response item handling, and turn lifecycle values unchanged. - Add the profiling fields directly to the existing analytics turn event without changing app-server protocol or rollout persistence. - Use the existing turn `status` to distinguish completed, failed, and interrupted profiles. Exact sampling/tool overlap is intentionally omitted because measuring tool completion accurately would require hooks in the tool execution path. ## Validation - Add app-server end-to-end coverage for a single-sampling turn with no blocking tool work. - Add app-server end-to-end coverage for `request_user_input` blocking followed by a second sampling request. - CI is running on the PR; tests were not executed locally per repository guidance. --- .../analytics/src/analytics_client_tests.rs | 40 ++++ codex-rs/analytics/src/client.rs | 7 + codex-rs/analytics/src/events.rs | 7 + codex-rs/analytics/src/facts.rs | 18 ++ codex-rs/analytics/src/lib.rs | 2 + codex-rs/analytics/src/reducer.rs | 151 +++++--------- .../app-server/tests/suite/v2/turn_start.rs | 152 +++++++++++++- codex-rs/core/src/session/turn.rs | 9 + codex-rs/core/src/tasks/mod.rs | 13 ++ codex-rs/core/src/turn_timing.rs | 191 ++++++++++++++++++ codex-rs/core/src/turn_timing_tests.rs | 41 ++++ 11 files changed, 530 insertions(+), 101 deletions(-) diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 7177952c3..f4cd18da1 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -63,6 +63,8 @@ use crate::facts::SubAgentThreadStartedInput; use crate::facts::ThreadInitializationMode; use crate::facts::TrackEventsContext; use crate::facts::TurnCodexErrorFact; +use crate::facts::TurnProfile; +use crate::facts::TurnProfileFact; use crate::facts::TurnResolvedConfigFact; use crate::facts::TurnStatus; use crate::facts::TurnSteerRequestError; @@ -396,6 +398,18 @@ fn sample_turn_resolved_config(thread_id: &str, turn_id: &str) -> TurnResolvedCo } } +fn sample_turn_profile() -> TurnProfile { + TurnProfile { + before_first_sampling_ms: 100, + sampling_ms: 700, + between_sampling_overhead_ms: 50, + tool_blocking_ms: 250, + after_last_sampling_ms: 134, + sampling_request_count: 2, + sampling_retry_count: 1, + } +} + fn sample_turn_steer_request( thread_id: &str, expected_turn_id: &str, @@ -649,6 +663,18 @@ async fn ingest_turn_prerequisites( ) .await; } + + reducer + .ingest( + AnalyticsFact::Custom(CustomAnalyticsFact::TurnProfile(Box::new( + TurnProfileFact { + turn_id: "turn-2".to_string(), + profile: sample_turn_profile(), + }, + ))), + out, + ) + .await; } async fn ingest_review_prerequisites( @@ -3300,6 +3326,13 @@ fn turn_event_serializes_expected_shape() { output_tokens: None, reasoning_output_tokens: None, total_tokens: None, + before_first_sampling_ms: 100, + sampling_ms: 700, + between_sampling_overhead_ms: 50, + tool_blocking_ms: 250, + after_last_sampling_ms: 134, + sampling_request_count: 2, + sampling_retry_count: 1, duration_ms: Some(1234), started_at: Some(455), completed_at: Some(456), @@ -3366,6 +3399,13 @@ fn turn_event_serializes_expected_shape() { "output_tokens": null, "reasoning_output_tokens": null, "total_tokens": null, + "before_first_sampling_ms": 100, + "sampling_ms": 700, + "between_sampling_overhead_ms": 50, + "tool_blocking_ms": 250, + "after_last_sampling_ms": 134, + "sampling_request_count": 2, + "sampling_retry_count": 1, "duration_ms": 1234, "started_at": 455, "completed_at": 456 diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index bd0726b28..b99a2ec86 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -19,6 +19,7 @@ use crate::facts::SkillInvokedInput; use crate::facts::SubAgentThreadStartedInput; use crate::facts::TrackEventsContext; use crate::facts::TurnCodexErrorFact; +use crate::facts::TurnProfileFact; use crate::facts::TurnResolvedConfigFact; use crate::facts::TurnTokenUsageFact; use crate::reducer::AnalyticsReducer; @@ -257,6 +258,12 @@ impl AnalyticsEventsClient { ))); } + pub fn track_turn_profile(&self, fact: TurnProfileFact) { + self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::TurnProfile( + Box::new(fact), + ))); + } + pub fn track_turn_codex_error(&self, fact: TurnCodexErrorFact) { self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::TurnCodexError( Box::new(fact), diff --git a/codex-rs/analytics/src/events.rs b/codex-rs/analytics/src/events.rs index bdc2c996d..f0fcddd27 100644 --- a/codex-rs/analytics/src/events.rs +++ b/codex-rs/analytics/src/events.rs @@ -817,6 +817,13 @@ pub(crate) struct CodexTurnEventParams { pub(crate) output_tokens: Option, pub(crate) reasoning_output_tokens: Option, pub(crate) total_tokens: Option, + pub(crate) before_first_sampling_ms: u64, + pub(crate) sampling_ms: u64, + pub(crate) between_sampling_overhead_ms: u64, + pub(crate) tool_blocking_ms: u64, + pub(crate) after_last_sampling_ms: u64, + pub(crate) sampling_request_count: u32, + pub(crate) sampling_retry_count: u32, pub(crate) duration_ms: Option, pub(crate) started_at: Option, pub(crate) completed_at: Option, diff --git a/codex-rs/analytics/src/facts.rs b/codex-rs/analytics/src/facts.rs index 1dff5de89..38af5ed8b 100644 --- a/codex-rs/analytics/src/facts.rs +++ b/codex-rs/analytics/src/facts.rs @@ -104,6 +104,23 @@ pub struct TurnTokenUsageFact { pub token_usage: TokenUsage, } +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub struct TurnProfile { + pub before_first_sampling_ms: u64, + pub sampling_ms: u64, + pub between_sampling_overhead_ms: u64, + pub tool_blocking_ms: u64, + pub after_last_sampling_ms: u64, + pub sampling_request_count: u32, + pub sampling_retry_count: u32, +} + +#[derive(Clone)] +pub struct TurnProfileFact { + pub turn_id: String, + pub profile: TurnProfile, +} + #[derive(Clone)] pub struct TurnCodexErrorFact { pub(crate) turn_id: String, @@ -476,6 +493,7 @@ pub(crate) enum CustomAnalyticsFact { GuardianReview(Box), TurnResolvedConfig(Box), TurnTokenUsage(Box), + TurnProfile(Box), TurnCodexError(Box), SkillInvoked(SkillInvokedInput), AppMentioned(AppMentionedInput), diff --git a/codex-rs/analytics/src/lib.rs b/codex-rs/analytics/src/lib.rs index c227f4daf..e2e16dfac 100644 --- a/codex-rs/analytics/src/lib.rs +++ b/codex-rs/analytics/src/lib.rs @@ -39,6 +39,8 @@ pub use facts::SubAgentThreadStartedInput; pub use facts::ThreadInitializationMode; pub use facts::TrackEventsContext; pub use facts::TurnCodexErrorFact; +pub use facts::TurnProfile; +pub use facts::TurnProfileFact; pub use facts::TurnResolvedConfigFact; pub use facts::TurnStatus; pub use facts::TurnSteerRejectionReason; diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index 877000928..66caaafe0 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -72,6 +72,8 @@ use crate::facts::SubAgentThreadStartedInput; use crate::facts::ThreadInitializationMode; use crate::facts::TurnCodexError; use crate::facts::TurnCodexErrorFact; +use crate::facts::TurnProfile; +use crate::facts::TurnProfileFact; use crate::facts::TurnResolvedConfigFact; use crate::facts::TurnStatus; use crate::facts::TurnSteerRejectionReason; @@ -316,6 +318,7 @@ struct CompletedTurnState { duration_ms: Option, } +#[derive(Default)] struct TurnState { connection_id: Option, thread_id: Option, @@ -323,6 +326,7 @@ struct TurnState { resolved_config: Option, started_at: Option, token_usage: Option, + profile: Option, completed: Option, codex_error: Option, latest_diff: Option, @@ -464,6 +468,9 @@ impl AnalyticsReducer { CustomAnalyticsFact::TurnTokenUsage(input) => { self.ingest_turn_token_usage(*input, out).await; } + CustomAnalyticsFact::TurnProfile(input) => { + self.ingest_turn_profile(*input, out).await; + } CustomAnalyticsFact::TurnCodexError(input) => { self.ingest_turn_codex_error(*input); } @@ -604,19 +611,7 @@ impl AnalyticsReducer { let turn_id = input.turn_id.clone(); let thread_id = input.thread_id.clone(); let num_input_images = input.num_input_images; - let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState { - connection_id: None, - thread_id: None, - num_input_images: None, - resolved_config: None, - started_at: None, - token_usage: None, - completed: None, - codex_error: None, - latest_diff: None, - steer_count: 0, - tool_counts: TurnToolCounts::default(), - }); + let turn_state = self.turns.entry(turn_id.clone()).or_default(); turn_state.thread_id = Some(thread_id); turn_state.num_input_images = Some(num_input_images); turn_state.resolved_config = Some(input); @@ -629,43 +624,30 @@ impl AnalyticsReducer { out: &mut Vec, ) { let turn_id = input.turn_id.clone(); - let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState { - connection_id: None, - thread_id: None, - num_input_images: None, - resolved_config: None, - started_at: None, - token_usage: None, - completed: None, - codex_error: None, - latest_diff: None, - steer_count: 0, - tool_counts: TurnToolCounts::default(), - }); + let turn_state = self.turns.entry(turn_id.clone()).or_default(); turn_state.thread_id = Some(input.thread_id); turn_state.token_usage = Some(input.token_usage); self.maybe_emit_turn_event(&turn_id, out).await; } + async fn ingest_turn_profile( + &mut self, + input: TurnProfileFact, + out: &mut Vec, + ) { + let TurnProfileFact { turn_id, profile } = input; + let turn_state = self.turns.entry(turn_id.clone()).or_default(); + turn_state.profile = Some(profile); + self.maybe_emit_turn_event(&turn_id, out).await; + } + fn ingest_turn_codex_error(&mut self, input: TurnCodexErrorFact) { let TurnCodexErrorFact { turn_id, thread_id, error, } = input; - let turn_state = self.turns.entry(turn_id).or_insert(TurnState { - connection_id: None, - thread_id: None, - num_input_images: None, - resolved_config: None, - started_at: None, - token_usage: None, - completed: None, - codex_error: None, - latest_diff: None, - steer_count: 0, - tool_counts: TurnToolCounts::default(), - }); + let turn_state = self.turns.entry(turn_id).or_default(); turn_state.thread_id.get_or_insert(thread_id); turn_state.codex_error = Some(error); } @@ -818,19 +800,7 @@ impl AnalyticsReducer { else { return; }; - let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState { - connection_id: None, - thread_id: None, - num_input_images: None, - resolved_config: None, - started_at: None, - token_usage: None, - completed: None, - codex_error: None, - latest_diff: None, - steer_count: 0, - tool_counts: TurnToolCounts::default(), - }); + let turn_state = self.turns.entry(turn_id.clone()).or_default(); turn_state.connection_id = Some(connection_id); turn_state.thread_id = Some(pending_request.thread_id); turn_state.num_input_images = Some(pending_request.num_input_images); @@ -1178,61 +1148,19 @@ impl AnalyticsReducer { self.ingest_guardian_review_completed(notification, out); } ServerNotification::TurnStarted(notification) => { - let turn_state = self.turns.entry(notification.turn.id).or_insert(TurnState { - connection_id: None, - thread_id: None, - num_input_images: None, - resolved_config: None, - started_at: None, - token_usage: None, - completed: None, - codex_error: None, - latest_diff: None, - steer_count: 0, - tool_counts: TurnToolCounts::default(), - }); + let turn_state = self.turns.entry(notification.turn.id).or_default(); turn_state.started_at = notification .turn .started_at .and_then(|started_at| u64::try_from(started_at).ok()); } ServerNotification::TurnDiffUpdated(notification) => { - let turn_state = - self.turns - .entry(notification.turn_id.clone()) - .or_insert(TurnState { - connection_id: None, - thread_id: None, - num_input_images: None, - resolved_config: None, - started_at: None, - token_usage: None, - completed: None, - codex_error: None, - latest_diff: None, - steer_count: 0, - tool_counts: TurnToolCounts::default(), - }); + let turn_state = self.turns.entry(notification.turn_id.clone()).or_default(); turn_state.thread_id = Some(notification.thread_id); turn_state.latest_diff = Some(notification.diff); } ServerNotification::TurnCompleted(notification) => { - let turn_state = - self.turns - .entry(notification.turn.id.clone()) - .or_insert(TurnState { - connection_id: None, - thread_id: None, - num_input_images: None, - resolved_config: None, - started_at: None, - token_usage: None, - completed: None, - codex_error: None, - latest_diff: None, - steer_count: 0, - tool_counts: TurnToolCounts::default(), - }); + let turn_state = self.turns.entry(notification.turn.id.clone()).or_default(); turn_state.completed = Some(CompletedTurnState { status: analytics_turn_status(notification.turn.status), turn_error: notification @@ -1511,6 +1439,7 @@ impl AnalyticsReducer { if turn_state.thread_id.is_none() || turn_state.num_input_images.is_none() || turn_state.resolved_config.is_none() + || turn_state.profile.is_none() || turn_state.completed.is_none() { return; @@ -2457,12 +2386,20 @@ fn codex_turn_event_params( turn_state: &TurnState, thread_metadata: &ThreadMetadataState, ) -> CodexTurnEventParams { - let (Some(thread_id), Some(num_input_images), Some(resolved_config), Some(completed)) = ( + let ( + Some(thread_id), + Some(num_input_images), + Some(resolved_config), + Some(profile), + Some(completed), + ) = ( turn_state.thread_id.clone(), turn_state.num_input_images, turn_state.resolved_config.clone(), + turn_state.profile.clone(), turn_state.completed.clone(), - ) else { + ) + else { unreachable!("turn event params require a fully populated turn state"); }; let started_at = turn_state.started_at; @@ -2488,6 +2425,15 @@ fn codex_turn_event_params( workspace_kind, is_first_turn, } = resolved_config; + let TurnProfile { + before_first_sampling_ms, + sampling_ms, + between_sampling_overhead_ms, + tool_blocking_ms, + after_last_sampling_ms, + sampling_request_count, + sampling_retry_count, + } = profile; let token_usage = turn_state.token_usage.clone(); let codex_error = turn_state.codex_error.as_ref(); CodexTurnEventParams { @@ -2550,6 +2496,13 @@ fn codex_turn_event_params( total_tokens: token_usage .as_ref() .map(|token_usage| token_usage.total_tokens), + before_first_sampling_ms, + sampling_ms, + between_sampling_overhead_ms, + tool_blocking_ms, + after_last_sampling_ms, + sampling_request_count, + sampling_retry_count, duration_ms: completed.duration_ms, started_at, completed_at: Some(completed.completed_at), diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index 9f6a027a7..b4da5cdde 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -9,6 +9,7 @@ use app_test_support::create_final_assistant_message_sse_response; use app_test_support::create_mock_responses_server_repeating_assistant; use app_test_support::create_mock_responses_server_sequence; use app_test_support::create_mock_responses_server_sequence_unchecked; +use app_test_support::create_request_user_input_sse_response; use app_test_support::create_shell_command_sse_response; use app_test_support::format_with_current_shell_display; use app_test_support::to_response; @@ -78,6 +79,7 @@ use std::collections::HashMap; use std::path::Path; use tempfile::TempDir; use tokio::time::timeout; +use wiremock::ResponseTemplate; use super::analytics::mount_analytics_capture; use super::analytics::wait_for_analytics_event; @@ -819,8 +821,20 @@ async fn thread_start_omits_empty_instruction_overrides_from_model_request() -> #[tokio::test] async fn turn_start_tracks_turn_event_analytics() -> Result<()> { - let responses = vec![create_final_assistant_message_sse_response("Done")?]; - let server = create_mock_responses_server_sequence_unchecked(responses).await; + let server = responses::start_mock_server().await; + let response_mock = responses::mount_response_sequence( + &server, + vec![ + ResponseTemplate::new(500).set_body_json(json!({ + "error": { + "type": "server_error", + "message": "synthetic retryable error" + } + })), + responses::sse_response(create_final_assistant_message_sse_response("Done")?), + ], + ) + .await; let codex_home = TempDir::new()?; write_mock_responses_config_toml_with_chatgpt_base_url( @@ -828,6 +842,10 @@ async fn turn_start_tracks_turn_event_analytics() -> Result<()> { &server.uri(), &server.uri(), )?; + let config_path = codex_home.path().join("config.toml"); + let config = std::fs::read_to_string(&config_path)? + .replace("stream_max_retries = 0", "stream_max_retries = 1"); + std::fs::write(config_path, config)?; mount_analytics_capture(&server, codex_home.path()).await?; let mut mcp = TestAppServer::new_without_managed_config(codex_home.path()).await?; @@ -908,6 +926,136 @@ async fn turn_start_tracks_turn_event_analytics() -> Result<()> { assert_eq!(event["event_params"]["output_tokens"], 0); assert_eq!(event["event_params"]["reasoning_output_tokens"], 0); assert_eq!(event["event_params"]["total_tokens"], 0); + let params = &event["event_params"]; + let timings_are_numbers = [ + "before_first_sampling_ms", + "sampling_ms", + "between_sampling_overhead_ms", + "tool_blocking_ms", + "after_last_sampling_ms", + ] + .into_iter() + .all(|field| params[field].as_u64().is_some()); + assert_eq!( + json!({ + "timingsAreNumbers": timings_are_numbers, + "toolBlockingMs": params["tool_blocking_ms"], + "samplingRequestCount": params["sampling_request_count"], + "samplingRetryCount": params["sampling_retry_count"], + "responseRequestCount": response_mock.requests().len(), + }), + json!({ + "timingsAreNumbers": true, + "toolBlockingMs": 0, + "samplingRequestCount": 2, + "samplingRetryCount": 1, + "responseRequestCount": 2, + }) + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn turn_profile_tracks_blocking_tool_and_follow_up_sampling() -> Result<()> { + let responses = vec![ + create_request_user_input_sse_response("call1")?, + create_final_assistant_message_sse_response("Done")?, + ]; + let server = create_mock_responses_server_sequence(responses).await; + + let codex_home = TempDir::new()?; + write_mock_responses_config_toml_with_chatgpt_base_url( + codex_home.path(), + &server.uri(), + &server.uri(), + )?; + mount_analytics_capture(&server, codex_home.path()).await?; + + let mut mcp = TestAppServer::new_without_managed_config(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + client_user_message_id: None, + input: vec![V2UserInput::Text { + text: "ask something".to_string(), + text_elements: Vec::new(), + }], + collaboration_mode: Some(CollaborationMode { + mode: ModeKind::Plan, + settings: Settings { + model: "mock-model".to_string(), + reasoning_effort: Some(ReasoningEffort::Medium), + developer_instructions: None, + }, + }), + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + + let server_req = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_request_message(), + ) + .await??; + let ServerRequest::ToolRequestUserInput { request_id, .. } = server_req else { + panic!("expected ToolRequestUserInput request, got: {server_req:?}"); + }; + tokio::time::sleep(std::time::Duration::from_millis(25)).await; + mcp.send_response( + request_id, + json!({ + "answers": { + "confirm_path": { "answers": ["yes"] } + } + }), + ) + .await?; + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let event = wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_event").await?; + let params = &event["event_params"]; + assert_eq!( + json!({ + "toolBlockingIsPositive": params["tool_blocking_ms"] + .as_u64() + .is_some_and(|duration| duration > 0), + "samplingRequestCount": params["sampling_request_count"], + "samplingRetryCount": params["sampling_retry_count"], + "status": params["status"], + }), + json!({ + "toolBlockingIsPositive": true, + "samplingRequestCount": 2, + "samplingRetryCount": 0, + "status": "completed", + }) + ); Ok(()) } diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 5853b78c7..76303345a 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1088,6 +1088,7 @@ async fn run_sampling_request( ResponsesStreamRequest::Sampling, ) .await?; + turn_context.turn_timing_state.record_sampling_retry(); } } @@ -1800,6 +1801,7 @@ async fn try_run_sampling_request( turn_context.model_info.slug.as_str(), turn_context.provider.info().name.as_str(), ); + let sampling_timing_guard = turn_context.turn_timing_state.begin_sampling(); let mut stream = client_session .stream( prompt, @@ -2213,6 +2215,7 @@ async fn try_run_sampling_request( } } }; + drop(sampling_timing_guard); flush_assistant_text_segments_all( &sess, @@ -2222,7 +2225,13 @@ async fn try_run_sampling_request( ) .await; + let tool_blocking_timing_guard = if in_flight.is_empty() { + None + } else { + Some(turn_context.turn_timing_state.begin_tool_blocking()) + }; drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?; + drop(tool_blocking_timing_guard); if should_emit_token_count { // A tool call such as request_user_input can intentionally pause the turn. Emit token diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index ef2ba0f16..70466c9c2 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -33,6 +33,7 @@ use crate::session::turn_context::TurnContext; use crate::state::ActiveTurn; use crate::state::RunningTask; use crate::state::TaskKind; +use codex_analytics::TurnProfileFact; use codex_analytics::TurnTokenUsageFact; use codex_login::AuthManager; use codex_models_manager::manager::SharedModelsManager; @@ -751,6 +752,12 @@ impl Session { .turn_timing_state .time_to_first_token_ms() .await; + self.services + .analytics_events_client + .track_turn_profile(TurnProfileFact { + turn_id: turn_context.sub_id.clone(), + profile: turn_context.turn_timing_state.complete_profile(), + }); self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref()) .await; if let Err(err) = self @@ -868,6 +875,12 @@ impl Session { .turn_timing_state .completed_at_and_duration_ms() .await; + self.services + .analytics_events_client + .track_turn_profile(TurnProfileFact { + turn_id: task.turn_context.sub_id.clone(), + profile: task.turn_context.turn_timing_state.complete_profile(), + }); let event = EventMsg::TurnAborted(TurnAbortedEvent { turn_id: Some(task.turn_context.sub_id.clone()), reason, diff --git a/codex-rs/core/src/turn_timing.rs b/codex-rs/core/src/turn_timing.rs index 445fe1621..380e246fa 100644 --- a/codex-rs/core/src/turn_timing.rs +++ b/codex-rs/core/src/turn_timing.rs @@ -1,8 +1,11 @@ +use std::sync::Arc; +use std::sync::Mutex as StdMutex; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; use std::time::UNIX_EPOCH; +use codex_analytics::TurnProfile; use codex_otel::TURN_TTFM_DURATION_METRIC; use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; @@ -39,6 +42,7 @@ pub(crate) async fn record_turn_ttfm_metric(turn_context: &TurnContext, item: &T #[derive(Debug, Default)] pub(crate) struct TurnTimingState { state: Mutex, + profile: StdMutex, } #[derive(Debug, Default)] @@ -49,6 +53,35 @@ struct TurnTimingStateInner { first_message_at: Option, } +#[derive(Debug, Default)] +struct TurnProfileState { + started_at: Option, + last_transition_at: Option, + active_phase: Option, + seen_sampling: bool, + before_first_sampling: Duration, + sampling: Duration, + between_sampling_overhead: Duration, + tool_blocking: Duration, + pending_idle_after_sampling: Duration, + sampling_request_count: u32, + sampling_retry_count: u32, + completed_profile: Option, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum TurnProfilePhase { + Sampling, + ToolBlocking, +} + +#[must_use] +pub(crate) struct TurnProfileTimingGuard { + timing: Arc, + phase: TurnProfilePhase, + active: bool, +} + impl TurnTimingState { pub(crate) async fn mark_turn_started(&self, started_at: Instant) -> i64 { let started_at_unix_ms = now_unix_timestamp_ms(); @@ -57,6 +90,7 @@ impl TurnTimingState { state.started_at_unix_secs = Some(started_at_unix_ms / 1000); state.first_token_at = None; state.first_message_at = None; + self.profile_state().start(started_at); started_at_unix_ms } @@ -80,6 +114,32 @@ impl TurnTimingState { .map(|duration| i64::try_from(duration.as_millis()).unwrap_or(i64::MAX)) } + pub(crate) fn complete_profile(&self) -> TurnProfile { + self.profile_state().complete(Instant::now()) + } + + pub(crate) fn begin_sampling(self: &Arc) -> TurnProfileTimingGuard { + let active = self.profile_state().begin_sampling(Instant::now()); + TurnProfileTimingGuard { + timing: Arc::clone(self), + phase: TurnProfilePhase::Sampling, + active, + } + } + + pub(crate) fn record_sampling_retry(&self) { + self.profile_state().record_sampling_retry(); + } + + pub(crate) fn begin_tool_blocking(self: &Arc) -> TurnProfileTimingGuard { + let active = self.profile_state().begin_tool_blocking(Instant::now()); + TurnProfileTimingGuard { + timing: Arc::clone(self), + phase: TurnProfilePhase::ToolBlocking, + active, + } + } + pub(crate) async fn record_ttft_for_response_event( &self, event: &ResponseEvent, @@ -98,6 +158,22 @@ impl TurnTimingState { let mut state = self.state.lock().await; state.record_turn_ttfm() } + + fn profile_state(&self) -> std::sync::MutexGuard<'_, TurnProfileState> { + self.profile + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + } +} + +impl Drop for TurnProfileTimingGuard { + fn drop(&mut self) { + if self.active { + self.timing + .profile_state() + .end_phase(Instant::now(), self.phase); + } + } } fn now_unix_timestamp_secs() -> i64 { @@ -111,6 +187,121 @@ pub(crate) fn now_unix_timestamp_ms() -> i64 { i64::try_from(duration.as_millis()).unwrap_or(i64::MAX) } +fn duration_to_u64_ms(duration: Duration) -> u64 { + u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) +} + +impl TurnProfileState { + fn start(&mut self, started_at: Instant) { + *self = Self { + started_at: Some(started_at), + last_transition_at: Some(started_at), + ..Self::default() + }; + } + + fn begin_sampling(&mut self, now: Instant) -> bool { + if self.completed_profile.is_some() + || self.started_at.is_none() + || self.active_phase.is_some() + { + return false; + } + self.advance(now); + if self.seen_sampling { + self.between_sampling_overhead += std::mem::take(&mut self.pending_idle_after_sampling); + } + self.seen_sampling = true; + self.active_phase = Some(TurnProfilePhase::Sampling); + self.sampling_request_count = self.sampling_request_count.saturating_add(1); + true + } + + fn record_sampling_retry(&mut self) { + if self.completed_profile.is_none() && self.started_at.is_some() { + self.sampling_retry_count = self.sampling_retry_count.saturating_add(1); + } + } + + fn begin_tool_blocking(&mut self, now: Instant) -> bool { + if self.completed_profile.is_some() + || self.started_at.is_none() + || self.active_phase.is_some() + { + return false; + } + self.advance(now); + self.active_phase = Some(TurnProfilePhase::ToolBlocking); + true + } + + fn end_phase(&mut self, now: Instant, phase: TurnProfilePhase) { + if self.completed_profile.is_some() || self.active_phase != Some(phase) { + return; + } + self.advance(now); + self.active_phase = None; + } + + fn advance(&mut self, now: Instant) { + let Some(previous) = self.last_transition_at.replace(now) else { + return; + }; + let elapsed = now.saturating_duration_since(previous); + match self.active_phase { + Some(TurnProfilePhase::Sampling) => self.sampling += elapsed, + Some(TurnProfilePhase::ToolBlocking) => self.tool_blocking += elapsed, + None if self.seen_sampling => self.pending_idle_after_sampling += elapsed, + None => self.before_first_sampling += elapsed, + } + } + + fn complete(&mut self, now: Instant) -> TurnProfile { + if let Some(profile) = self.completed_profile.as_ref() { + return profile.clone(); + } + + let final_phase = self.active_phase; + self.advance(now); + let after_last_sampling = if self.seen_sampling { + std::mem::take(&mut self.pending_idle_after_sampling) + } else { + Duration::ZERO + }; + + let mut profile = TurnProfile { + before_first_sampling_ms: duration_to_u64_ms(self.before_first_sampling), + sampling_ms: duration_to_u64_ms(self.sampling), + between_sampling_overhead_ms: duration_to_u64_ms(self.between_sampling_overhead), + tool_blocking_ms: duration_to_u64_ms(self.tool_blocking), + after_last_sampling_ms: duration_to_u64_ms(after_last_sampling), + sampling_request_count: self.sampling_request_count, + sampling_retry_count: self.sampling_retry_count, + }; + let total_ms = self + .started_at + .map(|started_at| duration_to_u64_ms(now.saturating_duration_since(started_at))) + .unwrap_or_default(); + let classified_ms = profile + .before_first_sampling_ms + .saturating_add(profile.sampling_ms) + .saturating_add(profile.between_sampling_overhead_ms) + .saturating_add(profile.tool_blocking_ms) + .saturating_add(profile.after_last_sampling_ms); + let rounding_ms = total_ms.saturating_sub(classified_ms); + match final_phase { + Some(TurnProfilePhase::Sampling) => profile.sampling_ms += rounding_ms, + Some(TurnProfilePhase::ToolBlocking) => profile.tool_blocking_ms += rounding_ms, + None if self.seen_sampling => profile.after_last_sampling_ms += rounding_ms, + None => profile.before_first_sampling_ms += rounding_ms, + } + + self.active_phase = None; + self.completed_profile = Some(profile.clone()); + profile + } +} + impl TurnTimingStateInner { fn time_to_first_token(&self) -> Option { Some(self.first_token_at?.duration_since(self.started_at?)) diff --git a/codex-rs/core/src/turn_timing_tests.rs b/codex-rs/core/src/turn_timing_tests.rs index a6675aea8..7146f537b 100644 --- a/codex-rs/core/src/turn_timing_tests.rs +++ b/codex-rs/core/src/turn_timing_tests.rs @@ -1,13 +1,17 @@ +use codex_analytics::TurnProfile; use codex_protocol::items::AgentMessageItem; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseItem; use pretty_assertions::assert_eq; +use std::time::Duration; use std::time::Instant; use std::time::SystemTime; use std::time::UNIX_EPOCH; +use super::TurnProfilePhase; +use super::TurnProfileState; use super::TurnTimingState; use super::response_item_records_turn_ttft; use crate::ResponseEvent; @@ -146,3 +150,40 @@ fn response_item_records_turn_ttft_ignores_empty_non_output_items() { } )); } + +#[test] +fn turn_profile_breaks_down_sampling_blocking_and_retry_overhead() { + let started_at = Instant::now(); + let mut state = TurnProfileState::default(); + state.start(started_at); + + let _ = state.begin_sampling(started_at + Duration::from_millis(100)); + state.end_phase( + started_at + Duration::from_millis(600), + TurnProfilePhase::Sampling, + ); + let _ = state.begin_tool_blocking(started_at + Duration::from_millis(600)); + state.end_phase( + started_at + Duration::from_millis(900), + TurnProfilePhase::ToolBlocking, + ); + state.record_sampling_retry(); + let _ = state.begin_sampling(started_at + Duration::from_millis(1_000)); + state.end_phase( + started_at + Duration::from_millis(1_200), + TurnProfilePhase::Sampling, + ); + + assert_eq!( + state.complete(started_at + Duration::from_millis(1_300)), + TurnProfile { + before_first_sampling_ms: 100, + sampling_ms: 700, + between_sampling_overhead_ms: 100, + tool_blocking_ms: 300, + after_last_sampling_ms: 100, + sampling_request_count: 2, + sampling_retry_count: 1, + } + ); +}