From bd30bad96f92de4d6b7f2429ddfc322937bc670a Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Mon, 6 Apr 2026 12:14:27 -0700 Subject: [PATCH] fix(guardian): fix ordering of guardian events (#16462) Guardian events were emitted a bit out of order for CommandExecution items. This would make it hard for the frontend to render a guardian auto-review, which has this payload: ``` pub struct ItemGuardianApprovalReviewStartedNotification { pub thread_id: String, pub turn_id: String, pub target_item_id: String, pub review: GuardianApprovalReview, // FYI this is no longer a json blob pub action: Option, } ``` There is a `target_item_id` the auto-approval review is referring to, but the actual item had not been emitted yet. Before this PR: - `item/autoApprovalReview/started` - `item/autoApprovalReview/completed`, and if approved... - `item/started` - `item/completed` After this PR: - `item/started` - `item/autoApprovalReview/started` - `item/autoApprovalReview/completed` - `item/completed` This lines up much better with existing patterns (i.e. human review in `Default mode`, where app-server would send a server request to prompt for user approval after `item/started`), and makes it easier for clients to render what guardian is actually reviewing. We do this following a similar pattern as `FileChange` (aka apply patch) items, where we create a FileChange item and emit `item/started` if we see the apply patch approval request, before the actual apply patch call runs. --- codex-rs/Cargo.lock | 2 +- codex-rs/app-server-protocol/Cargo.toml | 2 +- codex-rs/app-server-protocol/src/lib.rs | 1 + .../src/protocol/item_builders.rs | 299 +++++++ .../app-server-protocol/src/protocol/mod.rs | 1 + .../src/protocol/thread_history.rs | 279 ++++--- .../app-server/src/bespoke_event_handling.rs | 788 +++++++++++++----- codex-rs/app-server/src/thread_state.rs | 33 + 8 files changed, 1084 insertions(+), 321 deletions(-) create mode 100644 codex-rs/app-server-protocol/src/protocol/item_builders.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index bc2d73fdc..18e9aa434 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1492,6 +1492,7 @@ dependencies = [ "codex-experimental-api-macros", "codex-git-utils", "codex-protocol", + "codex-shell-command", "codex-utils-absolute-path", "codex-utils-cargo-bin", "inventory", @@ -1501,7 +1502,6 @@ dependencies = [ "serde", "serde_json", "serde_with", - "shlex", "similar", "strum_macros 0.28.0", "tempfile", diff --git a/codex-rs/app-server-protocol/Cargo.toml b/codex-rs/app-server-protocol/Cargo.toml index a9d90831e..d9ed5e873 100644 --- a/codex-rs/app-server-protocol/Cargo.toml +++ b/codex-rs/app-server-protocol/Cargo.toml @@ -17,12 +17,12 @@ clap = { workspace = true, features = ["derive"] } codex-experimental-api-macros = { workspace = true } codex-git-utils = { workspace = true } codex-protocol = { workspace = true } +codex-shell-command = { workspace = true } codex-utils-absolute-path = { workspace = true } schemars = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } serde_with = { workspace = true } -shlex = { workspace = true } strum_macros = { workspace = true } thiserror = { workspace = true } rmcp = { workspace = true, default-features = false, features = [ diff --git a/codex-rs/app-server-protocol/src/lib.rs b/codex-rs/app-server-protocol/src/lib.rs index b1a8f474f..d5c2f4b24 100644 --- a/codex-rs/app-server-protocol/src/lib.rs +++ b/codex-rs/app-server-protocol/src/lib.rs @@ -15,6 +15,7 @@ pub use export::generate_ts_with_options; pub use export::generate_types; pub use jsonrpc_lite::*; pub use protocol::common::*; +pub use protocol::item_builders::*; pub use protocol::thread_history::*; pub use protocol::v1::ApplyPatchApprovalParams; pub use protocol::v1::ApplyPatchApprovalResponse; diff --git a/codex-rs/app-server-protocol/src/protocol/item_builders.rs b/codex-rs/app-server-protocol/src/protocol/item_builders.rs new file mode 100644 index 000000000..804169db6 --- /dev/null +++ b/codex-rs/app-server-protocol/src/protocol/item_builders.rs @@ -0,0 +1,299 @@ +//! Shared builders for synthetic [`ThreadItem`] values emitted by the app-server layer. +//! +//! These items do not come from first-class core `ItemStarted` / `ItemCompleted` events. +//! Instead, the app-server synthesizes them so clients can render a coherent lifecycle for +//! approvals and other pre-execution flows before the underlying tool has started or when the +//! tool never starts at all. +//! +//! Keeping these builders in one place is useful for two reasons: +//! - Live notifications and rebuilt `thread/read` history both need to construct the same +//! synthetic items, so sharing the logic avoids drift between those paths. +//! - The projection is presentation-specific. Core protocol events stay generic, while the +//! app-server protocol decides how to surface those events as `ThreadItem`s for clients. +use crate::protocol::common::ServerNotification; +use crate::protocol::v2::CommandAction; +use crate::protocol::v2::CommandExecutionSource; +use crate::protocol::v2::CommandExecutionStatus; +use crate::protocol::v2::FileUpdateChange; +use crate::protocol::v2::GuardianApprovalReview; +use crate::protocol::v2::GuardianApprovalReviewStatus; +use crate::protocol::v2::ItemGuardianApprovalReviewCompletedNotification; +use crate::protocol::v2::ItemGuardianApprovalReviewStartedNotification; +use crate::protocol::v2::PatchApplyStatus; +use crate::protocol::v2::PatchChangeKind; +use crate::protocol::v2::ThreadItem; +use codex_protocol::ThreadId; +use codex_protocol::protocol::ApplyPatchApprovalRequestEvent; +use codex_protocol::protocol::ExecApprovalRequestEvent; +use codex_protocol::protocol::ExecCommandBeginEvent; +use codex_protocol::protocol::ExecCommandEndEvent; +use codex_protocol::protocol::FileChange; +use codex_protocol::protocol::GuardianAssessmentAction; +use codex_protocol::protocol::GuardianAssessmentEvent; +use codex_protocol::protocol::PatchApplyBeginEvent; +use codex_protocol::protocol::PatchApplyEndEvent; +use codex_shell_command::parse_command::parse_command; +use codex_shell_command::parse_command::shlex_join; +use std::collections::HashMap; +use std::path::PathBuf; + +pub fn build_file_change_approval_request_item( + payload: &ApplyPatchApprovalRequestEvent, +) -> ThreadItem { + ThreadItem::FileChange { + id: payload.call_id.clone(), + changes: convert_patch_changes(&payload.changes), + status: PatchApplyStatus::InProgress, + } +} + +pub fn build_file_change_begin_item(payload: &PatchApplyBeginEvent) -> ThreadItem { + ThreadItem::FileChange { + id: payload.call_id.clone(), + changes: convert_patch_changes(&payload.changes), + status: PatchApplyStatus::InProgress, + } +} + +pub fn build_file_change_end_item(payload: &PatchApplyEndEvent) -> ThreadItem { + ThreadItem::FileChange { + id: payload.call_id.clone(), + changes: convert_patch_changes(&payload.changes), + status: (&payload.status).into(), + } +} + +pub fn build_command_execution_approval_request_item( + payload: &ExecApprovalRequestEvent, +) -> ThreadItem { + ThreadItem::CommandExecution { + id: payload.call_id.clone(), + command: shlex_join(&payload.command), + cwd: payload.cwd.clone(), + process_id: None, + source: CommandExecutionSource::Agent, + status: CommandExecutionStatus::InProgress, + command_actions: payload + .parsed_cmd + .iter() + .cloned() + .map(CommandAction::from) + .collect(), + aggregated_output: None, + exit_code: None, + duration_ms: None, + } +} + +pub fn build_command_execution_begin_item(payload: &ExecCommandBeginEvent) -> ThreadItem { + ThreadItem::CommandExecution { + id: payload.call_id.clone(), + command: shlex_join(&payload.command), + cwd: payload.cwd.clone(), + process_id: payload.process_id.clone(), + source: payload.source.into(), + status: CommandExecutionStatus::InProgress, + command_actions: payload + .parsed_cmd + .iter() + .cloned() + .map(CommandAction::from) + .collect(), + aggregated_output: None, + exit_code: None, + duration_ms: None, + } +} + +pub fn build_command_execution_end_item(payload: &ExecCommandEndEvent) -> ThreadItem { + let aggregated_output = if payload.aggregated_output.is_empty() { + None + } else { + Some(payload.aggregated_output.clone()) + }; + let duration_ms = i64::try_from(payload.duration.as_millis()).unwrap_or(i64::MAX); + + ThreadItem::CommandExecution { + id: payload.call_id.clone(), + command: shlex_join(&payload.command), + cwd: payload.cwd.clone(), + process_id: payload.process_id.clone(), + source: payload.source.into(), + status: (&payload.status).into(), + command_actions: payload + .parsed_cmd + .iter() + .cloned() + .map(CommandAction::from) + .collect(), + aggregated_output, + exit_code: Some(payload.exit_code), + duration_ms: Some(duration_ms), + } +} + +/// Build a guardian-derived [`ThreadItem`]. +/// +/// Currently this only synthesizes [`ThreadItem::CommandExecution`] for +/// [`GuardianAssessmentAction::Command`] and [`GuardianAssessmentAction::Execve`]. +pub fn build_item_from_guardian_event( + assessment: &GuardianAssessmentEvent, + status: CommandExecutionStatus, +) -> Option { + match &assessment.action { + GuardianAssessmentAction::Command { command, cwd, .. } => { + let command = command.clone(); + let command_actions = vec![CommandAction::Unknown { + command: command.clone(), + }]; + Some(ThreadItem::CommandExecution { + id: assessment.id.clone(), + command, + cwd: cwd.clone(), + process_id: None, + source: CommandExecutionSource::Agent, + status, + command_actions, + aggregated_output: None, + exit_code: None, + duration_ms: None, + }) + } + GuardianAssessmentAction::Execve { + program, argv, cwd, .. + } => { + let argv = if argv.is_empty() { + vec![program.clone()] + } else { + std::iter::once(program.clone()) + .chain(argv.iter().skip(1).cloned()) + .collect::>() + }; + let command = shlex_join(&argv); + let parsed_cmd = parse_command(&argv); + let command_actions = if parsed_cmd.is_empty() { + vec![CommandAction::Unknown { + command: command.clone(), + }] + } else { + parsed_cmd.into_iter().map(CommandAction::from).collect() + }; + Some(ThreadItem::CommandExecution { + id: assessment.id.clone(), + command, + cwd: cwd.clone(), + process_id: None, + source: CommandExecutionSource::Agent, + status, + command_actions, + aggregated_output: None, + exit_code: None, + duration_ms: None, + }) + } + GuardianAssessmentAction::ApplyPatch { .. } + | GuardianAssessmentAction::NetworkAccess { .. } + | GuardianAssessmentAction::McpToolCall { .. } => None, + } +} + +pub fn guardian_auto_approval_review_notification( + conversation_id: &ThreadId, + event_turn_id: &str, + assessment: &GuardianAssessmentEvent, +) -> ServerNotification { + // TODO(ccunningham): Attach guardian review state to the reviewed tool + // item's lifecycle instead of sending standalone review notifications so + // the app-server API can persist and replay review state via `thread/read`. + let turn_id = if assessment.turn_id.is_empty() { + event_turn_id.to_string() + } else { + assessment.turn_id.clone() + }; + let review = GuardianApprovalReview { + status: match assessment.status { + codex_protocol::protocol::GuardianAssessmentStatus::InProgress => { + GuardianApprovalReviewStatus::InProgress + } + codex_protocol::protocol::GuardianAssessmentStatus::Approved => { + GuardianApprovalReviewStatus::Approved + } + codex_protocol::protocol::GuardianAssessmentStatus::Denied => { + GuardianApprovalReviewStatus::Denied + } + codex_protocol::protocol::GuardianAssessmentStatus::Aborted => { + GuardianApprovalReviewStatus::Aborted + } + }, + risk_score: assessment.risk_score, + risk_level: assessment.risk_level.map(Into::into), + rationale: assessment.rationale.clone(), + }; + let action = assessment.action.clone().into(); + match assessment.status { + codex_protocol::protocol::GuardianAssessmentStatus::InProgress => { + ServerNotification::ItemGuardianApprovalReviewStarted( + ItemGuardianApprovalReviewStartedNotification { + thread_id: conversation_id.to_string(), + turn_id, + target_item_id: assessment.id.clone(), + review, + action, + }, + ) + } + codex_protocol::protocol::GuardianAssessmentStatus::Approved + | codex_protocol::protocol::GuardianAssessmentStatus::Denied + | codex_protocol::protocol::GuardianAssessmentStatus::Aborted => { + ServerNotification::ItemGuardianApprovalReviewCompleted( + ItemGuardianApprovalReviewCompletedNotification { + thread_id: conversation_id.to_string(), + turn_id, + target_item_id: assessment.id.clone(), + review, + action, + }, + ) + } + } +} + +pub fn convert_patch_changes(changes: &HashMap) -> Vec { + let mut converted: Vec = changes + .iter() + .map(|(path, change)| FileUpdateChange { + path: path.to_string_lossy().into_owned(), + kind: map_patch_change_kind(change), + diff: format_file_change_diff(change), + }) + .collect(); + converted.sort_by(|a, b| a.path.cmp(&b.path)); + converted +} + +fn map_patch_change_kind(change: &FileChange) -> PatchChangeKind { + match change { + FileChange::Add { .. } => PatchChangeKind::Add, + FileChange::Delete { .. } => PatchChangeKind::Delete, + FileChange::Update { move_path, .. } => PatchChangeKind::Update { + move_path: move_path.clone(), + }, + } +} + +fn format_file_change_diff(change: &FileChange) -> String { + match change { + FileChange::Add { content } => content.clone(), + FileChange::Delete { content } => content.clone(), + FileChange::Update { + unified_diff, + move_path, + } => { + if let Some(path) = move_path { + format!("{unified_diff}\n\nMoved to: {}", path.display()) + } else { + unified_diff.clone() + } + } + } +} diff --git a/codex-rs/app-server-protocol/src/protocol/mod.rs b/codex-rs/app-server-protocol/src/protocol/mod.rs index 1e0410f4b..4179d361c 100644 --- a/codex-rs/app-server-protocol/src/protocol/mod.rs +++ b/codex-rs/app-server-protocol/src/protocol/mod.rs @@ -2,6 +2,7 @@ // Exposes protocol pieces used by `lib.rs` via `pub use protocol::common::*;`. pub mod common; +pub mod item_builders; mod mappers; mod serde_helpers; pub mod thread_history; diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index 48fa56d68..99a8f6e62 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -1,16 +1,18 @@ +use crate::protocol::item_builders::build_command_execution_begin_item; +use crate::protocol::item_builders::build_command_execution_end_item; +use crate::protocol::item_builders::build_file_change_approval_request_item; +use crate::protocol::item_builders::build_file_change_begin_item; +use crate::protocol::item_builders::build_file_change_end_item; +use crate::protocol::item_builders::build_item_from_guardian_event; use crate::protocol::v2::CollabAgentState; use crate::protocol::v2::CollabAgentTool; use crate::protocol::v2::CollabAgentToolCallStatus; -use crate::protocol::v2::CommandAction; use crate::protocol::v2::CommandExecutionStatus; use crate::protocol::v2::DynamicToolCallOutputContentItem; use crate::protocol::v2::DynamicToolCallStatus; -use crate::protocol::v2::FileUpdateChange; use crate::protocol::v2::McpToolCallError; use crate::protocol::v2::McpToolCallResult; use crate::protocol::v2::McpToolCallStatus; -use crate::protocol::v2::PatchApplyStatus; -use crate::protocol::v2::PatchChangeKind; use crate::protocol::v2::ThreadItem; use crate::protocol::v2::Turn; use crate::protocol::v2::TurnError as V2TurnError; @@ -31,6 +33,8 @@ use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ExecCommandBeginEvent; use codex_protocol::protocol::ExecCommandEndEvent; +use codex_protocol::protocol::GuardianAssessmentEvent; +use codex_protocol::protocol::GuardianAssessmentStatus; use codex_protocol::protocol::ImageGenerationBeginEvent; use codex_protocol::protocol::ImageGenerationEndEvent; use codex_protocol::protocol::ItemCompletedEvent; @@ -53,6 +57,14 @@ use std::collections::HashMap; use tracing::warn; use uuid::Uuid; +#[cfg(test)] +use crate::protocol::v2::CommandAction; +#[cfg(test)] +use crate::protocol::v2::FileUpdateChange; +#[cfg(test)] +use crate::protocol::v2::PatchApplyStatus; +#[cfg(test)] +use crate::protocol::v2::PatchChangeKind; #[cfg(test)] use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus; #[cfg(test)] @@ -149,6 +161,7 @@ impl ThreadHistoryBuilder { EventMsg::WebSearchEnd(payload) => self.handle_web_search_end(payload), EventMsg::ExecCommandBegin(payload) => self.handle_exec_command_begin(payload), EventMsg::ExecCommandEnd(payload) => self.handle_exec_command_end(payload), + EventMsg::GuardianAssessment(payload) => self.handle_guardian_assessment(payload), EventMsg::ApplyPatchApprovalRequest(payload) => { self.handle_apply_patch_approval_request(payload) } @@ -375,57 +388,12 @@ impl ThreadHistoryBuilder { } fn handle_exec_command_begin(&mut self, payload: &ExecCommandBeginEvent) { - let command = shlex::try_join(payload.command.iter().map(String::as_str)) - .unwrap_or_else(|_| payload.command.join(" ")); - let command_actions = payload - .parsed_cmd - .iter() - .cloned() - .map(CommandAction::from) - .collect(); - let item = ThreadItem::CommandExecution { - id: payload.call_id.clone(), - command, - cwd: payload.cwd.clone(), - process_id: payload.process_id.clone(), - source: payload.source.into(), - status: CommandExecutionStatus::InProgress, - command_actions, - aggregated_output: None, - exit_code: None, - duration_ms: None, - }; + let item = build_command_execution_begin_item(payload); self.upsert_item_in_turn_id(&payload.turn_id, item); } fn handle_exec_command_end(&mut self, payload: &ExecCommandEndEvent) { - let status: CommandExecutionStatus = (&payload.status).into(); - let duration_ms = i64::try_from(payload.duration.as_millis()).unwrap_or(i64::MAX); - let aggregated_output = if payload.aggregated_output.is_empty() { - None - } else { - Some(payload.aggregated_output.clone()) - }; - let command = shlex::try_join(payload.command.iter().map(String::as_str)) - .unwrap_or_else(|_| payload.command.join(" ")); - let command_actions = payload - .parsed_cmd - .iter() - .cloned() - .map(CommandAction::from) - .collect(); - let item = ThreadItem::CommandExecution { - id: payload.call_id.clone(), - command, - cwd: payload.cwd.clone(), - process_id: payload.process_id.clone(), - source: payload.source.into(), - status, - command_actions, - aggregated_output, - exit_code: Some(payload.exit_code), - duration_ms: Some(duration_ms), - }; + let item = build_command_execution_end_item(payload); // Command completions can arrive out of order. Unified exec may return // while a PTY is still running, then emit ExecCommandEnd later from a // background exit watcher when that process finally exits. By then, a @@ -434,12 +402,26 @@ impl ThreadHistoryBuilder { self.upsert_item_in_turn_id(&payload.turn_id, item); } - fn handle_apply_patch_approval_request(&mut self, payload: &ApplyPatchApprovalRequestEvent) { - let item = ThreadItem::FileChange { - id: payload.call_id.clone(), - changes: convert_patch_changes(&payload.changes), - status: PatchApplyStatus::InProgress, + fn handle_guardian_assessment(&mut self, payload: &GuardianAssessmentEvent) { + let status = match payload.status { + GuardianAssessmentStatus::InProgress => CommandExecutionStatus::InProgress, + GuardianAssessmentStatus::Denied | GuardianAssessmentStatus::Aborted => { + CommandExecutionStatus::Declined + } + GuardianAssessmentStatus::Approved => return, }; + let Some(item) = build_item_from_guardian_event(payload, status) else { + return; + }; + if payload.turn_id.is_empty() { + self.upsert_item_in_current_turn(item); + } else { + self.upsert_item_in_turn_id(&payload.turn_id, item); + } + } + + fn handle_apply_patch_approval_request(&mut self, payload: &ApplyPatchApprovalRequestEvent) { + let item = build_file_change_approval_request_item(payload); if payload.turn_id.is_empty() { self.upsert_item_in_current_turn(item); } else { @@ -448,11 +430,7 @@ impl ThreadHistoryBuilder { } fn handle_patch_apply_begin(&mut self, payload: &PatchApplyBeginEvent) { - let item = ThreadItem::FileChange { - id: payload.call_id.clone(), - changes: convert_patch_changes(&payload.changes), - status: PatchApplyStatus::InProgress, - }; + let item = build_file_change_begin_item(payload); if payload.turn_id.is_empty() { self.upsert_item_in_current_turn(item); } else { @@ -461,12 +439,7 @@ impl ThreadHistoryBuilder { } fn handle_patch_apply_end(&mut self, payload: &PatchApplyEndEvent) { - let status: PatchApplyStatus = (&payload.status).into(); - let item = ThreadItem::FileChange { - id: payload.call_id.clone(), - changes: convert_patch_changes(&payload.changes), - status, - }; + let item = build_file_change_end_item(payload); if payload.turn_id.is_empty() { self.upsert_item_in_current_turn(item); } else { @@ -1076,21 +1049,6 @@ fn render_review_output_text(output: &ReviewOutputEvent) -> String { } } -pub fn convert_patch_changes( - changes: &HashMap, -) -> Vec { - let mut converted: Vec = changes - .iter() - .map(|(path, change)| FileUpdateChange { - path: path.to_string_lossy().into_owned(), - kind: map_patch_change_kind(change), - diff: format_file_change_diff(change), - }) - .collect(); - converted.sort_by(|a, b| a.path.cmp(&b.path)); - converted -} - fn convert_dynamic_tool_content_items( items: &[codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem], ) -> Vec { @@ -1108,33 +1066,6 @@ fn convert_dynamic_tool_content_items( .collect() } -fn map_patch_change_kind(change: &codex_protocol::protocol::FileChange) -> PatchChangeKind { - match change { - codex_protocol::protocol::FileChange::Add { .. } => PatchChangeKind::Add, - codex_protocol::protocol::FileChange::Delete { .. } => PatchChangeKind::Delete, - codex_protocol::protocol::FileChange::Update { move_path, .. } => PatchChangeKind::Update { - move_path: move_path.clone(), - }, - } -} - -fn format_file_change_diff(change: &codex_protocol::protocol::FileChange) -> String { - match change { - codex_protocol::protocol::FileChange::Add { content } => content.clone(), - codex_protocol::protocol::FileChange::Delete { content } => content.clone(), - codex_protocol::protocol::FileChange::Update { - unified_diff, - move_path, - } => { - if let Some(path) = move_path { - format!("{unified_diff}\n\nMoved to: {}", path.display()) - } else { - unified_diff.clone() - } - } - } -} - fn upsert_turn_item(items: &mut Vec, item: ThreadItem) { if let Some(existing_item) = items .iter_mut() @@ -2030,6 +1961,136 @@ mod tests { ); } + #[test] + fn reconstructs_declined_guardian_command_item() { + let events = vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "review this command".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::GuardianAssessment(GuardianAssessmentEvent { + id: "guardian-exec".into(), + turn_id: "turn-1".into(), + status: GuardianAssessmentStatus::InProgress, + risk_score: None, + risk_level: None, + rationale: None, + action: serde_json::from_value(serde_json::json!({ + "type": "command", + "source": "shell", + "command": "rm -rf /tmp/guardian", + "cwd": "/tmp", + })) + .expect("guardian action"), + }), + EventMsg::GuardianAssessment(GuardianAssessmentEvent { + id: "guardian-exec".into(), + turn_id: "turn-1".into(), + status: GuardianAssessmentStatus::Denied, + risk_score: Some(97), + risk_level: Some(codex_protocol::protocol::GuardianRiskLevel::High), + rationale: Some("Would delete user data.".into()), + action: serde_json::from_value(serde_json::json!({ + "type": "command", + "source": "shell", + "command": "rm -rf /tmp/guardian", + "cwd": "/tmp", + })) + .expect("guardian action"), + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].items.len(), 2); + assert_eq!( + turns[0].items[1], + ThreadItem::CommandExecution { + id: "guardian-exec".into(), + command: "rm -rf /tmp/guardian".into(), + cwd: PathBuf::from("/tmp"), + process_id: None, + source: CommandExecutionSource::Agent, + status: CommandExecutionStatus::Declined, + command_actions: vec![CommandAction::Unknown { + command: "rm -rf /tmp/guardian".into(), + }], + aggregated_output: None, + exit_code: None, + duration_ms: None, + } + ); + } + + #[test] + fn reconstructs_in_progress_guardian_execve_item() { + let events = vec![ + EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".into(), + model_context_window: None, + collaboration_mode_kind: Default::default(), + }), + EventMsg::UserMessage(UserMessageEvent { + message: "run a subcommand".into(), + images: None, + text_elements: Vec::new(), + local_images: Vec::new(), + }), + EventMsg::GuardianAssessment(GuardianAssessmentEvent { + id: "guardian-execve".into(), + turn_id: "turn-1".into(), + status: GuardianAssessmentStatus::InProgress, + risk_score: None, + risk_level: None, + rationale: None, + action: serde_json::from_value(serde_json::json!({ + "type": "execve", + "source": "shell", + "program": "/bin/rm", + "argv": ["/usr/bin/rm", "-f", "/tmp/file.sqlite"], + "cwd": "/tmp", + })) + .expect("guardian action"), + }), + ]; + + let items = events + .into_iter() + .map(RolloutItem::EventMsg) + .collect::>(); + let turns = build_turns_from_rollout_items(&items); + assert_eq!(turns.len(), 1); + assert_eq!(turns[0].items.len(), 2); + assert_eq!( + turns[0].items[1], + ThreadItem::CommandExecution { + id: "guardian-execve".into(), + command: "/bin/rm -f /tmp/file.sqlite".into(), + cwd: PathBuf::from("/tmp"), + process_id: None, + source: CommandExecutionSource::Agent, + status: CommandExecutionStatus::InProgress, + command_actions: vec![CommandAction::Unknown { + command: "/bin/rm -f /tmp/file.sqlite".into(), + }], + aggregated_output: None, + exit_code: None, + duration_ms: None, + } + ); + } + #[test] fn assigns_late_exec_completion_to_original_turn() { let events = vec![ diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 78c60b0ae..01eb4bd57 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -7,9 +7,9 @@ use crate::error_code::INVALID_REQUEST_ERROR_CODE; use crate::outgoing_message::ClientRequestResult; use crate::outgoing_message::ThreadScopedOutgoingMessageSender; use crate::server_request_error::is_turn_transition_server_request_error; -use crate::thread_state::ThreadListenerCommand; use crate::thread_state::ThreadState; use crate::thread_state::TurnSummary; +use crate::thread_state::resolve_server_request_on_thread_listener; use crate::thread_status::ThreadWatchActiveGuard; use crate::thread_status::ThreadWatchManager; use codex_app_server_protocol::AccountRateLimitsUpdatedNotification; @@ -43,14 +43,10 @@ use codex_app_server_protocol::FileChangeRequestApprovalParams; use codex_app_server_protocol::FileChangeRequestApprovalResponse; use codex_app_server_protocol::FileUpdateChange; use codex_app_server_protocol::GrantedPermissionProfile as V2GrantedPermissionProfile; -use codex_app_server_protocol::GuardianApprovalReview; -use codex_app_server_protocol::GuardianApprovalReviewStatus; use codex_app_server_protocol::HookCompletedNotification; use codex_app_server_protocol::HookStartedNotification; use codex_app_server_protocol::InterruptConversationResponse; use codex_app_server_protocol::ItemCompletedNotification; -use codex_app_server_protocol::ItemGuardianApprovalReviewCompletedNotification; -use codex_app_server_protocol::ItemGuardianApprovalReviewStartedNotification; use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::McpServerElicitationAction; @@ -102,8 +98,14 @@ use codex_app_server_protocol::TurnPlanStep; use codex_app_server_protocol::TurnPlanUpdatedNotification; use codex_app_server_protocol::TurnStartedNotification; use codex_app_server_protocol::TurnStatus; +use codex_app_server_protocol::build_command_execution_end_item; +use codex_app_server_protocol::build_file_change_approval_request_item; +use codex_app_server_protocol::build_file_change_begin_item; +use codex_app_server_protocol::build_file_change_end_item; +use codex_app_server_protocol::build_item_from_guardian_event; use codex_app_server_protocol::build_turns_from_rollout_items; use codex_app_server_protocol::convert_patch_changes; +use codex_app_server_protocol::guardian_auto_approval_review_notification; use codex_core::CodexThread; use codex_core::ThreadManager; use codex_core::find_thread_name_by_id; @@ -114,13 +116,10 @@ use codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem as CoreDynam use codex_protocol::dynamic_tools::DynamicToolResponse as CoreDynamicToolResponse; use codex_protocol::items::parse_hook_prompt_message; use codex_protocol::plan_tool::UpdatePlanArgs; -use codex_protocol::protocol::ApplyPatchApprovalRequestEvent; use codex_protocol::protocol::CodexErrorInfo as CoreCodexErrorInfo; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ExecApprovalRequestEvent; -use codex_protocol::protocol::ExecCommandEndEvent; -use codex_protocol::protocol::GuardianAssessmentEvent; use codex_protocol::protocol::McpToolCallBeginEvent; use codex_protocol::protocol::McpToolCallEndEvent; use codex_protocol::protocol::Op; @@ -137,7 +136,6 @@ use codex_protocol::request_user_input::RequestUserInputResponse as CoreRequestU use codex_sandboxing::policy_transforms::intersect_permission_profiles; use codex_shell_command::parse_command::shlex_join; use std::collections::HashMap; -use std::convert::TryFrom; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; @@ -153,105 +151,13 @@ enum CommandExecutionApprovalPresentation { Command(CommandExecutionCompletionItem), } +#[derive(Debug, PartialEq)] struct CommandExecutionCompletionItem { command: String, cwd: PathBuf, command_actions: Vec, } -async fn resolve_server_request_on_thread_listener( - thread_state: &Arc>, - request_id: RequestId, -) { - let (completion_tx, completion_rx) = oneshot::channel(); - let listener_command_tx = { - let state = thread_state.lock().await; - state.listener_command_tx() - }; - let Some(listener_command_tx) = listener_command_tx else { - error!("failed to remove pending client request: thread listener is not running"); - return; - }; - - if listener_command_tx - .send(ThreadListenerCommand::ResolveServerRequest { - request_id, - completion_tx, - }) - .is_err() - { - error!( - "failed to remove pending client request: thread listener command channel is closed" - ); - return; - } - - if let Err(err) = completion_rx.await { - error!("failed to remove pending client request: {err}"); - } -} - -fn guardian_auto_approval_review_notification( - conversation_id: &ThreadId, - event_turn_id: &str, - assessment: &GuardianAssessmentEvent, -) -> ServerNotification { - // TODO(ccunningham): Attach guardian review state to the reviewed tool - // item's lifecycle instead of sending standalone review notifications so - // the app-server API can persist and replay review state via `thread/read`. - let turn_id = if assessment.turn_id.is_empty() { - event_turn_id.to_string() - } else { - assessment.turn_id.clone() - }; - let review = GuardianApprovalReview { - status: match assessment.status { - codex_protocol::protocol::GuardianAssessmentStatus::InProgress => { - GuardianApprovalReviewStatus::InProgress - } - codex_protocol::protocol::GuardianAssessmentStatus::Approved => { - GuardianApprovalReviewStatus::Approved - } - codex_protocol::protocol::GuardianAssessmentStatus::Denied => { - GuardianApprovalReviewStatus::Denied - } - codex_protocol::protocol::GuardianAssessmentStatus::Aborted => { - GuardianApprovalReviewStatus::Aborted - } - }, - risk_score: assessment.risk_score, - risk_level: assessment.risk_level.map(Into::into), - rationale: assessment.rationale.clone(), - }; - let action = assessment.action.clone().into(); - match assessment.status { - codex_protocol::protocol::GuardianAssessmentStatus::InProgress => { - ServerNotification::ItemGuardianApprovalReviewStarted( - ItemGuardianApprovalReviewStartedNotification { - thread_id: conversation_id.to_string(), - turn_id, - target_item_id: assessment.id.clone(), - review, - action, - }, - ) - } - codex_protocol::protocol::GuardianAssessmentStatus::Approved - | codex_protocol::protocol::GuardianAssessmentStatus::Denied - | codex_protocol::protocol::GuardianAssessmentStatus::Aborted => { - ServerNotification::ItemGuardianApprovalReviewCompleted( - ItemGuardianApprovalReviewCompletedNotification { - thread_id: conversation_id.to_string(), - turn_id, - target_item_id: assessment.id.clone(), - review, - action, - }, - ) - } - } -} - #[allow(clippy::too_many_arguments)] pub(crate) async fn apply_bespoke_event_handling( event: Event, @@ -344,12 +250,71 @@ pub(crate) async fn apply_bespoke_event_handling( EventMsg::Warning(_warning_event) => {} EventMsg::GuardianAssessment(assessment) => { if let ApiVersion::V2 = api_version { + let pending_command_execution = match build_item_from_guardian_event( + &assessment, + CommandExecutionStatus::InProgress, + ) { + Some(ThreadItem::CommandExecution { + command, + cwd, + command_actions, + .. + }) => Some(CommandExecutionCompletionItem { + command, + cwd, + command_actions, + }), + Some(_) | None => None, + }; + let assessment_turn_id = if assessment.turn_id.is_empty() { + event_turn_id.clone() + } else { + assessment.turn_id.clone() + }; + if assessment.status + == codex_protocol::protocol::GuardianAssessmentStatus::InProgress + && let Some(completion_item) = pending_command_execution.as_ref() + { + start_command_execution_item( + &conversation_id, + assessment_turn_id.clone(), + assessment.id.clone(), + completion_item.command.clone(), + completion_item.cwd.clone(), + completion_item.command_actions.clone(), + CommandExecutionSource::Agent, + &outgoing, + &thread_state, + ) + .await; + } let notification = guardian_auto_approval_review_notification( &conversation_id, &event_turn_id, &assessment, ); outgoing.send_server_notification(notification).await; + if matches!( + assessment.status, + codex_protocol::protocol::GuardianAssessmentStatus::Denied + | codex_protocol::protocol::GuardianAssessmentStatus::Aborted + ) && let Some(completion_item) = pending_command_execution + { + complete_command_execution_item( + &conversation_id, + assessment_turn_id, + assessment.id.clone(), + completion_item.command, + completion_item.cwd, + /*process_id*/ None, + CommandExecutionSource::Agent, + completion_item.command_actions, + CommandExecutionStatus::Declined, + &outgoing, + &thread_state, + ) + .await; + } } } EventMsg::ModelReroute(event) => { @@ -503,13 +468,7 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } } - EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { - call_id, - turn_id, - changes, - reason, - grant_root, - }) => { + EventMsg::ApplyPatchApprovalRequest(event) => { let permission_guard = thread_watch_manager .note_permission_requested(&conversation_id.to_string()) .await; @@ -517,14 +476,15 @@ pub(crate) async fn apply_bespoke_event_handling( ApiVersion::V1 => { let params = ApplyPatchApprovalParams { conversation_id, - call_id: call_id.clone(), - file_changes: changes.clone(), - reason, - grant_root, + call_id: event.call_id.clone(), + file_changes: event.changes.clone(), + reason: event.reason.clone(), + grant_root: event.grant_root.clone(), }; let (_pending_request_id, rx) = outgoing .send_request(ServerRequestPayload::ApplyPatchApproval(params)) .await; + let call_id = event.call_id.clone(); tokio::spawn(async move { let _permission_guard = permission_guard; on_patch_approval_response(call_id, rx, conversation).await; @@ -533,9 +493,8 @@ pub(crate) async fn apply_bespoke_event_handling( ApiVersion::V2 => { // Until we migrate the core to be aware of a first class FileChangeItem // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. - let item_id = call_id.clone(); - let patch_changes = convert_patch_changes(&changes); - + let item_id = event.call_id.clone(); + let patch_changes = convert_patch_changes(&event.changes); let first_start = { let mut state = thread_state.lock().await; state @@ -544,11 +503,7 @@ pub(crate) async fn apply_bespoke_event_handling( .insert(item_id.clone()) }; if first_start { - let item = ThreadItem::FileChange { - id: item_id.clone(), - changes: patch_changes.clone(), - status: PatchApplyStatus::InProgress, - }; + let item = build_file_change_approval_request_item(&event); let notification = ItemStartedNotification { thread_id: conversation_id.to_string(), turn_id: event_turn_id.clone(), @@ -561,10 +516,10 @@ pub(crate) async fn apply_bespoke_event_handling( let params = FileChangeRequestApprovalParams { thread_id: conversation_id.to_string(), - turn_id: turn_id.clone(), + turn_id: event.turn_id.clone(), item_id: item_id.clone(), - reason, - grant_root, + reason: event.reason.clone(), + grant_root: event.grant_root.clone(), }; let (pending_request_id, rx) = outgoing .send_request(ServerRequestPayload::FileChangeRequestApproval(params)) @@ -668,6 +623,22 @@ pub(crate) async fn apply_bespoke_event_handling( Some(completion_item), ), }; + if approval_id.is_none() + && let Some(completion_item) = completion_item.as_ref() + { + start_command_execution_item( + &conversation_id, + event_turn_id.clone(), + call_id.clone(), + completion_item.command.clone(), + completion_item.cwd.clone(), + completion_item.command_actions.clone(), + CommandExecutionSource::Agent, + &outgoing, + &thread_state, + ) + .await; + } let proposed_execpolicy_amendment_v2 = proposed_execpolicy_amendment.map(V2ExecPolicyAmendment::from); let proposed_network_policy_amendments_v2 = proposed_network_policy_amendments @@ -1555,7 +1526,6 @@ pub(crate) async fn apply_bespoke_event_handling( // Until we migrate the core to be aware of a first class FileChangeItem // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. let item_id = patch_begin_event.call_id.clone(); - let changes = convert_patch_changes(&patch_begin_event.changes); let first_start = { let mut state = thread_state.lock().await; @@ -1565,11 +1535,7 @@ pub(crate) async fn apply_bespoke_event_handling( .insert(item_id.clone()) }; if first_start { - let item = ThreadItem::FileChange { - id: item_id.clone(), - changes, - status: PatchApplyStatus::InProgress, - }; + let item = build_file_change_begin_item(&patch_begin_event); let notification = ItemStartedNotification { thread_id: conversation_id.to_string(), turn_id: event_turn_id.clone(), @@ -1584,14 +1550,10 @@ pub(crate) async fn apply_bespoke_event_handling( // Until we migrate the core to be aware of a first class FileChangeItem // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. let item_id = patch_end_event.call_id.clone(); - - let status: PatchApplyStatus = (&patch_end_event.status).into(); - let changes = convert_patch_changes(&patch_end_event.changes); complete_file_change_item( conversation_id, item_id, - changes, - status, + build_file_change_end_item(&patch_end_event), event_turn_id.clone(), &outgoing, &thread_state, @@ -1608,35 +1570,35 @@ pub(crate) async fn apply_bespoke_event_handling( let command = shlex_join(&exec_command_begin_event.command); let cwd = exec_command_begin_event.cwd; let process_id = exec_command_begin_event.process_id; - - { + let first_start = { let mut state = thread_state.lock().await; state .turn_summary .command_execution_started - .insert(item_id.clone()); + .insert(item_id.clone()) + }; + if first_start { + let item = ThreadItem::CommandExecution { + id: item_id, + command, + cwd, + process_id, + source: exec_command_begin_event.source.into(), + status: CommandExecutionStatus::InProgress, + command_actions, + aggregated_output: None, + exit_code: None, + duration_ms: None, + }; + let notification = ItemStartedNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id.clone(), + item, + }; + outgoing + .send_server_notification(ServerNotification::ItemStarted(notification)) + .await; } - - let item = ThreadItem::CommandExecution { - id: item_id, - command, - cwd, - process_id, - source: exec_command_begin_event.source.into(), - status: CommandExecutionStatus::InProgress, - command_actions, - aggregated_output: None, - exit_code: None, - duration_ms: None, - }; - let notification = ItemStartedNotification { - thread_id: conversation_id.to_string(), - turn_id: event_turn_id.clone(), - item, - }; - outgoing - .send_server_notification(ServerNotification::ItemStarted(notification)) - .await; } EventMsg::ExecCommandOutputDelta(exec_command_output_delta_event) => { let item_id = exec_command_output_delta_event.call_id.clone(); @@ -1694,20 +1656,7 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } EventMsg::ExecCommandEnd(exec_command_end_event) => { - let ExecCommandEndEvent { - call_id, - command, - cwd, - parsed_cmd, - process_id, - aggregated_output, - exit_code, - duration, - source, - status, - .. - } = exec_command_end_event; - + let call_id = exec_command_end_event.call_id.clone(); { let mut state = thread_state.lock().await; state @@ -1716,32 +1665,7 @@ pub(crate) async fn apply_bespoke_event_handling( .remove(&call_id); } - let status: CommandExecutionStatus = (&status).into(); - let command_actions = parsed_cmd - .into_iter() - .map(V2ParsedCommand::from) - .collect::>(); - - let aggregated_output = if aggregated_output.is_empty() { - None - } else { - Some(aggregated_output) - }; - - let duration_ms = i64::try_from(duration.as_millis()).unwrap_or(i64::MAX); - - let item = ThreadItem::CommandExecution { - id: call_id, - command: shlex_join(&command), - cwd, - process_id, - source: source.into(), - status, - command_actions, - aggregated_output, - exit_code: Some(exit_code), - duration_ms: Some(duration_ms), - }; + let item = build_command_execution_end_item(&exec_command_end_event); let notification = ItemCompletedNotification { thread_id: conversation_id.to_string(), @@ -1966,8 +1890,7 @@ async fn emit_turn_completed_with_status( async fn complete_file_change_item( conversation_id: ThreadId, item_id: String, - changes: Vec, - status: PatchApplyStatus, + item: ThreadItem, turn_id: String, outgoing: &ThreadScopedOutgoingMessageSender, thread_state: &Arc>, @@ -1976,11 +1899,6 @@ async fn complete_file_change_item( state.turn_summary.file_change_started.remove(&item_id); drop(state); - let item = ThreadItem::FileChange { - id: item_id, - changes, - status, - }; let notification = ItemCompletedNotification { thread_id: conversation_id.to_string(), turn_id, @@ -1991,9 +1909,52 @@ async fn complete_file_change_item( .await; } +#[allow(clippy::too_many_arguments)] +async fn start_command_execution_item( + conversation_id: &ThreadId, + turn_id: String, + item_id: String, + command: String, + cwd: PathBuf, + command_actions: Vec, + source: CommandExecutionSource, + outgoing: &ThreadScopedOutgoingMessageSender, + thread_state: &Arc>, +) -> bool { + let first_start = { + let mut state = thread_state.lock().await; + state + .turn_summary + .command_execution_started + .insert(item_id.clone()) + }; + if first_start { + let notification = ItemStartedNotification { + thread_id: conversation_id.to_string(), + turn_id, + item: ThreadItem::CommandExecution { + id: item_id, + command, + cwd, + process_id: None, + source, + status: CommandExecutionStatus::InProgress, + command_actions, + aggregated_output: None, + exit_code: None, + duration_ms: None, + }, + }; + outgoing + .send_server_notification(ServerNotification::ItemStarted(notification)) + .await; + } + first_start +} + #[allow(clippy::too_many_arguments)] async fn complete_command_execution_item( - conversation_id: ThreadId, + conversation_id: &ThreadId, turn_id: String, item_id: String, command: String, @@ -2003,7 +1964,18 @@ async fn complete_command_execution_item( command_actions: Vec, status: CommandExecutionStatus, outgoing: &ThreadScopedOutgoingMessageSender, + thread_state: &Arc>, ) { + let mut state = thread_state.lock().await; + let should_emit = state + .turn_summary + .command_execution_started + .remove(&item_id); + drop(state); + if !should_emit { + return; + } + let item = ThreadItem::CommandExecution { id: item_id, command, @@ -2592,8 +2564,11 @@ async fn on_file_change_request_approval_response( complete_file_change_item( conversation_id, item_id.clone(), - changes, - status, + ThreadItem::FileChange { + id: item_id.clone(), + changes, + status, + }, event_turn_id.clone(), &outgoing, &thread_state, @@ -2710,7 +2685,7 @@ async fn on_command_execution_request_approval_response( && let Some(completion_item) = completion_item { complete_command_execution_item( - conversation_id, + &conversation_id, event_turn_id.clone(), item_id.clone(), completion_item.command, @@ -2720,6 +2695,7 @@ async fn on_command_execution_request_approval_response( completion_item.command_actions, status, &outgoing, + &thread_state, ) .await; } @@ -2861,6 +2837,7 @@ mod tests { use codex_app_server_protocol::GuardianApprovalReviewStatus; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::TurnPlanStepStatus; + use codex_login::CodexAuth; use codex_protocol::items::HookPromptFragment; use codex_protocol::items::build_hook_prompt_message; use codex_protocol::mcp::CallToolResult; @@ -2871,16 +2848,22 @@ mod tests { use codex_protocol::protocol::CollabResumeBeginEvent; use codex_protocol::protocol::CollabResumeEndEvent; use codex_protocol::protocol::CreditsSnapshot; + use codex_protocol::protocol::GuardianAssessmentEvent; + use codex_protocol::protocol::GuardianAssessmentStatus; use codex_protocol::protocol::McpInvocation; use codex_protocol::protocol::RateLimitSnapshot; use codex_protocol::protocol::RateLimitWindow; use codex_protocol::protocol::TokenUsage; use codex_protocol::protocol::TokenUsageInfo; use codex_utils_absolute_path::AbsolutePathBuf; + use core_test_support::load_default_config_for_test; use pretty_assertions::assert_eq; use rmcp::model::Content; use serde_json::Value as JsonValue; + use serde_json::json; + use std::path::PathBuf; use std::time::Duration; + use tempfile::TempDir; use tokio::sync::Mutex; use tokio::sync::mpsc; @@ -2901,6 +2884,84 @@ mod tests { } } + fn command_execution_completion_item(command: &str) -> CommandExecutionCompletionItem { + CommandExecutionCompletionItem { + command: command.to_string(), + cwd: PathBuf::from("/tmp"), + command_actions: vec![V2ParsedCommand::Unknown { + command: command.to_string(), + }], + } + } + + fn guardian_command_assessment( + id: &str, + turn_id: &str, + status: GuardianAssessmentStatus, + ) -> GuardianAssessmentEvent { + let (risk_score, risk_level, rationale) = match status { + GuardianAssessmentStatus::InProgress => (None, None, None), + GuardianAssessmentStatus::Approved => ( + Some(12), + Some(codex_protocol::protocol::GuardianRiskLevel::Low), + Some("looks safe".to_string()), + ), + GuardianAssessmentStatus::Denied => ( + Some(88), + Some(codex_protocol::protocol::GuardianRiskLevel::High), + Some("too risky".to_string()), + ), + GuardianAssessmentStatus::Aborted => (None, None, None), + }; + GuardianAssessmentEvent { + id: id.to_string(), + turn_id: turn_id.to_string(), + status, + risk_score, + risk_level, + rationale, + action: serde_json::from_value(json!({ + "type": "command", + "source": "shell", + "command": format!("rm -f /tmp/{id}.sqlite"), + "cwd": "/tmp", + })) + .expect("guardian action"), + } + } + + struct GuardianAssessmentTestContext { + conversation_id: ThreadId, + conversation: Arc, + thread_manager: Arc, + outgoing: ThreadScopedOutgoingMessageSender, + thread_state: Arc>, + thread_watch_manager: ThreadWatchManager, + codex_home: PathBuf, + } + + impl GuardianAssessmentTestContext { + async fn apply_guardian_assessment_event(&self, assessment: GuardianAssessmentEvent) { + let event_turn_id = assessment.turn_id.clone(); + apply_bespoke_event_handling( + Event { + id: event_turn_id, + msg: EventMsg::GuardianAssessment(assessment), + }, + self.conversation_id.clone(), + self.conversation.clone(), + self.thread_manager.clone(), + self.outgoing.clone(), + self.thread_state.clone(), + self.thread_watch_manager.clone(), + ApiVersion::V2, + "test-provider".to_string(), + &self.codex_home, + ) + .await; + } + } + #[test] fn guardian_assessment_started_uses_event_turn_id_fallback() { let conversation_id = ThreadId::new(); @@ -3019,6 +3080,313 @@ mod tests { } } + #[tokio::test] + async fn command_execution_started_helper_emits_once() -> Result<()> { + let conversation_id = ThreadId::new(); + let thread_state = new_thread_state(); + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); + let completion_item = command_execution_completion_item("printf hi"); + + let first_start = start_command_execution_item( + &conversation_id, + "turn-1".to_string(), + "cmd-1".to_string(), + completion_item.command.clone(), + completion_item.cwd.clone(), + completion_item.command_actions.clone(), + CommandExecutionSource::Agent, + &outgoing, + &thread_state, + ) + .await; + assert!(first_start); + + let msg = recv_broadcast_message(&mut rx).await?; + match msg { + OutgoingMessage::AppServerNotification(ServerNotification::ItemStarted(payload)) => { + assert_eq!(payload.thread_id, conversation_id.to_string()); + assert_eq!(payload.turn_id, "turn-1"); + assert_eq!( + payload.item, + ThreadItem::CommandExecution { + id: "cmd-1".to_string(), + command: completion_item.command.clone(), + cwd: completion_item.cwd.clone(), + process_id: None, + source: CommandExecutionSource::Agent, + status: CommandExecutionStatus::InProgress, + command_actions: completion_item.command_actions.clone(), + aggregated_output: None, + exit_code: None, + duration_ms: None, + } + ); + } + other => bail!("unexpected message: {other:?}"), + } + + let second_start = start_command_execution_item( + &conversation_id, + "turn-1".to_string(), + "cmd-1".to_string(), + completion_item.command.clone(), + completion_item.cwd.clone(), + completion_item.command_actions.clone(), + CommandExecutionSource::Agent, + &outgoing, + &thread_state, + ) + .await; + assert!(!second_start); + assert!(rx.try_recv().is_err(), "duplicate start should not emit"); + Ok(()) + } + + #[tokio::test] + async fn complete_command_execution_item_emits_declined_once_for_pending_command() -> Result<()> + { + let conversation_id = ThreadId::new(); + let thread_state = new_thread_state(); + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); + let completion_item = command_execution_completion_item("printf hi"); + + start_command_execution_item( + &conversation_id, + "turn-1".to_string(), + "cmd-1".to_string(), + completion_item.command.clone(), + completion_item.cwd.clone(), + completion_item.command_actions.clone(), + CommandExecutionSource::Agent, + &outgoing, + &thread_state, + ) + .await; + let _started = recv_broadcast_message(&mut rx).await?; + + complete_command_execution_item( + &conversation_id, + "turn-1".to_string(), + "cmd-1".to_string(), + completion_item.command.clone(), + completion_item.cwd.clone(), + /*process_id*/ None, + CommandExecutionSource::Agent, + completion_item.command_actions.clone(), + CommandExecutionStatus::Declined, + &outgoing, + &thread_state, + ) + .await; + + let completed = recv_broadcast_message(&mut rx).await?; + match completed { + OutgoingMessage::AppServerNotification(ServerNotification::ItemCompleted(payload)) => { + let ThreadItem::CommandExecution { id, status, .. } = payload.item else { + bail!("expected command execution completion"); + }; + assert_eq!(id, "cmd-1"); + assert_eq!(status, CommandExecutionStatus::Declined); + } + other => bail!("unexpected message: {other:?}"), + } + + complete_command_execution_item( + &conversation_id, + "turn-1".to_string(), + "cmd-1".to_string(), + completion_item.command, + completion_item.cwd, + /*process_id*/ None, + CommandExecutionSource::Agent, + completion_item.command_actions, + CommandExecutionStatus::Declined, + &outgoing, + &thread_state, + ) + .await; + assert!( + rx.try_recv().is_err(), + "completion should not emit after the pending item is cleared" + ); + Ok(()) + } + + #[tokio::test] + async fn guardian_command_execution_notifications_wrap_review_lifecycle() -> Result<()> { + let codex_home = TempDir::new()?; + let config = load_default_config_for_test(&codex_home).await; + let thread_manager = Arc::new( + codex_core::test_support::thread_manager_with_models_provider_and_home( + CodexAuth::create_dummy_chatgpt_auth_for_testing(), + config.model_provider.clone(), + config.codex_home.clone(), + Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), + ), + ); + let codex_core::NewThread { + thread_id: conversation_id, + thread: conversation, + .. + } = thread_manager.start_thread(config).await?; + let thread_state = new_thread_state(); + let thread_watch_manager = ThreadWatchManager::new(); + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + conversation_id, + ); + let guardian_context = GuardianAssessmentTestContext { + conversation_id, + conversation: conversation.clone(), + thread_manager: thread_manager.clone(), + outgoing: outgoing.clone(), + thread_state: thread_state.clone(), + thread_watch_manager: thread_watch_manager.clone(), + codex_home: codex_home.path().to_path_buf(), + }; + + guardian_context + .apply_guardian_assessment_event(guardian_command_assessment( + "cmd-guardian-approved", + "turn-guardian-approved", + GuardianAssessmentStatus::InProgress, + )) + .await; + let first = recv_broadcast_message(&mut rx).await?; + match first { + OutgoingMessage::AppServerNotification(ServerNotification::ItemStarted(payload)) => { + assert_eq!(payload.turn_id, "turn-guardian-approved"); + let ThreadItem::CommandExecution { id, status, .. } = payload.item else { + bail!("expected command execution item"); + }; + assert_eq!(id, "cmd-guardian-approved"); + assert_eq!(status, CommandExecutionStatus::InProgress); + } + other => bail!("unexpected message: {other:?}"), + } + let second = recv_broadcast_message(&mut rx).await?; + match second { + OutgoingMessage::AppServerNotification( + ServerNotification::ItemGuardianApprovalReviewStarted(payload), + ) => { + assert_eq!(payload.target_item_id, "cmd-guardian-approved"); + assert_eq!( + payload.review.status, + GuardianApprovalReviewStatus::InProgress + ); + } + other => bail!("unexpected message: {other:?}"), + } + + guardian_context + .apply_guardian_assessment_event(guardian_command_assessment( + "cmd-guardian-approved", + "turn-guardian-approved", + GuardianAssessmentStatus::Approved, + )) + .await; + let third = recv_broadcast_message(&mut rx).await?; + match third { + OutgoingMessage::AppServerNotification( + ServerNotification::ItemGuardianApprovalReviewCompleted(payload), + ) => { + assert_eq!(payload.target_item_id, "cmd-guardian-approved"); + assert_eq!( + payload.review.status, + GuardianApprovalReviewStatus::Approved + ); + } + other => bail!("unexpected message: {other:?}"), + } + assert!( + rx.try_recv().is_err(), + "approved review should not complete the command item" + ); + + guardian_context + .apply_guardian_assessment_event(guardian_command_assessment( + "cmd-guardian-denied", + "turn-guardian-denied", + GuardianAssessmentStatus::InProgress, + )) + .await; + let fourth = recv_broadcast_message(&mut rx).await?; + match fourth { + OutgoingMessage::AppServerNotification(ServerNotification::ItemStarted(payload)) => { + assert_eq!(payload.turn_id, "turn-guardian-denied"); + let ThreadItem::CommandExecution { id, status, .. } = payload.item else { + bail!("expected command execution item"); + }; + assert_eq!(id, "cmd-guardian-denied"); + assert_eq!(status, CommandExecutionStatus::InProgress); + } + other => bail!("unexpected message: {other:?}"), + } + let fifth = recv_broadcast_message(&mut rx).await?; + match fifth { + OutgoingMessage::AppServerNotification( + ServerNotification::ItemGuardianApprovalReviewStarted(payload), + ) => { + assert_eq!(payload.target_item_id, "cmd-guardian-denied"); + assert_eq!( + payload.review.status, + GuardianApprovalReviewStatus::InProgress + ); + } + other => bail!("unexpected message: {other:?}"), + } + + guardian_context + .apply_guardian_assessment_event(guardian_command_assessment( + "cmd-guardian-denied", + "turn-guardian-denied", + GuardianAssessmentStatus::Denied, + )) + .await; + let sixth = recv_broadcast_message(&mut rx).await?; + match sixth { + OutgoingMessage::AppServerNotification( + ServerNotification::ItemGuardianApprovalReviewCompleted(payload), + ) => { + assert_eq!(payload.target_item_id, "cmd-guardian-denied"); + assert_eq!(payload.review.status, GuardianApprovalReviewStatus::Denied); + } + other => bail!("unexpected message: {other:?}"), + } + let seventh = recv_broadcast_message(&mut rx).await?; + match seventh { + OutgoingMessage::AppServerNotification(ServerNotification::ItemCompleted(payload)) => { + let ThreadItem::CommandExecution { id, status, .. } = payload.item else { + bail!("expected command execution completion"); + }; + assert_eq!(id, "cmd-guardian-denied"); + assert_eq!(status, CommandExecutionStatus::Declined); + } + other => bail!("unexpected message: {other:?}"), + } + + assert!(rx.try_recv().is_err(), "no extra messages expected"); + conversation.shutdown_and_wait().await?; + Ok(()) + } + #[test] fn file_change_accept_for_session_maps_to_approved_for_session() { let (decision, completion_status) = diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index be5478dd5..17823fefd 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -16,6 +16,7 @@ use std::sync::Weak; use tokio::sync::Mutex; use tokio::sync::mpsc; use tokio::sync::oneshot; +use tracing::error; type PendingInterruptQueue = Vec<( ConnectionRequestId, @@ -116,6 +117,38 @@ impl ThreadState { } } +pub(crate) async fn resolve_server_request_on_thread_listener( + thread_state: &Arc>, + request_id: RequestId, +) { + let (completion_tx, completion_rx) = oneshot::channel(); + let listener_command_tx = { + let state = thread_state.lock().await; + state.listener_command_tx() + }; + let Some(listener_command_tx) = listener_command_tx else { + error!("failed to remove pending client request: thread listener is not running"); + return; + }; + + if listener_command_tx + .send(ThreadListenerCommand::ResolveServerRequest { + request_id, + completion_tx, + }) + .is_err() + { + error!( + "failed to remove pending client request: thread listener command channel is closed" + ); + return; + } + + if let Err(err) = completion_rx.await { + error!("failed to remove pending client request: {err}"); + } +} + struct ThreadEntry { state: Arc>, connection_ids: HashSet,