[codex] Add turn profiling analytics (#26484)

## Summary

Add flat profiling fields to `codex_turn_event` so analytics can explain
where turn wall-clock time is spent without changing tool execution
behavior.

The profile reports:
- time before the first sampling request
- sampling time across all attempts and follow-ups
- overhead between sampling requests
- time blocked in the post-sampling tool drain
- time after the final sampling request
- sampling request and retry counts

## Implementation

- Extend the existing turn timing state with constant-memory phase
accounting and one RAII phase guard.
- Observe sampling and the existing post-sampling drain only at turn
orchestration boundaries.
- Keep tool runtime, tool futures, response item handling, and turn
lifecycle values unchanged.
- Add the profiling fields directly to the existing analytics turn event
without changing app-server protocol or rollout persistence.
- Use the existing turn `status` to distinguish completed, failed, and
interrupted profiles.

Exact sampling/tool overlap is intentionally omitted because measuring
tool completion accurately would require hooks in the tool execution
path.

## Validation

- Add app-server end-to-end coverage for a single-sampling turn with no
blocking tool work.
- Add app-server end-to-end coverage for `request_user_input` blocking
followed by a second sampling request.
- CI is running on the PR; tests were not executed locally per
repository guidance.
This commit is contained in:
Ahmed Ibrahim
2026-06-05 11:27:10 -07:00
committed by GitHub
Unverified
parent 82b15b65e2
commit 8d72fb6de9
11 changed files with 530 additions and 101 deletions
@@ -63,6 +63,8 @@ use crate::facts::SubAgentThreadStartedInput;
use crate::facts::ThreadInitializationMode;
use crate::facts::TrackEventsContext;
use crate::facts::TurnCodexErrorFact;
use crate::facts::TurnProfile;
use crate::facts::TurnProfileFact;
use crate::facts::TurnResolvedConfigFact;
use crate::facts::TurnStatus;
use crate::facts::TurnSteerRequestError;
@@ -396,6 +398,18 @@ fn sample_turn_resolved_config(thread_id: &str, turn_id: &str) -> TurnResolvedCo
}
}
fn sample_turn_profile() -> TurnProfile {
TurnProfile {
before_first_sampling_ms: 100,
sampling_ms: 700,
between_sampling_overhead_ms: 50,
tool_blocking_ms: 250,
after_last_sampling_ms: 134,
sampling_request_count: 2,
sampling_retry_count: 1,
}
}
fn sample_turn_steer_request(
thread_id: &str,
expected_turn_id: &str,
@@ -649,6 +663,18 @@ async fn ingest_turn_prerequisites(
)
.await;
}
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::TurnProfile(Box::new(
TurnProfileFact {
turn_id: "turn-2".to_string(),
profile: sample_turn_profile(),
},
))),
out,
)
.await;
}
async fn ingest_review_prerequisites(
@@ -3300,6 +3326,13 @@ fn turn_event_serializes_expected_shape() {
output_tokens: None,
reasoning_output_tokens: None,
total_tokens: None,
before_first_sampling_ms: 100,
sampling_ms: 700,
between_sampling_overhead_ms: 50,
tool_blocking_ms: 250,
after_last_sampling_ms: 134,
sampling_request_count: 2,
sampling_retry_count: 1,
duration_ms: Some(1234),
started_at: Some(455),
completed_at: Some(456),
@@ -3366,6 +3399,13 @@ fn turn_event_serializes_expected_shape() {
"output_tokens": null,
"reasoning_output_tokens": null,
"total_tokens": null,
"before_first_sampling_ms": 100,
"sampling_ms": 700,
"between_sampling_overhead_ms": 50,
"tool_blocking_ms": 250,
"after_last_sampling_ms": 134,
"sampling_request_count": 2,
"sampling_retry_count": 1,
"duration_ms": 1234,
"started_at": 455,
"completed_at": 456
+7
View File
@@ -19,6 +19,7 @@ use crate::facts::SkillInvokedInput;
use crate::facts::SubAgentThreadStartedInput;
use crate::facts::TrackEventsContext;
use crate::facts::TurnCodexErrorFact;
use crate::facts::TurnProfileFact;
use crate::facts::TurnResolvedConfigFact;
use crate::facts::TurnTokenUsageFact;
use crate::reducer::AnalyticsReducer;
@@ -257,6 +258,12 @@ impl AnalyticsEventsClient {
)));
}
pub fn track_turn_profile(&self, fact: TurnProfileFact) {
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::TurnProfile(
Box::new(fact),
)));
}
pub fn track_turn_codex_error(&self, fact: TurnCodexErrorFact) {
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::TurnCodexError(
Box::new(fact),
+7
View File
@@ -817,6 +817,13 @@ pub(crate) struct CodexTurnEventParams {
pub(crate) output_tokens: Option<i64>,
pub(crate) reasoning_output_tokens: Option<i64>,
pub(crate) total_tokens: Option<i64>,
pub(crate) before_first_sampling_ms: u64,
pub(crate) sampling_ms: u64,
pub(crate) between_sampling_overhead_ms: u64,
pub(crate) tool_blocking_ms: u64,
pub(crate) after_last_sampling_ms: u64,
pub(crate) sampling_request_count: u32,
pub(crate) sampling_retry_count: u32,
pub(crate) duration_ms: Option<u64>,
pub(crate) started_at: Option<u64>,
pub(crate) completed_at: Option<u64>,
+18
View File
@@ -104,6 +104,23 @@ pub struct TurnTokenUsageFact {
pub token_usage: TokenUsage,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct TurnProfile {
pub before_first_sampling_ms: u64,
pub sampling_ms: u64,
pub between_sampling_overhead_ms: u64,
pub tool_blocking_ms: u64,
pub after_last_sampling_ms: u64,
pub sampling_request_count: u32,
pub sampling_retry_count: u32,
}
#[derive(Clone)]
pub struct TurnProfileFact {
pub turn_id: String,
pub profile: TurnProfile,
}
#[derive(Clone)]
pub struct TurnCodexErrorFact {
pub(crate) turn_id: String,
@@ -476,6 +493,7 @@ pub(crate) enum CustomAnalyticsFact {
GuardianReview(Box<GuardianReviewEventParams>),
TurnResolvedConfig(Box<TurnResolvedConfigFact>),
TurnTokenUsage(Box<TurnTokenUsageFact>),
TurnProfile(Box<TurnProfileFact>),
TurnCodexError(Box<TurnCodexErrorFact>),
SkillInvoked(SkillInvokedInput),
AppMentioned(AppMentionedInput),
+2
View File
@@ -39,6 +39,8 @@ pub use facts::SubAgentThreadStartedInput;
pub use facts::ThreadInitializationMode;
pub use facts::TrackEventsContext;
pub use facts::TurnCodexErrorFact;
pub use facts::TurnProfile;
pub use facts::TurnProfileFact;
pub use facts::TurnResolvedConfigFact;
pub use facts::TurnStatus;
pub use facts::TurnSteerRejectionReason;
+52 -99
View File
@@ -72,6 +72,8 @@ use crate::facts::SubAgentThreadStartedInput;
use crate::facts::ThreadInitializationMode;
use crate::facts::TurnCodexError;
use crate::facts::TurnCodexErrorFact;
use crate::facts::TurnProfile;
use crate::facts::TurnProfileFact;
use crate::facts::TurnResolvedConfigFact;
use crate::facts::TurnStatus;
use crate::facts::TurnSteerRejectionReason;
@@ -316,6 +318,7 @@ struct CompletedTurnState {
duration_ms: Option<u64>,
}
#[derive(Default)]
struct TurnState {
connection_id: Option<u64>,
thread_id: Option<String>,
@@ -323,6 +326,7 @@ struct TurnState {
resolved_config: Option<TurnResolvedConfigFact>,
started_at: Option<u64>,
token_usage: Option<TokenUsage>,
profile: Option<TurnProfile>,
completed: Option<CompletedTurnState>,
codex_error: Option<TurnCodexError>,
latest_diff: Option<String>,
@@ -464,6 +468,9 @@ impl AnalyticsReducer {
CustomAnalyticsFact::TurnTokenUsage(input) => {
self.ingest_turn_token_usage(*input, out).await;
}
CustomAnalyticsFact::TurnProfile(input) => {
self.ingest_turn_profile(*input, out).await;
}
CustomAnalyticsFact::TurnCodexError(input) => {
self.ingest_turn_codex_error(*input);
}
@@ -604,19 +611,7 @@ impl AnalyticsReducer {
let turn_id = input.turn_id.clone();
let thread_id = input.thread_id.clone();
let num_input_images = input.num_input_images;
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
codex_error: None,
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
});
let turn_state = self.turns.entry(turn_id.clone()).or_default();
turn_state.thread_id = Some(thread_id);
turn_state.num_input_images = Some(num_input_images);
turn_state.resolved_config = Some(input);
@@ -629,43 +624,30 @@ impl AnalyticsReducer {
out: &mut Vec<TrackEventRequest>,
) {
let turn_id = input.turn_id.clone();
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
codex_error: None,
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
});
let turn_state = self.turns.entry(turn_id.clone()).or_default();
turn_state.thread_id = Some(input.thread_id);
turn_state.token_usage = Some(input.token_usage);
self.maybe_emit_turn_event(&turn_id, out).await;
}
async fn ingest_turn_profile(
&mut self,
input: TurnProfileFact,
out: &mut Vec<TrackEventRequest>,
) {
let TurnProfileFact { turn_id, profile } = input;
let turn_state = self.turns.entry(turn_id.clone()).or_default();
turn_state.profile = Some(profile);
self.maybe_emit_turn_event(&turn_id, out).await;
}
fn ingest_turn_codex_error(&mut self, input: TurnCodexErrorFact) {
let TurnCodexErrorFact {
turn_id,
thread_id,
error,
} = input;
let turn_state = self.turns.entry(turn_id).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
codex_error: None,
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
});
let turn_state = self.turns.entry(turn_id).or_default();
turn_state.thread_id.get_or_insert(thread_id);
turn_state.codex_error = Some(error);
}
@@ -818,19 +800,7 @@ impl AnalyticsReducer {
else {
return;
};
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
codex_error: None,
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
});
let turn_state = self.turns.entry(turn_id.clone()).or_default();
turn_state.connection_id = Some(connection_id);
turn_state.thread_id = Some(pending_request.thread_id);
turn_state.num_input_images = Some(pending_request.num_input_images);
@@ -1178,61 +1148,19 @@ impl AnalyticsReducer {
self.ingest_guardian_review_completed(notification, out);
}
ServerNotification::TurnStarted(notification) => {
let turn_state = self.turns.entry(notification.turn.id).or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
codex_error: None,
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
});
let turn_state = self.turns.entry(notification.turn.id).or_default();
turn_state.started_at = notification
.turn
.started_at
.and_then(|started_at| u64::try_from(started_at).ok());
}
ServerNotification::TurnDiffUpdated(notification) => {
let turn_state =
self.turns
.entry(notification.turn_id.clone())
.or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
codex_error: None,
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
});
let turn_state = self.turns.entry(notification.turn_id.clone()).or_default();
turn_state.thread_id = Some(notification.thread_id);
turn_state.latest_diff = Some(notification.diff);
}
ServerNotification::TurnCompleted(notification) => {
let turn_state =
self.turns
.entry(notification.turn.id.clone())
.or_insert(TurnState {
connection_id: None,
thread_id: None,
num_input_images: None,
resolved_config: None,
started_at: None,
token_usage: None,
completed: None,
codex_error: None,
latest_diff: None,
steer_count: 0,
tool_counts: TurnToolCounts::default(),
});
let turn_state = self.turns.entry(notification.turn.id.clone()).or_default();
turn_state.completed = Some(CompletedTurnState {
status: analytics_turn_status(notification.turn.status),
turn_error: notification
@@ -1511,6 +1439,7 @@ impl AnalyticsReducer {
if turn_state.thread_id.is_none()
|| turn_state.num_input_images.is_none()
|| turn_state.resolved_config.is_none()
|| turn_state.profile.is_none()
|| turn_state.completed.is_none()
{
return;
@@ -2457,12 +2386,20 @@ fn codex_turn_event_params(
turn_state: &TurnState,
thread_metadata: &ThreadMetadataState,
) -> CodexTurnEventParams {
let (Some(thread_id), Some(num_input_images), Some(resolved_config), Some(completed)) = (
let (
Some(thread_id),
Some(num_input_images),
Some(resolved_config),
Some(profile),
Some(completed),
) = (
turn_state.thread_id.clone(),
turn_state.num_input_images,
turn_state.resolved_config.clone(),
turn_state.profile.clone(),
turn_state.completed.clone(),
) else {
)
else {
unreachable!("turn event params require a fully populated turn state");
};
let started_at = turn_state.started_at;
@@ -2488,6 +2425,15 @@ fn codex_turn_event_params(
workspace_kind,
is_first_turn,
} = resolved_config;
let TurnProfile {
before_first_sampling_ms,
sampling_ms,
between_sampling_overhead_ms,
tool_blocking_ms,
after_last_sampling_ms,
sampling_request_count,
sampling_retry_count,
} = profile;
let token_usage = turn_state.token_usage.clone();
let codex_error = turn_state.codex_error.as_ref();
CodexTurnEventParams {
@@ -2550,6 +2496,13 @@ fn codex_turn_event_params(
total_tokens: token_usage
.as_ref()
.map(|token_usage| token_usage.total_tokens),
before_first_sampling_ms,
sampling_ms,
between_sampling_overhead_ms,
tool_blocking_ms,
after_last_sampling_ms,
sampling_request_count,
sampling_retry_count,
duration_ms: completed.duration_ms,
started_at,
completed_at: Some(completed.completed_at),
@@ -9,6 +9,7 @@ use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_request_user_input_sse_response;
use app_test_support::create_shell_command_sse_response;
use app_test_support::format_with_current_shell_display;
use app_test_support::to_response;
@@ -78,6 +79,7 @@ use std::collections::HashMap;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
use wiremock::ResponseTemplate;
use super::analytics::mount_analytics_capture;
use super::analytics::wait_for_analytics_event;
@@ -819,8 +821,20 @@ async fn thread_start_omits_empty_instruction_overrides_from_model_request() ->
#[tokio::test]
async fn turn_start_tracks_turn_event_analytics() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let server = responses::start_mock_server().await;
let response_mock = responses::mount_response_sequence(
&server,
vec![
ResponseTemplate::new(500).set_body_json(json!({
"error": {
"type": "server_error",
"message": "synthetic retryable error"
}
})),
responses::sse_response(create_final_assistant_message_sse_response("Done")?),
],
)
.await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml_with_chatgpt_base_url(
@@ -828,6 +842,10 @@ async fn turn_start_tracks_turn_event_analytics() -> Result<()> {
&server.uri(),
&server.uri(),
)?;
let config_path = codex_home.path().join("config.toml");
let config = std::fs::read_to_string(&config_path)?
.replace("stream_max_retries = 0", "stream_max_retries = 1");
std::fs::write(config_path, config)?;
mount_analytics_capture(&server, codex_home.path()).await?;
let mut mcp = TestAppServer::new_without_managed_config(codex_home.path()).await?;
@@ -908,6 +926,136 @@ async fn turn_start_tracks_turn_event_analytics() -> Result<()> {
assert_eq!(event["event_params"]["output_tokens"], 0);
assert_eq!(event["event_params"]["reasoning_output_tokens"], 0);
assert_eq!(event["event_params"]["total_tokens"], 0);
let params = &event["event_params"];
let timings_are_numbers = [
"before_first_sampling_ms",
"sampling_ms",
"between_sampling_overhead_ms",
"tool_blocking_ms",
"after_last_sampling_ms",
]
.into_iter()
.all(|field| params[field].as_u64().is_some());
assert_eq!(
json!({
"timingsAreNumbers": timings_are_numbers,
"toolBlockingMs": params["tool_blocking_ms"],
"samplingRequestCount": params["sampling_request_count"],
"samplingRetryCount": params["sampling_retry_count"],
"responseRequestCount": response_mock.requests().len(),
}),
json!({
"timingsAreNumbers": true,
"toolBlockingMs": 0,
"samplingRequestCount": 2,
"samplingRetryCount": 1,
"responseRequestCount": 2,
})
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn turn_profile_tracks_blocking_tool_and_follow_up_sampling() -> Result<()> {
let responses = vec![
create_request_user_input_sse_response("call1")?,
create_final_assistant_message_sse_response("Done")?,
];
let server = create_mock_responses_server_sequence(responses).await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml_with_chatgpt_base_url(
codex_home.path(),
&server.uri(),
&server.uri(),
)?;
mount_analytics_capture(&server, codex_home.path()).await?;
let mut mcp = TestAppServer::new_without_managed_config(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
client_user_message_id: None,
input: vec![V2UserInput::Text {
text: "ask something".to_string(),
text_elements: Vec::new(),
}],
collaboration_mode: Some(CollaborationMode {
mode: ModeKind::Plan,
settings: Settings {
model: "mock-model".to_string(),
reasoning_effort: Some(ReasoningEffort::Medium),
developer_instructions: None,
},
}),
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let server_req = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_request_message(),
)
.await??;
let ServerRequest::ToolRequestUserInput { request_id, .. } = server_req else {
panic!("expected ToolRequestUserInput request, got: {server_req:?}");
};
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
mcp.send_response(
request_id,
json!({
"answers": {
"confirm_path": { "answers": ["yes"] }
}
}),
)
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let event = wait_for_analytics_event(&server, DEFAULT_READ_TIMEOUT, "codex_turn_event").await?;
let params = &event["event_params"];
assert_eq!(
json!({
"toolBlockingIsPositive": params["tool_blocking_ms"]
.as_u64()
.is_some_and(|duration| duration > 0),
"samplingRequestCount": params["sampling_request_count"],
"samplingRetryCount": params["sampling_retry_count"],
"status": params["status"],
}),
json!({
"toolBlockingIsPositive": true,
"samplingRequestCount": 2,
"samplingRetryCount": 0,
"status": "completed",
})
);
Ok(())
}
+9
View File
@@ -1088,6 +1088,7 @@ async fn run_sampling_request(
ResponsesStreamRequest::Sampling,
)
.await?;
turn_context.turn_timing_state.record_sampling_retry();
}
}
@@ -1800,6 +1801,7 @@ async fn try_run_sampling_request(
turn_context.model_info.slug.as_str(),
turn_context.provider.info().name.as_str(),
);
let sampling_timing_guard = turn_context.turn_timing_state.begin_sampling();
let mut stream = client_session
.stream(
prompt,
@@ -2213,6 +2215,7 @@ async fn try_run_sampling_request(
}
}
};
drop(sampling_timing_guard);
flush_assistant_text_segments_all(
&sess,
@@ -2222,7 +2225,13 @@ async fn try_run_sampling_request(
)
.await;
let tool_blocking_timing_guard = if in_flight.is_empty() {
None
} else {
Some(turn_context.turn_timing_state.begin_tool_blocking())
};
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
drop(tool_blocking_timing_guard);
if should_emit_token_count {
// A tool call such as request_user_input can intentionally pause the turn. Emit token
+13
View File
@@ -33,6 +33,7 @@ use crate::session::turn_context::TurnContext;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use codex_analytics::TurnProfileFact;
use codex_analytics::TurnTokenUsageFact;
use codex_login::AuthManager;
use codex_models_manager::manager::SharedModelsManager;
@@ -751,6 +752,12 @@ impl Session {
.turn_timing_state
.time_to_first_token_ms()
.await;
self.services
.analytics_events_client
.track_turn_profile(TurnProfileFact {
turn_id: turn_context.sub_id.clone(),
profile: turn_context.turn_timing_state.complete_profile(),
});
self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref())
.await;
if let Err(err) = self
@@ -868,6 +875,12 @@ impl Session {
.turn_timing_state
.completed_at_and_duration_ms()
.await;
self.services
.analytics_events_client
.track_turn_profile(TurnProfileFact {
turn_id: task.turn_context.sub_id.clone(),
profile: task.turn_context.turn_timing_state.complete_profile(),
});
let event = EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some(task.turn_context.sub_id.clone()),
reason,
+191
View File
@@ -1,8 +1,11 @@
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use codex_analytics::TurnProfile;
use codex_otel::TURN_TTFM_DURATION_METRIC;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
@@ -39,6 +42,7 @@ pub(crate) async fn record_turn_ttfm_metric(turn_context: &TurnContext, item: &T
#[derive(Debug, Default)]
pub(crate) struct TurnTimingState {
state: Mutex<TurnTimingStateInner>,
profile: StdMutex<TurnProfileState>,
}
#[derive(Debug, Default)]
@@ -49,6 +53,35 @@ struct TurnTimingStateInner {
first_message_at: Option<Instant>,
}
#[derive(Debug, Default)]
struct TurnProfileState {
started_at: Option<Instant>,
last_transition_at: Option<Instant>,
active_phase: Option<TurnProfilePhase>,
seen_sampling: bool,
before_first_sampling: Duration,
sampling: Duration,
between_sampling_overhead: Duration,
tool_blocking: Duration,
pending_idle_after_sampling: Duration,
sampling_request_count: u32,
sampling_retry_count: u32,
completed_profile: Option<TurnProfile>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum TurnProfilePhase {
Sampling,
ToolBlocking,
}
#[must_use]
pub(crate) struct TurnProfileTimingGuard {
timing: Arc<TurnTimingState>,
phase: TurnProfilePhase,
active: bool,
}
impl TurnTimingState {
pub(crate) async fn mark_turn_started(&self, started_at: Instant) -> i64 {
let started_at_unix_ms = now_unix_timestamp_ms();
@@ -57,6 +90,7 @@ impl TurnTimingState {
state.started_at_unix_secs = Some(started_at_unix_ms / 1000);
state.first_token_at = None;
state.first_message_at = None;
self.profile_state().start(started_at);
started_at_unix_ms
}
@@ -80,6 +114,32 @@ impl TurnTimingState {
.map(|duration| i64::try_from(duration.as_millis()).unwrap_or(i64::MAX))
}
pub(crate) fn complete_profile(&self) -> TurnProfile {
self.profile_state().complete(Instant::now())
}
pub(crate) fn begin_sampling(self: &Arc<Self>) -> TurnProfileTimingGuard {
let active = self.profile_state().begin_sampling(Instant::now());
TurnProfileTimingGuard {
timing: Arc::clone(self),
phase: TurnProfilePhase::Sampling,
active,
}
}
pub(crate) fn record_sampling_retry(&self) {
self.profile_state().record_sampling_retry();
}
pub(crate) fn begin_tool_blocking(self: &Arc<Self>) -> TurnProfileTimingGuard {
let active = self.profile_state().begin_tool_blocking(Instant::now());
TurnProfileTimingGuard {
timing: Arc::clone(self),
phase: TurnProfilePhase::ToolBlocking,
active,
}
}
pub(crate) async fn record_ttft_for_response_event(
&self,
event: &ResponseEvent,
@@ -98,6 +158,22 @@ impl TurnTimingState {
let mut state = self.state.lock().await;
state.record_turn_ttfm()
}
fn profile_state(&self) -> std::sync::MutexGuard<'_, TurnProfileState> {
self.profile
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
}
impl Drop for TurnProfileTimingGuard {
fn drop(&mut self) {
if self.active {
self.timing
.profile_state()
.end_phase(Instant::now(), self.phase);
}
}
}
fn now_unix_timestamp_secs() -> i64 {
@@ -111,6 +187,121 @@ pub(crate) fn now_unix_timestamp_ms() -> i64 {
i64::try_from(duration.as_millis()).unwrap_or(i64::MAX)
}
fn duration_to_u64_ms(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}
impl TurnProfileState {
fn start(&mut self, started_at: Instant) {
*self = Self {
started_at: Some(started_at),
last_transition_at: Some(started_at),
..Self::default()
};
}
fn begin_sampling(&mut self, now: Instant) -> bool {
if self.completed_profile.is_some()
|| self.started_at.is_none()
|| self.active_phase.is_some()
{
return false;
}
self.advance(now);
if self.seen_sampling {
self.between_sampling_overhead += std::mem::take(&mut self.pending_idle_after_sampling);
}
self.seen_sampling = true;
self.active_phase = Some(TurnProfilePhase::Sampling);
self.sampling_request_count = self.sampling_request_count.saturating_add(1);
true
}
fn record_sampling_retry(&mut self) {
if self.completed_profile.is_none() && self.started_at.is_some() {
self.sampling_retry_count = self.sampling_retry_count.saturating_add(1);
}
}
fn begin_tool_blocking(&mut self, now: Instant) -> bool {
if self.completed_profile.is_some()
|| self.started_at.is_none()
|| self.active_phase.is_some()
{
return false;
}
self.advance(now);
self.active_phase = Some(TurnProfilePhase::ToolBlocking);
true
}
fn end_phase(&mut self, now: Instant, phase: TurnProfilePhase) {
if self.completed_profile.is_some() || self.active_phase != Some(phase) {
return;
}
self.advance(now);
self.active_phase = None;
}
fn advance(&mut self, now: Instant) {
let Some(previous) = self.last_transition_at.replace(now) else {
return;
};
let elapsed = now.saturating_duration_since(previous);
match self.active_phase {
Some(TurnProfilePhase::Sampling) => self.sampling += elapsed,
Some(TurnProfilePhase::ToolBlocking) => self.tool_blocking += elapsed,
None if self.seen_sampling => self.pending_idle_after_sampling += elapsed,
None => self.before_first_sampling += elapsed,
}
}
fn complete(&mut self, now: Instant) -> TurnProfile {
if let Some(profile) = self.completed_profile.as_ref() {
return profile.clone();
}
let final_phase = self.active_phase;
self.advance(now);
let after_last_sampling = if self.seen_sampling {
std::mem::take(&mut self.pending_idle_after_sampling)
} else {
Duration::ZERO
};
let mut profile = TurnProfile {
before_first_sampling_ms: duration_to_u64_ms(self.before_first_sampling),
sampling_ms: duration_to_u64_ms(self.sampling),
between_sampling_overhead_ms: duration_to_u64_ms(self.between_sampling_overhead),
tool_blocking_ms: duration_to_u64_ms(self.tool_blocking),
after_last_sampling_ms: duration_to_u64_ms(after_last_sampling),
sampling_request_count: self.sampling_request_count,
sampling_retry_count: self.sampling_retry_count,
};
let total_ms = self
.started_at
.map(|started_at| duration_to_u64_ms(now.saturating_duration_since(started_at)))
.unwrap_or_default();
let classified_ms = profile
.before_first_sampling_ms
.saturating_add(profile.sampling_ms)
.saturating_add(profile.between_sampling_overhead_ms)
.saturating_add(profile.tool_blocking_ms)
.saturating_add(profile.after_last_sampling_ms);
let rounding_ms = total_ms.saturating_sub(classified_ms);
match final_phase {
Some(TurnProfilePhase::Sampling) => profile.sampling_ms += rounding_ms,
Some(TurnProfilePhase::ToolBlocking) => profile.tool_blocking_ms += rounding_ms,
None if self.seen_sampling => profile.after_last_sampling_ms += rounding_ms,
None => profile.before_first_sampling_ms += rounding_ms,
}
self.active_phase = None;
self.completed_profile = Some(profile.clone());
profile
}
}
impl TurnTimingStateInner {
fn time_to_first_token(&self) -> Option<Duration> {
Some(self.first_token_at?.duration_since(self.started_at?))
+41
View File
@@ -1,13 +1,17 @@
use codex_analytics::TurnProfile;
use codex_protocol::items::AgentMessageItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use super::TurnProfilePhase;
use super::TurnProfileState;
use super::TurnTimingState;
use super::response_item_records_turn_ttft;
use crate::ResponseEvent;
@@ -146,3 +150,40 @@ fn response_item_records_turn_ttft_ignores_empty_non_output_items() {
}
));
}
#[test]
fn turn_profile_breaks_down_sampling_blocking_and_retry_overhead() {
let started_at = Instant::now();
let mut state = TurnProfileState::default();
state.start(started_at);
let _ = state.begin_sampling(started_at + Duration::from_millis(100));
state.end_phase(
started_at + Duration::from_millis(600),
TurnProfilePhase::Sampling,
);
let _ = state.begin_tool_blocking(started_at + Duration::from_millis(600));
state.end_phase(
started_at + Duration::from_millis(900),
TurnProfilePhase::ToolBlocking,
);
state.record_sampling_retry();
let _ = state.begin_sampling(started_at + Duration::from_millis(1_000));
state.end_phase(
started_at + Duration::from_millis(1_200),
TurnProfilePhase::Sampling,
);
assert_eq!(
state.complete(started_at + Duration::from_millis(1_300)),
TurnProfile {
before_first_sampling_ms: 100,
sampling_ms: 700,
between_sampling_overhead_ms: 100,
tool_blocking_ms: 300,
after_last_sampling_ms: 100,
sampling_request_count: 2,
sampling_retry_count: 1,
}
);
}