[rollout_trace] Record core session rollout traces (#18877)

## Summary

Wires rollout trace recording into `codex-core` session and turn
execution. This records the core model request/response, compaction, and
session lifecycle boundaries needed for replay without yet tracing every
nested runtime/tool boundary.

## Stack

This is PR 2/5 in the rollout trace stack.

- [#18876](https://github.com/openai/codex/pull/18876): Add rollout
trace crate
- [#18877](https://github.com/openai/codex/pull/18877): Record core
session rollout traces
- [#18878](https://github.com/openai/codex/pull/18878): Trace tool and
code-mode boundaries
- [#18879](https://github.com/openai/codex/pull/18879): Trace sessions
and multi-agent edges
- [#18880](https://github.com/openai/codex/pull/18880): Add debug trace
reduction command

## Review Notes

This layer is the first live integration point. The important review
question is whether trace recording is isolated from normal session
behavior: trace failures should not become user-visible execution
failures, and recording should preserve the existing turn/session
lifecycle semantics.

The PR depends on the reducer/data model from the first stack entry and
only introduces the core recorder surface that later PRs use for richer
runtime and relationship events.
This commit is contained in:
cassirer-openai
2026-04-22 10:00:48 -07:00
committed by GitHub
Unverified
parent 79ea577156
commit f67383bcba
23 changed files with 627 additions and 17 deletions
+3
View File
@@ -2362,6 +2362,7 @@ dependencies = [
"codex-response-debug-context",
"codex-rmcp-client",
"codex-rollout",
"codex-rollout-trace",
"codex-sandboxing",
"codex-secrets",
"codex-shell-command",
@@ -3202,6 +3203,8 @@ dependencies = [
"serde",
"serde_json",
"tempfile",
"tracing",
"uuid",
]
[[package]]
+1
View File
@@ -169,6 +169,7 @@ codex-responses-api-proxy = { path = "responses-api-proxy" }
codex-response-debug-context = { path = "response-debug-context" }
codex-rmcp-client = { path = "rmcp-client" }
codex-rollout = { path = "rollout" }
codex-rollout-trace = { path = "rollout-trace" }
codex-sandboxing = { path = "sandboxing" }
codex-secrets = { path = "secrets" }
codex-shell-command = { path = "shell-command" }
+1
View File
@@ -53,6 +53,7 @@ codex-model-provider = { workspace = true }
codex-protocol = { workspace = true }
codex-response-debug-context = { workspace = true }
codex-rollout = { workspace = true }
codex-rollout-trace = { workspace = true }
codex-rmcp-client = { workspace = true }
codex-sandboxing = { workspace = true }
codex-state = { workspace = true }
+69 -15
View File
@@ -80,6 +80,9 @@ use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::W3cTraceContext;
use codex_rollout_trace::CompactionTraceContext;
use codex_rollout_trace::InferenceTraceAttempt;
use codex_rollout_trace::InferenceTraceContext;
use codex_tools::create_tools_json_for_responses_api;
use eventsource_stream::Event;
use eventsource_stream::EventStreamError;
@@ -411,6 +414,7 @@ impl ModelClient {
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
session_telemetry: &SessionTelemetry,
compaction_trace: &CompactionTraceContext,
) -> Result<Vec<ResponseItem>> {
if prompt.input.is_empty() {
return Ok(Vec::new());
@@ -469,10 +473,13 @@ impl ModelClient {
extra_headers.extend(build_conversation_headers(Some(
self.state.conversation_id.to_string(),
)));
client
let trace_attempt = compaction_trace.start_attempt(&payload);
let result = client
.compact_input(&payload, extra_headers)
.await
.map_err(map_api_error)
.map_err(map_api_error);
trace_attempt.record_result(result.as_deref());
result
}
pub(crate) async fn create_realtime_call_with_headers(
@@ -1156,6 +1163,7 @@ impl ModelClientSession {
summary: ReasoningSummaryConfig,
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
inference_trace: &InferenceTraceContext,
) -> Result<ResponseStream> {
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
warn!(path, "Streaming from fixture");
@@ -1164,7 +1172,11 @@ impl ModelClientSession {
self.client.state.provider.info().stream_idle_timeout(),
)
.map_err(map_api_error)?;
let (stream, _last_request_rx) = map_response_stream(stream, session_telemetry.clone());
let (stream, _last_request_rx) = map_response_stream(
stream,
session_telemetry.clone(),
InferenceTraceAttempt::disabled(),
);
return Ok(stream);
}
@@ -1198,6 +1210,8 @@ impl ModelClientSession {
summary,
service_tier,
)?;
let inference_trace_attempt = inference_trace.start_attempt();
inference_trace_attempt.record_started(&request);
let client = ApiResponsesClient::new(
transport,
client_setup.api_provider,
@@ -1208,12 +1222,17 @@ impl ModelClientSession {
match stream_result {
Ok(stream) => {
let (stream, _) = map_response_stream(stream, session_telemetry.clone());
let (stream, _) = map_response_stream(
stream,
session_telemetry.clone(),
inference_trace_attempt,
);
return Ok(stream);
}
Err(ApiError::Transport(
unauthorized_transport @ TransportError::Http { status, .. },
)) if status == StatusCode::UNAUTHORIZED => {
inference_trace_attempt.record_failed(&unauthorized_transport);
pending_retry = PendingUnauthorizedRetry::from_recovery(
handle_unauthorized(
unauthorized_transport,
@@ -1224,7 +1243,11 @@ impl ModelClientSession {
);
continue;
}
Err(err) => return Err(map_api_error(err)),
Err(err) => {
let err = map_api_error(err);
inference_trace_attempt.record_failed(&err);
return Err(err);
}
}
}
}
@@ -1255,6 +1278,7 @@ impl ModelClientSession {
turn_metadata_header: Option<&str>,
warmup: bool,
request_trace: Option<W3cTraceContext>,
inference_trace: &InferenceTraceContext,
) -> Result<WebsocketStreamOutcome> {
let auth_manager = self.client.state.provider.auth_manager();
@@ -1329,17 +1353,33 @@ impl ModelClientSession {
let ws_request = self.prepare_websocket_request(ws_payload, &request);
self.websocket_session.last_request = Some(request);
let stream_result = self.websocket_session.connection.as_ref().ok_or_else(|| {
map_api_error(ApiError::Stream(
"websocket connection is unavailable".to_string(),
))
})?;
let stream_result = stream_result
let inference_trace_attempt = if warmup {
// Prewarm sends `generate=false`; it is connection setup, not a
// model inference attempt that should appear in rollout traces.
InferenceTraceAttempt::disabled()
} else {
inference_trace.start_attempt()
};
inference_trace_attempt.record_started(&ws_request);
let websocket_connection =
self.websocket_session.connection.as_ref().ok_or_else(|| {
map_api_error(ApiError::Stream(
"websocket connection is unavailable".to_string(),
))
})?;
let stream_result = websocket_connection
.stream_request(ws_request, self.websocket_session.connection_reused())
.await
.map_err(map_api_error)?;
let (stream, last_request_rx) =
map_response_stream(stream_result, session_telemetry.clone());
.map_err(|err| {
let err = map_api_error(err);
inference_trace_attempt.record_failed(&err);
err
})?;
let (stream, last_request_rx) = map_response_stream(
stream_result,
session_telemetry.clone(),
inference_trace_attempt,
);
self.websocket_session.last_response_rx = Some(last_request_rx);
return Ok(WebsocketStreamOutcome::Stream(stream));
}
@@ -1398,6 +1438,7 @@ impl ModelClientSession {
return Ok(());
}
let disabled_trace = InferenceTraceContext::disabled();
match self
.stream_responses_websocket(
prompt,
@@ -1409,6 +1450,7 @@ impl ModelClientSession {
turn_metadata_header,
/*warmup*/ true,
current_span_w3c_trace_context(),
&disabled_trace,
)
.await
{
@@ -1437,7 +1479,9 @@ impl ModelClientSession {
/// The caller is responsible for passing per-turn settings explicitly (model selection,
/// reasoning settings, telemetry context, and turn metadata). This method will prefer the
/// Responses WebSocket transport when the provider supports it and it remains healthy, and will
/// fall back to the HTTP Responses API transport otherwise.
/// fall back to the HTTP Responses API transport otherwise. The trace context may be enabled or
/// disabled, but is always explicit so transport paths do not need separate trace/no-trace
/// branches.
pub async fn stream(
&mut self,
prompt: &Prompt,
@@ -1447,6 +1491,7 @@ impl ModelClientSession {
summary: ReasoningSummaryConfig,
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
inference_trace: &InferenceTraceContext,
) -> Result<ResponseStream> {
let wire_api = self.client.state.provider.info().wire_api;
match wire_api {
@@ -1464,6 +1509,7 @@ impl ModelClientSession {
turn_metadata_header,
/*warmup*/ false,
request_trace,
inference_trace,
)
.await?
{
@@ -1482,6 +1528,7 @@ impl ModelClientSession {
summary,
service_tier,
turn_metadata_header,
inference_trace,
)
.await
}
@@ -1577,6 +1624,7 @@ fn parent_thread_id_header_value(session_source: &SessionSource) -> Option<Strin
fn map_response_stream<S>(
api_stream: S,
session_telemetry: SessionTelemetry,
inference_trace_attempt: InferenceTraceAttempt,
) -> (ResponseStream, oneshot::Receiver<LastResponse>)
where
S: futures::Stream<Item = std::result::Result<ResponseEvent, ApiError>>
@@ -1617,6 +1665,11 @@ where
usage.total_tokens,
);
}
inference_trace_attempt.record_completed(
&response_id,
&token_usage,
&items_added,
);
if let Some(sender) = tx_last_response.take() {
let _ = sender.send(LastResponse {
response_id: response_id.clone(),
@@ -1641,6 +1694,7 @@ where
}
Err(err) => {
let mapped = map_api_error(err);
inference_trace_attempt.record_failed(&mapped);
if !logged_error {
session_telemetry.see_event_completed_failed(&mapped);
logged_error = true;
+1
View File
@@ -92,6 +92,7 @@ pub(crate) async fn run_codex_thread_interactive(
inherited_shell_snapshot: None,
user_shell_override: None,
inherited_exec_policy: Some(Arc::clone(&parent_session.services.exec_policy)),
inherited_rollout_trace: codex_rollout_trace::RolloutTraceRecorder::disabled(),
parent_trace: None,
analytics_events_client: Some(parent_session.services.analytics_events_client.clone()),
}))
+4
View File
@@ -31,6 +31,7 @@ use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::WarningEvent;
use codex_protocol::user_input::UserInput;
use codex_rollout_trace::InferenceTraceContext;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::approx_token_count;
use codex_utils_output_truncation::truncate_text;
@@ -546,6 +547,9 @@ async fn drain_to_completed(
turn_context.reasoning_summary,
turn_context.config.service_tier,
turn_metadata_header,
// Rollout tracing currently models remote compaction only; local compaction streams
// are left untraced until the reducer has a first-class local compaction lifecycle.
&InferenceTraceContext::disabled(),
)
.await?;
loop {
+24 -2
View File
@@ -26,6 +26,7 @@ use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnStartedEvent;
use codex_rollout_trace::CompactionCheckpointTracePayload;
use futures::TryFutureExt;
use tokio_util::sync::CancellationToken;
use tracing::error;
@@ -114,7 +115,17 @@ async fn run_remote_compact_task_inner_impl(
turn_context: &Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
let context_compaction_item = ContextCompactionItem::new();
// Use the UI compaction item ID as the trace compaction ID so protocol lifecycle events,
// endpoint attempts, and the installed history checkpoint all have one join key.
let compaction_trace = sess.services.rollout_trace.compaction_trace_context(
sess.conversation_id,
turn_context.sub_id.as_str(),
context_compaction_item.id.as_str(),
turn_context.model_info.slug.as_str(),
turn_context.provider.info().name.as_str(),
);
let compaction_item = TurnItem::ContextCompaction(context_compaction_item);
sess.emit_turn_item_started(turn_context, &compaction_item)
.await;
let mut history = sess.clone_history().await;
@@ -131,6 +142,10 @@ async fn run_remote_compact_task_inner_impl(
"trimmed history items before remote compaction"
);
}
// This is the history selected for remote compaction, after any trimming required to fit the
// compact endpoint. The checkpoint below records it separately from the next sampling request,
// whose prompt will repeat current developer/context prefix items.
let trace_input_history = history.raw_items().to_vec();
// Required to keep `/undo` available after compaction
let ghost_snapshots: Vec<ResponseItem> = history
.raw_items()
@@ -158,7 +173,6 @@ async fn run_remote_compact_task_inner_impl(
output_schema: None,
output_schema_strict: true,
};
let mut new_history = sess
.services
.model_client
@@ -168,6 +182,7 @@ async fn run_remote_compact_task_inner_impl(
turn_context.reasoning_effort,
turn_context.reasoning_summary,
&turn_context.session_telemetry,
&compaction_trace,
)
.or_else(|err| async {
let total_usage_breakdown = sess.get_total_token_usage_breakdown().await;
@@ -201,6 +216,13 @@ async fn run_remote_compact_task_inner_impl(
message: String::new(),
replacement_history: Some(new_history.clone()),
};
// Install is the semantic boundary where the compact endpoint's output becomes live
// thread history. Keep it distinct from the later inference request so the reducer can
// still represent repeated developer/context prefix items exactly as the model saw them.
compaction_trace.record_installed(&CompactionCheckpointTracePayload {
input_history: &trace_input_history,
replacement_history: &new_history,
});
sess.replace_compacted_history(new_history, reference_context_item, compacted_item)
.await;
sess.recompute_token_usage(turn_context).await;
+2
View File
@@ -23,6 +23,7 @@ use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::TokenUsage;
use codex_rollout_trace::InferenceTraceContext;
use codex_secrets::redact_secrets;
use futures::StreamExt;
use serde::Deserialize;
@@ -354,6 +355,7 @@ mod job {
stage_one_context.reasoning_summary,
stage_one_context.service_tier,
stage_one_context.turn_metadata_header.as_deref(),
&InferenceTraceContext::disabled(),
)
.await?;
+6
View File
@@ -120,6 +120,8 @@ use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_rmcp_client::ElicitationResponse;
use codex_rollout::RolloutConfig;
use codex_rollout::state_db;
use codex_rollout_trace::RolloutTraceRecorder;
use codex_rollout_trace::ThreadStartedTraceMetadata;
use codex_sandboxing::policy_transforms::intersect_permission_profiles;
use codex_shell_command::parse_command::parse_command;
use codex_terminal_detection::user_agent;
@@ -397,6 +399,8 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) metrics_service_name: Option<String>,
pub(crate) inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
pub(crate) inherited_exec_policy: Option<Arc<ExecPolicyManager>>,
/// Parent rollout-tree recorder, or a disabled recorder when this spawn has no parent trace.
pub(crate) inherited_rollout_trace: RolloutTraceRecorder,
pub(crate) user_shell_override: Option<shell::Shell>,
pub(crate) parent_trace: Option<W3cTraceContext>,
pub(crate) analytics_events_client: Option<AnalyticsEventsClient>,
@@ -452,6 +456,7 @@ impl Codex {
inherited_shell_snapshot,
user_shell_override,
inherited_exec_policy,
inherited_rollout_trace,
parent_trace: _,
analytics_events_client,
} = args;
@@ -647,6 +652,7 @@ impl Codex {
agent_control,
environment_manager,
analytics_events_client,
inherited_rollout_trace,
)
.await
.map_err(|e| {
+34
View File
@@ -237,6 +237,7 @@ impl Session {
agent_control: AgentControl,
environment_manager: Arc<EnvironmentManager>,
analytics_events_client: Option<AnalyticsEventsClient>,
inherited_rollout_trace: RolloutTraceRecorder,
) -> anyhow::Result<Arc<Self>> {
debug!(
"Configuring session: model={}; provider={:?}",
@@ -373,6 +374,38 @@ impl Session {
let rollout_path = rollout_recorder
.as_ref()
.map(|rec| rec.rollout_path().to_path_buf());
let trace_agent_path = session_configuration
.session_source
.get_agent_path()
.unwrap_or_else(codex_protocol::AgentPath::root);
let trace_task_name =
(!trace_agent_path.is_root()).then(|| trace_agent_path.name().to_string());
let trace_metadata = ThreadStartedTraceMetadata {
thread_id: conversation_id.to_string(),
agent_path: trace_agent_path.to_string(),
task_name: trace_task_name,
nickname: session_configuration.session_source.get_nickname(),
agent_role: session_configuration.session_source.get_agent_role(),
session_source: session_configuration.session_source.clone(),
cwd: session_configuration.cwd.to_path_buf(),
rollout_path: rollout_path.clone(),
model: session_configuration.collaboration_mode.model().to_string(),
provider_name: config.model_provider_id.clone(),
approval_policy: session_configuration.approval_policy.value().to_string(),
sandbox_policy: format!("{:?}", session_configuration.sandbox_policy.get()),
};
let rollout_trace = if matches!(
session_configuration.session_source,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })
) {
// Spawned child threads are part of their root rollout tree. If
// the parent had no trace recorder, do not create an orphan child
// bundle that looks like an independent rollout.
inherited_rollout_trace
} else {
RolloutTraceRecorder::create_root_or_disabled(conversation_id)
};
rollout_trace.record_thread_started(trace_metadata);
let mut post_session_configured_events = Vec::<Event>::new();
@@ -652,6 +685,7 @@ impl Session {
analytics_events_client,
hooks,
rollout: Mutex::new(rollout_recorder),
rollout_trace,
user_shell: Arc::new(default_shell),
shell_snapshot_tx,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
+4
View File
@@ -2959,6 +2959,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
AgentControl::default(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
RolloutTraceRecorder::disabled(),
)
.await;
@@ -3081,6 +3082,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
..HooksConfig::default()
}),
rollout: Mutex::new(None),
rollout_trace: RolloutTraceRecorder::disabled(),
user_shell: Arc::new(default_user_shell()),
shell_snapshot_tx: watch::channel(None).0,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
@@ -3274,6 +3276,7 @@ async fn make_session_with_config_and_rx(
AgentControl::default(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
RolloutTraceRecorder::disabled(),
)
.await?;
@@ -4299,6 +4302,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
..HooksConfig::default()
}),
rollout: Mutex::new(None),
rollout_trace: RolloutTraceRecorder::disabled(),
user_shell: Arc::new(default_user_shell()),
shell_snapshot_tx: watch::channel(None).0,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
@@ -650,6 +650,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
metrics_service_name: None,
inherited_shell_snapshot: None,
inherited_exec_policy: Some(Arc::new(parent_exec_policy)),
inherited_rollout_trace: RolloutTraceRecorder::disabled(),
user_shell_override: None,
parent_trace: None,
analytics_events_client: None,
+7
View File
@@ -1877,6 +1877,12 @@ async fn try_run_sampling_request(
auth_mode = sess.services.auth_manager.auth_mode(),
features = sess.features.enabled_features(),
);
let inference_trace = sess.services.rollout_trace.inference_trace_context(
sess.conversation_id,
turn_context.sub_id.as_str(),
turn_context.model_info.slug.as_str(),
turn_context.provider.info().name.as_str(),
);
let mut stream = client_session
.stream(
prompt,
@@ -1886,6 +1892,7 @@ async fn try_run_sampling_request(
turn_context.reasoning_summary,
turn_context.config.service_tier,
turn_metadata_header,
&inference_trace,
)
.instrument(trace_span!("stream_request"))
.or_cancel(&cancellation_token)
+2
View File
@@ -23,6 +23,7 @@ use codex_mcp::McpConnectionManager;
use codex_models_manager::manager::ModelsManager;
use codex_otel::SessionTelemetry;
use codex_rollout::state_db::StateDbHandle;
use codex_rollout_trace::RolloutTraceRecorder;
use codex_thread_store::LocalThreadStore;
use std::path::PathBuf;
use tokio::sync::Mutex;
@@ -41,6 +42,7 @@ pub(crate) struct SessionServices {
pub(crate) analytics_events_client: AnalyticsEventsClient,
pub(crate) hooks: Hooks,
pub(crate) rollout: Mutex<Option<RolloutRecorder>>,
pub(crate) rollout_trace: RolloutTraceRecorder,
pub(crate) user_shell: Arc<crate::shell::Shell>,
pub(crate) shell_snapshot_tx: watch::Sender<Option<Arc<crate::shell_snapshot::ShellSnapshot>>>,
pub(crate) show_raw_agent_reasoning: bool,
+1
View File
@@ -953,6 +953,7 @@ impl ThreadManagerState {
metrics_service_name,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_rollout_trace: codex_rollout_trace::RolloutTraceRecorder::disabled(),
user_shell_override,
parent_trace,
analytics_events_client: self.analytics_events_client.clone(),
+3
View File
@@ -131,6 +131,7 @@ async fn responses_stream_includes_subagent_header_on_review() {
summary.unwrap_or(model_info.default_reasoning_summary),
/*service_tier*/ None,
/*turn_metadata_header*/ None,
&codex_rollout_trace::InferenceTraceContext::disabled(),
)
.await
.expect("stream failed");
@@ -257,6 +258,7 @@ async fn responses_stream_includes_subagent_header_on_other() {
summary.unwrap_or(model_info.default_reasoning_summary),
/*service_tier*/ None,
/*turn_metadata_header*/ None,
&codex_rollout_trace::InferenceTraceContext::disabled(),
)
.await
.expect("stream failed");
@@ -372,6 +374,7 @@ async fn responses_respects_model_info_overrides_from_config() {
summary.unwrap_or(model_info.default_reasoning_summary),
/*service_tier*/ None,
/*turn_metadata_header*/ None,
&codex_rollout_trace::InferenceTraceContext::disabled(),
)
.await
.expect("stream failed");
+2
View File
@@ -914,6 +914,7 @@ async fn send_provider_auth_request(server: &MockServer, auth: ModelProviderAuth
summary.unwrap_or(ReasoningSummary::Auto),
/*service_tier*/ None,
/*turn_metadata_header*/ None,
&codex_rollout_trace::InferenceTraceContext::disabled(),
)
.await
.expect("responses stream to start");
@@ -2280,6 +2281,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
summary.unwrap_or(ReasoningSummary::Auto),
/*service_tier*/ None,
/*turn_metadata_header*/ None,
&codex_rollout_trace::InferenceTraceContext::disabled(),
)
.await
.expect("responses stream to start");
@@ -390,6 +390,7 @@ async fn responses_websocket_preconnect_is_reused_even_with_header_changes() {
harness.summary,
/*service_tier*/ None,
/*turn_metadata_header*/ None,
&codex_rollout_trace::InferenceTraceContext::disabled(),
)
.await
.expect("websocket stream failed");
@@ -440,6 +441,7 @@ async fn responses_websocket_request_prewarm_is_reused_even_with_header_changes(
harness.summary,
/*service_tier*/ None,
/*turn_metadata_header*/ None,
&codex_rollout_trace::InferenceTraceContext::disabled(),
)
.await
.expect("websocket stream failed");
@@ -842,6 +844,7 @@ async fn responses_websocket_emits_reasoning_included_event() {
harness.summary,
/*service_tier*/ None,
/*turn_metadata_header*/ None,
&codex_rollout_trace::InferenceTraceContext::disabled(),
)
.await
.expect("websocket stream failed");
@@ -915,6 +918,7 @@ async fn responses_websocket_emits_rate_limit_events() {
harness.summary,
/*service_tier*/ None,
/*turn_metadata_header*/ None,
&codex_rollout_trace::InferenceTraceContext::disabled(),
)
.await
.expect("websocket stream failed");
@@ -1553,6 +1557,7 @@ async fn responses_websocket_v2_after_error_uses_full_create_without_previous_re
harness.summary,
/*service_tier*/ None,
/*turn_metadata_header*/ None,
&codex_rollout_trace::InferenceTraceContext::disabled(),
)
.await
.expect("websocket stream failed");
@@ -1640,6 +1645,7 @@ async fn responses_websocket_v2_surfaces_terminal_error_without_close_handshake(
harness.summary,
/*service_tier*/ None,
/*turn_metadata_header*/ None,
&codex_rollout_trace::InferenceTraceContext::disabled(),
)
.await
.expect("websocket stream failed");
@@ -1903,6 +1909,7 @@ async fn stream_until_complete_with_request_metadata(
harness.summary,
service_tier,
turn_metadata_header,
&codex_rollout_trace::InferenceTraceContext::disabled(),
)
.await
.expect("websocket stream failed");
+2
View File
@@ -17,6 +17,8 @@ anyhow = { workspace = true }
codex-protocol = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }
+54
View File
@@ -13,6 +13,7 @@ use std::sync::atomic::Ordering;
use codex_protocol::models::ResponseItem;
use serde::Serialize;
use serde_json::Value as JsonValue;
use tracing::warn;
use crate::inference::trace_response_item_json;
use crate::model::AgentThreadId;
@@ -74,6 +75,17 @@ struct TracedCompactionCompleted {
output_items: Vec<JsonValue>,
}
/// History replacement checkpoint persisted when compaction installs new live history.
///
/// The checkpoint keeps compaction separate from ordinary sampling snapshots:
/// `input_history` is the live thread history selected for compaction, while
/// `replacement_history` is what future prompts may carry after the checkpoint.
#[derive(Serialize)]
pub struct CompactionCheckpointTracePayload<'a> {
pub input_history: &'a [ResponseItem],
pub replacement_history: &'a [ResponseItem],
}
impl CompactionTraceContext {
/// Builds a context that accepts trace calls and records nothing.
pub fn disabled() -> Self {
@@ -118,6 +130,40 @@ impl CompactionTraceContext {
attempt.record_started(request);
attempt
}
/// Records the point where compacted history becomes the live thread history.
///
/// The checkpoint belongs to the same semantic compaction lifecycle as the
/// compact endpoint attempts, so the context reuses its stable compaction ID.
pub fn record_installed(&self, checkpoint: &CompactionCheckpointTracePayload<'_>) {
let CompactionTraceContextState::Enabled(context) = &self.state else {
return;
};
let checkpoint_payload = match context
.writer
.write_json_payload(RawPayloadKind::CompactionCheckpoint, checkpoint)
{
Ok(payload_ref) => payload_ref,
Err(err) => {
warn!("failed to write rollout trace payload: {err:#}");
return;
}
};
let event_context = RawTraceEventContext {
thread_id: Some(context.thread_id.clone()),
codex_turn_id: Some(context.codex_turn_id.clone()),
};
if let Err(err) = context.writer.append_with_context(
event_context,
RawTraceEventPayload::CompactionInstalled {
compaction_id: context.compaction_id.clone(),
checkpoint_payload,
},
) {
warn!("failed to append rollout trace event: {err:#}");
}
}
}
impl CompactionTraceAttempt {
@@ -184,6 +230,14 @@ impl CompactionTraceAttempt {
);
}
/// Records the compact endpoint result without forcing callers to branch on trace events.
pub fn record_result<E: Display>(&self, result: Result<&[ResponseItem], E>) {
match result {
Ok(output_items) => self.record_completed(output_items),
Err(err) => self.record_failed(err),
}
}
/// Records pre-response failures from the compact endpoint.
pub fn record_failed(&self, error: impl Display) {
let CompactionTraceAttemptState::Enabled(attempt) = &self.state else {
+9
View File
@@ -12,11 +12,14 @@ mod inference;
mod model;
mod payload;
mod raw_event;
mod recorder;
mod reducer;
mod writer;
/// Conventional reduced-state cache name written next to a raw trace bundle.
pub use bundle::REDUCED_STATE_FILE_NAME;
/// Raw checkpoint payload for a remote compaction install event.
pub use compaction::CompactionCheckpointTracePayload;
/// No-op-capable handle for recording remote-compaction requests.
pub use compaction::CompactionTraceAttempt;
/// Shared recorder context for a compaction checkpoint.
@@ -43,6 +46,12 @@ pub use raw_event::RawTraceEvent;
pub use raw_event::RawTraceEventContext;
/// Typed payload for one raw trace event.
pub use raw_event::RawTraceEventPayload;
/// Environment variable that enables local trace-bundle recording.
pub use recorder::CODEX_ROLLOUT_TRACE_ROOT_ENV;
/// Best-effort hot-path recorder for one rollout trace bundle.
pub use recorder::RolloutTraceRecorder;
/// Raw metadata captured when a thread starts.
pub use recorder::ThreadStartedTraceMetadata;
/// Replay a raw trace bundle and write/read its reduced `RolloutTrace`.
pub use reducer::replay_bundle;
/// Append-only writer used by hot-path Codex instrumentation.
+228
View File
@@ -0,0 +1,228 @@
//! Opt-in hot-path producer for rollout trace bundles.
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use codex_protocol::ThreadId;
use codex_protocol::protocol::SessionSource;
use serde::Serialize;
use tracing::debug;
use tracing::warn;
use uuid::Uuid;
use crate::AgentThreadId;
use crate::CodexTurnId;
use crate::CompactionId;
use crate::CompactionTraceContext;
use crate::InferenceTraceContext;
use crate::RawPayloadKind;
use crate::RawPayloadRef;
use crate::RawTraceEventPayload;
use crate::TraceWriter;
/// Environment variable that enables local trace-bundle recording.
///
/// The value is a root directory. Each independent root session gets one child
/// bundle directory. Spawned child threads share their root session's bundle so
/// one reduced `state.json` describes the whole multi-agent rollout tree.
pub const CODEX_ROLLOUT_TRACE_ROOT_ENV: &str = "CODEX_ROLLOUT_TRACE_ROOT";
/// Lightweight handle stored in `SessionServices`.
///
/// Cloning the handle is cheap; all sequencing and file ownership remains
/// inside `TraceWriter`. Disabled handles intentionally accept the same calls
/// as enabled handles so hot-path session code can describe traceable events
/// without repeatedly branching on whether diagnostic recording is enabled.
#[derive(Clone, Debug)]
pub struct RolloutTraceRecorder {
state: RolloutTraceRecorderState,
}
#[derive(Clone, Debug)]
enum RolloutTraceRecorderState {
Disabled,
Enabled(EnabledRolloutTraceRecorder),
}
#[derive(Clone, Debug)]
struct EnabledRolloutTraceRecorder {
writer: Arc<TraceWriter>,
}
/// Metadata captured once at thread/session start.
///
/// This payload is intentionally operational rather than reduced: it is a raw
/// payload that later reducers can mine as the reduced thread model evolves.
#[derive(Serialize)]
pub struct ThreadStartedTraceMetadata {
pub thread_id: String,
pub agent_path: String,
pub task_name: Option<String>,
pub nickname: Option<String>,
pub agent_role: Option<String>,
pub session_source: SessionSource,
pub cwd: PathBuf,
pub rollout_path: Option<PathBuf>,
pub model: String,
pub provider_name: String,
pub approval_policy: String,
pub sandbox_policy: String,
}
impl RolloutTraceRecorder {
/// Builds a recorder handle that accepts trace calls and records nothing.
pub fn disabled() -> Self {
Self {
state: RolloutTraceRecorderState::Disabled,
}
}
/// Creates and starts a root trace bundle, or returns a disabled recorder.
///
/// Trace startup is best-effort. A tracing failure must not make the Codex
/// session unusable, because traces are diagnostic and can be enabled while
/// debugging unrelated production failures. The returned recorder has not
/// emitted `ThreadStarted`; session setup records that event uniformly for
/// root and inherited child recorders.
pub fn create_root_or_disabled(thread_id: ThreadId) -> Self {
let Some(root) = std::env::var_os(CODEX_ROLLOUT_TRACE_ROOT_ENV) else {
return Self::disabled();
};
let root = PathBuf::from(root);
match Self::create_in_root(root.as_path(), thread_id) {
Ok(recorder) => recorder,
Err(err) => {
warn!("failed to initialize rollout trace recorder: {err:#}");
Self::disabled()
}
}
}
fn create_in_root(root: &Path, thread_id: ThreadId) -> anyhow::Result<Self> {
let trace_id = Uuid::new_v4().to_string();
let thread_id = thread_id.to_string();
let bundle_dir = root.join(format!("trace-{trace_id}-{thread_id}"));
let writer = TraceWriter::create(
&bundle_dir,
trace_id.clone(),
thread_id.clone(),
thread_id.clone(),
)?;
let recorder = EnabledRolloutTraceRecorder {
writer: Arc::new(writer),
};
recorder.append_best_effort(RawTraceEventPayload::RolloutStarted {
trace_id,
root_thread_id: thread_id,
});
debug!("recording rollout trace at {}", bundle_dir.display());
Ok(Self::enabled(recorder))
}
fn enabled(inner: EnabledRolloutTraceRecorder) -> Self {
Self {
state: RolloutTraceRecorderState::Enabled(inner),
}
}
/// Emits the lifecycle event and metadata for one thread in this rollout tree.
///
/// Root sessions call this immediately after `RolloutStarted`; spawned
/// child sessions call it on the inherited recorder. Keeping children in
/// the root bundle preserves one raw payload namespace and one reduced
/// `RolloutTrace` for the whole multi-agent task.
pub fn record_thread_started(&self, metadata: ThreadStartedTraceMetadata) {
let RolloutTraceRecorderState::Enabled(recorder) = &self.state else {
return;
};
let metadata_payload =
recorder.write_json_payload_best_effort(RawPayloadKind::SessionMetadata, &metadata);
recorder.append_best_effort(RawTraceEventPayload::ThreadStarted {
thread_id: metadata.thread_id,
agent_path: metadata.agent_path,
metadata_payload,
});
}
/// Builds reusable inference trace context for one Codex turn.
///
/// The returned context is intentionally not "an inference call" yet.
/// Transport code owns retry/fallback attempts and calls `start_attempt`
/// only after it has built the concrete request payload for that attempt.
pub fn inference_trace_context(
&self,
thread_id: impl Into<AgentThreadId>,
codex_turn_id: impl Into<CodexTurnId>,
model: impl Into<String>,
provider_name: impl Into<String>,
) -> InferenceTraceContext {
let RolloutTraceRecorderState::Enabled(recorder) = &self.state else {
return InferenceTraceContext::disabled();
};
InferenceTraceContext::enabled(
Arc::clone(&recorder.writer),
thread_id.into(),
codex_turn_id.into(),
model.into(),
provider_name.into(),
)
}
/// Builds remote-compaction trace context for one checkpoint.
///
/// Rollout tracing currently has a first-class checkpoint model only for remote compaction.
/// The compact endpoint is a model-facing request whose output replaces live history, so it
/// needs both request/response attempt events and a later checkpoint event when processed
/// replacement history is installed.
pub fn compaction_trace_context(
&self,
thread_id: impl Into<AgentThreadId>,
codex_turn_id: impl Into<CodexTurnId>,
compaction_id: impl Into<CompactionId>,
model: impl Into<String>,
provider_name: impl Into<String>,
) -> CompactionTraceContext {
let RolloutTraceRecorderState::Enabled(recorder) = &self.state else {
return CompactionTraceContext::disabled();
};
CompactionTraceContext::enabled(
Arc::clone(&recorder.writer),
thread_id.into(),
codex_turn_id.into(),
compaction_id.into(),
model.into(),
provider_name.into(),
)
}
}
impl EnabledRolloutTraceRecorder {
fn write_json_payload_best_effort(
&self,
kind: RawPayloadKind,
payload: &impl Serialize,
) -> Option<RawPayloadRef> {
match self.writer.write_json_payload(kind, payload) {
Ok(payload_ref) => Some(payload_ref),
Err(err) => {
warn!("failed to write rollout trace payload: {err:#}");
None
}
}
}
fn append_best_effort(&self, payload: RawTraceEventPayload) {
if let Err(err) = self.writer.append(payload) {
warn!("failed to append rollout trace event: {err:#}");
}
}
}
#[cfg(test)]
#[path = "recorder_tests.rs"]
mod tests;
@@ -0,0 +1,162 @@
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use tempfile::TempDir;
use super::*;
use crate::CompactionCheckpointTracePayload;
use crate::RolloutStatus;
use crate::replay_bundle;
#[test]
fn create_in_root_writes_replayable_lifecycle_events() -> anyhow::Result<()> {
let temp = TempDir::new()?;
let thread_id = ThreadId::new();
let recorder =
RolloutTraceRecorder::create_in_root(temp.path(), thread_id).expect("trace recorder");
recorder.record_thread_started(ThreadStartedTraceMetadata {
thread_id: thread_id.to_string(),
agent_path: "/root".to_string(),
task_name: None,
nickname: None,
agent_role: None,
session_source: SessionSource::Exec,
cwd: PathBuf::from("/workspace"),
rollout_path: Some(PathBuf::from("/tmp/rollout.jsonl")),
model: "gpt-test".to_string(),
provider_name: "test-provider".to_string(),
approval_policy: "never".to_string(),
sandbox_policy: format!("{:?}", SandboxPolicy::DangerFullAccess),
});
let bundle_dir = single_bundle_dir(temp.path())?;
let replayed = replay_bundle(&bundle_dir)?;
assert_eq!(replayed.status, RolloutStatus::Running);
assert_eq!(replayed.root_thread_id, thread_id.to_string());
assert_eq!(replayed.threads[&thread_id.to_string()].agent_path, "/root");
assert_eq!(replayed.raw_payloads.len(), 1);
Ok(())
}
#[test]
fn spawned_thread_start_appends_to_root_bundle() -> anyhow::Result<()> {
let temp = TempDir::new()?;
let root_thread_id = ThreadId::new();
let child_thread_id = ThreadId::new();
let recorder =
RolloutTraceRecorder::create_in_root(temp.path(), root_thread_id).expect("trace recorder");
recorder.record_thread_started(minimal_metadata(root_thread_id));
recorder.record_thread_started(ThreadStartedTraceMetadata {
thread_id: child_thread_id.to_string(),
agent_path: "/root/repo_file_counter".to_string(),
task_name: Some("repo_file_counter".to_string()),
nickname: Some("Kepler".to_string()),
agent_role: Some("worker".to_string()),
session_source: SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: root_thread_id,
depth: 1,
agent_path: Some(
AgentPath::try_from("/root/repo_file_counter").map_err(anyhow::Error::msg)?,
),
agent_nickname: Some("Kepler".to_string()),
agent_role: Some("worker".to_string()),
}),
cwd: PathBuf::from("/workspace"),
rollout_path: Some(PathBuf::from("/tmp/child-rollout.jsonl")),
model: "gpt-test".to_string(),
provider_name: "test-provider".to_string(),
approval_policy: "never".to_string(),
sandbox_policy: format!("{:?}", SandboxPolicy::DangerFullAccess),
});
let bundle_dir = single_bundle_dir(temp.path())?;
let replayed = replay_bundle(&bundle_dir)?;
assert_eq!(fs::read_dir(temp.path())?.count(), 1);
assert_eq!(replayed.threads.len(), 2);
assert_eq!(
replayed.threads[&child_thread_id.to_string()].agent_path,
"/root/repo_file_counter"
);
assert_eq!(replayed.status, RolloutStatus::Running);
assert_eq!(
replayed.threads[&child_thread_id.to_string()]
.execution
.status,
crate::ExecutionStatus::Running
);
assert_eq!(replayed.raw_payloads.len(), 2);
Ok(())
}
#[test]
fn disabled_recorder_accepts_trace_calls_without_writing() -> anyhow::Result<()> {
let temp = TempDir::new()?;
let thread_id = ThreadId::new();
let recorder = RolloutTraceRecorder::disabled();
recorder.record_thread_started(minimal_metadata(thread_id));
let inference_trace =
recorder.inference_trace_context(thread_id, "turn-1", "gpt-test", "test-provider");
let inference_attempt = inference_trace.start_attempt();
inference_attempt.record_started(&serde_json::json!({ "kind": "inference" }));
let token_usage: Option<codex_protocol::protocol::TokenUsage> = None;
inference_attempt.record_completed("response-1", &token_usage, &[]);
inference_attempt.record_failed("inference failed");
let compaction_trace = recorder.compaction_trace_context(
thread_id,
"turn-1",
"compaction-1",
"gpt-test",
"test-provider",
);
let compaction_attempt =
compaction_trace.start_attempt(&serde_json::json!({ "kind": "compaction" }));
compaction_attempt.record_completed(&[]);
compaction_attempt.record_failed("compaction failed");
compaction_trace.record_installed(&CompactionCheckpointTracePayload {
input_history: &[],
replacement_history: &[],
});
assert_eq!(fs::read_dir(temp.path())?.count(), 0);
Ok(())
}
fn minimal_metadata(thread_id: ThreadId) -> ThreadStartedTraceMetadata {
ThreadStartedTraceMetadata {
thread_id: thread_id.to_string(),
agent_path: "/root".to_string(),
task_name: None,
nickname: None,
agent_role: None,
session_source: SessionSource::Exec,
cwd: PathBuf::from("/workspace"),
rollout_path: None,
model: "gpt-test".to_string(),
provider_name: "test-provider".to_string(),
approval_policy: "never".to_string(),
sandbox_policy: "danger-full-access".to_string(),
}
}
fn single_bundle_dir(root: &Path) -> anyhow::Result<PathBuf> {
let mut entries = fs::read_dir(root)?
.map(|entry| entry.map(|entry| entry.path()))
.collect::<Result<Vec<_>, _>>()?;
entries.sort();
assert_eq!(entries.len(), 1);
Ok(entries.remove(0))
}