mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
[codex] Start the guardian child session when parent session is started (#27982)
## Why The first auto-review currently creates its Guardian child session on demand, adding avoidable latency before the review can begin. Creating the ordinary Guardian child during parent-session initialization lets that child use the existing session startup WebSocket prewarm before the first escalation. This does not introduce a Guardian-specific prewarm mechanism. ## What changed - initialize the existing Guardian review-session manager owned by `Session` when a thread starts with auto-review enabled and an approval policy that routes to Guardian - use the standard Guardian child-session construction and the existing session startup WebSocket prewarm - preserve the existing reuse-key invalidation and lazy creation fallback when startup initialization fails or the effective review configuration changes - add an integration test that verifies normal root-session startup emits a Guardian `generate=false` prewarm request ## Benchmark I compared release builds against main. Each prompt first ran a non-escalated `sleep 3`, then requested an escalated marker command. | binary | count | avg Guardian duration | median Guardian duration | avg Guardian TTFT | |---|---:|---:|---:|---:| | origin-main | 10 | 4008.7 ms | 3949.5 ms | 3746.5 ms | | session-fix | 10 | 2865.0 ms | 2594.0 ms | 2492.7 ms | Guardian duration fell by 28.5% and Guardian TTFT fell by 33.5%. These measurements cover Guardian review latency; they do not measure parent thread-start latency.
This commit is contained in:
committed by
GitHub
Unverified
parent
5c0fbf3492
commit
15f448d8b0
@@ -665,40 +665,23 @@ pub(crate) fn spawn_approval_request_review(
|
||||
rx
|
||||
}
|
||||
|
||||
/// Runs the guardian in a locked-down reusable review session.
|
||||
///
|
||||
/// The guardian itself should not mutate state or trigger further approvals, so
|
||||
/// it is pinned to a read-only sandbox with `approval_policy = never` and
|
||||
/// nonessential agent features disabled. When the cached trunk session is idle,
|
||||
/// later approvals append onto that same guardian conversation to preserve a
|
||||
/// stable prompt-cache key. If the trunk is already busy, the review runs in an
|
||||
/// ephemeral fork from the last committed trunk rollout so parallel approvals
|
||||
/// do not block each other or mutate the cached thread. The trunk is recreated
|
||||
/// when the effective review-session config changes, and any future compaction
|
||||
/// must continue to preserve the guardian policy as exact top-level developer
|
||||
/// context. It may still reuse the parent's managed-network allowlist for
|
||||
/// read-only checks, but it intentionally runs without inherited exec-policy
|
||||
/// rules.
|
||||
async fn run_guardian_review_session_before_deadline(
|
||||
session: Arc<Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
request: GuardianApprovalRequest,
|
||||
retry_reason: Option<String>,
|
||||
schema: serde_json::Value,
|
||||
external_cancel: Option<CancellationToken>,
|
||||
deadline: Instant,
|
||||
) -> (GuardianReviewOutcome, GuardianReviewAnalyticsResult) {
|
||||
pub(super) struct GuardianReviewSessionConfig {
|
||||
pub(super) spawn_config: crate::config::Config,
|
||||
model: String,
|
||||
reasoning_effort: Option<codex_protocol::openai_models::ReasoningEffort>,
|
||||
default_review_model_id: String,
|
||||
catalog_contains_auto_review: bool,
|
||||
model_overridden: bool,
|
||||
model_override: Option<String>,
|
||||
}
|
||||
|
||||
pub(super) async fn guardian_review_session_config(
|
||||
session: &Session,
|
||||
turn: &TurnContext,
|
||||
) -> anyhow::Result<GuardianReviewSessionConfig> {
|
||||
let network_proxy = session.services.network_proxy.load_full();
|
||||
let live_network_config = match network_proxy.as_ref() {
|
||||
Some(network_proxy) => match network_proxy.proxy().current_cfg().await {
|
||||
Ok(config) => Some(config),
|
||||
Err(err) => {
|
||||
return (
|
||||
GuardianReviewOutcome::Error(GuardianReviewError::prompt_build(err)),
|
||||
GuardianReviewAnalyticsResult::without_session(),
|
||||
);
|
||||
}
|
||||
},
|
||||
Some(network_proxy) => Some(network_proxy.proxy().current_cfg().await?),
|
||||
None => None,
|
||||
};
|
||||
let available_models = session
|
||||
@@ -750,14 +733,50 @@ async fn run_guardian_review_session_before_deadline(
|
||||
reasoning_effort,
|
||||
)
|
||||
};
|
||||
let guardian_config = build_guardian_review_session_config(
|
||||
|
||||
let spawn_config = build_guardian_review_session_config(
|
||||
turn.config.as_ref(),
|
||||
live_network_config.clone(),
|
||||
live_network_config,
|
||||
guardian_model.as_str(),
|
||||
guardian_reasoning_effort.clone(),
|
||||
);
|
||||
let guardian_config = match guardian_config {
|
||||
Ok(config) => config,
|
||||
)?;
|
||||
Ok(GuardianReviewSessionConfig {
|
||||
spawn_config,
|
||||
model: guardian_model,
|
||||
reasoning_effort: guardian_reasoning_effort,
|
||||
default_review_model_id: default_review_model_id.to_string(),
|
||||
catalog_contains_auto_review: guardian_catalog_contains_auto_review,
|
||||
model_overridden: guardian_review_model_overridden,
|
||||
model_override: guardian_review_model_override,
|
||||
})
|
||||
}
|
||||
|
||||
/// Runs the guardian in a locked-down reusable review session.
|
||||
///
|
||||
/// The guardian itself should not mutate state or trigger further approvals, so
|
||||
/// it is pinned to a read-only sandbox with `approval_policy = never` and
|
||||
/// nonessential agent features disabled. When the cached trunk session is idle,
|
||||
/// later approvals append onto that same guardian conversation to preserve a
|
||||
/// stable prompt-cache key. If the trunk is already busy, the review runs in an
|
||||
/// ephemeral fork from the last committed trunk rollout so parallel approvals
|
||||
/// do not block each other or mutate the cached thread. The trunk is recreated
|
||||
/// when the effective review-session config changes, and any future compaction
|
||||
/// must continue to preserve the guardian policy as exact top-level developer
|
||||
/// context. It may still reuse the parent's managed-network allowlist for
|
||||
/// read-only checks, but it intentionally runs without inherited exec-policy
|
||||
/// rules.
|
||||
async fn run_guardian_review_session_before_deadline(
|
||||
session: Arc<Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
request: GuardianApprovalRequest,
|
||||
retry_reason: Option<String>,
|
||||
schema: serde_json::Value,
|
||||
external_cancel: Option<CancellationToken>,
|
||||
deadline: Instant,
|
||||
) -> (GuardianReviewOutcome, GuardianReviewAnalyticsResult) {
|
||||
let session_config = match guardian_review_session_config(session.as_ref(), turn.as_ref()).await
|
||||
{
|
||||
Ok(session_config) => session_config,
|
||||
Err(err) => {
|
||||
return (
|
||||
GuardianReviewOutcome::Error(GuardianReviewError::prompt_build(err)),
|
||||
@@ -765,23 +784,22 @@ async fn run_guardian_review_session_before_deadline(
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let (session_outcome, session_analytics_result) = Box::pin(
|
||||
session
|
||||
.guardian_review_session
|
||||
.run_review(GuardianReviewSessionParams {
|
||||
parent_session: Arc::clone(&session),
|
||||
parent_turn: turn.clone(),
|
||||
spawn_config: guardian_config,
|
||||
spawn_config: session_config.spawn_config,
|
||||
request,
|
||||
retry_reason,
|
||||
schema,
|
||||
model: guardian_model,
|
||||
reasoning_effort: guardian_reasoning_effort,
|
||||
guardian_default_review_model_id: default_review_model_id.to_string(),
|
||||
guardian_catalog_contains_auto_review,
|
||||
guardian_review_model_overridden,
|
||||
guardian_review_model_override,
|
||||
model: session_config.model,
|
||||
reasoning_effort: session_config.reasoning_effort,
|
||||
guardian_default_review_model_id: session_config.default_review_model_id,
|
||||
guardian_catalog_contains_auto_review: session_config.catalog_contains_auto_review,
|
||||
guardian_review_model_overridden: session_config.model_overridden,
|
||||
guardian_review_model_override: session_config.model_override,
|
||||
reasoning_summary: turn.reasoning_summary,
|
||||
personality: turn.personality,
|
||||
external_cancel,
|
||||
|
||||
@@ -27,6 +27,7 @@ use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use futures::future::BoxFuture;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::Semaphore;
|
||||
@@ -56,6 +57,7 @@ use super::prompt::GuardianTranscriptCursor;
|
||||
use super::prompt::build_guardian_prompt_items_with_parent_turn;
|
||||
use super::prompt::guardian_policy_prompt;
|
||||
use super::prompt::guardian_policy_prompt_with_config;
|
||||
use super::review::guardian_review_session_config;
|
||||
|
||||
const GUARDIAN_INTERRUPT_DRAIN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
#[derive(Debug)]
|
||||
@@ -92,6 +94,7 @@ pub(crate) struct GuardianReviewSessionParams {
|
||||
#[derive(Default)]
|
||||
pub(crate) struct GuardianReviewSessionManager {
|
||||
state: Arc<Mutex<GuardianReviewSessionState>>,
|
||||
cancellation_token: CancellationToken,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -289,6 +292,42 @@ impl Drop for EphemeralReviewCleanup {
|
||||
}
|
||||
|
||||
impl GuardianReviewSessionManager {
|
||||
pub(crate) fn initialize(
|
||||
&self,
|
||||
parent_session: Arc<Session>,
|
||||
parent_turn: Arc<TurnContext>,
|
||||
) -> BoxFuture<'_, anyhow::Result<()>> {
|
||||
// Boxing breaks the Session::new -> Guardian -> Session::new future recursion.
|
||||
Box::pin(async move {
|
||||
let spawn_config = guardian_review_session_config(&parent_session, &parent_turn)
|
||||
.await?
|
||||
.spawn_config;
|
||||
let reuse_key = GuardianReviewSessionReuseKey::from_spawn_config(
|
||||
&spawn_config,
|
||||
parent_session.user_instructions().await,
|
||||
);
|
||||
let spawn_cancel_token = self.cancellation_token.child_token();
|
||||
let spawn_cancel_guard = spawn_cancel_token.clone().drop_guard();
|
||||
let review_session = spawn_guardian_review_session(
|
||||
&parent_session,
|
||||
&parent_turn,
|
||||
spawn_config,
|
||||
reuse_key,
|
||||
spawn_cancel_token.clone(),
|
||||
/*fork_snapshot*/ None,
|
||||
)
|
||||
.await?;
|
||||
// A first review or shutdown may win while eager initialization is in flight;
|
||||
// install only if neither has happened.
|
||||
let mut state = self.state.lock().await;
|
||||
if !spawn_cancel_token.is_cancelled() && state.trunk.is_none() {
|
||||
state.trunk = Some(Arc::new(review_session));
|
||||
drop(spawn_cancel_guard.disarm());
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn trunk_rollout_path(&self) -> Option<PathBuf> {
|
||||
let trunk = self.state.lock().await.trunk.clone()?;
|
||||
trunk.codex.session.ensure_rollout_materialized().await;
|
||||
@@ -302,6 +341,7 @@ impl GuardianReviewSessionManager {
|
||||
}
|
||||
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
self.cancellation_token.cancel();
|
||||
let (review_session, ephemeral_reviews) = {
|
||||
let mut state = self.state.lock().await;
|
||||
(
|
||||
@@ -348,13 +388,14 @@ impl GuardianReviewSessionManager {
|
||||
}
|
||||
|
||||
if state.trunk.is_none() {
|
||||
let spawn_cancel_token = CancellationToken::new();
|
||||
let spawn_cancel_token = self.cancellation_token.child_token();
|
||||
let review_session = match run_before_review_deadline_with_cancel(
|
||||
deadline,
|
||||
params.external_cancel.as_ref(),
|
||||
&spawn_cancel_token,
|
||||
Box::pin(spawn_guardian_review_session(
|
||||
¶ms,
|
||||
¶ms.parent_session,
|
||||
¶ms.parent_turn,
|
||||
params.spawn_config.clone(),
|
||||
next_reuse_key.clone(),
|
||||
spawn_cancel_token.clone(),
|
||||
@@ -556,7 +597,7 @@ impl GuardianReviewSessionManager {
|
||||
deadline: tokio::time::Instant,
|
||||
fork_snapshot: Option<GuardianReviewForkSnapshot>,
|
||||
) -> (GuardianReviewSessionOutcome, GuardianReviewAnalyticsResult) {
|
||||
let spawn_cancel_token = CancellationToken::new();
|
||||
let spawn_cancel_token = self.cancellation_token.child_token();
|
||||
let mut fork_config = params.spawn_config.clone();
|
||||
fork_config.ephemeral = true;
|
||||
let review_session = match run_before_review_deadline_with_cancel(
|
||||
@@ -564,7 +605,8 @@ impl GuardianReviewSessionManager {
|
||||
params.external_cancel.as_ref(),
|
||||
&spawn_cancel_token,
|
||||
Box::pin(spawn_guardian_review_session(
|
||||
¶ms,
|
||||
¶ms.parent_session,
|
||||
¶ms.parent_turn,
|
||||
fork_config,
|
||||
reuse_key,
|
||||
spawn_cancel_token.clone(),
|
||||
@@ -605,7 +647,8 @@ impl GuardianReviewSessionManager {
|
||||
}
|
||||
|
||||
async fn spawn_guardian_review_session(
|
||||
params: &GuardianReviewSessionParams,
|
||||
parent_session: &Arc<Session>,
|
||||
parent_turn: &Arc<TurnContext>,
|
||||
spawn_config: Config,
|
||||
reuse_key: GuardianReviewSessionReuseKey,
|
||||
cancel_token: CancellationToken,
|
||||
@@ -621,10 +664,10 @@ async fn spawn_guardian_review_session(
|
||||
};
|
||||
let codex = Box::pin(run_codex_thread_interactive(
|
||||
spawn_config,
|
||||
params.parent_session.services.auth_manager.clone(),
|
||||
params.parent_session.services.models_manager.clone(),
|
||||
Arc::clone(¶ms.parent_session),
|
||||
Arc::clone(¶ms.parent_turn),
|
||||
parent_session.services.auth_manager.clone(),
|
||||
parent_session.services.models_manager.clone(),
|
||||
Arc::clone(parent_session),
|
||||
Arc::clone(parent_turn),
|
||||
cancel_token.clone(),
|
||||
SubAgentSource::Other(GUARDIAN_REVIEWER_NAME.to_string()),
|
||||
initial_history,
|
||||
@@ -1546,6 +1589,7 @@ mod tests {
|
||||
trunk: Some(Arc::new(review_session)),
|
||||
ephemeral_reviews: Vec::new(),
|
||||
})),
|
||||
..Default::default()
|
||||
};
|
||||
drop(tx_event);
|
||||
|
||||
|
||||
@@ -1217,7 +1217,6 @@ impl Session {
|
||||
let mut state = sess.state.lock().await;
|
||||
state.queue_pending_session_start_source(session_start_source);
|
||||
}
|
||||
|
||||
Ok(sess)
|
||||
}
|
||||
.await;
|
||||
|
||||
@@ -10,6 +10,7 @@ use tracing::instrument;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::client::ModelClientSession;
|
||||
use crate::guardian::routes_approval_to_guardian;
|
||||
use crate::responses_metadata::CodexResponsesRequestKind;
|
||||
use crate::session::INITIAL_SUBMIT_ID;
|
||||
use crate::session::session::Session;
|
||||
@@ -242,6 +243,19 @@ async fn schedule_startup_prewarm_inner(
|
||||
prewarm_started_at.elapsed(),
|
||||
/*status*/ None,
|
||||
);
|
||||
if routes_approval_to_guardian(&startup_turn_context) {
|
||||
let guardian_session = Arc::clone(&session);
|
||||
let guardian_parent_turn = Arc::clone(&startup_turn_context);
|
||||
drop(tokio::spawn(async move {
|
||||
if let Err(err) = guardian_session
|
||||
.guardian_review_session
|
||||
.initialize(Arc::clone(&guardian_session), guardian_parent_turn)
|
||||
.await
|
||||
{
|
||||
warn!("failed to initialize guardian review session: {err:#}");
|
||||
}
|
||||
}));
|
||||
}
|
||||
let startup_cancellation_token = CancellationToken::new();
|
||||
let built_tools_started_at = Instant::now();
|
||||
let startup_router = built_tools(
|
||||
@@ -295,6 +309,5 @@ async fn schedule_startup_prewarm_inner(
|
||||
websocket_warmup_started_at.elapsed(),
|
||||
/*status*/ None,
|
||||
);
|
||||
|
||||
Ok(client_session)
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::responses::start_websocket_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::skip_if_sandbox;
|
||||
use core_test_support::test_codex::local_selections;
|
||||
@@ -30,6 +31,85 @@ use std::os::unix::fs::PermissionsExt;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn guardian_session_prewarms_and_is_reused_for_first_review() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let tool_args = json!({
|
||||
"cmd": "true",
|
||||
"sandbox_permissions": SandboxPermissions::RequireEscalated,
|
||||
"justification": "Exercise Guardian approval routing.",
|
||||
})
|
||||
.to_string();
|
||||
let server = start_websocket_server(vec![
|
||||
vec![vec![ev_response_created("warm-1"), ev_completed("warm-1")]],
|
||||
vec![vec![ev_response_created("warm-2"), ev_completed("warm-2")]],
|
||||
vec![vec![
|
||||
ev_response_created("approval-request"),
|
||||
ev_function_call("approval-call", "exec_command", &tool_args),
|
||||
ev_completed("approval-request"),
|
||||
]],
|
||||
vec![vec![
|
||||
ev_response_created("guardian-review"),
|
||||
ev_completed("guardian-review"),
|
||||
]],
|
||||
])
|
||||
.await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.permissions.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
|
||||
config.approvals_reviewer = ApprovalsReviewer::AutoReview;
|
||||
});
|
||||
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
let (first, second) = tokio::time::timeout(Duration::from_secs(5), async {
|
||||
tokio::join!(
|
||||
server.wait_for_request(/*connection_index*/ 0, /*request_index*/ 0),
|
||||
server.wait_for_request(/*connection_index*/ 1, /*request_index*/ 0)
|
||||
)
|
||||
})
|
||||
.await?;
|
||||
let prewarm_requests = [first.body_json(), second.body_json()];
|
||||
let guardian_prewarm = prewarm_requests
|
||||
.iter()
|
||||
.find(|request| {
|
||||
request["client_metadata"]["x-openai-subagent"].as_str() == Some("guardian")
|
||||
})
|
||||
.expect("guardian startup prewarm request");
|
||||
assert_eq!(guardian_prewarm["generate"].as_bool(), Some(false));
|
||||
let guardian_thread_id = guardian_prewarm["client_metadata"]["thread_id"]
|
||||
.as_str()
|
||||
.expect("guardian thread id");
|
||||
|
||||
test.codex
|
||||
.submit(
|
||||
vec![UserInput::Text {
|
||||
text: "run a command that requires Guardian review".into(),
|
||||
text_elements: Vec::new(),
|
||||
}]
|
||||
.into(),
|
||||
)
|
||||
.await?;
|
||||
let guardian_review = tokio::time::timeout(
|
||||
Duration::from_secs(5),
|
||||
server.wait_for_request(/*connection_index*/ 3, /*request_index*/ 0),
|
||||
)
|
||||
.await?
|
||||
.body_json();
|
||||
assert_eq!(
|
||||
guardian_review["client_metadata"]["x-openai-subagent"].as_str(),
|
||||
Some("guardian")
|
||||
);
|
||||
assert_eq!(
|
||||
guardian_review["client_metadata"]["thread_id"].as_str(),
|
||||
Some(guardian_thread_id)
|
||||
);
|
||||
assert_eq!(guardian_review.get("generate"), None);
|
||||
|
||||
test.codex.shutdown_and_wait().await?;
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn guardian_review_session_does_not_inherit_legacy_notify() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
Reference in New Issue
Block a user