diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index e94d69ed5..7172de689 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -108,6 +108,7 @@ use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ServerResponse; use codex_app_server_protocol::SessionSource as AppServerSessionSource; +use codex_app_server_protocol::SubAgentActivityKind; use codex_app_server_protocol::Thread; use codex_app_server_protocol::ThreadArchiveParams; use codex_app_server_protocol::ThreadArchiveResponse; @@ -3794,6 +3795,12 @@ async fn turn_event_counts_completed_tool_items() { reasoning_effort: None, agents_states: Default::default(), }, + ThreadItem::SubAgentActivity { + id: "sub-agent-activity-1".to_string(), + kind: SubAgentActivityKind::Interacted, + agent_thread_id: "thread-child".to_string(), + agent_path: "/root/child".to_string(), + }, ThreadItem::WebSearch { id: "web-1".to_string(), query: "codex".to_string(), @@ -3841,14 +3848,14 @@ async fn turn_event_counts_completed_tool_items() { .find(|event| matches!(event, TrackEventRequest::TurnEvent(_))) .expect("turn event should be emitted"); let payload = serde_json::to_value(turn_event).expect("serialize turn event"); - assert_eq!(payload["event_params"]["total_tool_call_count"], json!(7)); + assert_eq!(payload["event_params"]["total_tool_call_count"], json!(8)); assert_eq!(payload["event_params"]["shell_command_count"], json!(1)); assert_eq!(payload["event_params"]["file_change_count"], json!(1)); assert_eq!(payload["event_params"]["mcp_tool_call_count"], json!(1)); assert_eq!(payload["event_params"]["dynamic_tool_call_count"], json!(1)); assert_eq!( payload["event_params"]["subagent_tool_call_count"], - json!(1) + json!(2) ); assert_eq!(payload["event_params"]["web_search_count"], json!(1)); assert_eq!(payload["event_params"]["image_generation_count"], json!(1)); diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index e159c0f2a..0cceb2b46 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -360,7 +360,9 @@ impl TurnToolCounts { ThreadItem::FileChange { .. } => self.file_change += 1, ThreadItem::McpToolCall { .. } => self.mcp_tool_call += 1, ThreadItem::DynamicToolCall { .. } => self.dynamic_tool_call += 1, - ThreadItem::CollabAgentToolCall { .. } => self.subagent_tool_call += 1, + ThreadItem::CollabAgentToolCall { .. } | ThreadItem::SubAgentActivity { .. } => { + self.subagent_tool_call += 1; + } ThreadItem::WebSearch { .. } => self.web_search += 1, ThreadItem::ImageGeneration { .. } => self.image_generation += 1, ThreadItem::UserMessage { .. } @@ -1091,6 +1093,18 @@ impl AnalyticsReducer { ); } ServerNotification::ItemCompleted(notification) => { + if matches!(notification.item, ThreadItem::SubAgentActivity { .. }) { + let Some(turn_state) = self.turns.get_mut(¬ification.turn_id) else { + tracing::warn!( + thread_id = %notification.thread_id, + turn_id = %notification.turn_id, + "dropping sub-agent activity tool count update: missing turn state" + ); + return; + }; + turn_state.tool_counts.record(¬ification.item); + return; + } let Some(item_id) = tracked_tool_item_id(¬ification.item) else { return; }; @@ -1562,6 +1576,7 @@ fn tracked_tool_item_id(item: &ThreadItem) -> Option<&str> { | ThreadItem::AgentMessage { .. } | ThreadItem::Plan { .. } | ThreadItem::Reasoning { .. } + | ThreadItem::SubAgentActivity { .. } | ThreadItem::ImageView { .. } | ThreadItem::EnteredReviewMode { .. } | ThreadItem::ExitedReviewMode { .. } diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index dac3a4fdf..01f6f58b0 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -3177,6 +3177,14 @@ ], "type": "object" }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "SubAgentSource": { "oneOf": [ { @@ -4107,6 +4115,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 1873ed317..7abc5af7d 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -15580,6 +15580,14 @@ ], "type": "object" }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "SubAgentSource": { "oneOf": [ { @@ -16892,6 +16900,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/v2/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 38c74b185..e5ac77c08 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -13397,6 +13397,14 @@ ], "type": "object" }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "SubAgentSource": { "oneOf": [ { @@ -14709,6 +14717,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ItemCompletedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ItemCompletedNotification.json index 4b1d1dddb..162f3aa3d 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ItemCompletedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ItemCompletedNotification.json @@ -466,6 +466,14 @@ "minLength": 1, "type": "string" }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "TextElement": { "properties": { "byteRange": { @@ -989,6 +997,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ItemStartedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ItemStartedNotification.json index 67e493a4d..af3b1ddd1 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ItemStartedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ItemStartedNotification.json @@ -466,6 +466,14 @@ "minLength": 1, "type": "string" }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "TextElement": { "properties": { "byteRange": { @@ -989,6 +997,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ReviewStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ReviewStartResponse.json index 0e3e704bf..8918f6cae 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ReviewStartResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ReviewStartResponse.json @@ -610,6 +610,14 @@ "minLength": 1, "type": "string" }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "TextElement": { "properties": { "byteRange": { @@ -1133,6 +1141,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json index 0dbb7ef00..0cea90749 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json @@ -861,6 +861,14 @@ } ] }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "SubAgentSource": { "oneOf": [ { @@ -1617,6 +1625,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json index 903553afa..d0b3351a4 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json @@ -676,6 +676,14 @@ } ] }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "SubAgentSource": { "oneOf": [ { @@ -1432,6 +1440,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json index 93f44759b..e2850b3dd 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json @@ -676,6 +676,14 @@ } ] }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "SubAgentSource": { "oneOf": [ { @@ -1432,6 +1440,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json index a78d1837c..01da3165b 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json @@ -676,6 +676,14 @@ } ] }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "SubAgentSource": { "oneOf": [ { @@ -1432,6 +1440,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json index 288c3e6c8..d766d7a02 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json @@ -861,6 +861,14 @@ } ] }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "SubAgentSource": { "oneOf": [ { @@ -1617,6 +1625,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json index 2ef7959a8..6448ca651 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json @@ -676,6 +676,14 @@ } ] }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "SubAgentSource": { "oneOf": [ { @@ -1432,6 +1440,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json index 1a182e5d6..a19a40f52 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json @@ -861,6 +861,14 @@ } ] }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "SubAgentSource": { "oneOf": [ { @@ -1617,6 +1625,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json index 66ed134c8..0eb79075f 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json @@ -676,6 +676,14 @@ } ] }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "SubAgentSource": { "oneOf": [ { @@ -1432,6 +1440,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json index d331fa9bf..71064d5ea 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json @@ -676,6 +676,14 @@ } ] }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "SubAgentSource": { "oneOf": [ { @@ -1432,6 +1440,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/TurnCompletedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/TurnCompletedNotification.json index dd5ebbf6e..3cc329c9e 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/TurnCompletedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/TurnCompletedNotification.json @@ -610,6 +610,14 @@ "minLength": 1, "type": "string" }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "TextElement": { "properties": { "byteRange": { @@ -1133,6 +1141,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/TurnStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/TurnStartResponse.json index cc4bce10f..7dbea8af5 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/TurnStartResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/TurnStartResponse.json @@ -610,6 +610,14 @@ "minLength": 1, "type": "string" }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "TextElement": { "properties": { "byteRange": { @@ -1133,6 +1141,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/TurnStartedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/TurnStartedNotification.json index 10b43d435..362e789ea 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/TurnStartedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/TurnStartedNotification.json @@ -610,6 +610,14 @@ "minLength": 1, "type": "string" }, + "SubAgentActivityKind": { + "enum": [ + "started", + "interacted", + "interrupted" + ], + "type": "string" + }, "TextElement": { "properties": { "byteRange": { @@ -1133,6 +1141,38 @@ "title": "CollabAgentToolCallThreadItem", "type": "object" }, + { + "properties": { + "agentPath": { + "type": "string" + }, + "agentThreadId": { + "type": "string" + }, + "id": { + "type": "string" + }, + "kind": { + "$ref": "#/definitions/SubAgentActivityKind" + }, + "type": { + "enum": [ + "subAgentActivity" + ], + "title": "SubAgentActivityThreadItemType", + "type": "string" + } + }, + "required": [ + "agentPath", + "agentThreadId", + "id", + "kind", + "type" + ], + "title": "SubAgentActivityThreadItem", + "type": "object" + }, { "properties": { "action": { diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/SubAgentActivityKind.ts b/codex-rs/app-server-protocol/schema/typescript/v2/SubAgentActivityKind.ts new file mode 100644 index 000000000..5e3ce81e0 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/SubAgentActivityKind.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type SubAgentActivityKind = "started" | "interacted" | "interrupted"; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadItem.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadItem.ts index 78dff0d76..8d74ae8de 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadItem.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadItem.ts @@ -20,6 +20,7 @@ import type { McpToolCallResult } from "./McpToolCallResult"; import type { McpToolCallStatus } from "./McpToolCallStatus"; import type { MemoryCitation } from "./MemoryCitation"; import type { PatchApplyStatus } from "./PatchApplyStatus"; +import type { SubAgentActivityKind } from "./SubAgentActivityKind"; import type { UserInput } from "./UserInput"; import type { WebSearchAction } from "./WebSearchAction"; @@ -98,4 +99,4 @@ reasoningEffort: ReasoningEffort | null, /** * Last known status of the target agents, when available. */ -agentsStates: { [key in string]?: CollabAgentState }, } | { "type": "webSearch", id: string, query: string, action: WebSearchAction | null, } | { "type": "imageView", id: string, path: AbsolutePathBuf, } | { "type": "imageGeneration", id: string, status: string, revisedPrompt: string | null, result: string, savedPath?: AbsolutePathBuf, } | { "type": "enteredReviewMode", id: string, review: string, } | { "type": "exitedReviewMode", id: string, review: string, } | { "type": "contextCompaction", id: string, }; +agentsStates: { [key in string]?: CollabAgentState }, } | { "type": "subAgentActivity", id: string, kind: SubAgentActivityKind, agentThreadId: string, agentPath: string, } | { "type": "webSearch", id: string, query: string, action: WebSearchAction | null, } | { "type": "imageView", id: string, path: AbsolutePathBuf, } | { "type": "imageGeneration", id: string, status: string, revisedPrompt: string | null, result: string, savedPath?: AbsolutePathBuf, } | { "type": "enteredReviewMode", id: string, review: string, } | { "type": "exitedReviewMode", id: string, review: string, } | { "type": "contextCompaction", id: string, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index 727c049d1..eab3494e1 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -356,6 +356,7 @@ export type { SkillsListParams } from "./SkillsListParams"; export type { SkillsListResponse } from "./SkillsListResponse"; export type { SortDirection } from "./SortDirection"; export type { SpendControlLimitSnapshot } from "./SpendControlLimitSnapshot"; +export type { SubAgentActivityKind } from "./SubAgentActivityKind"; export type { SubagentMigration } from "./SubagentMigration"; export type { TerminalInteractionNotification } from "./TerminalInteractionNotification"; export type { TextElement } from "./TextElement"; diff --git a/codex-rs/app-server-protocol/src/protocol/event_mapping.rs b/codex-rs/app-server-protocol/src/protocol/event_mapping.rs index 609ca83a5..6d6012e19 100644 --- a/codex-rs/app-server-protocol/src/protocol/event_mapping.rs +++ b/codex-rs/app-server-protocol/src/protocol/event_mapping.rs @@ -178,6 +178,20 @@ pub fn item_event_to_server_notification( completed_at_ms: end_event.completed_at_ms, }) } + EventMsg::SubAgentActivity(activity) => { + let item = ThreadItem::SubAgentActivity { + id: activity.event_id, + kind: activity.kind.into(), + agent_thread_id: activity.agent_thread_id.to_string(), + agent_path: String::from(activity.agent_path), + }; + ServerNotification::ItemCompleted(ItemCompletedNotification { + thread_id, + turn_id, + item, + completed_at_ms: activity.occurred_at_ms, + }) + } EventMsg::CollabWaitingBegin(begin_event) => { let receiver_thread_ids = begin_event .receiver_thread_ids diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index fdc2af15d..075b3e043 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -203,6 +203,7 @@ impl ThreadHistoryBuilder { EventMsg::CollabAgentInteractionEnd(payload) => { self.handle_collab_agent_interaction_end(payload) } + EventMsg::SubAgentActivity(payload) => self.handle_sub_agent_activity(payload), EventMsg::CollabWaitingBegin(payload) => self.handle_collab_waiting_begin(payload), EventMsg::CollabWaitingEnd(payload) => self.handle_collab_waiting_end(payload), EventMsg::CollabCloseBegin(payload) => self.handle_collab_close_begin(payload), @@ -702,6 +703,18 @@ impl ThreadHistoryBuilder { }); } + fn handle_sub_agent_activity( + &mut self, + payload: &codex_protocol::protocol::SubAgentActivityEvent, + ) { + self.upsert_item_in_current_turn(ThreadItem::SubAgentActivity { + id: payload.event_id.clone(), + kind: payload.kind.into(), + agent_thread_id: payload.agent_thread_id.to_string(), + agent_path: String::from(payload.agent_path.clone()), + }); + } + fn handle_collab_waiting_begin( &mut self, payload: &codex_protocol::protocol::CollabWaitingBeginEvent, diff --git a/codex-rs/app-server-protocol/src/protocol/v2/item.rs b/codex-rs/app-server-protocol/src/protocol/v2/item.rs index d68485565..29d212ab1 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2/item.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2/item.rs @@ -29,6 +29,7 @@ use codex_protocol::protocol::GuardianRiskLevel as CoreGuardianRiskLevel; use codex_protocol::protocol::GuardianUserAuthorization as CoreGuardianUserAuthorization; use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus; use codex_protocol::protocol::ReviewDecision as CoreReviewDecision; +use codex_protocol::protocol::SubAgentActivityKind as CoreSubAgentActivityKind; use codex_utils_absolute_path::AbsolutePathBuf; use schemars::JsonSchema; use serde::Deserialize; @@ -336,6 +337,14 @@ pub enum ThreadItem { }, #[serde(rename_all = "camelCase")] #[ts(rename_all = "camelCase")] + SubAgentActivity { + id: String, + kind: SubAgentActivityKind, + agent_thread_id: String, + agent_path: String, + }, + #[serde(rename_all = "camelCase")] + #[ts(rename_all = "camelCase")] WebSearch { id: String, query: String, @@ -387,6 +396,7 @@ impl ThreadItem { | ThreadItem::McpToolCall { id, .. } | ThreadItem::DynamicToolCall { id, .. } | ThreadItem::CollabAgentToolCall { id, .. } + | ThreadItem::SubAgentActivity { id, .. } | ThreadItem::WebSearch { id, .. } | ThreadItem::ImageView { id, .. } | ThreadItem::ImageGeneration { id, .. } @@ -1004,6 +1014,25 @@ pub enum CollabAgentToolCallStatus { Failed, } +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub enum SubAgentActivityKind { + Started, + Interacted, + Interrupted, +} + +impl From for SubAgentActivityKind { + fn from(value: CoreSubAgentActivityKind) -> Self { + match value { + CoreSubAgentActivityKind::Started => SubAgentActivityKind::Started, + CoreSubAgentActivityKind::Interacted => SubAgentActivityKind::Interacted, + CoreSubAgentActivityKind::Interrupted => SubAgentActivityKind::Interrupted, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index d4d6f5934..62df11f44 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -844,6 +844,7 @@ pub(crate) async fn apply_bespoke_event_handling( | EventMsg::CollabAgentSpawnEnd(_) | EventMsg::CollabAgentInteractionBegin(_) | EventMsg::CollabAgentInteractionEnd(_) + | EventMsg::SubAgentActivity(_) | EventMsg::CollabWaitingBegin(_) | EventMsg::CollabWaitingEnd(_) | EventMsg::CollabCloseBegin(_) diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 9d4e7d1e8..959e28834 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1431,7 +1431,8 @@ pub(super) fn realtime_text_for_event(msg: &EventMsg) -> Option { | EventMsg::CollabCloseBegin(_) | EventMsg::CollabCloseEnd(_) | EventMsg::CollabResumeBegin(_) - | EventMsg::CollabResumeEnd(_) => None, + | EventMsg::CollabResumeEnd(_) + | EventMsg::SubAgentActivity(_) => None, } } diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2.rs index 6b5bd9f24..cae3497a1 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2.rs @@ -14,15 +14,11 @@ use crate::tools::registry::ToolExecutor; use codex_protocol::AgentPath; use codex_protocol::models::ResponseInputItem; use codex_protocol::openai_models::ReasoningEffort; -use codex_protocol::protocol::CollabAgentInteractionBeginEvent; -use codex_protocol::protocol::CollabAgentInteractionEndEvent; -use codex_protocol::protocol::CollabAgentSpawnBeginEvent; -use codex_protocol::protocol::CollabAgentSpawnEndEvent; -use codex_protocol::protocol::CollabCloseBeginEvent; -use codex_protocol::protocol::CollabCloseEndEvent; use codex_protocol::protocol::CollabWaitingBeginEvent; use codex_protocol::protocol::CollabWaitingEndEvent; use codex_protocol::protocol::InterAgentCommunication; +use codex_protocol::protocol::SubAgentActivityEvent; +use codex_protocol::protocol::SubAgentActivityKind; use codex_protocol::user_input::UserInput; use codex_tools::ToolName; use serde::Deserialize; diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/interrupt_agent.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/interrupt_agent.rs index 907367348..159c49127 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/interrupt_agent.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/interrupt_agent.rs @@ -59,18 +59,9 @@ async fn handle_interrupt_agent( .to_string(), )); } - session - .send_event( - &turn, - CollabCloseBeginEvent { - call_id: call_id.clone(), - started_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - receiver_thread_id: agent_id, - } - .into(), - ) - .await; + let receiver_agent_path = receiver_agent.agent_path.clone().ok_or_else(|| { + FunctionCallError::RespondToModel("target agent is missing an agent_path".to_string()) + })?; let status = session.services.agent_control.get_status(agent_id).await; let result = match session .services @@ -81,22 +72,20 @@ async fn handle_interrupt_agent( Ok(_) | Err(CodexErr::ThreadNotFound(_)) | Err(CodexErr::InternalAgentDied) => Ok(()), Err(err) => Err(collab_agent_error(agent_id, err)), }; + result?; session .send_event( &turn, - CollabCloseEndEvent { - call_id, - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - receiver_thread_id: agent_id, - receiver_agent_nickname: receiver_agent.agent_nickname, - receiver_agent_role: receiver_agent.agent_role, - status: status.clone(), + SubAgentActivityEvent { + event_id: call_id, + occurred_at_ms: now_unix_timestamp_ms(), + agent_thread_id: agent_id, + agent_path: receiver_agent_path, + kind: SubAgentActivityKind::Interrupted, } .into(), ) .await; - result?; Ok(InterruptAgentResult { previous_status: status, diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs index 07fd9d70e..7fdb80e8b 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs @@ -69,7 +69,6 @@ pub(crate) async fn handle_message_string_tool( call_id, .. } = invocation; - let prompt = String::new(); let receiver_thread_id = resolve_agent_target(&session, &turn, &target).await?; let receiver_agent = session .services @@ -96,52 +95,32 @@ pub(crate) async fn handle_message_string_tool( .ensure_v2_agent_loaded(resume_config, receiver_thread_id) .await .map_err(|err| collab_agent_error(receiver_thread_id, err))?; - session - .send_event( - &turn, - CollabAgentInteractionBeginEvent { - call_id: call_id.clone(), - started_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - receiver_thread_id, - prompt: prompt.clone(), - } - .into(), - ) - .await; let author = turn .session_source .get_agent_path() .unwrap_or_else(AgentPath::root); - let communication = communication_from_tool_message(author, receiver_agent_path, message); + let communication = + communication_from_tool_message(author, receiver_agent_path.clone(), message); let result = session .services .agent_control .send_inter_agent_communication(receiver_thread_id, mode.apply(communication)) .await .map_err(|err| collab_agent_error(receiver_thread_id, err)); - let status = session - .services - .agent_control - .get_status(receiver_thread_id) - .await; + result?; session .send_event( &turn, - CollabAgentInteractionEndEvent { - call_id, - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - receiver_thread_id, - receiver_agent_nickname: receiver_agent.agent_nickname, - receiver_agent_role: receiver_agent.agent_role, - prompt, - status, + SubAgentActivityEvent { + event_id: call_id, + occurred_at_ms: now_unix_timestamp_ms(), + agent_thread_id: receiver_thread_id, + agent_path: receiver_agent_path, + kind: SubAgentActivityKind::Interacted, } .into(), ) .await; - result?; Ok(FunctionToolOutput::from_text(String::new(), Some(true))) } diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs index b5b7d210f..fc97df05d 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs @@ -61,24 +61,8 @@ async fn handle_spawn_agent( let message = args.message.clone(); let initial_operation = parse_collab_input(Some(args.message), /*items*/ None)?; - let prompt = String::new(); - let session_source = turn.session_source.clone(); let child_depth = next_thread_spawn_depth(&session_source); - session - .send_event( - &turn, - CollabAgentSpawnBeginEvent { - call_id: call_id.clone(), - started_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - prompt: prompt.clone(), - model: args.model.clone().unwrap_or_default(), - reasoning_effort: args.reasoning_effort.clone().unwrap_or_default(), - } - .into(), - ) - .await; let mut config = build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?; if let Some(service_tier) = args.service_tier.as_ref() { @@ -119,11 +103,16 @@ async fn handle_spawn_agent( role_name, Some(args.task_name.clone()), )?; - let result = Box::pin( + let new_agent_path = spawn_source.get_agent_path().ok_or_else(|| { + FunctionCallError::RespondToModel( + "spawned agent is missing a canonical task name".to_string(), + ) + })?; + let spawned_agent = Box::pin( session.services.agent_control.spawn_agent_with_metadata( config, - match (spawn_source.get_agent_path(), initial_operation) { - (Some(recipient), Op::UserInput { items, .. }) + match initial_operation { + Op::UserInput { items, .. } if items .iter() .all(|item| matches!(item, UserInput::Text { .. })) => @@ -132,10 +121,11 @@ async fn handle_spawn_agent( .session_source .get_agent_path() .unwrap_or_else(AgentPath::root); - let communication = communication_from_tool_message(author, recipient, message); + let communication = + communication_from_tool_message(author, new_agent_path.clone(), message); Op::InterAgentCommunication { communication } } - (_, initial_operation) => initial_operation, + initial_operation => initial_operation, }, Some(spawn_source), SpawnAgentOptions { @@ -147,78 +137,37 @@ async fn handle_spawn_agent( ), ) .await - .map_err(collab_spawn_error); - let (new_thread_id, new_agent_metadata, status) = match &result { - Ok(spawned_agent) => ( - Some(spawned_agent.thread_id), - Some(spawned_agent.metadata.clone()), - spawned_agent.status.clone(), - ), - Err(_) => (None, None, AgentStatus::NotFound), - }; - let agent_snapshot = match new_thread_id { - Some(thread_id) => { - session - .services - .agent_control - .get_agent_config_snapshot(thread_id) - .await - } - None => None, - }; - let (new_agent_path, new_agent_nickname, new_agent_role) = - match (&agent_snapshot, new_agent_metadata) { - (Some(snapshot), _) => ( - snapshot.session_source.get_agent_path().map(String::from), - snapshot.session_source.get_nickname(), - snapshot.session_source.get_agent_role(), - ), - (None, Some(metadata)) => ( - metadata.agent_path.map(String::from), - metadata.agent_nickname, - metadata.agent_role, - ), - (None, None) => (None, None, None), - }; - let effective_model = agent_snapshot + .map_err(collab_spawn_error)?; + let new_thread_id = spawned_agent.thread_id; + let agent_snapshot = session + .services + .agent_control + .get_agent_config_snapshot(new_thread_id) + .await; + let nickname = agent_snapshot .as_ref() - .map(|snapshot| snapshot.model.clone()) - .unwrap_or_else(|| args.model.clone().unwrap_or_default()); - let effective_reasoning_effort = agent_snapshot - .as_ref() - .and_then(|snapshot| snapshot.reasoning_effort.clone()) - .unwrap_or(args.reasoning_effort.unwrap_or_default()); - let nickname = new_agent_nickname.clone(); + .and_then(|snapshot| snapshot.session_source.get_nickname()) + .or(spawned_agent.metadata.agent_nickname); session .send_event( &turn, - CollabAgentSpawnEndEvent { - call_id, - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - new_thread_id, - new_agent_nickname, - new_agent_role, - prompt, - model: effective_model, - reasoning_effort: effective_reasoning_effort, - status, + SubAgentActivityEvent { + event_id: call_id, + occurred_at_ms: now_unix_timestamp_ms(), + agent_thread_id: new_thread_id, + agent_path: new_agent_path.clone(), + kind: SubAgentActivityKind::Started, } .into(), ) .await; - let _ = result?; let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME); turn.session_telemetry.counter( "codex.multi_agent.spawn", /*inc*/ 1, &[("role", role_tag)], ); - let task_name = new_agent_path.ok_or_else(|| { - FunctionCallError::RespondToModel( - "spawned agent is missing a canonical task name".to_string(), - ) - })?; + let task_name = String::from(new_agent_path); let hide_agent_metadata = turn.config.multi_agent_v2.hide_spawn_agent_metadata; if hide_agent_metadata { diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 21793cc69..c4ec218dc 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -382,6 +382,7 @@ async fn run_codex_tool_session_inner( | EventMsg::CollabCloseEnd(_) | EventMsg::CollabResumeBegin(_) | EventMsg::CollabResumeEnd(_) + | EventMsg::SubAgentActivity(_) | EventMsg::RealtimeConversationStarted(_) | EventMsg::RealtimeConversationSdp(_) | EventMsg::RealtimeConversationRealtime(_) diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 642f2e5de..b14129828 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1349,6 +1349,9 @@ pub enum EventMsg { CollabResumeBegin(CollabResumeBeginEvent), /// Collab interaction: resume end. CollabResumeEnd(CollabResumeEndEvent), + + /// Path-based v2 sub-agent activity. + SubAgentActivity(SubAgentActivityEvent), } #[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS, EnumIter)] @@ -1568,6 +1571,12 @@ impl From for EventMsg { } } +impl From for EventMsg { + fn from(event: SubAgentActivityEvent) -> Self { + EventMsg::SubAgentActivity(event) + } +} + /// Agent lifecycle status, derived from emitted events. #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS, Default)] #[serde(rename_all = "snake_case")] @@ -3890,6 +3899,27 @@ pub struct CollabAgentInteractionEndEvent { pub status: AgentStatus, } +#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "snake_case")] +#[ts(rename_all = "snake_case")] +pub enum SubAgentActivityKind { + Started, + Interacted, + Interrupted, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +pub struct SubAgentActivityEvent { + pub event_id: String, + #[serde(default)] + pub occurred_at_ms: i64, + /// Thread ID of the affected sub-agent. + pub agent_thread_id: ThreadId, + /// Canonical v2 path of the affected sub-agent. + pub agent_path: AgentPath, + pub kind: SubAgentActivityKind, +} + #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)] pub struct CollabWaitingBeginEvent { #[serde(default)] diff --git a/codex-rs/rollout-trace/src/protocol_event.rs b/codex-rs/rollout-trace/src/protocol_event.rs index a3e1f184e..bbba24530 100644 --- a/codex-rs/rollout-trace/src/protocol_event.rs +++ b/codex-rs/rollout-trace/src/protocol_event.rs @@ -21,6 +21,7 @@ use codex_protocol::protocol::McpToolCallEndEvent; use codex_protocol::protocol::PatchApplyBeginEvent; use codex_protocol::protocol::PatchApplyEndEvent; use codex_protocol::protocol::PatchApplyStatus; +use codex_protocol::protocol::SubAgentActivityEvent; use codex_protocol::protocol::TurnAbortReason; use serde::Serialize; @@ -110,6 +111,7 @@ pub(crate) enum ToolRuntimePayload<'a> { CollabWaitingEnd(&'a codex_protocol::protocol::CollabWaitingEndEvent), CollabCloseBegin(&'a codex_protocol::protocol::CollabCloseBeginEvent), CollabCloseEnd(&'a codex_protocol::protocol::CollabCloseEndEvent), + SubAgentActivity(&'a SubAgentActivityEvent), } impl Serialize for ToolRuntimePayload<'_> { @@ -132,6 +134,7 @@ impl Serialize for ToolRuntimePayload<'_> { ToolRuntimePayload::CollabWaitingEnd(event) => event.serialize(serializer), ToolRuntimePayload::CollabCloseBegin(event) => event.serialize(serializer), ToolRuntimePayload::CollabCloseEnd(event) => event.serialize(serializer), + ToolRuntimePayload::SubAgentActivity(event) => event.serialize(serializer), } } } @@ -215,6 +218,11 @@ pub(crate) fn tool_runtime_trace_event(event: &EventMsg) -> Option Some(ToolRuntimeTraceEvent::Ended { + tool_call_id: &event.event_id, + status: ExecutionStatus::Completed, + payload: ToolRuntimePayload::SubAgentActivity(event), + }), EventMsg::Error(_) | EventMsg::Warning(_) | EventMsg::GuardianWarning(_) @@ -357,7 +365,8 @@ pub(crate) fn wrapped_protocol_event_type(event: &EventMsg) -> Option<&'static s | EventMsg::CollabCloseBegin(_) | EventMsg::CollabCloseEnd(_) | EventMsg::CollabResumeBegin(_) - | EventMsg::CollabResumeEnd(_) => None, + | EventMsg::CollabResumeEnd(_) + | EventMsg::SubAgentActivity(_) => None, } } @@ -393,3 +402,7 @@ fn execution_status_for_abort_reason(reason: &TurnAbortReason) -> ExecutionStatu | TurnAbortReason::BudgetLimited => ExecutionStatus::Cancelled, } } + +#[cfg(test)] +#[path = "protocol_event_tests.rs"] +mod tests; diff --git a/codex-rs/rollout-trace/src/protocol_event_tests.rs b/codex-rs/rollout-trace/src/protocol_event_tests.rs new file mode 100644 index 000000000..b18c7200a --- /dev/null +++ b/codex-rs/rollout-trace/src/protocol_event_tests.rs @@ -0,0 +1,46 @@ +use codex_protocol::AgentPath; +use codex_protocol::ThreadId; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::SubAgentActivityEvent; +use codex_protocol::protocol::SubAgentActivityKind; +use pretty_assertions::assert_eq; +use serde_json::json; + +use super::ToolRuntimeTraceEvent; +use super::tool_runtime_trace_event; +use crate::ExecutionStatus; + +#[test] +fn sub_agent_activity_is_a_terminal_tool_runtime_event() -> anyhow::Result<()> { + let agent_thread_id = ThreadId::new(); + let event = EventMsg::SubAgentActivity(SubAgentActivityEvent { + event_id: "call-spawn".to_string(), + occurred_at_ms: 1234, + agent_thread_id, + agent_path: AgentPath::try_from("/root/reviewer").map_err(anyhow::Error::msg)?, + kind: SubAgentActivityKind::Started, + }); + + let Some(ToolRuntimeTraceEvent::Ended { + tool_call_id, + status, + payload, + }) = tool_runtime_trace_event(&event) + else { + panic!("expected terminal tool runtime event"); + }; + + assert_eq!(tool_call_id, "call-spawn"); + assert_eq!(status, ExecutionStatus::Completed); + assert_eq!( + serde_json::to_value(payload)?, + json!({ + "event_id": "call-spawn", + "occurred_at_ms": 1234, + "agent_thread_id": agent_thread_id, + "agent_path": "/root/reviewer", + "kind": "started" + }) + ); + Ok(()) +} diff --git a/codex-rs/rollout-trace/src/reducer/tool/agents.rs b/codex-rs/rollout-trace/src/reducer/tool/agents.rs index 37f3220c2..656fccaf7 100644 --- a/codex-rs/rollout-trace/src/reducer/tool/agents.rs +++ b/codex-rs/rollout-trace/src/reducer/tool/agents.rs @@ -7,6 +7,10 @@ use codex_protocol::protocol::CollabAgentSpawnEndEvent; use codex_protocol::protocol::CollabCloseBeginEvent; use codex_protocol::protocol::CollabCloseEndEvent; use codex_protocol::protocol::InterAgentCommunication; +use codex_protocol::protocol::SubAgentActivityEvent; +use codex_protocol::protocol::SubAgentActivityKind; +use serde::Deserialize; +use serde_json::Value; use super::super::TraceReducer; use crate::model::ConversationItem; @@ -52,6 +56,11 @@ pub(in crate::reducer) struct ObservedAgentResultEdge { pub(in crate::reducer) carried_payload: Option, } +#[derive(Deserialize)] +struct AgentMessageInvocationArgs { + message: String, +} + /// Builds the stable edge id for the spawn relationship between two threads. pub(in crate::reducer) fn spawn_edge_id(parent_thread_id: &str, child_thread_id: &str) -> String { format!("edge:spawn:{parent_thread_id}:{child_thread_id}") @@ -123,15 +132,20 @@ impl TraceReducer { runtime_payload: &RawPayloadRef, ) -> Result<()> { let kind = self.rollout.tool_calls[tool_call_id].kind.clone(); + let runtime_payload_json = self.read_payload_json(runtime_payload)?; + if runtime_payload_json.get("agent_thread_id").is_some() { + let payload: SubAgentActivityEvent = serde_json::from_value(runtime_payload_json)?; + return self.end_sub_agent_activity(wall_time_unix_ms, tool_call_id, &kind, &payload); + } match kind { ToolCallKind::SpawnAgent => { let payload: CollabAgentSpawnEndEvent = - serde_json::from_value(self.read_payload_json(runtime_payload)?)?; + serde_json::from_value(runtime_payload_json)?; self.end_spawn_agent_interaction(wall_time_unix_ms, tool_call_id, &payload) } ToolCallKind::AssignAgentTask => { let payload: CollabAgentInteractionEndEvent = - serde_json::from_value(self.read_payload_json(runtime_payload)?)?; + serde_json::from_value(runtime_payload_json)?; self.end_message_agent_interaction( wall_time_unix_ms, tool_call_id, @@ -141,7 +155,7 @@ impl TraceReducer { } ToolCallKind::SendMessage => { let payload: CollabAgentInteractionEndEvent = - serde_json::from_value(self.read_payload_json(runtime_payload)?)?; + serde_json::from_value(runtime_payload_json)?; self.end_message_agent_interaction( wall_time_unix_ms, tool_call_id, @@ -150,8 +164,7 @@ impl TraceReducer { ) } ToolCallKind::CloseAgent => { - let payload: CollabCloseEndEvent = - serde_json::from_value(self.read_payload_json(runtime_payload)?)?; + let payload: CollabCloseEndEvent = serde_json::from_value(runtime_payload_json)?; self.upsert_close_agent_interaction( tool_call_id, payload.receiver_thread_id.to_string(), @@ -169,6 +182,127 @@ impl TraceReducer { } } + fn end_sub_agent_activity( + &mut self, + wall_time_unix_ms: i64, + tool_call_id: &str, + tool_kind: &ToolCallKind, + payload: &SubAgentActivityEvent, + ) -> Result<()> { + let target_thread_id = payload.agent_thread_id.to_string(); + match (tool_kind, &payload.kind) { + (ToolCallKind::SpawnAgent, SubAgentActivityKind::Started) => { + let parent_thread_id = self + .rollout + .tool_calls + .get(tool_call_id) + .with_context(|| { + format!("agent activity referenced unknown tool call {tool_call_id}") + })? + .thread_id + .clone(); + self.queue_sub_agent_activity_message_edge( + wall_time_unix_ms, + tool_call_id, + spawn_edge_id(&parent_thread_id, &target_thread_id), + InteractionEdgeKind::SpawnAgent, + target_thread_id.clone(), + Some(target_thread_id), + ) + } + (ToolCallKind::AssignAgentTask, SubAgentActivityKind::Interacted) => self + .queue_sub_agent_activity_message_edge( + wall_time_unix_ms, + tool_call_id, + tool_edge_id(tool_call_id), + InteractionEdgeKind::AssignAgentTask, + target_thread_id, + /*unresolved_spawn_thread_id*/ None, + ), + (ToolCallKind::SendMessage, SubAgentActivityKind::Interacted) => self + .queue_sub_agent_activity_message_edge( + wall_time_unix_ms, + tool_call_id, + tool_edge_id(tool_call_id), + InteractionEdgeKind::SendMessage, + target_thread_id, + /*unresolved_spawn_thread_id*/ None, + ), + (ToolCallKind::CloseAgent, SubAgentActivityKind::Interrupted) => self + .upsert_close_agent_interaction( + tool_call_id, + target_thread_id, + Some(wall_time_unix_ms), + ), + _ => bail!( + "sub-agent activity {:?} does not match tool call kind {tool_kind:?}", + payload.kind + ), + } + } + + fn queue_sub_agent_activity_message_edge( + &mut self, + wall_time_unix_ms: i64, + tool_call_id: &str, + edge_id: String, + edge_kind: InteractionEdgeKind, + target_thread_id: String, + unresolved_spawn_thread_id: Option, + ) -> Result<()> { + let tool_call = self.rollout.tool_calls.get(tool_call_id).with_context(|| { + format!("agent activity referenced unknown tool call {tool_call_id}") + })?; + let started_at_unix_ms = tool_call.execution.started_at_unix_ms; + let message_content = self.agent_message_content_from_invocation(tool_call_id)?; + let carried_raw_payload_ids = self.agent_tool_payload_ids(tool_call_id)?; + self.queue_or_resolve_agent_interaction_edge(PendingAgentInteractionEdge { + edge_id, + kind: edge_kind, + source: TraceAnchor::ToolCall { + tool_call_id: tool_call_id.to_string(), + }, + target_thread_id, + message_content, + unresolved_spawn_thread_id, + started_at_unix_ms, + ended_at_unix_ms: Some(wall_time_unix_ms), + carried_raw_payload_ids, + }) + } + + fn agent_message_content_from_invocation(&self, tool_call_id: &str) -> Result { + let tool_call = self.rollout.tool_calls.get(tool_call_id).with_context(|| { + format!("agent activity referenced unknown tool call {tool_call_id}") + })?; + let invocation_payload_id = tool_call + .raw_invocation_payload_id + .as_deref() + .with_context(|| { + format!("agent activity tool call {tool_call_id} missing invocation payload") + })?; + let invocation_payload = self + .rollout + .raw_payloads + .get(invocation_payload_id) + .with_context(|| { + format!( + "agent activity tool call {tool_call_id} referenced missing invocation payload {invocation_payload_id}" + ) + })?; + let invocation = self.read_payload_json(invocation_payload)?; + let arguments = invocation + .get("payload") + .and_then(|payload| payload.get("arguments")) + .and_then(Value::as_str) + .with_context(|| { + format!("agent activity tool call {tool_call_id} missing function arguments") + })?; + let args: AgentMessageInvocationArgs = serde_json::from_str(arguments) + .with_context(|| format!("parse agent activity tool call {tool_call_id} arguments"))?; + Ok(args.message) + } + /// Adds the canonical tool result payload to an already reduced multi-agent edge. pub(super) fn attach_agent_interaction_tool_result( &mut self, diff --git a/codex-rs/rollout-trace/src/reducer/tool/agents_tests.rs b/codex-rs/rollout-trace/src/reducer/tool/agents_tests.rs index deaf06dac..b009a1543 100644 --- a/codex-rs/rollout-trace/src/reducer/tool/agents_tests.rs +++ b/codex-rs/rollout-trace/src/reducer/tool/agents_tests.rs @@ -194,6 +194,92 @@ fn spawn_runtime_payload_falls_back_to_child_thread_without_delivery_item() -> a Ok(()) } +#[test] +fn sub_agent_started_activity_creates_spawn_edge() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let writer = create_started_agent_writer(&temp)?; + start_agent_turn(&writer, "turn-1")?; + let child_thread_id = "019d0000-0000-7000-8000-000000000002"; + let invocation_payload = writer.write_json_payload( + RawPayloadKind::ToolInvocation, + &json!({ + "tool_name": "spawn_agent", + "payload": { + "type": "function", + "arguments": "{\"message\":\"review this\",\"task_name\":\"reviewer\"}" + } + }), + )?; + writer.append_with_context( + trace_context_for_agent("turn-1"), + RawTraceEventPayload::ToolCallStarted { + tool_call_id: "call-spawn-v2".to_string(), + model_visible_call_id: Some("call-spawn-v2".to_string()), + code_mode_runtime_tool_id: None, + requester: RawToolCallRequester::Model, + kind: ToolCallKind::SpawnAgent, + summary: ToolCallSummary::Generic { + label: "spawn_agent".to_string(), + input_preview: None, + output_preview: None, + }, + invocation_payload: Some(invocation_payload.clone()), + }, + )?; + let activity_payload = writer.write_json_payload( + RawPayloadKind::ToolRuntimeEvent, + &json!({ + "event_id": "call-spawn-v2", + "occurred_at_ms": 1234, + "agent_thread_id": child_thread_id, + "agent_path": "/root/reviewer", + "kind": "started" + }), + )?; + writer.append_with_context( + trace_context_for_agent("turn-1"), + RawTraceEventPayload::ToolCallRuntimeEnded { + tool_call_id: "call-spawn-v2".to_string(), + status: ExecutionStatus::Completed, + runtime_payload: activity_payload.clone(), + }, + )?; + start_thread(&writer, child_thread_id, "/root/reviewer")?; + start_turn_for_thread(&writer, child_thread_id, "turn-child-1")?; + let delivered = inter_agent_message( + "/root", + "/root/reviewer", + "review this", + /*trigger_turn*/ true, + ); + append_inference_request( + &writer, + child_thread_id, + "turn-child-1", + "inference-child-1", + vec![message("assistant", &delivered)], + )?; + + let replayed = replay_bundle(temp.path())?; + let edge_id = format!("edge:spawn:019d0000-0000-7000-8000-000000000001:{child_thread_id}"); + let edge = &replayed.interaction_edges[&edge_id]; + assert_eq!(edge.kind, InteractionEdgeKind::SpawnAgent); + let target_item_id = target_conversation_item_id(&edge.target); + assert_eq!(edge.carried_item_ids, vec![target_item_id.clone()]); + assert_eq!( + replayed.conversation_items[target_item_id].thread_id, + child_thread_id + ); + assert_eq!( + edge.carried_raw_payload_ids, + vec![ + invocation_payload.raw_payload_id, + activity_payload.raw_payload_id, + ] + ); + Ok(()) +} + #[test] fn send_message_runtime_payload_targets_delivered_child_message() -> anyhow::Result<()> { let temp = TempDir::new()?; @@ -300,6 +386,178 @@ fn send_message_runtime_payload_targets_delivered_child_message() -> anyhow::Res Ok(()) } +#[test] +fn send_message_activity_targets_delivered_child_message() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let writer = create_started_agent_writer(&temp)?; + start_agent_turn(&writer, "turn-1")?; + let child_thread_id = "019d0000-0000-7000-8000-000000000002"; + let invocation_payload = writer.write_json_payload( + RawPayloadKind::ToolInvocation, + &json!({ + "tool_name": "send_message", + "payload": { + "type": "function", + "arguments": "{\"target\":\"/root/child\",\"message\":\"hello again\"}" + } + }), + )?; + writer.append_with_context( + trace_context_for_agent("turn-1"), + RawTraceEventPayload::ToolCallStarted { + tool_call_id: "call-send-v2".to_string(), + model_visible_call_id: Some("call-send-v2".to_string()), + code_mode_runtime_tool_id: None, + requester: RawToolCallRequester::Model, + kind: ToolCallKind::SendMessage, + summary: ToolCallSummary::Generic { + label: "send_message".to_string(), + input_preview: None, + output_preview: None, + }, + invocation_payload: Some(invocation_payload.clone()), + }, + )?; + let activity_payload = writer.write_json_payload( + RawPayloadKind::ToolRuntimeEvent, + &json!({ + "event_id": "call-send-v2", + "occurred_at_ms": 1234, + "agent_thread_id": child_thread_id, + "agent_path": "/root/child", + "kind": "interacted" + }), + )?; + writer.append_with_context( + trace_context_for_agent("turn-1"), + RawTraceEventPayload::ToolCallRuntimeEnded { + tool_call_id: "call-send-v2".to_string(), + status: ExecutionStatus::Completed, + runtime_payload: activity_payload.clone(), + }, + )?; + start_thread(&writer, child_thread_id, "/root/child")?; + start_turn_for_thread(&writer, child_thread_id, "turn-child-1")?; + let delivered = inter_agent_message( + "/root", + "/root/child", + "hello again", + /*trigger_turn*/ false, + ); + append_inference_request( + &writer, + child_thread_id, + "turn-child-1", + "inference-child-1", + vec![message("assistant", &delivered)], + )?; + + let replayed = replay_bundle(temp.path())?; + let edge = &replayed.interaction_edges["edge:tool:call-send-v2"]; + assert_eq!(edge.kind, InteractionEdgeKind::SendMessage); + let target_item_id = target_conversation_item_id(&edge.target); + assert_eq!(edge.carried_item_ids, vec![target_item_id.clone()]); + assert_eq!( + replayed.conversation_items[target_item_id].thread_id, + child_thread_id + ); + assert_eq!( + edge.carried_raw_payload_ids, + vec![ + invocation_payload.raw_payload_id, + activity_payload.raw_payload_id, + ] + ); + + Ok(()) +} + +#[test] +fn followup_activity_targets_delivered_child_message() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let writer = create_started_agent_writer(&temp)?; + start_agent_turn(&writer, "turn-1")?; + let child_thread_id = "019d0000-0000-7000-8000-000000000002"; + let invocation_payload = writer.write_json_payload( + RawPayloadKind::ToolInvocation, + &json!({ + "tool_name": "followup_task", + "payload": { + "type": "function", + "arguments": "{\"target\":\"/root/child\",\"message\":\"continue\"}" + } + }), + )?; + writer.append_with_context( + trace_context_for_agent("turn-1"), + RawTraceEventPayload::ToolCallStarted { + tool_call_id: "call-followup-v2".to_string(), + model_visible_call_id: Some("call-followup-v2".to_string()), + code_mode_runtime_tool_id: None, + requester: RawToolCallRequester::Model, + kind: ToolCallKind::AssignAgentTask, + summary: ToolCallSummary::Generic { + label: "followup_task".to_string(), + input_preview: None, + output_preview: None, + }, + invocation_payload: Some(invocation_payload.clone()), + }, + )?; + let activity_payload = writer.write_json_payload( + RawPayloadKind::ToolRuntimeEvent, + &json!({ + "event_id": "call-followup-v2", + "occurred_at_ms": 1234, + "agent_thread_id": child_thread_id, + "agent_path": "/root/child", + "kind": "interacted" + }), + )?; + writer.append_with_context( + trace_context_for_agent("turn-1"), + RawTraceEventPayload::ToolCallRuntimeEnded { + tool_call_id: "call-followup-v2".to_string(), + status: ExecutionStatus::Completed, + runtime_payload: activity_payload.clone(), + }, + )?; + start_thread(&writer, child_thread_id, "/root/child")?; + start_turn_for_thread(&writer, child_thread_id, "turn-child-1")?; + let delivered = inter_agent_message( + "/root", + "/root/child", + "continue", + /*trigger_turn*/ true, + ); + append_inference_request( + &writer, + child_thread_id, + "turn-child-1", + "inference-child-1", + vec![message("assistant", &delivered)], + )?; + + let replayed = replay_bundle(temp.path())?; + let edge = &replayed.interaction_edges["edge:tool:call-followup-v2"]; + assert_eq!(edge.kind, InteractionEdgeKind::AssignAgentTask); + let target_item_id = target_conversation_item_id(&edge.target); + assert_eq!(edge.carried_item_ids, vec![target_item_id.clone()]); + assert_eq!( + replayed.conversation_items[target_item_id].thread_id, + child_thread_id + ); + assert_eq!( + edge.carried_raw_payload_ids, + vec![ + invocation_payload.raw_payload_id, + activity_payload.raw_payload_id, + ] + ); + + Ok(()) +} + #[test] fn close_agent_runtime_payload_targets_thread() -> anyhow::Result<()> { let temp = TempDir::new()?; diff --git a/codex-rs/rollout-trace/src/tool_dispatch.rs b/codex-rs/rollout-trace/src/tool_dispatch.rs index 7082283b4..36c6651b1 100644 --- a/codex-rs/rollout-trace/src/tool_dispatch.rs +++ b/codex-rs/rollout-trace/src/tool_dispatch.rs @@ -269,7 +269,7 @@ fn dispatched_tool_kind(tool_name: &str, _payload: &ToolDispatchPayload) -> Tool "send_message" => ToolCallKind::SendMessage, "followup_task" | "assign_task" => ToolCallKind::AssignAgentTask, "wait_agent" => ToolCallKind::WaitAgent, - "close_agent" => ToolCallKind::CloseAgent, + "close_agent" | "interrupt_agent" => ToolCallKind::CloseAgent, other => ToolCallKind::Other { name: other.to_string(), }, @@ -425,6 +425,19 @@ mod tests { ))); } + #[test] + fn classifies_interrupt_agent_as_close_agent() { + assert_eq!( + dispatched_tool_kind( + "interrupt_agent", + &ToolDispatchPayload::Function { + arguments: r#"{"target":"/root/child"}"#.to_string(), + }, + ), + ToolCallKind::CloseAgent + ); + } + fn invocation( tool_name: &str, tool_namespace: Option, diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index 17da9e620..9760191bd 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -91,7 +91,8 @@ pub fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::TurnStarted(_) | EventMsg::TurnComplete(_) | EventMsg::WebSearchEnd(_) - | EventMsg::ImageGenerationEnd(_) => true, + | EventMsg::ImageGenerationEnd(_) + | EventMsg::SubAgentActivity(_) => true, EventMsg::ItemCompleted(event) => { // Plan items are derived from streaming tags and are not part of the // raw ResponseItem history, so we persist their completion to replay diff --git a/codex-rs/thread-manager-sample/src/main.rs b/codex-rs/thread-manager-sample/src/main.rs index 5ffb8c4c0..97bfea828 100644 --- a/codex-rs/thread-manager-sample/src/main.rs +++ b/codex-rs/thread-manager-sample/src/main.rs @@ -327,6 +327,7 @@ async fn run_turn(thread: &CodexThread, thread_id: &str, prompt: String) -> anyh | EventMsg::CollabCloseEnd(_) | EventMsg::CollabResumeBegin(_) | EventMsg::CollabResumeEnd(_) + | EventMsg::SubAgentActivity(_) | EventMsg::AgentMessageContentDelta(_) | EventMsg::PlanDelta(_) | EventMsg::ReasoningContentDelta(_) diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index de43b3275..63a5d4a1c 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -62,6 +62,7 @@ use crate::multi_agents::agent_picker_status_dot_spans; use crate::multi_agents::format_agent_picker_item_name; use crate::multi_agents::next_agent_shortcut_matches; use crate::multi_agents::previous_agent_shortcut_matches; +use crate::multi_agents::sub_agent_activity_display; use crate::pager_overlay::Overlay; use crate::render::highlight::highlight_bash_to_lines; use crate::render::renderable::Renderable; @@ -198,6 +199,7 @@ use toml::Value as TomlValue; use uuid::Uuid; mod agent_message_consolidation; mod agent_navigation; +mod agent_status_feed; mod app_server_event_targets; mod app_server_events; pub(crate) mod app_server_requests; @@ -267,6 +269,20 @@ fn collab_receiver_thread_ids(notification: &ServerNotification) -> Option<&[Str } } +fn sub_agent_activity_item(notification: &ServerNotification) -> Option<&ThreadItem> { + match notification { + ServerNotification::ItemStarted(notification) => match ¬ification.item { + ThreadItem::SubAgentActivity { .. } => Some(¬ification.item), + _ => None, + }, + ServerNotification::ItemCompleted(notification) => match ¬ification.item { + ThreadItem::SubAgentActivity { .. } => Some(¬ification.item), + _ => None, + }, + _ => None, + } +} + fn collab_receiver_is_not_found( notification: &ServerNotification, receiver_thread_id: &str, diff --git a/codex-rs/tui/src/app/agent_navigation.rs b/codex-rs/tui/src/app/agent_navigation.rs index db9e47c5b..e329de90a 100644 --- a/codex-rs/tui/src/app/agent_navigation.rs +++ b/codex-rs/tui/src/app/agent_navigation.rs @@ -19,6 +19,7 @@ //! updated or marked closed. use crate::multi_agents::AgentPickerThreadEntry; +use crate::multi_agents::SubAgentActivityDisplay; use crate::multi_agents::format_agent_picker_item_name; use crate::multi_agents::next_agent_shortcut; use crate::multi_agents::previous_agent_shortcut; @@ -86,16 +87,56 @@ impl AgentNavigationState { if !self.threads.contains_key(&thread_id) { self.order.push(thread_id); } + let (previous_agent_path, previous_is_running) = self + .threads + .get(&thread_id) + .map(|entry| (entry.agent_path.clone(), entry.is_running)) + .unwrap_or((None, false)); self.threads.insert( thread_id, AgentPickerThreadEntry { agent_nickname, agent_role, + agent_path: previous_agent_path, + is_running: previous_is_running && !is_closed, is_closed, }, ); } + pub(crate) fn record_sub_agent_activity(&mut self, activity: SubAgentActivityDisplay) { + if !self.threads.contains_key(&activity.thread_id) { + self.order.push(activity.thread_id); + } + let entry = + self.threads + .entry(activity.thread_id) + .or_insert_with(|| AgentPickerThreadEntry { + agent_nickname: None, + agent_role: None, + agent_path: None, + is_running: false, + is_closed: false, + }); + entry.agent_path = Some(activity.agent_path); + entry.is_running = activity.is_running_hint; + entry.is_closed = false; + } + + pub(crate) fn set_running(&mut self, thread_id: ThreadId, is_running: bool) { + if let Some(entry) = self.threads.get_mut(&thread_id) { + entry.is_running = is_running; + } + } + + pub(crate) fn set_agent_path(&mut self, thread_id: ThreadId, agent_path: Option) { + if let Some(agent_path) = agent_path + && let Some(entry) = self.threads.get_mut(&thread_id) + { + entry.agent_path = Some(agent_path); + } + } + /// Marks a thread as closed without removing it from the traversal cache. /// /// Closed threads stay in the picker and in spawn order so users can still review them and so @@ -105,6 +146,7 @@ impl AgentNavigationState { pub(crate) fn mark_closed(&mut self, thread_id: ThreadId) { if let Some(entry) = self.threads.get_mut(&thread_id) { entry.is_closed = true; + entry.is_running = false; } else { self.upsert( thread_id, /*agent_nickname*/ None, /*agent_role*/ None, @@ -155,6 +197,22 @@ impl AgentNavigationState { .collect() } + pub(crate) fn ordered_path_backed_subagent_threads( + &self, + primary_thread_id: Option, + ) -> Vec<(ThreadId, &AgentPickerThreadEntry)> { + self.ordered_threads() + .into_iter() + .filter(|(thread_id, entry)| { + Some(*thread_id) != primary_thread_id + && entry + .agent_path + .as_deref() + .is_some_and(|agent_path| !agent_path.trim().is_empty()) + }) + .collect() + } + /// Returns tracked thread ids in the same stable order used by the picker. pub(crate) fn tracked_thread_ids(&self) -> Vec { self.ordered_threads() @@ -217,6 +275,14 @@ impl AgentNavigationState { self.threads .get(&thread_id) .map(|entry| { + if !is_primary + && let Some(agent_path) = entry + .agent_path + .as_deref() + .filter(|agent_path| !agent_path.trim().is_empty()) + { + return format!("`{agent_path}`"); + } format_agent_picker_item_name( entry.agent_nickname.as_deref(), entry.agent_role.as_deref(), diff --git a/codex-rs/tui/src/app/agent_status_feed.rs b/codex-rs/tui/src/app/agent_status_feed.rs new file mode 100644 index 000000000..baeb2d080 --- /dev/null +++ b/codex-rs/tui/src/app/agent_status_feed.rs @@ -0,0 +1,209 @@ +//! Bounded, best-effort previews for the v2 `/agent` status output. + +use super::ThreadBufferedEvent; +use super::ThreadEventStore; +use crate::history_cell::HistoryCell; +use crate::history_cell::plain_lines; +use crate::text_formatting::truncate_text; +use codex_app_server_protocol::CollabAgentTool; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::SubAgentActivityKind; +use codex_app_server_protocol::ThreadItem; +use ratatui::style::Stylize; +use ratatui::text::Line; +use std::collections::HashSet; + +const AGENT_STATUS_PREVIEW_LINES: usize = 3; +const AGENT_STATUS_PREVIEW_ITEMS: usize = 6; +const AGENT_STATUS_PREVIEW_GRAPHEMES: usize = 240; +const AGENT_STATUS_PREVIEW_INDENT: u16 = 4; + +#[derive(Debug)] +pub(super) struct AgentStatusHistoryCell { + entries: Vec, +} + +impl AgentStatusHistoryCell { + pub(super) fn new(entries: Vec) -> Self { + Self { entries } + } +} + +impl HistoryCell for AgentStatusHistoryCell { + fn display_lines(&self, width: u16) -> Vec> { + let mut lines: Vec> = vec![ + "/agent".magenta().into(), + "Sub-agents running".bold().into(), + "".into(), + ]; + + if self.entries.is_empty() { + lines.push(" • No sub-agents running.".italic().into()); + return lines; + } + + for entry in &self.entries { + lines.push(entry.title_line()); + let preview_width = width.saturating_sub(AGENT_STATUS_PREVIEW_INDENT).max(1); + let preview_lines = entry.preview_lines(preview_width); + if preview_lines.is_empty() { + lines.push(vec![" ".into(), "No recent activity yet.".dim().italic()].into()); + } else { + lines.extend(preview_lines.into_iter().map(indent_preview_line)); + } + lines.push("".into()); + } + let _ = lines.pop(); + lines + } + + fn raw_lines(&self) -> Vec> { + plain_lines(self.display_lines(u16::MAX)) + } +} + +#[derive(Debug)] +pub(super) struct AgentStatusThreadPreview { + agent_path: String, + activity: Vec, +} + +impl AgentStatusThreadPreview { + pub(super) fn from_store(agent_path: String, store: &ThreadEventStore) -> Self { + Self::from_events(agent_path, store.buffer.iter().rev()) + } + + pub(super) fn empty(agent_path: String) -> Self { + Self::from_events(agent_path, std::iter::empty()) + } + + fn from_events<'a>( + agent_path: String, + events: impl Iterator, + ) -> Self { + let mut seen_item_ids = HashSet::new(); + let mut activity = Vec::new(); + for event in events { + let item = match event { + ThreadBufferedEvent::Notification(ServerNotification::ItemCompleted(event)) => { + &event.item + } + ThreadBufferedEvent::Notification(ServerNotification::ItemStarted(event)) => { + &event.item + } + ThreadBufferedEvent::Notification(_) + | ThreadBufferedEvent::Request(_) + | ThreadBufferedEvent::HistoryEntryResponse(_) + | ThreadBufferedEvent::FeedbackSubmission(_) => continue, + }; + if !seen_item_ids.insert(item.id().to_string()) { + continue; + } + if let Some(summary) = activity_summary(item) { + activity.push(summary); + if activity.len() == AGENT_STATUS_PREVIEW_ITEMS { + break; + } + } + } + activity.reverse(); + Self { + agent_path, + activity, + } + } + + fn title_line(&self) -> Line<'static> { + vec![" • ".dim(), format!("`{}`", self.agent_path).cyan()].into() + } + + fn preview_lines(&self, width: u16) -> Vec> { + let mut lines = self + .activity + .iter() + .flat_map(|activity| textwrap::wrap(activity, width as usize)) + .filter(|line| !line.trim().is_empty()) + .map(|line| line.into_owned().dim().into()) + .collect::>(); + if lines.len() > AGENT_STATUS_PREVIEW_LINES { + lines.drain(..lines.len() - AGENT_STATUS_PREVIEW_LINES); + } + lines + } +} + +fn activity_summary(item: &ThreadItem) -> Option { + let summary = match item { + ThreadItem::AgentMessage { text, .. } | ThreadItem::Plan { text, .. } => text, + ThreadItem::Reasoning { summary, .. } => summary.last()?, + ThreadItem::CommandExecution { command, .. } => { + let command = truncate_text( + command, + AGENT_STATUS_PREVIEW_GRAPHEMES.saturating_sub("$ ".len()), + ); + return bounded_summary(&format!("$ {command}")); + } + ThreadItem::FileChange { changes, .. } => { + return bounded_summary(&format!("Updated {} file(s)", changes.len())); + } + ThreadItem::McpToolCall { server, tool, .. } => { + return bounded_summary(&format!("MCP {server}/{tool}")); + } + ThreadItem::DynamicToolCall { + namespace, tool, .. + } => { + let tool = namespace + .as_ref() + .map(|namespace| format!("{namespace}/{tool}")) + .unwrap_or_else(|| tool.clone()); + return bounded_summary(&format!("Tool {tool}")); + } + ThreadItem::CollabAgentToolCall { tool, .. } => { + let action = match tool { + CollabAgentTool::SpawnAgent => "Spawned an agent", + CollabAgentTool::SendInput => "Sent input to an agent", + CollabAgentTool::ResumeAgent => "Resumed an agent", + CollabAgentTool::Wait => "Waited for an agent", + CollabAgentTool::CloseAgent => "Closed an agent", + }; + return Some(action.to_string()); + } + ThreadItem::SubAgentActivity { + kind, agent_path, .. + } => { + let action = match kind { + SubAgentActivityKind::Started => "Started", + SubAgentActivityKind::Interacted => "Contacted", + SubAgentActivityKind::Interrupted => "Interrupted", + }; + return bounded_summary(&format!("{action} {agent_path}")); + } + ThreadItem::WebSearch { query, .. } => { + return bounded_summary(&format!("Web search: {query}")); + } + ThreadItem::ImageView { path, .. } => { + return bounded_summary(&format!("Viewed {}", path.display())); + } + ThreadItem::ImageGeneration { .. } => return Some("Generated an image".to_string()), + ThreadItem::EnteredReviewMode { .. } => return Some("Entered review mode".to_string()), + ThreadItem::ExitedReviewMode { .. } => return Some("Exited review mode".to_string()), + ThreadItem::ContextCompaction { .. } => return Some("Compacted context".to_string()), + ThreadItem::UserMessage { .. } | ThreadItem::HookPrompt { .. } => return None, + }; + bounded_summary(summary) +} + +fn bounded_summary(summary: &str) -> Option { + let summary = truncate_text(summary, AGENT_STATUS_PREVIEW_GRAPHEMES); + let summary = summary.split_whitespace().collect::>().join(" "); + (!summary.is_empty()).then_some(summary) +} + +fn indent_preview_line(mut line: Line<'static>) -> Line<'static> { + line.spans.insert(0, " ".into()); + line +} + +#[cfg(test)] +#[path = "agent_status_feed_tests.rs"] +mod tests; diff --git a/codex-rs/tui/src/app/agent_status_feed_tests.rs b/codex-rs/tui/src/app/agent_status_feed_tests.rs new file mode 100644 index 000000000..5b8bc4b59 --- /dev/null +++ b/codex-rs/tui/src/app/agent_status_feed_tests.rs @@ -0,0 +1,109 @@ +use super::*; +use codex_app_server_protocol::CommandExecutionSource; +use codex_app_server_protocol::CommandExecutionStatus; +use codex_app_server_protocol::ItemCompletedNotification; +use codex_utils_absolute_path::AbsolutePathBuf; + +#[test] +fn agent_status_uses_bounded_buffered_activity() { + let mut store = ThreadEventStore::new(/*capacity*/ 8); + store.push_notification(ServerNotification::ItemCompleted( + ItemCompletedNotification { + item: ThreadItem::CommandExecution { + id: "command-1".to_string(), + command: "cargo test -p codex-tui".to_string(), + cwd: AbsolutePathBuf::try_from("/workspace").expect("absolute path"), + process_id: None, + source: CommandExecutionSource::Agent, + status: CommandExecutionStatus::Completed, + command_actions: Vec::new(), + aggregated_output: Some("unbounded output\n".repeat(10_000)), + exit_code: Some(0), + duration_ms: Some(42), + }, + thread_id: "thread-child".to_string(), + turn_id: "turn-1".to_string(), + completed_at_ms: 1, + }, + )); + store.push_notification(ServerNotification::ItemCompleted( + ItemCompletedNotification { + item: ThreadItem::AgentMessage { + id: "message-1".to_string(), + text: "Finished checking the focused TUI tests.".to_string(), + phase: None, + memory_citation: None, + }, + thread_id: "thread-child".to_string(), + turn_id: "turn-1".to_string(), + completed_at_ms: 2, + }, + )); + + let preview = AgentStatusThreadPreview::from_store("/root/reviewer".to_string(), &store); + let cell = AgentStatusHistoryCell::new(vec![preview]); + let rendered = cell + .display_lines(/*width*/ 80) + .iter() + .map(ToString::to_string) + .collect::>() + .join("\n"); + + insta::assert_snapshot!(rendered, @r###" + /agent + Sub-agents running + + • `/root/reviewer` + $ cargo test -p codex-tui + Finished checking the focused TUI tests. + "###); + assert!(!rendered.contains("unbounded output")); +} + +#[test] +fn agent_status_uses_reasoning_summaries_only() { + let mut store = ThreadEventStore::new(/*capacity*/ 8); + store.push_notification(ServerNotification::ItemCompleted( + ItemCompletedNotification { + item: ThreadItem::Reasoning { + id: "reasoning-with-summary".to_string(), + summary: vec!["safe summary".to_string()], + content: vec!["hidden raw reasoning".to_string()], + }, + thread_id: "thread-child".to_string(), + turn_id: "turn-1".to_string(), + completed_at_ms: 1, + }, + )); + store.push_notification(ServerNotification::ItemCompleted( + ItemCompletedNotification { + item: ThreadItem::Reasoning { + id: "reasoning-without-summary".to_string(), + summary: Vec::new(), + content: vec!["raw-only reasoning".to_string()], + }, + thread_id: "thread-child".to_string(), + turn_id: "turn-1".to_string(), + completed_at_ms: 2, + }, + )); + + let preview = AgentStatusThreadPreview::from_store("/root/reviewer".to_string(), &store); + let cell = AgentStatusHistoryCell::new(vec![preview]); + let rendered = cell + .display_lines(/*width*/ 80) + .iter() + .map(ToString::to_string) + .collect::>() + .join("\n"); + + insta::assert_snapshot!(rendered, @r###" + /agent + Sub-agents running + + • `/root/reviewer` + safe summary + "###); + assert!(!rendered.contains("hidden raw reasoning")); + assert!(!rendered.contains("raw-only reasoning")); +} diff --git a/codex-rs/tui/src/app/loaded_threads.rs b/codex-rs/tui/src/app/loaded_threads.rs index d988c8051..aba08f08d 100644 --- a/codex-rs/tui/src/app/loaded_threads.rs +++ b/codex-rs/tui/src/app/loaded_threads.rs @@ -14,8 +14,10 @@ //! `SessionSource::SubAgent(ThreadSpawn { parent_thread_id, .. })` edges until no new children are //! found. The primary thread itself is never included in the output. +use codex_app_server_protocol::SessionSource; use codex_app_server_protocol::Thread; use codex_protocol::ThreadId; +use codex_protocol::protocol::SubAgentSource; use std::collections::HashMap; use std::collections::HashSet; @@ -26,6 +28,7 @@ pub(crate) struct LoadedSubagentThread { pub(crate) thread_id: ThreadId, pub(crate) agent_nickname: Option, pub(crate) agent_role: Option, + pub(crate) agent_path: Option, } /// Walks the spawn tree rooted at `primary_thread_id` and returns every descendant subagent. @@ -84,6 +87,7 @@ pub(crate) fn find_loaded_subagent_threads_for_primary( thread_id, agent_nickname: thread.agent_nickname, agent_role: thread.agent_role, + agent_path: thread_spawn_agent_path(&thread.source), }) }) .collect(); @@ -91,16 +95,22 @@ pub(crate) fn find_loaded_subagent_threads_for_primary( loaded_threads } -fn thread_spawn_parent_thread_id( - source: &codex_app_server_protocol::SessionSource, -) -> Option { - let value = serde_json::to_value(source).ok()?; - let parent_thread_id = value - .get("subAgent")? - .get("thread_spawn")? - .get("parent_thread_id")? - .as_str()?; - ThreadId::from_string(parent_thread_id).ok() +fn thread_spawn_agent_path(source: &SessionSource) -> Option { + match source { + SessionSource::SubAgent(SubAgentSource::ThreadSpawn { agent_path, .. }) => { + agent_path.clone().map(String::from) + } + _ => None, + } +} + +fn thread_spawn_parent_thread_id(source: &SessionSource) -> Option { + match source { + SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id, .. + }) => Some(*parent_thread_id), + _ => None, + } } #[cfg(test)] @@ -208,11 +218,13 @@ mod tests { thread_id: child_thread_id, agent_nickname: Some("Scout".to_string()), agent_role: Some("explorer".to_string()), + agent_path: None, }, LoadedSubagentThread { thread_id: grandchild_thread_id, agent_nickname: Some("Atlas".to_string()), agent_role: Some("worker".to_string()), + agent_path: None, }, ] ); diff --git a/codex-rs/tui/src/app/session_lifecycle.rs b/codex-rs/tui/src/app/session_lifecycle.rs index 5e7985f10..a961658c8 100644 --- a/codex-rs/tui/src/app/session_lifecycle.rs +++ b/codex-rs/tui/src/app/session_lifecycle.rs @@ -8,6 +8,60 @@ use super::*; impl App { pub(super) async fn open_agent_picker(&mut self, app_server: &mut AppServerSession) { + self.backfill_loaded_subagent_threads(app_server).await; + // V2 subagents are identified by canonical paths observed from activity events or loaded + // thread metadata. Prefer local buffered turn state for liveness, and fall back to + // thread/read only when no local event channel exists. + let path_backed_thread_ids: Vec<_> = self + .agent_navigation + .ordered_path_backed_subagent_threads(self.primary_thread_id) + .into_iter() + .map(|(thread_id, _)| thread_id) + .collect(); + for thread_id in path_backed_thread_ids { + if let Some(channel) = self.thread_event_channels.get(&thread_id) + && channel.attachment() == ThreadEventAttachment::Live + { + let is_running = channel.store.lock().await.active_turn_id().is_some(); + self.agent_navigation.set_running(thread_id, is_running); + } else { + self.refresh_agent_picker_thread_liveness(app_server, thread_id) + .await; + } + } + let path_backed_threads = self + .agent_navigation + .ordered_path_backed_subagent_threads(self.primary_thread_id); + if !path_backed_threads.is_empty() { + let running_threads: Vec<_> = path_backed_threads + .into_iter() + .filter_map(|(thread_id, entry)| { + if !entry.is_running || entry.is_closed { + return None; + } + Some((thread_id, entry.agent_path.as_deref()?.trim().to_string())) + }) + .collect(); + let mut entries = Vec::new(); + for (thread_id, agent_path) in running_threads { + let preview = if let Some(channel) = self.thread_event_channels.get(&thread_id) { + let store = channel.store.lock().await; + super::agent_status_feed::AgentStatusThreadPreview::from_store( + agent_path, &store, + ) + } else { + super::agent_status_feed::AgentStatusThreadPreview::empty(agent_path) + }; + entries.push(preview); + } + + self.chat_widget + .add_to_history(super::agent_status_feed::AgentStatusHistoryCell::new( + entries, + )); + return; + } + let mut thread_ids = self.agent_navigation.tracked_thread_ids(); for thread_id in self.thread_event_channels.keys().copied() { if !thread_ids.contains(&thread_id) { @@ -44,14 +98,14 @@ impl App { let items: Vec = self .agent_navigation .ordered_threads() - .iter() + .into_iter() .enumerate() .map(|(idx, (thread_id, entry))| { - if self.active_thread_id == Some(*thread_id) { + if self.active_thread_id == Some(thread_id) { initial_selected_idx = Some(idx); } - let id = *thread_id; - let is_primary = self.primary_thread_id == Some(*thread_id); + let id = thread_id; + let is_primary = self.primary_thread_id == Some(thread_id); let name = format_agent_picker_item_name( entry.agent_nickname.as_deref(), entry.agent_role.as_deref(), @@ -62,7 +116,7 @@ impl App { name: name.clone(), name_prefix_spans: agent_picker_status_dot_spans(entry.is_closed), description: Some(uuid.clone()), - is_current: self.active_thread_id == Some(*thread_id), + is_current: self.active_thread_id == Some(thread_id), actions: vec![Box::new(move |tx| { tx.send(AppEvent::SelectAgentThread(id)); })], @@ -145,6 +199,14 @@ impl App { .await { Ok(thread) => { + let is_running = matches!( + thread.status, + codex_app_server_protocol::ThreadStatus::Active { .. } + ); + let is_closed = matches!( + thread.status, + codex_app_server_protocol::ThreadStatus::NotLoaded + ); self.upsert_agent_picker_thread( thread_id, thread.agent_nickname.or_else(|| { @@ -157,11 +219,9 @@ impl App { .as_ref() .and_then(|entry| entry.agent_role.clone()) }), - matches!( - thread.status, - codex_app_server_protocol::ThreadStatus::NotLoaded - ), + is_closed, ); + self.agent_navigation.set_running(thread_id, is_running); true } Err(err) => { @@ -186,6 +246,8 @@ impl App { is_closed, ); } + self.agent_navigation + .set_running(thread_id, /*is_running*/ false); true } } @@ -248,6 +310,9 @@ impl App { } }; let channel = self.ensure_thread_channel(thread_id); + if !live_attached { + channel.mark_replay_only(); + } let mut store = channel.store.lock().await; store.set_session(session, turns); Ok(live_attached) @@ -610,13 +675,17 @@ impl App { } for thread in find_loaded_subagent_threads_for_primary(threads, primary_thread_id) { + let agent_path = thread.agent_path; self.upsert_agent_picker_thread( thread.thread_id, thread.agent_nickname, thread.agent_role, /*is_closed*/ false, ); + self.agent_navigation + .set_agent_path(thread.thread_id, agent_path); } + self.sync_active_agent_label(); !had_read_error } diff --git a/codex-rs/tui/src/app/tests.rs b/codex-rs/tui/src/app/tests.rs index 451bfadc1..99d4c3533 100644 --- a/codex-rs/tui/src/app/tests.rs +++ b/codex-rs/tui/src/app/tests.rs @@ -22,6 +22,7 @@ use crate::history_cell::PlainHistoryCell; use crate::history_cell::UserHistoryCell; use crate::history_cell::new_session_info; use crate::multi_agents::AgentPickerThreadEntry; +use crate::multi_agents::SubAgentActivityDisplay; use assert_matches::assert_matches; use crate::app_command::AppCommand as Op; @@ -1164,6 +1165,8 @@ async fn collab_receiver_notification_caches_thread_without_app_server_read() { Some(&AgentPickerThreadEntry { agent_nickname: None, agent_role: None, + agent_path: None, + is_running: false, is_closed: false, }) ); @@ -1223,6 +1226,8 @@ async fn open_agent_picker_keeps_missing_threads_for_replay() -> Result<()> { Some(&AgentPickerThreadEntry { agent_nickname: None, agent_role: None, + agent_path: None, + is_running: false, is_closed: true, }) ); @@ -1256,6 +1261,88 @@ async fn open_agent_picker_preserves_cached_metadata_for_replay_threads() -> Res Some(&AgentPickerThreadEntry { agent_nickname: Some("Robie".to_string()), agent_role: Some("explorer".to_string()), + agent_path: None, + is_running: false, + is_closed: true, + }) + ); + Ok(()) +} + +#[tokio::test] +async fn open_agent_picker_clears_completed_path_backed_agent_running_state() -> Result<()> { + let mut app = Box::pin(make_test_app()).await; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); + let thread_id = ThreadId::new(); + let channel = ThreadEventChannel::new(/*capacity*/ 4); + { + let mut store = channel.store.lock().await; + store.push_notification(turn_started_notification(thread_id, "turn-1")); + store.push_notification(turn_completed_notification( + thread_id, + "turn-1", + TurnStatus::Completed, + )); + } + app.thread_event_channels.insert(thread_id, channel); + app.agent_navigation + .record_sub_agent_activity(SubAgentActivityDisplay { + thread_id, + agent_path: "/root/child".to_string(), + is_running_hint: true, + }); + + Box::pin(app.open_agent_picker(&mut app_server)).await; + + assert_eq!( + app.agent_navigation.get(&thread_id), + Some(&AgentPickerThreadEntry { + agent_nickname: None, + agent_role: None, + agent_path: Some("/root/child".to_string()), + is_running: false, + is_closed: false, + }) + ); + Ok(()) +} + +#[tokio::test] +async fn open_agent_picker_refreshes_replay_only_path_backed_liveness() -> Result<()> { + let mut app = Box::pin(make_test_app()).await; + let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker( + app.chat_widget.config_ref(), + )) + .await + .expect("embedded app server"); + let thread_id = ThreadId::new(); + let mut channel = ThreadEventChannel::new(/*capacity*/ 4); + channel.mark_replay_only(); + { + let mut store = channel.store.lock().await; + store.push_notification(turn_started_notification(thread_id, "turn-1")); + } + app.thread_event_channels.insert(thread_id, channel); + app.agent_navigation + .record_sub_agent_activity(SubAgentActivityDisplay { + thread_id, + agent_path: "/root/child".to_string(), + is_running_hint: true, + }); + + Box::pin(app.open_agent_picker(&mut app_server)).await; + + assert_eq!( + app.agent_navigation.get(&thread_id), + Some(&AgentPickerThreadEntry { + agent_nickname: None, + agent_role: None, + agent_path: Some("/root/child".to_string()), + is_running: false, is_closed: true, }) ); @@ -1310,6 +1397,8 @@ async fn open_agent_picker_marks_terminal_read_errors_closed() -> Result<()> { Some(&AgentPickerThreadEntry { agent_nickname: Some("Robie".to_string()), agent_role: Some("explorer".to_string()), + agent_path: None, + is_running: false, is_closed: true, }) ); @@ -1348,6 +1437,8 @@ fn open_agent_picker_marks_loaded_threads_open() -> Result<()> { Some(&AgentPickerThreadEntry { agent_nickname: None, agent_role: None, + agent_path: None, + is_running: false, is_closed: false, }) ); @@ -2819,6 +2910,8 @@ async fn inactive_thread_started_notification_initializes_replay_session() -> Re Some(&AgentPickerThreadEntry { agent_nickname: Some("Robie".to_string()), agent_role: Some("explorer".to_string()), + agent_path: None, + is_running: false, is_closed: false, }) ); diff --git a/codex-rs/tui/src/app/thread_events.rs b/codex-rs/tui/src/app/thread_events.rs index 5f7a58da1..48dc1d084 100644 --- a/codex-rs/tui/src/app/thread_events.rs +++ b/codex-rs/tui/src/app/thread_events.rs @@ -31,6 +31,12 @@ pub(super) struct FeedbackThreadEvent { pub(super) result: Result, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum ThreadEventAttachment { + Live, + ReplayOnly, +} + #[derive(Debug)] pub(super) struct ThreadEventStore { pub(super) session: Option, @@ -285,6 +291,7 @@ pub(super) struct ThreadEventChannel { pub(super) sender: mpsc::Sender, pub(super) receiver: Option>, pub(super) store: Arc>, + attachment: ThreadEventAttachment, } impl ThreadEventChannel { @@ -294,9 +301,18 @@ impl ThreadEventChannel { sender, receiver: Some(receiver), store: Arc::new(Mutex::new(ThreadEventStore::new(capacity))), + attachment: ThreadEventAttachment::Live, } } + pub(super) fn mark_replay_only(&mut self) { + self.attachment = ThreadEventAttachment::ReplayOnly; + } + + pub(super) fn attachment(&self) -> ThreadEventAttachment { + self.attachment + } + #[cfg_attr(not(test), allow(dead_code))] pub(super) fn new_with_session( capacity: usize, @@ -310,6 +326,7 @@ impl ThreadEventChannel { store: Arc::new(Mutex::new(ThreadEventStore::new_with_session( capacity, session, turns, ))), + attachment: ThreadEventAttachment::Live, } } } diff --git a/codex-rs/tui/src/app/thread_routing.rs b/codex-rs/tui/src/app/thread_routing.rs index 099c81fb7..80b67b19d 100644 --- a/codex-rs/tui/src/app/thread_routing.rs +++ b/codex-rs/tui/src/app/thread_routing.rs @@ -896,6 +896,14 @@ impl App { &mut self, notification: &ServerNotification, ) { + if let Some(activity) = + sub_agent_activity_item(notification).and_then(sub_agent_activity_display) + { + self.agent_navigation.record_sub_agent_activity(activity); + self.sync_active_agent_label(); + return; + } + let Some(receiver_thread_ids) = collab_receiver_thread_ids(notification) else { return; }; diff --git a/codex-rs/tui/src/chatwidget/protocol.rs b/codex-rs/tui/src/chatwidget/protocol.rs index 9b77da037..9d2920807 100644 --- a/codex-rs/tui/src/chatwidget/protocol.rs +++ b/codex-rs/tui/src/chatwidget/protocol.rs @@ -327,6 +327,7 @@ impl ChatWidget { reasoning_effort, agents_states, }), + item @ ThreadItem::SubAgentActivity { .. } => self.on_sub_agent_activity(item), ThreadItem::EnteredReviewMode { review, .. } if !from_replay => { self.enter_review_mode_with_hint(review, /*from_replay*/ false); } diff --git a/codex-rs/tui/src/chatwidget/replay.rs b/codex-rs/tui/src/chatwidget/replay.rs index f0211aaca..bdb8b8d3a 100644 --- a/codex-rs/tui/src/chatwidget/replay.rs +++ b/codex-rs/tui/src/chatwidget/replay.rs @@ -190,6 +190,7 @@ impl ChatWidget { reasoning_effort, agents_states, }), + item @ ThreadItem::SubAgentActivity { .. } => self.on_sub_agent_activity(item), ThreadItem::DynamicToolCall { .. } => {} } diff --git a/codex-rs/tui/src/chatwidget/tool_lifecycle.rs b/codex-rs/tui/src/chatwidget/tool_lifecycle.rs index 5df5e7bc1..43d4c2e3c 100644 --- a/codex-rs/tui/src/chatwidget/tool_lifecycle.rs +++ b/codex-rs/tui/src/chatwidget/tool_lifecycle.rs @@ -144,6 +144,13 @@ impl ChatWidget { } } + pub(super) fn on_sub_agent_activity(&mut self, item: ThreadItem) { + self.record_visible_turn_activity(); + if let Some(cell) = multi_agents::sub_agent_activity_history_cell(&item) { + self.on_collab_event(cell); + } + } + pub(crate) fn handle_file_change_completed_now(&mut self, item: ThreadItem) { let ThreadItem::FileChange { status, .. } = item else { return; diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index f46543640..281e4a52a 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -192,6 +192,7 @@ mod terminal_title; mod terminal_visualization_instructions; mod text_formatting; mod theme_picker; +mod thread_transcript; mod token_usage; mod tooltips; mod transcript_reflow; diff --git a/codex-rs/tui/src/multi_agents.rs b/codex-rs/tui/src/multi_agents.rs index 0532ea9fb..aa47d95c6 100644 --- a/codex-rs/tui/src/multi_agents.rs +++ b/codex-rs/tui/src/multi_agents.rs @@ -11,6 +11,7 @@ use codex_app_server_protocol::CollabAgentState; use codex_app_server_protocol::CollabAgentStatus; use codex_app_server_protocol::CollabAgentTool; use codex_app_server_protocol::CollabAgentToolCallStatus; +use codex_app_server_protocol::SubAgentActivityKind; use codex_app_server_protocol::ThreadItem; use codex_protocol::ThreadId; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; @@ -35,10 +36,21 @@ pub(crate) struct AgentPickerThreadEntry { pub(crate) agent_nickname: Option, /// Agent type shown in brackets when present, for example `worker`. pub(crate) agent_role: Option, + /// Canonical v2 agent path, when the thread was observed through v2 activity. + pub(crate) agent_path: Option, + /// Whether the latest liveness refresh says the agent thread is actively working. + pub(crate) is_running: bool, /// Whether the thread has emitted a close event and should render dimmed. pub(crate) is_closed: bool, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct SubAgentActivityDisplay { + pub(crate) thread_id: ThreadId, + pub(crate) agent_path: String, + pub(crate) is_running_hint: bool, +} + #[derive(Clone, Debug, Default, PartialEq, Eq)] pub(crate) struct AgentMetadata { /// Human-friendly nickname shown in rendered tool-call rows. @@ -266,6 +278,56 @@ pub(crate) fn tool_call_history_cell( } } +pub(crate) fn sub_agent_activity_display(item: &ThreadItem) -> Option { + let ThreadItem::SubAgentActivity { + kind, + agent_thread_id, + agent_path, + .. + } = item + else { + return None; + }; + Some(SubAgentActivityDisplay { + thread_id: parse_thread_id(agent_thread_id)?, + agent_path: agent_path.clone(), + is_running_hint: !matches!(kind, SubAgentActivityKind::Interrupted), + }) +} + +pub(crate) fn sub_agent_activity_history_cell(item: &ThreadItem) -> Option { + let ThreadItem::SubAgentActivity { + kind, agent_path, .. + } = item + else { + return None; + }; + Some(collab_event( + sub_agent_activity_title(*kind, agent_path), + Vec::new(), + )) +} + +pub(crate) fn sub_agent_activity_summary(kind: SubAgentActivityKind, agent_path: &str) -> String { + match kind { + SubAgentActivityKind::Started => format!("Started `{agent_path}`"), + SubAgentActivityKind::Interacted => format!("Interacted with `{agent_path}`"), + SubAgentActivityKind::Interrupted => format!("Interrupted `{agent_path}`"), + } +} + +fn sub_agent_activity_title(kind: SubAgentActivityKind, agent_path: &str) -> Line<'static> { + let (prefix, path) = match kind { + SubAgentActivityKind::Started => ("Started ", agent_path), + SubAgentActivityKind::Interacted => ("Interacted with ", agent_path), + SubAgentActivityKind::Interrupted => ("Interrupted ", agent_path), + }; + title_spans_line(vec![ + Span::from(prefix).bold(), + Span::from(format!("`{path}`")).cyan(), + ]) +} + fn spawn_end( new_thread_id: Option, prompt: &str, diff --git a/codex-rs/tui/src/resume_picker.rs b/codex-rs/tui/src/resume_picker.rs index de9a46e8f..c736fe9b6 100644 --- a/codex-rs/tui/src/resume_picker.rs +++ b/codex-rs/tui/src/resume_picker.rs @@ -4,8 +4,6 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; -mod transcript; - use crate::app_server_session::AppServerSession; use crate::clipboard_paste::normalize_pasted_search_query; use crate::color::blend; @@ -25,6 +23,9 @@ use crate::status::format_directory_display; use crate::terminal_palette::best_color; use crate::terminal_palette::default_bg; use crate::text_formatting::truncate_text; +use crate::thread_transcript::RawReasoningVisibility; +use crate::thread_transcript::TranscriptCells; +use crate::thread_transcript::load_session_transcript; use crate::tui::FrameRequester; use crate::tui::Tui; use crate::tui::TuiEvent; @@ -60,9 +61,6 @@ use tokio::sync::mpsc; use tokio_stream::StreamExt; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::warn; -use transcript::RawReasoningVisibility; -use transcript::TranscriptCells; -use transcript::load_session_transcript; use unicode_width::UnicodeWidthStr; const PAGE_SIZE: usize = 25; @@ -5751,7 +5749,7 @@ session_picker_view = "dense" #[test] fn thread_to_transcript_cells_renders_core_message_types() { - use transcript::thread_to_transcript_cells; + use crate::thread_transcript::thread_to_transcript_cells; let thread_id = ThreadId::new(); let thread = Thread { @@ -5820,7 +5818,7 @@ session_picker_view = "dense" #[test] fn thread_to_transcript_cells_hides_raw_reasoning_when_not_enabled() { - use transcript::thread_to_transcript_cells; + use crate::thread_transcript::thread_to_transcript_cells; let thread_id = ThreadId::new(); let thread = Thread { @@ -5878,7 +5876,7 @@ session_picker_view = "dense" #[test] fn thread_to_transcript_cells_shows_raw_reasoning_over_summary_when_enabled() { - use transcript::thread_to_transcript_cells; + use crate::thread_transcript::thread_to_transcript_cells; let thread_id = ThreadId::new(); let thread = Thread { diff --git a/codex-rs/tui/src/resume_picker/transcript.rs b/codex-rs/tui/src/thread_transcript.rs similarity index 96% rename from codex-rs/tui/src/resume_picker/transcript.rs rename to codex-rs/tui/src/thread_transcript.rs index e211a7126..b530dfacf 100644 --- a/codex-rs/tui/src/resume_picker/transcript.rs +++ b/codex-rs/tui/src/thread_transcript.rs @@ -1,3 +1,5 @@ +//! Render persisted thread turns into history-cell building blocks. + use std::sync::Arc; use crate::app_server_session::AppServerSession; @@ -7,6 +9,7 @@ use crate::history_cell::HistoryCell; use crate::history_cell::PlainHistoryCell; use crate::history_cell::ReasoningSummaryCell; use crate::history_cell::UserHistoryCell; +use crate::multi_agents::sub_agent_activity_summary; use codex_app_server_protocol::Thread; use codex_app_server_protocol::ThreadItem; use codex_protocol::ThreadId; @@ -191,6 +194,11 @@ fn fallback_transcript_cell(item: &ThreadItem) -> Option { ThreadItem::CollabAgentToolCall { tool, status, .. } => { vec![format!("agent tool: {tool:?} · {status:?}").dim().into()] } + ThreadItem::SubAgentActivity { + kind, agent_path, .. + } => { + vec![sub_agent_activity_summary(*kind, agent_path).dim().into()] + } ThreadItem::WebSearch { query, .. } => { vec![vec!["web search: ".dim(), query.clone().into()].into()] }