mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
Add spans to run_turn (#27107)
## Why Codex app-server latency traces do not granularly cover turn orchestration, sampling-request preparation, and tool-loading work. These spans help separate local coordination/setup costs from model streaming and tool execution. ## What changed - Add `run_turn.*` spans around sampling-request input preparation and post-sampling state collection - Add function-level trace spans around turn setup, hook execution, compaction, prompt construction, and MCP tool exposure - Add `built_tools.*` spans around plugin loading and discoverable-tool loading ## Verification Trigger Codex rollout and observe new spans are included
This commit is contained in:
committed by
GitHub
Unverified
parent
e0cb4ede4e
commit
00a25e1e0c
@@ -36,6 +36,7 @@ use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_thread_store::ReadThreadParams;
|
||||
use serde_json::Value;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::context::ContextualUserFragment;
|
||||
use crate::context::HookAdditionalContext;
|
||||
@@ -97,6 +98,7 @@ impl From<UserPromptSubmitOutcome> for ContextInjectingHookOutcome {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
pub(crate) async fn run_pending_session_start_hooks(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
@@ -291,6 +293,7 @@ pub(crate) async fn run_post_tool_use_hooks(
|
||||
outcome
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
pub(crate) async fn run_turn_stop_hooks(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
@@ -427,6 +430,7 @@ pub(crate) async fn run_post_compact_hooks(
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
pub(crate) async fn run_legacy_after_agent_hook(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
|
||||
@@ -4,6 +4,7 @@ use codex_features::Feature;
|
||||
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use codex_mcp::ToolInfo as McpToolInfo;
|
||||
use codex_mcp::tool_is_model_visible;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::connectors;
|
||||
@@ -15,6 +16,7 @@ pub(crate) struct McpToolExposure {
|
||||
pub(crate) deferred_tools: Option<Vec<McpToolInfo>>,
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
pub(crate) fn build_mcp_tool_exposure(
|
||||
all_mcp_tools: &[McpToolInfo],
|
||||
connectors: Option<&[connectors::AppInfo]>,
|
||||
|
||||
@@ -3011,6 +3011,7 @@ impl Session {
|
||||
/// Mid-turn compaction is the other path that can re-establish that baseline when it
|
||||
/// reinjects full initial context into replacement history. Other non-regular tasks
|
||||
/// intentionally do not update the baseline.
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
pub(crate) async fn record_context_updates_and_set_reference_context_item(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
|
||||
@@ -186,10 +186,9 @@ pub(crate) async fn run_turn(
|
||||
let mut stop_hook_active = false;
|
||||
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
|
||||
// many turns, from the perspective of the user, it is a single turn.
|
||||
let display_roots = turn_diff_display_roots(turn_context.as_ref()).await;
|
||||
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(
|
||||
TurnDiffTracker::with_environment_display_roots(
|
||||
turn_diff_display_roots(turn_context.as_ref()).await,
|
||||
),
|
||||
TurnDiffTracker::with_environment_display_roots(display_roots),
|
||||
));
|
||||
|
||||
// `ModelClientSession` is turn-scoped and caches WebSocket + sticky routing state, so we reuse
|
||||
@@ -214,11 +213,13 @@ pub(crate) async fn run_turn(
|
||||
}
|
||||
|
||||
// Construct the input that we will send to the model.
|
||||
let sampling_request_input: Vec<ResponseItem> = {
|
||||
let sampling_request_input: Vec<ResponseItem> = async {
|
||||
sess.clone_history()
|
||||
.await
|
||||
.for_prompt(&turn_context.model_info.input_modalities)
|
||||
};
|
||||
}
|
||||
.instrument(trace_span!("run_turn.prepare_sampling_request_input"))
|
||||
.await;
|
||||
|
||||
let window_id = sess.services.model_client.current_window_id();
|
||||
let turn_metadata_header = turn_context
|
||||
@@ -242,15 +243,20 @@ pub(crate) async fn run_turn(
|
||||
last_agent_message: sampling_request_last_agent_message,
|
||||
} = sampling_request_output;
|
||||
can_drain_pending_input = true;
|
||||
let has_pending_input = sess.input_queue.has_pending_input(&sess.active_turn).await;
|
||||
let (has_pending_input, token_status, estimated_token_count) = async {
|
||||
let has_pending_input =
|
||||
sess.input_queue.has_pending_input(&sess.active_turn).await;
|
||||
let token_status =
|
||||
auto_compact_token_status(sess.as_ref(), turn_context.as_ref()).await;
|
||||
let estimated_token_count =
|
||||
sess.get_estimated_token_count(turn_context.as_ref()).await;
|
||||
(has_pending_input, token_status, estimated_token_count)
|
||||
}
|
||||
.instrument(trace_span!("run_turn.collect_post_sampling_state"))
|
||||
.await;
|
||||
let needs_follow_up = model_needs_follow_up || has_pending_input;
|
||||
let token_status =
|
||||
auto_compact_token_status(sess.as_ref(), turn_context.as_ref()).await;
|
||||
let token_limit_reached = token_status.token_limit_reached;
|
||||
|
||||
let estimated_token_count =
|
||||
sess.get_estimated_token_count(turn_context.as_ref()).await;
|
||||
|
||||
trace!(
|
||||
turn_id = %turn_context.sub_id,
|
||||
total_usage_tokens = token_status.active_context_tokens,
|
||||
@@ -381,6 +387,7 @@ pub(crate) async fn run_turn(
|
||||
last_agent_message
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
async fn turn_diff_display_roots(turn_context: &TurnContext) -> Vec<(String, PathBuf)> {
|
||||
let mut display_roots = Vec::new();
|
||||
for turn_environment in &turn_context.environments.turn_environments {
|
||||
@@ -396,6 +403,7 @@ async fn turn_diff_display_roots(turn_context: &TurnContext) -> Vec<(String, Pat
|
||||
display_roots
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
async fn run_hooks_and_record_inputs(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
@@ -428,6 +436,7 @@ async fn run_hooks_and_record_inputs(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP tool listing borrows the read guard across cancellation-aware await"
|
||||
)]
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
async fn build_skills_and_plugins(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &TurnContext,
|
||||
@@ -756,6 +765,7 @@ async fn auto_compact_token_status(
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
async fn run_pre_sampling_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
@@ -834,6 +844,11 @@ async fn maybe_run_previous_model_inline_compact(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
level = "trace",
|
||||
skip_all,
|
||||
fields(reason = ?reason, phase = ?phase)
|
||||
)]
|
||||
async fn run_auto_compact(
|
||||
sess: &Arc<Session>,
|
||||
turn_context: &Arc<TurnContext>,
|
||||
@@ -942,6 +957,7 @@ pub(super) fn collect_explicit_app_ids_from_skill_items(
|
||||
connector_ids
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
pub(crate) fn build_prompt(
|
||||
input: Vec<ResponseItem>,
|
||||
router: &ToolRouter,
|
||||
@@ -1095,6 +1111,7 @@ pub(crate) async fn built_tools(
|
||||
.services
|
||||
.plugins_manager
|
||||
.plugins_for_config(&turn_context.config.plugins_config_input())
|
||||
.instrument(trace_span!("built_tools.load_plugins"))
|
||||
.await;
|
||||
|
||||
let apps_enabled = turn_context.apps_enabled();
|
||||
@@ -1125,35 +1142,39 @@ pub(crate) async fn built_tools(
|
||||
.into_iter()
|
||||
.map(|connector_id| connector_id.0)
|
||||
.collect::<Vec<_>>();
|
||||
let discoverable_tools = if apps_enabled && tool_suggest_enabled(turn_context) {
|
||||
if let Some(accessible_connectors) = accessible_connectors_with_enabled_state.as_ref() {
|
||||
match connectors::list_tool_suggest_discoverable_tools_with_auth(
|
||||
&turn_context.config,
|
||||
sess.services.plugins_manager.as_ref(),
|
||||
auth.as_ref(),
|
||||
accessible_connectors.as_slice(),
|
||||
&loaded_plugin_app_connector_ids,
|
||||
)
|
||||
.await
|
||||
.map(|discoverable_tools| {
|
||||
filter_request_plugin_install_discoverable_tools_for_client(
|
||||
discoverable_tools,
|
||||
turn_context.app_server_client_name.as_deref(),
|
||||
let discoverable_tools = async {
|
||||
if apps_enabled && tool_suggest_enabled(turn_context) {
|
||||
if let Some(accessible_connectors) = accessible_connectors_with_enabled_state.as_ref() {
|
||||
match connectors::list_tool_suggest_discoverable_tools_with_auth(
|
||||
&turn_context.config,
|
||||
sess.services.plugins_manager.as_ref(),
|
||||
auth.as_ref(),
|
||||
accessible_connectors.as_slice(),
|
||||
&loaded_plugin_app_connector_ids,
|
||||
)
|
||||
}) {
|
||||
Ok(discoverable_tools) if discoverable_tools.is_empty() => None,
|
||||
Ok(discoverable_tools) => Some(discoverable_tools),
|
||||
Err(err) => {
|
||||
warn!("failed to load discoverable tool suggestions: {err:#}");
|
||||
None
|
||||
.await
|
||||
.map(|discoverable_tools| {
|
||||
filter_request_plugin_install_discoverable_tools_for_client(
|
||||
discoverable_tools,
|
||||
turn_context.app_server_client_name.as_deref(),
|
||||
)
|
||||
}) {
|
||||
Ok(discoverable_tools) if discoverable_tools.is_empty() => None,
|
||||
Ok(discoverable_tools) => Some(discoverable_tools),
|
||||
Err(err) => {
|
||||
warn!("failed to load discoverable tool suggestions: {err:#}");
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
}
|
||||
.instrument(trace_span!("built_tools.load_discoverable_tools"))
|
||||
.await;
|
||||
|
||||
let mcp_tool_exposure = build_mcp_tool_exposure(
|
||||
&all_mcp_tools,
|
||||
|
||||
Reference in New Issue
Block a user