mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
multi-agent: add path-based v2 activity tracking (#27007)
## Why Multi-agent v2 identifies agents by canonical paths, but its tool handlers still emitted the larger legacy collaboration begin/end events built around nickname and role metadata. App-server, rollout-trace, analytics, and TUI consumers therefore lacked one compact path-based completion signal that behaved consistently across live events and replay. The TUI also needs a bounded `/agent` status surface for v2 agents. It should use recent local activity for previews, refresh liveness without loading full histories, and keep the legacy picker available when no path-backed v2 agent is known. ## What changed - Replace the v2 `spawn_agent`, `send_message`, `followup_task`, and `interrupt_agent` legacy lifecycle emissions with a success-only `SubAgentActivity` event. The event records the tool call ID, occurrence time, affected thread, canonical agent path, and `started`, `interacted`, or `interrupted` kind. - Expose the activity as a completion-only app-server v2 `subAgentActivity` thread item in live notifications and reconstructed history, regenerate the protocol schemas, and count it in sub-agent tool analytics. - Track canonical paths from live activity and loaded-thread metadata in the TUI, and render the activity in live and replayed transcripts. - Make `/agent` list running path-backed agents with summaries from bounded local event buffers. Each summary is capped at 240 graphemes, the scan is capped at six recent items, only the last three wrapped lines are shown, and command output is omitted. Liveness falls back to metadata-only `thread/read` when local turn state is unavailable. - Persist the activity as a terminal rollout-trace runtime payload and reduce it to the corresponding spawn, send, follow-up, or close interaction edge. `interrupt_agent` is classified as a close-edge operation. - Preserve the legacy picker when no path-backed v2 agent is known. ## Compatibility App-server v2 clients that consumed `collabAgentToolCall` begin/end pairs for these tools must handle the new completion-only `subAgentActivity` item. Legacy v1 collaboration behavior is unchanged. ## Screenshot <img width="684" height="288" alt="Screenshot 2026-06-08 at 15 40 47" src="https://github.com/user-attachments/assets/194b3cd0-619d-45fb-b587-cf3e2b1b8a1d" /> ## Testing - `just test -p codex-app-server-protocol` - `just test -p codex-rollout-trace` - Added focused coverage for activity analytics, terminal trace serialization, spawn-edge reduction, `interrupt_agent` classification, TUI status rendering without aggregated command output, and clearing stale running state after a completed turn.
This commit is contained in:
@@ -108,6 +108,7 @@ use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
|
||||
use codex_app_server_protocol::SubAgentActivityKind;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
@@ -3794,6 +3795,12 @@ async fn turn_event_counts_completed_tool_items() {
|
||||
reasoning_effort: None,
|
||||
agents_states: Default::default(),
|
||||
},
|
||||
ThreadItem::SubAgentActivity {
|
||||
id: "sub-agent-activity-1".to_string(),
|
||||
kind: SubAgentActivityKind::Interacted,
|
||||
agent_thread_id: "thread-child".to_string(),
|
||||
agent_path: "/root/child".to_string(),
|
||||
},
|
||||
ThreadItem::WebSearch {
|
||||
id: "web-1".to_string(),
|
||||
query: "codex".to_string(),
|
||||
@@ -3841,14 +3848,14 @@ async fn turn_event_counts_completed_tool_items() {
|
||||
.find(|event| matches!(event, TrackEventRequest::TurnEvent(_)))
|
||||
.expect("turn event should be emitted");
|
||||
let payload = serde_json::to_value(turn_event).expect("serialize turn event");
|
||||
assert_eq!(payload["event_params"]["total_tool_call_count"], json!(7));
|
||||
assert_eq!(payload["event_params"]["total_tool_call_count"], json!(8));
|
||||
assert_eq!(payload["event_params"]["shell_command_count"], json!(1));
|
||||
assert_eq!(payload["event_params"]["file_change_count"], json!(1));
|
||||
assert_eq!(payload["event_params"]["mcp_tool_call_count"], json!(1));
|
||||
assert_eq!(payload["event_params"]["dynamic_tool_call_count"], json!(1));
|
||||
assert_eq!(
|
||||
payload["event_params"]["subagent_tool_call_count"],
|
||||
json!(1)
|
||||
json!(2)
|
||||
);
|
||||
assert_eq!(payload["event_params"]["web_search_count"], json!(1));
|
||||
assert_eq!(payload["event_params"]["image_generation_count"], json!(1));
|
||||
|
||||
@@ -360,7 +360,9 @@ impl TurnToolCounts {
|
||||
ThreadItem::FileChange { .. } => self.file_change += 1,
|
||||
ThreadItem::McpToolCall { .. } => self.mcp_tool_call += 1,
|
||||
ThreadItem::DynamicToolCall { .. } => self.dynamic_tool_call += 1,
|
||||
ThreadItem::CollabAgentToolCall { .. } => self.subagent_tool_call += 1,
|
||||
ThreadItem::CollabAgentToolCall { .. } | ThreadItem::SubAgentActivity { .. } => {
|
||||
self.subagent_tool_call += 1;
|
||||
}
|
||||
ThreadItem::WebSearch { .. } => self.web_search += 1,
|
||||
ThreadItem::ImageGeneration { .. } => self.image_generation += 1,
|
||||
ThreadItem::UserMessage { .. }
|
||||
@@ -1091,6 +1093,18 @@ impl AnalyticsReducer {
|
||||
);
|
||||
}
|
||||
ServerNotification::ItemCompleted(notification) => {
|
||||
if matches!(notification.item, ThreadItem::SubAgentActivity { .. }) {
|
||||
let Some(turn_state) = self.turns.get_mut(¬ification.turn_id) else {
|
||||
tracing::warn!(
|
||||
thread_id = %notification.thread_id,
|
||||
turn_id = %notification.turn_id,
|
||||
"dropping sub-agent activity tool count update: missing turn state"
|
||||
);
|
||||
return;
|
||||
};
|
||||
turn_state.tool_counts.record(¬ification.item);
|
||||
return;
|
||||
}
|
||||
let Some(item_id) = tracked_tool_item_id(¬ification.item) else {
|
||||
return;
|
||||
};
|
||||
@@ -1562,6 +1576,7 @@ fn tracked_tool_item_id(item: &ThreadItem) -> Option<&str> {
|
||||
| ThreadItem::AgentMessage { .. }
|
||||
| ThreadItem::Plan { .. }
|
||||
| ThreadItem::Reasoning { .. }
|
||||
| ThreadItem::SubAgentActivity { .. }
|
||||
| ThreadItem::ImageView { .. }
|
||||
| ThreadItem::EnteredReviewMode { .. }
|
||||
| ThreadItem::ExitedReviewMode { .. }
|
||||
|
||||
@@ -3177,6 +3177,14 @@
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -4107,6 +4115,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
+40
@@ -15580,6 +15580,14 @@
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -16892,6 +16900,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/v2/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
+40
@@ -13397,6 +13397,14 @@
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -14709,6 +14717,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -466,6 +466,14 @@
|
||||
"minLength": 1,
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"TextElement": {
|
||||
"properties": {
|
||||
"byteRange": {
|
||||
@@ -989,6 +997,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -466,6 +466,14 @@
|
||||
"minLength": 1,
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"TextElement": {
|
||||
"properties": {
|
||||
"byteRange": {
|
||||
@@ -989,6 +997,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -610,6 +610,14 @@
|
||||
"minLength": 1,
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"TextElement": {
|
||||
"properties": {
|
||||
"byteRange": {
|
||||
@@ -1133,6 +1141,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -861,6 +861,14 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -1617,6 +1625,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -676,6 +676,14 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -1432,6 +1440,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
+40
@@ -676,6 +676,14 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -1432,6 +1440,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -676,6 +676,14 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -1432,6 +1440,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -861,6 +861,14 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -1617,6 +1625,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -676,6 +676,14 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -1432,6 +1440,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -861,6 +861,14 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -1617,6 +1625,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -676,6 +676,14 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -1432,6 +1440,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -676,6 +676,14 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -1432,6 +1440,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -610,6 +610,14 @@
|
||||
"minLength": 1,
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"TextElement": {
|
||||
"properties": {
|
||||
"byteRange": {
|
||||
@@ -1133,6 +1141,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -610,6 +610,14 @@
|
||||
"minLength": 1,
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"TextElement": {
|
||||
"properties": {
|
||||
"byteRange": {
|
||||
@@ -1133,6 +1141,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -610,6 +610,14 @@
|
||||
"minLength": 1,
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentActivityKind": {
|
||||
"enum": [
|
||||
"started",
|
||||
"interacted",
|
||||
"interrupted"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"TextElement": {
|
||||
"properties": {
|
||||
"byteRange": {
|
||||
@@ -1133,6 +1141,38 @@
|
||||
"title": "CollabAgentToolCallThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"agentPath": {
|
||||
"type": "string"
|
||||
},
|
||||
"agentThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"kind": {
|
||||
"$ref": "#/definitions/SubAgentActivityKind"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"subAgentActivity"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItemType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"agentPath",
|
||||
"agentThreadId",
|
||||
"id",
|
||||
"kind",
|
||||
"type"
|
||||
],
|
||||
"title": "SubAgentActivityThreadItem",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"action": {
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
|
||||
export type SubAgentActivityKind = "started" | "interacted" | "interrupted";
|
||||
@@ -20,6 +20,7 @@ import type { McpToolCallResult } from "./McpToolCallResult";
|
||||
import type { McpToolCallStatus } from "./McpToolCallStatus";
|
||||
import type { MemoryCitation } from "./MemoryCitation";
|
||||
import type { PatchApplyStatus } from "./PatchApplyStatus";
|
||||
import type { SubAgentActivityKind } from "./SubAgentActivityKind";
|
||||
import type { UserInput } from "./UserInput";
|
||||
import type { WebSearchAction } from "./WebSearchAction";
|
||||
|
||||
@@ -98,4 +99,4 @@ reasoningEffort: ReasoningEffort | null,
|
||||
/**
|
||||
* Last known status of the target agents, when available.
|
||||
*/
|
||||
agentsStates: { [key in string]?: CollabAgentState }, } | { "type": "webSearch", id: string, query: string, action: WebSearchAction | null, } | { "type": "imageView", id: string, path: AbsolutePathBuf, } | { "type": "imageGeneration", id: string, status: string, revisedPrompt: string | null, result: string, savedPath?: AbsolutePathBuf, } | { "type": "enteredReviewMode", id: string, review: string, } | { "type": "exitedReviewMode", id: string, review: string, } | { "type": "contextCompaction", id: string, };
|
||||
agentsStates: { [key in string]?: CollabAgentState }, } | { "type": "subAgentActivity", id: string, kind: SubAgentActivityKind, agentThreadId: string, agentPath: string, } | { "type": "webSearch", id: string, query: string, action: WebSearchAction | null, } | { "type": "imageView", id: string, path: AbsolutePathBuf, } | { "type": "imageGeneration", id: string, status: string, revisedPrompt: string | null, result: string, savedPath?: AbsolutePathBuf, } | { "type": "enteredReviewMode", id: string, review: string, } | { "type": "exitedReviewMode", id: string, review: string, } | { "type": "contextCompaction", id: string, };
|
||||
|
||||
@@ -356,6 +356,7 @@ export type { SkillsListParams } from "./SkillsListParams";
|
||||
export type { SkillsListResponse } from "./SkillsListResponse";
|
||||
export type { SortDirection } from "./SortDirection";
|
||||
export type { SpendControlLimitSnapshot } from "./SpendControlLimitSnapshot";
|
||||
export type { SubAgentActivityKind } from "./SubAgentActivityKind";
|
||||
export type { SubagentMigration } from "./SubagentMigration";
|
||||
export type { TerminalInteractionNotification } from "./TerminalInteractionNotification";
|
||||
export type { TextElement } from "./TextElement";
|
||||
|
||||
@@ -178,6 +178,20 @@ pub fn item_event_to_server_notification(
|
||||
completed_at_ms: end_event.completed_at_ms,
|
||||
})
|
||||
}
|
||||
EventMsg::SubAgentActivity(activity) => {
|
||||
let item = ThreadItem::SubAgentActivity {
|
||||
id: activity.event_id,
|
||||
kind: activity.kind.into(),
|
||||
agent_thread_id: activity.agent_thread_id.to_string(),
|
||||
agent_path: String::from(activity.agent_path),
|
||||
};
|
||||
ServerNotification::ItemCompleted(ItemCompletedNotification {
|
||||
thread_id,
|
||||
turn_id,
|
||||
item,
|
||||
completed_at_ms: activity.occurred_at_ms,
|
||||
})
|
||||
}
|
||||
EventMsg::CollabWaitingBegin(begin_event) => {
|
||||
let receiver_thread_ids = begin_event
|
||||
.receiver_thread_ids
|
||||
|
||||
@@ -203,6 +203,7 @@ impl ThreadHistoryBuilder {
|
||||
EventMsg::CollabAgentInteractionEnd(payload) => {
|
||||
self.handle_collab_agent_interaction_end(payload)
|
||||
}
|
||||
EventMsg::SubAgentActivity(payload) => self.handle_sub_agent_activity(payload),
|
||||
EventMsg::CollabWaitingBegin(payload) => self.handle_collab_waiting_begin(payload),
|
||||
EventMsg::CollabWaitingEnd(payload) => self.handle_collab_waiting_end(payload),
|
||||
EventMsg::CollabCloseBegin(payload) => self.handle_collab_close_begin(payload),
|
||||
@@ -702,6 +703,18 @@ impl ThreadHistoryBuilder {
|
||||
});
|
||||
}
|
||||
|
||||
fn handle_sub_agent_activity(
|
||||
&mut self,
|
||||
payload: &codex_protocol::protocol::SubAgentActivityEvent,
|
||||
) {
|
||||
self.upsert_item_in_current_turn(ThreadItem::SubAgentActivity {
|
||||
id: payload.event_id.clone(),
|
||||
kind: payload.kind.into(),
|
||||
agent_thread_id: payload.agent_thread_id.to_string(),
|
||||
agent_path: String::from(payload.agent_path.clone()),
|
||||
});
|
||||
}
|
||||
|
||||
fn handle_collab_waiting_begin(
|
||||
&mut self,
|
||||
payload: &codex_protocol::protocol::CollabWaitingBeginEvent,
|
||||
|
||||
@@ -29,6 +29,7 @@ use codex_protocol::protocol::GuardianRiskLevel as CoreGuardianRiskLevel;
|
||||
use codex_protocol::protocol::GuardianUserAuthorization as CoreGuardianUserAuthorization;
|
||||
use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus;
|
||||
use codex_protocol::protocol::ReviewDecision as CoreReviewDecision;
|
||||
use codex_protocol::protocol::SubAgentActivityKind as CoreSubAgentActivityKind;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use schemars::JsonSchema;
|
||||
use serde::Deserialize;
|
||||
@@ -336,6 +337,14 @@ pub enum ThreadItem {
|
||||
},
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
SubAgentActivity {
|
||||
id: String,
|
||||
kind: SubAgentActivityKind,
|
||||
agent_thread_id: String,
|
||||
agent_path: String,
|
||||
},
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(rename_all = "camelCase")]
|
||||
WebSearch {
|
||||
id: String,
|
||||
query: String,
|
||||
@@ -387,6 +396,7 @@ impl ThreadItem {
|
||||
| ThreadItem::McpToolCall { id, .. }
|
||||
| ThreadItem::DynamicToolCall { id, .. }
|
||||
| ThreadItem::CollabAgentToolCall { id, .. }
|
||||
| ThreadItem::SubAgentActivity { id, .. }
|
||||
| ThreadItem::WebSearch { id, .. }
|
||||
| ThreadItem::ImageView { id, .. }
|
||||
| ThreadItem::ImageGeneration { id, .. }
|
||||
@@ -1004,6 +1014,25 @@ pub enum CollabAgentToolCallStatus {
|
||||
Failed,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub enum SubAgentActivityKind {
|
||||
Started,
|
||||
Interacted,
|
||||
Interrupted,
|
||||
}
|
||||
|
||||
impl From<CoreSubAgentActivityKind> for SubAgentActivityKind {
|
||||
fn from(value: CoreSubAgentActivityKind) -> Self {
|
||||
match value {
|
||||
CoreSubAgentActivityKind::Started => SubAgentActivityKind::Started,
|
||||
CoreSubAgentActivityKind::Interacted => SubAgentActivityKind::Interacted,
|
||||
CoreSubAgentActivityKind::Interrupted => SubAgentActivityKind::Interrupted,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
|
||||
@@ -844,6 +844,7 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
| EventMsg::CollabAgentSpawnEnd(_)
|
||||
| EventMsg::CollabAgentInteractionBegin(_)
|
||||
| EventMsg::CollabAgentInteractionEnd(_)
|
||||
| EventMsg::SubAgentActivity(_)
|
||||
| EventMsg::CollabWaitingBegin(_)
|
||||
| EventMsg::CollabWaitingEnd(_)
|
||||
| EventMsg::CollabCloseBegin(_)
|
||||
|
||||
@@ -1431,7 +1431,8 @@ pub(super) fn realtime_text_for_event(msg: &EventMsg) -> Option<String> {
|
||||
| EventMsg::CollabCloseBegin(_)
|
||||
| EventMsg::CollabCloseEnd(_)
|
||||
| EventMsg::CollabResumeBegin(_)
|
||||
| EventMsg::CollabResumeEnd(_) => None,
|
||||
| EventMsg::CollabResumeEnd(_)
|
||||
| EventMsg::SubAgentActivity(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,15 +14,11 @@ use crate::tools::registry::ToolExecutor;
|
||||
use codex_protocol::AgentPath;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::CollabAgentInteractionBeginEvent;
|
||||
use codex_protocol::protocol::CollabAgentInteractionEndEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_protocol::protocol::CollabCloseBeginEvent;
|
||||
use codex_protocol::protocol::CollabCloseEndEvent;
|
||||
use codex_protocol::protocol::CollabWaitingBeginEvent;
|
||||
use codex_protocol::protocol::CollabWaitingEndEvent;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
use codex_protocol::protocol::SubAgentActivityEvent;
|
||||
use codex_protocol::protocol::SubAgentActivityKind;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_tools::ToolName;
|
||||
use serde::Deserialize;
|
||||
|
||||
@@ -59,18 +59,9 @@ async fn handle_interrupt_agent(
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
session
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabCloseBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
started_at_ms: now_unix_timestamp_ms(),
|
||||
sender_thread_id: session.thread_id,
|
||||
receiver_thread_id: agent_id,
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
let receiver_agent_path = receiver_agent.agent_path.clone().ok_or_else(|| {
|
||||
FunctionCallError::RespondToModel("target agent is missing an agent_path".to_string())
|
||||
})?;
|
||||
let status = session.services.agent_control.get_status(agent_id).await;
|
||||
let result = match session
|
||||
.services
|
||||
@@ -81,22 +72,20 @@ async fn handle_interrupt_agent(
|
||||
Ok(_) | Err(CodexErr::ThreadNotFound(_)) | Err(CodexErr::InternalAgentDied) => Ok(()),
|
||||
Err(err) => Err(collab_agent_error(agent_id, err)),
|
||||
};
|
||||
result?;
|
||||
session
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabCloseEndEvent {
|
||||
call_id,
|
||||
completed_at_ms: now_unix_timestamp_ms(),
|
||||
sender_thread_id: session.thread_id,
|
||||
receiver_thread_id: agent_id,
|
||||
receiver_agent_nickname: receiver_agent.agent_nickname,
|
||||
receiver_agent_role: receiver_agent.agent_role,
|
||||
status: status.clone(),
|
||||
SubAgentActivityEvent {
|
||||
event_id: call_id,
|
||||
occurred_at_ms: now_unix_timestamp_ms(),
|
||||
agent_thread_id: agent_id,
|
||||
agent_path: receiver_agent_path,
|
||||
kind: SubAgentActivityKind::Interrupted,
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
result?;
|
||||
|
||||
Ok(InterruptAgentResult {
|
||||
previous_status: status,
|
||||
|
||||
@@ -69,7 +69,6 @@ pub(crate) async fn handle_message_string_tool(
|
||||
call_id,
|
||||
..
|
||||
} = invocation;
|
||||
let prompt = String::new();
|
||||
let receiver_thread_id = resolve_agent_target(&session, &turn, &target).await?;
|
||||
let receiver_agent = session
|
||||
.services
|
||||
@@ -96,52 +95,32 @@ pub(crate) async fn handle_message_string_tool(
|
||||
.ensure_v2_agent_loaded(resume_config, receiver_thread_id)
|
||||
.await
|
||||
.map_err(|err| collab_agent_error(receiver_thread_id, err))?;
|
||||
session
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabAgentInteractionBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
started_at_ms: now_unix_timestamp_ms(),
|
||||
sender_thread_id: session.thread_id,
|
||||
receiver_thread_id,
|
||||
prompt: prompt.clone(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
let author = turn
|
||||
.session_source
|
||||
.get_agent_path()
|
||||
.unwrap_or_else(AgentPath::root);
|
||||
let communication = communication_from_tool_message(author, receiver_agent_path, message);
|
||||
let communication =
|
||||
communication_from_tool_message(author, receiver_agent_path.clone(), message);
|
||||
let result = session
|
||||
.services
|
||||
.agent_control
|
||||
.send_inter_agent_communication(receiver_thread_id, mode.apply(communication))
|
||||
.await
|
||||
.map_err(|err| collab_agent_error(receiver_thread_id, err));
|
||||
let status = session
|
||||
.services
|
||||
.agent_control
|
||||
.get_status(receiver_thread_id)
|
||||
.await;
|
||||
result?;
|
||||
session
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabAgentInteractionEndEvent {
|
||||
call_id,
|
||||
completed_at_ms: now_unix_timestamp_ms(),
|
||||
sender_thread_id: session.thread_id,
|
||||
receiver_thread_id,
|
||||
receiver_agent_nickname: receiver_agent.agent_nickname,
|
||||
receiver_agent_role: receiver_agent.agent_role,
|
||||
prompt,
|
||||
status,
|
||||
SubAgentActivityEvent {
|
||||
event_id: call_id,
|
||||
occurred_at_ms: now_unix_timestamp_ms(),
|
||||
agent_thread_id: receiver_thread_id,
|
||||
agent_path: receiver_agent_path,
|
||||
kind: SubAgentActivityKind::Interacted,
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
result?;
|
||||
|
||||
Ok(FunctionToolOutput::from_text(String::new(), Some(true)))
|
||||
}
|
||||
|
||||
@@ -61,24 +61,8 @@ async fn handle_spawn_agent(
|
||||
|
||||
let message = args.message.clone();
|
||||
let initial_operation = parse_collab_input(Some(args.message), /*items*/ None)?;
|
||||
let prompt = String::new();
|
||||
|
||||
let session_source = turn.session_source.clone();
|
||||
let child_depth = next_thread_spawn_depth(&session_source);
|
||||
session
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabAgentSpawnBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
started_at_ms: now_unix_timestamp_ms(),
|
||||
sender_thread_id: session.thread_id,
|
||||
prompt: prompt.clone(),
|
||||
model: args.model.clone().unwrap_or_default(),
|
||||
reasoning_effort: args.reasoning_effort.clone().unwrap_or_default(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
let mut config =
|
||||
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
|
||||
if let Some(service_tier) = args.service_tier.as_ref() {
|
||||
@@ -119,11 +103,16 @@ async fn handle_spawn_agent(
|
||||
role_name,
|
||||
Some(args.task_name.clone()),
|
||||
)?;
|
||||
let result = Box::pin(
|
||||
let new_agent_path = spawn_source.get_agent_path().ok_or_else(|| {
|
||||
FunctionCallError::RespondToModel(
|
||||
"spawned agent is missing a canonical task name".to_string(),
|
||||
)
|
||||
})?;
|
||||
let spawned_agent = Box::pin(
|
||||
session.services.agent_control.spawn_agent_with_metadata(
|
||||
config,
|
||||
match (spawn_source.get_agent_path(), initial_operation) {
|
||||
(Some(recipient), Op::UserInput { items, .. })
|
||||
match initial_operation {
|
||||
Op::UserInput { items, .. }
|
||||
if items
|
||||
.iter()
|
||||
.all(|item| matches!(item, UserInput::Text { .. })) =>
|
||||
@@ -132,10 +121,11 @@ async fn handle_spawn_agent(
|
||||
.session_source
|
||||
.get_agent_path()
|
||||
.unwrap_or_else(AgentPath::root);
|
||||
let communication = communication_from_tool_message(author, recipient, message);
|
||||
let communication =
|
||||
communication_from_tool_message(author, new_agent_path.clone(), message);
|
||||
Op::InterAgentCommunication { communication }
|
||||
}
|
||||
(_, initial_operation) => initial_operation,
|
||||
initial_operation => initial_operation,
|
||||
},
|
||||
Some(spawn_source),
|
||||
SpawnAgentOptions {
|
||||
@@ -147,78 +137,37 @@ async fn handle_spawn_agent(
|
||||
),
|
||||
)
|
||||
.await
|
||||
.map_err(collab_spawn_error);
|
||||
let (new_thread_id, new_agent_metadata, status) = match &result {
|
||||
Ok(spawned_agent) => (
|
||||
Some(spawned_agent.thread_id),
|
||||
Some(spawned_agent.metadata.clone()),
|
||||
spawned_agent.status.clone(),
|
||||
),
|
||||
Err(_) => (None, None, AgentStatus::NotFound),
|
||||
};
|
||||
let agent_snapshot = match new_thread_id {
|
||||
Some(thread_id) => {
|
||||
session
|
||||
.services
|
||||
.agent_control
|
||||
.get_agent_config_snapshot(thread_id)
|
||||
.await
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let (new_agent_path, new_agent_nickname, new_agent_role) =
|
||||
match (&agent_snapshot, new_agent_metadata) {
|
||||
(Some(snapshot), _) => (
|
||||
snapshot.session_source.get_agent_path().map(String::from),
|
||||
snapshot.session_source.get_nickname(),
|
||||
snapshot.session_source.get_agent_role(),
|
||||
),
|
||||
(None, Some(metadata)) => (
|
||||
metadata.agent_path.map(String::from),
|
||||
metadata.agent_nickname,
|
||||
metadata.agent_role,
|
||||
),
|
||||
(None, None) => (None, None, None),
|
||||
};
|
||||
let effective_model = agent_snapshot
|
||||
.map_err(collab_spawn_error)?;
|
||||
let new_thread_id = spawned_agent.thread_id;
|
||||
let agent_snapshot = session
|
||||
.services
|
||||
.agent_control
|
||||
.get_agent_config_snapshot(new_thread_id)
|
||||
.await;
|
||||
let nickname = agent_snapshot
|
||||
.as_ref()
|
||||
.map(|snapshot| snapshot.model.clone())
|
||||
.unwrap_or_else(|| args.model.clone().unwrap_or_default());
|
||||
let effective_reasoning_effort = agent_snapshot
|
||||
.as_ref()
|
||||
.and_then(|snapshot| snapshot.reasoning_effort.clone())
|
||||
.unwrap_or(args.reasoning_effort.unwrap_or_default());
|
||||
let nickname = new_agent_nickname.clone();
|
||||
.and_then(|snapshot| snapshot.session_source.get_nickname())
|
||||
.or(spawned_agent.metadata.agent_nickname);
|
||||
session
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabAgentSpawnEndEvent {
|
||||
call_id,
|
||||
completed_at_ms: now_unix_timestamp_ms(),
|
||||
sender_thread_id: session.thread_id,
|
||||
new_thread_id,
|
||||
new_agent_nickname,
|
||||
new_agent_role,
|
||||
prompt,
|
||||
model: effective_model,
|
||||
reasoning_effort: effective_reasoning_effort,
|
||||
status,
|
||||
SubAgentActivityEvent {
|
||||
event_id: call_id,
|
||||
occurred_at_ms: now_unix_timestamp_ms(),
|
||||
agent_thread_id: new_thread_id,
|
||||
agent_path: new_agent_path.clone(),
|
||||
kind: SubAgentActivityKind::Started,
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
let _ = result?;
|
||||
let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME);
|
||||
turn.session_telemetry.counter(
|
||||
"codex.multi_agent.spawn",
|
||||
/*inc*/ 1,
|
||||
&[("role", role_tag)],
|
||||
);
|
||||
let task_name = new_agent_path.ok_or_else(|| {
|
||||
FunctionCallError::RespondToModel(
|
||||
"spawned agent is missing a canonical task name".to_string(),
|
||||
)
|
||||
})?;
|
||||
let task_name = String::from(new_agent_path);
|
||||
|
||||
let hide_agent_metadata = turn.config.multi_agent_v2.hide_spawn_agent_metadata;
|
||||
if hide_agent_metadata {
|
||||
|
||||
@@ -382,6 +382,7 @@ async fn run_codex_tool_session_inner(
|
||||
| EventMsg::CollabCloseEnd(_)
|
||||
| EventMsg::CollabResumeBegin(_)
|
||||
| EventMsg::CollabResumeEnd(_)
|
||||
| EventMsg::SubAgentActivity(_)
|
||||
| EventMsg::RealtimeConversationStarted(_)
|
||||
| EventMsg::RealtimeConversationSdp(_)
|
||||
| EventMsg::RealtimeConversationRealtime(_)
|
||||
|
||||
@@ -1349,6 +1349,9 @@ pub enum EventMsg {
|
||||
CollabResumeBegin(CollabResumeBeginEvent),
|
||||
/// Collab interaction: resume end.
|
||||
CollabResumeEnd(CollabResumeEndEvent),
|
||||
|
||||
/// Path-based v2 sub-agent activity.
|
||||
SubAgentActivity(SubAgentActivityEvent),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS, EnumIter)]
|
||||
@@ -1568,6 +1571,12 @@ impl From<CollabResumeEndEvent> for EventMsg {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SubAgentActivityEvent> for EventMsg {
|
||||
fn from(event: SubAgentActivityEvent) -> Self {
|
||||
EventMsg::SubAgentActivity(event)
|
||||
}
|
||||
}
|
||||
|
||||
/// Agent lifecycle status, derived from emitted events.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
@@ -3890,6 +3899,27 @@ pub struct CollabAgentInteractionEndEvent {
|
||||
pub status: AgentStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[ts(rename_all = "snake_case")]
|
||||
pub enum SubAgentActivityKind {
|
||||
Started,
|
||||
Interacted,
|
||||
Interrupted,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
|
||||
pub struct SubAgentActivityEvent {
|
||||
pub event_id: String,
|
||||
#[serde(default)]
|
||||
pub occurred_at_ms: i64,
|
||||
/// Thread ID of the affected sub-agent.
|
||||
pub agent_thread_id: ThreadId,
|
||||
/// Canonical v2 path of the affected sub-agent.
|
||||
pub agent_path: AgentPath,
|
||||
pub kind: SubAgentActivityKind,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
|
||||
pub struct CollabWaitingBeginEvent {
|
||||
#[serde(default)]
|
||||
|
||||
@@ -21,6 +21,7 @@ use codex_protocol::protocol::McpToolCallEndEvent;
|
||||
use codex_protocol::protocol::PatchApplyBeginEvent;
|
||||
use codex_protocol::protocol::PatchApplyEndEvent;
|
||||
use codex_protocol::protocol::PatchApplyStatus;
|
||||
use codex_protocol::protocol::SubAgentActivityEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use serde::Serialize;
|
||||
|
||||
@@ -110,6 +111,7 @@ pub(crate) enum ToolRuntimePayload<'a> {
|
||||
CollabWaitingEnd(&'a codex_protocol::protocol::CollabWaitingEndEvent),
|
||||
CollabCloseBegin(&'a codex_protocol::protocol::CollabCloseBeginEvent),
|
||||
CollabCloseEnd(&'a codex_protocol::protocol::CollabCloseEndEvent),
|
||||
SubAgentActivity(&'a SubAgentActivityEvent),
|
||||
}
|
||||
|
||||
impl Serialize for ToolRuntimePayload<'_> {
|
||||
@@ -132,6 +134,7 @@ impl Serialize for ToolRuntimePayload<'_> {
|
||||
ToolRuntimePayload::CollabWaitingEnd(event) => event.serialize(serializer),
|
||||
ToolRuntimePayload::CollabCloseBegin(event) => event.serialize(serializer),
|
||||
ToolRuntimePayload::CollabCloseEnd(event) => event.serialize(serializer),
|
||||
ToolRuntimePayload::SubAgentActivity(event) => event.serialize(serializer),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -215,6 +218,11 @@ pub(crate) fn tool_runtime_trace_event(event: &EventMsg) -> Option<ToolRuntimeTr
|
||||
status: ExecutionStatus::Completed,
|
||||
payload: ToolRuntimePayload::CollabCloseEnd(event),
|
||||
}),
|
||||
EventMsg::SubAgentActivity(event) => Some(ToolRuntimeTraceEvent::Ended {
|
||||
tool_call_id: &event.event_id,
|
||||
status: ExecutionStatus::Completed,
|
||||
payload: ToolRuntimePayload::SubAgentActivity(event),
|
||||
}),
|
||||
EventMsg::Error(_)
|
||||
| EventMsg::Warning(_)
|
||||
| EventMsg::GuardianWarning(_)
|
||||
@@ -357,7 +365,8 @@ pub(crate) fn wrapped_protocol_event_type(event: &EventMsg) -> Option<&'static s
|
||||
| EventMsg::CollabCloseBegin(_)
|
||||
| EventMsg::CollabCloseEnd(_)
|
||||
| EventMsg::CollabResumeBegin(_)
|
||||
| EventMsg::CollabResumeEnd(_) => None,
|
||||
| EventMsg::CollabResumeEnd(_)
|
||||
| EventMsg::SubAgentActivity(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -393,3 +402,7 @@ fn execution_status_for_abort_reason(reason: &TurnAbortReason) -> ExecutionStatu
|
||||
| TurnAbortReason::BudgetLimited => ExecutionStatus::Cancelled,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "protocol_event_tests.rs"]
|
||||
mod tests;
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
use codex_protocol::AgentPath;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::SubAgentActivityEvent;
|
||||
use codex_protocol::protocol::SubAgentActivityKind;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
|
||||
use super::ToolRuntimeTraceEvent;
|
||||
use super::tool_runtime_trace_event;
|
||||
use crate::ExecutionStatus;
|
||||
|
||||
#[test]
|
||||
fn sub_agent_activity_is_a_terminal_tool_runtime_event() -> anyhow::Result<()> {
|
||||
let agent_thread_id = ThreadId::new();
|
||||
let event = EventMsg::SubAgentActivity(SubAgentActivityEvent {
|
||||
event_id: "call-spawn".to_string(),
|
||||
occurred_at_ms: 1234,
|
||||
agent_thread_id,
|
||||
agent_path: AgentPath::try_from("/root/reviewer").map_err(anyhow::Error::msg)?,
|
||||
kind: SubAgentActivityKind::Started,
|
||||
});
|
||||
|
||||
let Some(ToolRuntimeTraceEvent::Ended {
|
||||
tool_call_id,
|
||||
status,
|
||||
payload,
|
||||
}) = tool_runtime_trace_event(&event)
|
||||
else {
|
||||
panic!("expected terminal tool runtime event");
|
||||
};
|
||||
|
||||
assert_eq!(tool_call_id, "call-spawn");
|
||||
assert_eq!(status, ExecutionStatus::Completed);
|
||||
assert_eq!(
|
||||
serde_json::to_value(payload)?,
|
||||
json!({
|
||||
"event_id": "call-spawn",
|
||||
"occurred_at_ms": 1234,
|
||||
"agent_thread_id": agent_thread_id,
|
||||
"agent_path": "/root/reviewer",
|
||||
"kind": "started"
|
||||
})
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -7,6 +7,10 @@ use codex_protocol::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_protocol::protocol::CollabCloseBeginEvent;
|
||||
use codex_protocol::protocol::CollabCloseEndEvent;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
use codex_protocol::protocol::SubAgentActivityEvent;
|
||||
use codex_protocol::protocol::SubAgentActivityKind;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
|
||||
use super::super::TraceReducer;
|
||||
use crate::model::ConversationItem;
|
||||
@@ -52,6 +56,11 @@ pub(in crate::reducer) struct ObservedAgentResultEdge {
|
||||
pub(in crate::reducer) carried_payload: Option<RawPayloadRef>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct AgentMessageInvocationArgs {
|
||||
message: String,
|
||||
}
|
||||
|
||||
/// Builds the stable edge id for the spawn relationship between two threads.
|
||||
pub(in crate::reducer) fn spawn_edge_id(parent_thread_id: &str, child_thread_id: &str) -> String {
|
||||
format!("edge:spawn:{parent_thread_id}:{child_thread_id}")
|
||||
@@ -123,15 +132,20 @@ impl TraceReducer {
|
||||
runtime_payload: &RawPayloadRef,
|
||||
) -> Result<()> {
|
||||
let kind = self.rollout.tool_calls[tool_call_id].kind.clone();
|
||||
let runtime_payload_json = self.read_payload_json(runtime_payload)?;
|
||||
if runtime_payload_json.get("agent_thread_id").is_some() {
|
||||
let payload: SubAgentActivityEvent = serde_json::from_value(runtime_payload_json)?;
|
||||
return self.end_sub_agent_activity(wall_time_unix_ms, tool_call_id, &kind, &payload);
|
||||
}
|
||||
match kind {
|
||||
ToolCallKind::SpawnAgent => {
|
||||
let payload: CollabAgentSpawnEndEvent =
|
||||
serde_json::from_value(self.read_payload_json(runtime_payload)?)?;
|
||||
serde_json::from_value(runtime_payload_json)?;
|
||||
self.end_spawn_agent_interaction(wall_time_unix_ms, tool_call_id, &payload)
|
||||
}
|
||||
ToolCallKind::AssignAgentTask => {
|
||||
let payload: CollabAgentInteractionEndEvent =
|
||||
serde_json::from_value(self.read_payload_json(runtime_payload)?)?;
|
||||
serde_json::from_value(runtime_payload_json)?;
|
||||
self.end_message_agent_interaction(
|
||||
wall_time_unix_ms,
|
||||
tool_call_id,
|
||||
@@ -141,7 +155,7 @@ impl TraceReducer {
|
||||
}
|
||||
ToolCallKind::SendMessage => {
|
||||
let payload: CollabAgentInteractionEndEvent =
|
||||
serde_json::from_value(self.read_payload_json(runtime_payload)?)?;
|
||||
serde_json::from_value(runtime_payload_json)?;
|
||||
self.end_message_agent_interaction(
|
||||
wall_time_unix_ms,
|
||||
tool_call_id,
|
||||
@@ -150,8 +164,7 @@ impl TraceReducer {
|
||||
)
|
||||
}
|
||||
ToolCallKind::CloseAgent => {
|
||||
let payload: CollabCloseEndEvent =
|
||||
serde_json::from_value(self.read_payload_json(runtime_payload)?)?;
|
||||
let payload: CollabCloseEndEvent = serde_json::from_value(runtime_payload_json)?;
|
||||
self.upsert_close_agent_interaction(
|
||||
tool_call_id,
|
||||
payload.receiver_thread_id.to_string(),
|
||||
@@ -169,6 +182,127 @@ impl TraceReducer {
|
||||
}
|
||||
}
|
||||
|
||||
fn end_sub_agent_activity(
|
||||
&mut self,
|
||||
wall_time_unix_ms: i64,
|
||||
tool_call_id: &str,
|
||||
tool_kind: &ToolCallKind,
|
||||
payload: &SubAgentActivityEvent,
|
||||
) -> Result<()> {
|
||||
let target_thread_id = payload.agent_thread_id.to_string();
|
||||
match (tool_kind, &payload.kind) {
|
||||
(ToolCallKind::SpawnAgent, SubAgentActivityKind::Started) => {
|
||||
let parent_thread_id = self
|
||||
.rollout
|
||||
.tool_calls
|
||||
.get(tool_call_id)
|
||||
.with_context(|| {
|
||||
format!("agent activity referenced unknown tool call {tool_call_id}")
|
||||
})?
|
||||
.thread_id
|
||||
.clone();
|
||||
self.queue_sub_agent_activity_message_edge(
|
||||
wall_time_unix_ms,
|
||||
tool_call_id,
|
||||
spawn_edge_id(&parent_thread_id, &target_thread_id),
|
||||
InteractionEdgeKind::SpawnAgent,
|
||||
target_thread_id.clone(),
|
||||
Some(target_thread_id),
|
||||
)
|
||||
}
|
||||
(ToolCallKind::AssignAgentTask, SubAgentActivityKind::Interacted) => self
|
||||
.queue_sub_agent_activity_message_edge(
|
||||
wall_time_unix_ms,
|
||||
tool_call_id,
|
||||
tool_edge_id(tool_call_id),
|
||||
InteractionEdgeKind::AssignAgentTask,
|
||||
target_thread_id,
|
||||
/*unresolved_spawn_thread_id*/ None,
|
||||
),
|
||||
(ToolCallKind::SendMessage, SubAgentActivityKind::Interacted) => self
|
||||
.queue_sub_agent_activity_message_edge(
|
||||
wall_time_unix_ms,
|
||||
tool_call_id,
|
||||
tool_edge_id(tool_call_id),
|
||||
InteractionEdgeKind::SendMessage,
|
||||
target_thread_id,
|
||||
/*unresolved_spawn_thread_id*/ None,
|
||||
),
|
||||
(ToolCallKind::CloseAgent, SubAgentActivityKind::Interrupted) => self
|
||||
.upsert_close_agent_interaction(
|
||||
tool_call_id,
|
||||
target_thread_id,
|
||||
Some(wall_time_unix_ms),
|
||||
),
|
||||
_ => bail!(
|
||||
"sub-agent activity {:?} does not match tool call kind {tool_kind:?}",
|
||||
payload.kind
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
fn queue_sub_agent_activity_message_edge(
|
||||
&mut self,
|
||||
wall_time_unix_ms: i64,
|
||||
tool_call_id: &str,
|
||||
edge_id: String,
|
||||
edge_kind: InteractionEdgeKind,
|
||||
target_thread_id: String,
|
||||
unresolved_spawn_thread_id: Option<String>,
|
||||
) -> Result<()> {
|
||||
let tool_call = self.rollout.tool_calls.get(tool_call_id).with_context(|| {
|
||||
format!("agent activity referenced unknown tool call {tool_call_id}")
|
||||
})?;
|
||||
let started_at_unix_ms = tool_call.execution.started_at_unix_ms;
|
||||
let message_content = self.agent_message_content_from_invocation(tool_call_id)?;
|
||||
let carried_raw_payload_ids = self.agent_tool_payload_ids(tool_call_id)?;
|
||||
self.queue_or_resolve_agent_interaction_edge(PendingAgentInteractionEdge {
|
||||
edge_id,
|
||||
kind: edge_kind,
|
||||
source: TraceAnchor::ToolCall {
|
||||
tool_call_id: tool_call_id.to_string(),
|
||||
},
|
||||
target_thread_id,
|
||||
message_content,
|
||||
unresolved_spawn_thread_id,
|
||||
started_at_unix_ms,
|
||||
ended_at_unix_ms: Some(wall_time_unix_ms),
|
||||
carried_raw_payload_ids,
|
||||
})
|
||||
}
|
||||
|
||||
fn agent_message_content_from_invocation(&self, tool_call_id: &str) -> Result<String> {
|
||||
let tool_call = self.rollout.tool_calls.get(tool_call_id).with_context(|| {
|
||||
format!("agent activity referenced unknown tool call {tool_call_id}")
|
||||
})?;
|
||||
let invocation_payload_id = tool_call
|
||||
.raw_invocation_payload_id
|
||||
.as_deref()
|
||||
.with_context(|| {
|
||||
format!("agent activity tool call {tool_call_id} missing invocation payload")
|
||||
})?;
|
||||
let invocation_payload = self
|
||||
.rollout
|
||||
.raw_payloads
|
||||
.get(invocation_payload_id)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"agent activity tool call {tool_call_id} referenced missing invocation payload {invocation_payload_id}"
|
||||
)
|
||||
})?;
|
||||
let invocation = self.read_payload_json(invocation_payload)?;
|
||||
let arguments = invocation
|
||||
.get("payload")
|
||||
.and_then(|payload| payload.get("arguments"))
|
||||
.and_then(Value::as_str)
|
||||
.with_context(|| {
|
||||
format!("agent activity tool call {tool_call_id} missing function arguments")
|
||||
})?;
|
||||
let args: AgentMessageInvocationArgs = serde_json::from_str(arguments)
|
||||
.with_context(|| format!("parse agent activity tool call {tool_call_id} arguments"))?;
|
||||
Ok(args.message)
|
||||
}
|
||||
|
||||
/// Adds the canonical tool result payload to an already reduced multi-agent edge.
|
||||
pub(super) fn attach_agent_interaction_tool_result(
|
||||
&mut self,
|
||||
|
||||
@@ -194,6 +194,92 @@ fn spawn_runtime_payload_falls_back_to_child_thread_without_delivery_item() -> a
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sub_agent_started_activity_creates_spawn_edge() -> anyhow::Result<()> {
|
||||
let temp = TempDir::new()?;
|
||||
let writer = create_started_agent_writer(&temp)?;
|
||||
start_agent_turn(&writer, "turn-1")?;
|
||||
let child_thread_id = "019d0000-0000-7000-8000-000000000002";
|
||||
let invocation_payload = writer.write_json_payload(
|
||||
RawPayloadKind::ToolInvocation,
|
||||
&json!({
|
||||
"tool_name": "spawn_agent",
|
||||
"payload": {
|
||||
"type": "function",
|
||||
"arguments": "{\"message\":\"review this\",\"task_name\":\"reviewer\"}"
|
||||
}
|
||||
}),
|
||||
)?;
|
||||
writer.append_with_context(
|
||||
trace_context_for_agent("turn-1"),
|
||||
RawTraceEventPayload::ToolCallStarted {
|
||||
tool_call_id: "call-spawn-v2".to_string(),
|
||||
model_visible_call_id: Some("call-spawn-v2".to_string()),
|
||||
code_mode_runtime_tool_id: None,
|
||||
requester: RawToolCallRequester::Model,
|
||||
kind: ToolCallKind::SpawnAgent,
|
||||
summary: ToolCallSummary::Generic {
|
||||
label: "spawn_agent".to_string(),
|
||||
input_preview: None,
|
||||
output_preview: None,
|
||||
},
|
||||
invocation_payload: Some(invocation_payload.clone()),
|
||||
},
|
||||
)?;
|
||||
let activity_payload = writer.write_json_payload(
|
||||
RawPayloadKind::ToolRuntimeEvent,
|
||||
&json!({
|
||||
"event_id": "call-spawn-v2",
|
||||
"occurred_at_ms": 1234,
|
||||
"agent_thread_id": child_thread_id,
|
||||
"agent_path": "/root/reviewer",
|
||||
"kind": "started"
|
||||
}),
|
||||
)?;
|
||||
writer.append_with_context(
|
||||
trace_context_for_agent("turn-1"),
|
||||
RawTraceEventPayload::ToolCallRuntimeEnded {
|
||||
tool_call_id: "call-spawn-v2".to_string(),
|
||||
status: ExecutionStatus::Completed,
|
||||
runtime_payload: activity_payload.clone(),
|
||||
},
|
||||
)?;
|
||||
start_thread(&writer, child_thread_id, "/root/reviewer")?;
|
||||
start_turn_for_thread(&writer, child_thread_id, "turn-child-1")?;
|
||||
let delivered = inter_agent_message(
|
||||
"/root",
|
||||
"/root/reviewer",
|
||||
"review this",
|
||||
/*trigger_turn*/ true,
|
||||
);
|
||||
append_inference_request(
|
||||
&writer,
|
||||
child_thread_id,
|
||||
"turn-child-1",
|
||||
"inference-child-1",
|
||||
vec![message("assistant", &delivered)],
|
||||
)?;
|
||||
|
||||
let replayed = replay_bundle(temp.path())?;
|
||||
let edge_id = format!("edge:spawn:019d0000-0000-7000-8000-000000000001:{child_thread_id}");
|
||||
let edge = &replayed.interaction_edges[&edge_id];
|
||||
assert_eq!(edge.kind, InteractionEdgeKind::SpawnAgent);
|
||||
let target_item_id = target_conversation_item_id(&edge.target);
|
||||
assert_eq!(edge.carried_item_ids, vec![target_item_id.clone()]);
|
||||
assert_eq!(
|
||||
replayed.conversation_items[target_item_id].thread_id,
|
||||
child_thread_id
|
||||
);
|
||||
assert_eq!(
|
||||
edge.carried_raw_payload_ids,
|
||||
vec![
|
||||
invocation_payload.raw_payload_id,
|
||||
activity_payload.raw_payload_id,
|
||||
]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send_message_runtime_payload_targets_delivered_child_message() -> anyhow::Result<()> {
|
||||
let temp = TempDir::new()?;
|
||||
@@ -300,6 +386,178 @@ fn send_message_runtime_payload_targets_delivered_child_message() -> anyhow::Res
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send_message_activity_targets_delivered_child_message() -> anyhow::Result<()> {
|
||||
let temp = TempDir::new()?;
|
||||
let writer = create_started_agent_writer(&temp)?;
|
||||
start_agent_turn(&writer, "turn-1")?;
|
||||
let child_thread_id = "019d0000-0000-7000-8000-000000000002";
|
||||
let invocation_payload = writer.write_json_payload(
|
||||
RawPayloadKind::ToolInvocation,
|
||||
&json!({
|
||||
"tool_name": "send_message",
|
||||
"payload": {
|
||||
"type": "function",
|
||||
"arguments": "{\"target\":\"/root/child\",\"message\":\"hello again\"}"
|
||||
}
|
||||
}),
|
||||
)?;
|
||||
writer.append_with_context(
|
||||
trace_context_for_agent("turn-1"),
|
||||
RawTraceEventPayload::ToolCallStarted {
|
||||
tool_call_id: "call-send-v2".to_string(),
|
||||
model_visible_call_id: Some("call-send-v2".to_string()),
|
||||
code_mode_runtime_tool_id: None,
|
||||
requester: RawToolCallRequester::Model,
|
||||
kind: ToolCallKind::SendMessage,
|
||||
summary: ToolCallSummary::Generic {
|
||||
label: "send_message".to_string(),
|
||||
input_preview: None,
|
||||
output_preview: None,
|
||||
},
|
||||
invocation_payload: Some(invocation_payload.clone()),
|
||||
},
|
||||
)?;
|
||||
let activity_payload = writer.write_json_payload(
|
||||
RawPayloadKind::ToolRuntimeEvent,
|
||||
&json!({
|
||||
"event_id": "call-send-v2",
|
||||
"occurred_at_ms": 1234,
|
||||
"agent_thread_id": child_thread_id,
|
||||
"agent_path": "/root/child",
|
||||
"kind": "interacted"
|
||||
}),
|
||||
)?;
|
||||
writer.append_with_context(
|
||||
trace_context_for_agent("turn-1"),
|
||||
RawTraceEventPayload::ToolCallRuntimeEnded {
|
||||
tool_call_id: "call-send-v2".to_string(),
|
||||
status: ExecutionStatus::Completed,
|
||||
runtime_payload: activity_payload.clone(),
|
||||
},
|
||||
)?;
|
||||
start_thread(&writer, child_thread_id, "/root/child")?;
|
||||
start_turn_for_thread(&writer, child_thread_id, "turn-child-1")?;
|
||||
let delivered = inter_agent_message(
|
||||
"/root",
|
||||
"/root/child",
|
||||
"hello again",
|
||||
/*trigger_turn*/ false,
|
||||
);
|
||||
append_inference_request(
|
||||
&writer,
|
||||
child_thread_id,
|
||||
"turn-child-1",
|
||||
"inference-child-1",
|
||||
vec![message("assistant", &delivered)],
|
||||
)?;
|
||||
|
||||
let replayed = replay_bundle(temp.path())?;
|
||||
let edge = &replayed.interaction_edges["edge:tool:call-send-v2"];
|
||||
assert_eq!(edge.kind, InteractionEdgeKind::SendMessage);
|
||||
let target_item_id = target_conversation_item_id(&edge.target);
|
||||
assert_eq!(edge.carried_item_ids, vec![target_item_id.clone()]);
|
||||
assert_eq!(
|
||||
replayed.conversation_items[target_item_id].thread_id,
|
||||
child_thread_id
|
||||
);
|
||||
assert_eq!(
|
||||
edge.carried_raw_payload_ids,
|
||||
vec![
|
||||
invocation_payload.raw_payload_id,
|
||||
activity_payload.raw_payload_id,
|
||||
]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn followup_activity_targets_delivered_child_message() -> anyhow::Result<()> {
|
||||
let temp = TempDir::new()?;
|
||||
let writer = create_started_agent_writer(&temp)?;
|
||||
start_agent_turn(&writer, "turn-1")?;
|
||||
let child_thread_id = "019d0000-0000-7000-8000-000000000002";
|
||||
let invocation_payload = writer.write_json_payload(
|
||||
RawPayloadKind::ToolInvocation,
|
||||
&json!({
|
||||
"tool_name": "followup_task",
|
||||
"payload": {
|
||||
"type": "function",
|
||||
"arguments": "{\"target\":\"/root/child\",\"message\":\"continue\"}"
|
||||
}
|
||||
}),
|
||||
)?;
|
||||
writer.append_with_context(
|
||||
trace_context_for_agent("turn-1"),
|
||||
RawTraceEventPayload::ToolCallStarted {
|
||||
tool_call_id: "call-followup-v2".to_string(),
|
||||
model_visible_call_id: Some("call-followup-v2".to_string()),
|
||||
code_mode_runtime_tool_id: None,
|
||||
requester: RawToolCallRequester::Model,
|
||||
kind: ToolCallKind::AssignAgentTask,
|
||||
summary: ToolCallSummary::Generic {
|
||||
label: "followup_task".to_string(),
|
||||
input_preview: None,
|
||||
output_preview: None,
|
||||
},
|
||||
invocation_payload: Some(invocation_payload.clone()),
|
||||
},
|
||||
)?;
|
||||
let activity_payload = writer.write_json_payload(
|
||||
RawPayloadKind::ToolRuntimeEvent,
|
||||
&json!({
|
||||
"event_id": "call-followup-v2",
|
||||
"occurred_at_ms": 1234,
|
||||
"agent_thread_id": child_thread_id,
|
||||
"agent_path": "/root/child",
|
||||
"kind": "interacted"
|
||||
}),
|
||||
)?;
|
||||
writer.append_with_context(
|
||||
trace_context_for_agent("turn-1"),
|
||||
RawTraceEventPayload::ToolCallRuntimeEnded {
|
||||
tool_call_id: "call-followup-v2".to_string(),
|
||||
status: ExecutionStatus::Completed,
|
||||
runtime_payload: activity_payload.clone(),
|
||||
},
|
||||
)?;
|
||||
start_thread(&writer, child_thread_id, "/root/child")?;
|
||||
start_turn_for_thread(&writer, child_thread_id, "turn-child-1")?;
|
||||
let delivered = inter_agent_message(
|
||||
"/root",
|
||||
"/root/child",
|
||||
"continue",
|
||||
/*trigger_turn*/ true,
|
||||
);
|
||||
append_inference_request(
|
||||
&writer,
|
||||
child_thread_id,
|
||||
"turn-child-1",
|
||||
"inference-child-1",
|
||||
vec![message("assistant", &delivered)],
|
||||
)?;
|
||||
|
||||
let replayed = replay_bundle(temp.path())?;
|
||||
let edge = &replayed.interaction_edges["edge:tool:call-followup-v2"];
|
||||
assert_eq!(edge.kind, InteractionEdgeKind::AssignAgentTask);
|
||||
let target_item_id = target_conversation_item_id(&edge.target);
|
||||
assert_eq!(edge.carried_item_ids, vec![target_item_id.clone()]);
|
||||
assert_eq!(
|
||||
replayed.conversation_items[target_item_id].thread_id,
|
||||
child_thread_id
|
||||
);
|
||||
assert_eq!(
|
||||
edge.carried_raw_payload_ids,
|
||||
vec![
|
||||
invocation_payload.raw_payload_id,
|
||||
activity_payload.raw_payload_id,
|
||||
]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn close_agent_runtime_payload_targets_thread() -> anyhow::Result<()> {
|
||||
let temp = TempDir::new()?;
|
||||
|
||||
@@ -269,7 +269,7 @@ fn dispatched_tool_kind(tool_name: &str, _payload: &ToolDispatchPayload) -> Tool
|
||||
"send_message" => ToolCallKind::SendMessage,
|
||||
"followup_task" | "assign_task" => ToolCallKind::AssignAgentTask,
|
||||
"wait_agent" => ToolCallKind::WaitAgent,
|
||||
"close_agent" => ToolCallKind::CloseAgent,
|
||||
"close_agent" | "interrupt_agent" => ToolCallKind::CloseAgent,
|
||||
other => ToolCallKind::Other {
|
||||
name: other.to_string(),
|
||||
},
|
||||
@@ -425,6 +425,19 @@ mod tests {
|
||||
)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classifies_interrupt_agent_as_close_agent() {
|
||||
assert_eq!(
|
||||
dispatched_tool_kind(
|
||||
"interrupt_agent",
|
||||
&ToolDispatchPayload::Function {
|
||||
arguments: r#"{"target":"/root/child"}"#.to_string(),
|
||||
},
|
||||
),
|
||||
ToolCallKind::CloseAgent
|
||||
);
|
||||
}
|
||||
|
||||
fn invocation(
|
||||
tool_name: &str,
|
||||
tool_namespace: Option<String>,
|
||||
|
||||
@@ -91,7 +91,8 @@ pub fn should_persist_event_msg(ev: &EventMsg) -> bool {
|
||||
| EventMsg::TurnStarted(_)
|
||||
| EventMsg::TurnComplete(_)
|
||||
| EventMsg::WebSearchEnd(_)
|
||||
| EventMsg::ImageGenerationEnd(_) => true,
|
||||
| EventMsg::ImageGenerationEnd(_)
|
||||
| EventMsg::SubAgentActivity(_) => true,
|
||||
EventMsg::ItemCompleted(event) => {
|
||||
// Plan items are derived from streaming tags and are not part of the
|
||||
// raw ResponseItem history, so we persist their completion to replay
|
||||
|
||||
@@ -327,6 +327,7 @@ async fn run_turn(thread: &CodexThread, thread_id: &str, prompt: String) -> anyh
|
||||
| EventMsg::CollabCloseEnd(_)
|
||||
| EventMsg::CollabResumeBegin(_)
|
||||
| EventMsg::CollabResumeEnd(_)
|
||||
| EventMsg::SubAgentActivity(_)
|
||||
| EventMsg::AgentMessageContentDelta(_)
|
||||
| EventMsg::PlanDelta(_)
|
||||
| EventMsg::ReasoningContentDelta(_)
|
||||
|
||||
@@ -62,6 +62,7 @@ use crate::multi_agents::agent_picker_status_dot_spans;
|
||||
use crate::multi_agents::format_agent_picker_item_name;
|
||||
use crate::multi_agents::next_agent_shortcut_matches;
|
||||
use crate::multi_agents::previous_agent_shortcut_matches;
|
||||
use crate::multi_agents::sub_agent_activity_display;
|
||||
use crate::pager_overlay::Overlay;
|
||||
use crate::render::highlight::highlight_bash_to_lines;
|
||||
use crate::render::renderable::Renderable;
|
||||
@@ -198,6 +199,7 @@ use toml::Value as TomlValue;
|
||||
use uuid::Uuid;
|
||||
mod agent_message_consolidation;
|
||||
mod agent_navigation;
|
||||
mod agent_status_feed;
|
||||
mod app_server_event_targets;
|
||||
mod app_server_events;
|
||||
pub(crate) mod app_server_requests;
|
||||
@@ -267,6 +269,20 @@ fn collab_receiver_thread_ids(notification: &ServerNotification) -> Option<&[Str
|
||||
}
|
||||
}
|
||||
|
||||
fn sub_agent_activity_item(notification: &ServerNotification) -> Option<&ThreadItem> {
|
||||
match notification {
|
||||
ServerNotification::ItemStarted(notification) => match ¬ification.item {
|
||||
ThreadItem::SubAgentActivity { .. } => Some(¬ification.item),
|
||||
_ => None,
|
||||
},
|
||||
ServerNotification::ItemCompleted(notification) => match ¬ification.item {
|
||||
ThreadItem::SubAgentActivity { .. } => Some(¬ification.item),
|
||||
_ => None,
|
||||
},
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn collab_receiver_is_not_found(
|
||||
notification: &ServerNotification,
|
||||
receiver_thread_id: &str,
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
//! updated or marked closed.
|
||||
|
||||
use crate::multi_agents::AgentPickerThreadEntry;
|
||||
use crate::multi_agents::SubAgentActivityDisplay;
|
||||
use crate::multi_agents::format_agent_picker_item_name;
|
||||
use crate::multi_agents::next_agent_shortcut;
|
||||
use crate::multi_agents::previous_agent_shortcut;
|
||||
@@ -86,16 +87,56 @@ impl AgentNavigationState {
|
||||
if !self.threads.contains_key(&thread_id) {
|
||||
self.order.push(thread_id);
|
||||
}
|
||||
let (previous_agent_path, previous_is_running) = self
|
||||
.threads
|
||||
.get(&thread_id)
|
||||
.map(|entry| (entry.agent_path.clone(), entry.is_running))
|
||||
.unwrap_or((None, false));
|
||||
self.threads.insert(
|
||||
thread_id,
|
||||
AgentPickerThreadEntry {
|
||||
agent_nickname,
|
||||
agent_role,
|
||||
agent_path: previous_agent_path,
|
||||
is_running: previous_is_running && !is_closed,
|
||||
is_closed,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn record_sub_agent_activity(&mut self, activity: SubAgentActivityDisplay) {
|
||||
if !self.threads.contains_key(&activity.thread_id) {
|
||||
self.order.push(activity.thread_id);
|
||||
}
|
||||
let entry =
|
||||
self.threads
|
||||
.entry(activity.thread_id)
|
||||
.or_insert_with(|| AgentPickerThreadEntry {
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
agent_path: None,
|
||||
is_running: false,
|
||||
is_closed: false,
|
||||
});
|
||||
entry.agent_path = Some(activity.agent_path);
|
||||
entry.is_running = activity.is_running_hint;
|
||||
entry.is_closed = false;
|
||||
}
|
||||
|
||||
pub(crate) fn set_running(&mut self, thread_id: ThreadId, is_running: bool) {
|
||||
if let Some(entry) = self.threads.get_mut(&thread_id) {
|
||||
entry.is_running = is_running;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_agent_path(&mut self, thread_id: ThreadId, agent_path: Option<String>) {
|
||||
if let Some(agent_path) = agent_path
|
||||
&& let Some(entry) = self.threads.get_mut(&thread_id)
|
||||
{
|
||||
entry.agent_path = Some(agent_path);
|
||||
}
|
||||
}
|
||||
|
||||
/// Marks a thread as closed without removing it from the traversal cache.
|
||||
///
|
||||
/// Closed threads stay in the picker and in spawn order so users can still review them and so
|
||||
@@ -105,6 +146,7 @@ impl AgentNavigationState {
|
||||
pub(crate) fn mark_closed(&mut self, thread_id: ThreadId) {
|
||||
if let Some(entry) = self.threads.get_mut(&thread_id) {
|
||||
entry.is_closed = true;
|
||||
entry.is_running = false;
|
||||
} else {
|
||||
self.upsert(
|
||||
thread_id, /*agent_nickname*/ None, /*agent_role*/ None,
|
||||
@@ -155,6 +197,22 @@ impl AgentNavigationState {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn ordered_path_backed_subagent_threads(
|
||||
&self,
|
||||
primary_thread_id: Option<ThreadId>,
|
||||
) -> Vec<(ThreadId, &AgentPickerThreadEntry)> {
|
||||
self.ordered_threads()
|
||||
.into_iter()
|
||||
.filter(|(thread_id, entry)| {
|
||||
Some(*thread_id) != primary_thread_id
|
||||
&& entry
|
||||
.agent_path
|
||||
.as_deref()
|
||||
.is_some_and(|agent_path| !agent_path.trim().is_empty())
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns tracked thread ids in the same stable order used by the picker.
|
||||
pub(crate) fn tracked_thread_ids(&self) -> Vec<ThreadId> {
|
||||
self.ordered_threads()
|
||||
@@ -217,6 +275,14 @@ impl AgentNavigationState {
|
||||
self.threads
|
||||
.get(&thread_id)
|
||||
.map(|entry| {
|
||||
if !is_primary
|
||||
&& let Some(agent_path) = entry
|
||||
.agent_path
|
||||
.as_deref()
|
||||
.filter(|agent_path| !agent_path.trim().is_empty())
|
||||
{
|
||||
return format!("`{agent_path}`");
|
||||
}
|
||||
format_agent_picker_item_name(
|
||||
entry.agent_nickname.as_deref(),
|
||||
entry.agent_role.as_deref(),
|
||||
|
||||
@@ -0,0 +1,209 @@
|
||||
//! Bounded, best-effort previews for the v2 `/agent` status output.
|
||||
|
||||
use super::ThreadBufferedEvent;
|
||||
use super::ThreadEventStore;
|
||||
use crate::history_cell::HistoryCell;
|
||||
use crate::history_cell::plain_lines;
|
||||
use crate::text_formatting::truncate_text;
|
||||
use codex_app_server_protocol::CollabAgentTool;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::SubAgentActivityKind;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use ratatui::style::Stylize;
|
||||
use ratatui::text::Line;
|
||||
use std::collections::HashSet;
|
||||
|
||||
const AGENT_STATUS_PREVIEW_LINES: usize = 3;
|
||||
const AGENT_STATUS_PREVIEW_ITEMS: usize = 6;
|
||||
const AGENT_STATUS_PREVIEW_GRAPHEMES: usize = 240;
|
||||
const AGENT_STATUS_PREVIEW_INDENT: u16 = 4;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct AgentStatusHistoryCell {
|
||||
entries: Vec<AgentStatusThreadPreview>,
|
||||
}
|
||||
|
||||
impl AgentStatusHistoryCell {
|
||||
pub(super) fn new(entries: Vec<AgentStatusThreadPreview>) -> Self {
|
||||
Self { entries }
|
||||
}
|
||||
}
|
||||
|
||||
impl HistoryCell for AgentStatusHistoryCell {
|
||||
fn display_lines(&self, width: u16) -> Vec<Line<'static>> {
|
||||
let mut lines: Vec<Line<'static>> = vec![
|
||||
"/agent".magenta().into(),
|
||||
"Sub-agents running".bold().into(),
|
||||
"".into(),
|
||||
];
|
||||
|
||||
if self.entries.is_empty() {
|
||||
lines.push(" • No sub-agents running.".italic().into());
|
||||
return lines;
|
||||
}
|
||||
|
||||
for entry in &self.entries {
|
||||
lines.push(entry.title_line());
|
||||
let preview_width = width.saturating_sub(AGENT_STATUS_PREVIEW_INDENT).max(1);
|
||||
let preview_lines = entry.preview_lines(preview_width);
|
||||
if preview_lines.is_empty() {
|
||||
lines.push(vec![" ".into(), "No recent activity yet.".dim().italic()].into());
|
||||
} else {
|
||||
lines.extend(preview_lines.into_iter().map(indent_preview_line));
|
||||
}
|
||||
lines.push("".into());
|
||||
}
|
||||
let _ = lines.pop();
|
||||
lines
|
||||
}
|
||||
|
||||
fn raw_lines(&self) -> Vec<Line<'static>> {
|
||||
plain_lines(self.display_lines(u16::MAX))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct AgentStatusThreadPreview {
|
||||
agent_path: String,
|
||||
activity: Vec<String>,
|
||||
}
|
||||
|
||||
impl AgentStatusThreadPreview {
|
||||
pub(super) fn from_store(agent_path: String, store: &ThreadEventStore) -> Self {
|
||||
Self::from_events(agent_path, store.buffer.iter().rev())
|
||||
}
|
||||
|
||||
pub(super) fn empty(agent_path: String) -> Self {
|
||||
Self::from_events(agent_path, std::iter::empty())
|
||||
}
|
||||
|
||||
fn from_events<'a>(
|
||||
agent_path: String,
|
||||
events: impl Iterator<Item = &'a ThreadBufferedEvent>,
|
||||
) -> Self {
|
||||
let mut seen_item_ids = HashSet::new();
|
||||
let mut activity = Vec::new();
|
||||
for event in events {
|
||||
let item = match event {
|
||||
ThreadBufferedEvent::Notification(ServerNotification::ItemCompleted(event)) => {
|
||||
&event.item
|
||||
}
|
||||
ThreadBufferedEvent::Notification(ServerNotification::ItemStarted(event)) => {
|
||||
&event.item
|
||||
}
|
||||
ThreadBufferedEvent::Notification(_)
|
||||
| ThreadBufferedEvent::Request(_)
|
||||
| ThreadBufferedEvent::HistoryEntryResponse(_)
|
||||
| ThreadBufferedEvent::FeedbackSubmission(_) => continue,
|
||||
};
|
||||
if !seen_item_ids.insert(item.id().to_string()) {
|
||||
continue;
|
||||
}
|
||||
if let Some(summary) = activity_summary(item) {
|
||||
activity.push(summary);
|
||||
if activity.len() == AGENT_STATUS_PREVIEW_ITEMS {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
activity.reverse();
|
||||
Self {
|
||||
agent_path,
|
||||
activity,
|
||||
}
|
||||
}
|
||||
|
||||
fn title_line(&self) -> Line<'static> {
|
||||
vec![" • ".dim(), format!("`{}`", self.agent_path).cyan()].into()
|
||||
}
|
||||
|
||||
fn preview_lines(&self, width: u16) -> Vec<Line<'static>> {
|
||||
let mut lines = self
|
||||
.activity
|
||||
.iter()
|
||||
.flat_map(|activity| textwrap::wrap(activity, width as usize))
|
||||
.filter(|line| !line.trim().is_empty())
|
||||
.map(|line| line.into_owned().dim().into())
|
||||
.collect::<Vec<_>>();
|
||||
if lines.len() > AGENT_STATUS_PREVIEW_LINES {
|
||||
lines.drain(..lines.len() - AGENT_STATUS_PREVIEW_LINES);
|
||||
}
|
||||
lines
|
||||
}
|
||||
}
|
||||
|
||||
fn activity_summary(item: &ThreadItem) -> Option<String> {
|
||||
let summary = match item {
|
||||
ThreadItem::AgentMessage { text, .. } | ThreadItem::Plan { text, .. } => text,
|
||||
ThreadItem::Reasoning { summary, .. } => summary.last()?,
|
||||
ThreadItem::CommandExecution { command, .. } => {
|
||||
let command = truncate_text(
|
||||
command,
|
||||
AGENT_STATUS_PREVIEW_GRAPHEMES.saturating_sub("$ ".len()),
|
||||
);
|
||||
return bounded_summary(&format!("$ {command}"));
|
||||
}
|
||||
ThreadItem::FileChange { changes, .. } => {
|
||||
return bounded_summary(&format!("Updated {} file(s)", changes.len()));
|
||||
}
|
||||
ThreadItem::McpToolCall { server, tool, .. } => {
|
||||
return bounded_summary(&format!("MCP {server}/{tool}"));
|
||||
}
|
||||
ThreadItem::DynamicToolCall {
|
||||
namespace, tool, ..
|
||||
} => {
|
||||
let tool = namespace
|
||||
.as_ref()
|
||||
.map(|namespace| format!("{namespace}/{tool}"))
|
||||
.unwrap_or_else(|| tool.clone());
|
||||
return bounded_summary(&format!("Tool {tool}"));
|
||||
}
|
||||
ThreadItem::CollabAgentToolCall { tool, .. } => {
|
||||
let action = match tool {
|
||||
CollabAgentTool::SpawnAgent => "Spawned an agent",
|
||||
CollabAgentTool::SendInput => "Sent input to an agent",
|
||||
CollabAgentTool::ResumeAgent => "Resumed an agent",
|
||||
CollabAgentTool::Wait => "Waited for an agent",
|
||||
CollabAgentTool::CloseAgent => "Closed an agent",
|
||||
};
|
||||
return Some(action.to_string());
|
||||
}
|
||||
ThreadItem::SubAgentActivity {
|
||||
kind, agent_path, ..
|
||||
} => {
|
||||
let action = match kind {
|
||||
SubAgentActivityKind::Started => "Started",
|
||||
SubAgentActivityKind::Interacted => "Contacted",
|
||||
SubAgentActivityKind::Interrupted => "Interrupted",
|
||||
};
|
||||
return bounded_summary(&format!("{action} {agent_path}"));
|
||||
}
|
||||
ThreadItem::WebSearch { query, .. } => {
|
||||
return bounded_summary(&format!("Web search: {query}"));
|
||||
}
|
||||
ThreadItem::ImageView { path, .. } => {
|
||||
return bounded_summary(&format!("Viewed {}", path.display()));
|
||||
}
|
||||
ThreadItem::ImageGeneration { .. } => return Some("Generated an image".to_string()),
|
||||
ThreadItem::EnteredReviewMode { .. } => return Some("Entered review mode".to_string()),
|
||||
ThreadItem::ExitedReviewMode { .. } => return Some("Exited review mode".to_string()),
|
||||
ThreadItem::ContextCompaction { .. } => return Some("Compacted context".to_string()),
|
||||
ThreadItem::UserMessage { .. } | ThreadItem::HookPrompt { .. } => return None,
|
||||
};
|
||||
bounded_summary(summary)
|
||||
}
|
||||
|
||||
fn bounded_summary(summary: &str) -> Option<String> {
|
||||
let summary = truncate_text(summary, AGENT_STATUS_PREVIEW_GRAPHEMES);
|
||||
let summary = summary.split_whitespace().collect::<Vec<_>>().join(" ");
|
||||
(!summary.is_empty()).then_some(summary)
|
||||
}
|
||||
|
||||
fn indent_preview_line(mut line: Line<'static>) -> Line<'static> {
|
||||
line.spans.insert(0, " ".into());
|
||||
line
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "agent_status_feed_tests.rs"]
|
||||
mod tests;
|
||||
@@ -0,0 +1,109 @@
|
||||
use super::*;
|
||||
use codex_app_server_protocol::CommandExecutionSource;
|
||||
use codex_app_server_protocol::CommandExecutionStatus;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
|
||||
#[test]
|
||||
fn agent_status_uses_bounded_buffered_activity() {
|
||||
let mut store = ThreadEventStore::new(/*capacity*/ 8);
|
||||
store.push_notification(ServerNotification::ItemCompleted(
|
||||
ItemCompletedNotification {
|
||||
item: ThreadItem::CommandExecution {
|
||||
id: "command-1".to_string(),
|
||||
command: "cargo test -p codex-tui".to_string(),
|
||||
cwd: AbsolutePathBuf::try_from("/workspace").expect("absolute path"),
|
||||
process_id: None,
|
||||
source: CommandExecutionSource::Agent,
|
||||
status: CommandExecutionStatus::Completed,
|
||||
command_actions: Vec::new(),
|
||||
aggregated_output: Some("unbounded output\n".repeat(10_000)),
|
||||
exit_code: Some(0),
|
||||
duration_ms: Some(42),
|
||||
},
|
||||
thread_id: "thread-child".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
completed_at_ms: 1,
|
||||
},
|
||||
));
|
||||
store.push_notification(ServerNotification::ItemCompleted(
|
||||
ItemCompletedNotification {
|
||||
item: ThreadItem::AgentMessage {
|
||||
id: "message-1".to_string(),
|
||||
text: "Finished checking the focused TUI tests.".to_string(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
},
|
||||
thread_id: "thread-child".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
completed_at_ms: 2,
|
||||
},
|
||||
));
|
||||
|
||||
let preview = AgentStatusThreadPreview::from_store("/root/reviewer".to_string(), &store);
|
||||
let cell = AgentStatusHistoryCell::new(vec![preview]);
|
||||
let rendered = cell
|
||||
.display_lines(/*width*/ 80)
|
||||
.iter()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
|
||||
insta::assert_snapshot!(rendered, @r###"
|
||||
/agent
|
||||
Sub-agents running
|
||||
|
||||
• `/root/reviewer`
|
||||
$ cargo test -p codex-tui
|
||||
Finished checking the focused TUI tests.
|
||||
"###);
|
||||
assert!(!rendered.contains("unbounded output"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn agent_status_uses_reasoning_summaries_only() {
|
||||
let mut store = ThreadEventStore::new(/*capacity*/ 8);
|
||||
store.push_notification(ServerNotification::ItemCompleted(
|
||||
ItemCompletedNotification {
|
||||
item: ThreadItem::Reasoning {
|
||||
id: "reasoning-with-summary".to_string(),
|
||||
summary: vec!["safe summary".to_string()],
|
||||
content: vec!["hidden raw reasoning".to_string()],
|
||||
},
|
||||
thread_id: "thread-child".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
completed_at_ms: 1,
|
||||
},
|
||||
));
|
||||
store.push_notification(ServerNotification::ItemCompleted(
|
||||
ItemCompletedNotification {
|
||||
item: ThreadItem::Reasoning {
|
||||
id: "reasoning-without-summary".to_string(),
|
||||
summary: Vec::new(),
|
||||
content: vec!["raw-only reasoning".to_string()],
|
||||
},
|
||||
thread_id: "thread-child".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
completed_at_ms: 2,
|
||||
},
|
||||
));
|
||||
|
||||
let preview = AgentStatusThreadPreview::from_store("/root/reviewer".to_string(), &store);
|
||||
let cell = AgentStatusHistoryCell::new(vec![preview]);
|
||||
let rendered = cell
|
||||
.display_lines(/*width*/ 80)
|
||||
.iter()
|
||||
.map(ToString::to_string)
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
|
||||
insta::assert_snapshot!(rendered, @r###"
|
||||
/agent
|
||||
Sub-agents running
|
||||
|
||||
• `/root/reviewer`
|
||||
safe summary
|
||||
"###);
|
||||
assert!(!rendered.contains("hidden raw reasoning"));
|
||||
assert!(!rendered.contains("raw-only reasoning"));
|
||||
}
|
||||
@@ -14,8 +14,10 @@
|
||||
//! `SessionSource::SubAgent(ThreadSpawn { parent_thread_id, .. })` edges until no new children are
|
||||
//! found. The primary thread itself is never included in the output.
|
||||
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
|
||||
@@ -26,6 +28,7 @@ pub(crate) struct LoadedSubagentThread {
|
||||
pub(crate) thread_id: ThreadId,
|
||||
pub(crate) agent_nickname: Option<String>,
|
||||
pub(crate) agent_role: Option<String>,
|
||||
pub(crate) agent_path: Option<String>,
|
||||
}
|
||||
|
||||
/// Walks the spawn tree rooted at `primary_thread_id` and returns every descendant subagent.
|
||||
@@ -84,6 +87,7 @@ pub(crate) fn find_loaded_subagent_threads_for_primary(
|
||||
thread_id,
|
||||
agent_nickname: thread.agent_nickname,
|
||||
agent_role: thread.agent_role,
|
||||
agent_path: thread_spawn_agent_path(&thread.source),
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
@@ -91,16 +95,22 @@ pub(crate) fn find_loaded_subagent_threads_for_primary(
|
||||
loaded_threads
|
||||
}
|
||||
|
||||
fn thread_spawn_parent_thread_id(
|
||||
source: &codex_app_server_protocol::SessionSource,
|
||||
) -> Option<ThreadId> {
|
||||
let value = serde_json::to_value(source).ok()?;
|
||||
let parent_thread_id = value
|
||||
.get("subAgent")?
|
||||
.get("thread_spawn")?
|
||||
.get("parent_thread_id")?
|
||||
.as_str()?;
|
||||
ThreadId::from_string(parent_thread_id).ok()
|
||||
fn thread_spawn_agent_path(source: &SessionSource) -> Option<String> {
|
||||
match source {
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { agent_path, .. }) => {
|
||||
agent_path.clone().map(String::from)
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_spawn_parent_thread_id(source: &SessionSource) -> Option<ThreadId> {
|
||||
match source {
|
||||
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id, ..
|
||||
}) => Some(*parent_thread_id),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -208,11 +218,13 @@ mod tests {
|
||||
thread_id: child_thread_id,
|
||||
agent_nickname: Some("Scout".to_string()),
|
||||
agent_role: Some("explorer".to_string()),
|
||||
agent_path: None,
|
||||
},
|
||||
LoadedSubagentThread {
|
||||
thread_id: grandchild_thread_id,
|
||||
agent_nickname: Some("Atlas".to_string()),
|
||||
agent_role: Some("worker".to_string()),
|
||||
agent_path: None,
|
||||
},
|
||||
]
|
||||
);
|
||||
|
||||
@@ -8,6 +8,60 @@ use super::*;
|
||||
|
||||
impl App {
|
||||
pub(super) async fn open_agent_picker(&mut self, app_server: &mut AppServerSession) {
|
||||
self.backfill_loaded_subagent_threads(app_server).await;
|
||||
// V2 subagents are identified by canonical paths observed from activity events or loaded
|
||||
// thread metadata. Prefer local buffered turn state for liveness, and fall back to
|
||||
// thread/read only when no local event channel exists.
|
||||
let path_backed_thread_ids: Vec<_> = self
|
||||
.agent_navigation
|
||||
.ordered_path_backed_subagent_threads(self.primary_thread_id)
|
||||
.into_iter()
|
||||
.map(|(thread_id, _)| thread_id)
|
||||
.collect();
|
||||
for thread_id in path_backed_thread_ids {
|
||||
if let Some(channel) = self.thread_event_channels.get(&thread_id)
|
||||
&& channel.attachment() == ThreadEventAttachment::Live
|
||||
{
|
||||
let is_running = channel.store.lock().await.active_turn_id().is_some();
|
||||
self.agent_navigation.set_running(thread_id, is_running);
|
||||
} else {
|
||||
self.refresh_agent_picker_thread_liveness(app_server, thread_id)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
let path_backed_threads = self
|
||||
.agent_navigation
|
||||
.ordered_path_backed_subagent_threads(self.primary_thread_id);
|
||||
if !path_backed_threads.is_empty() {
|
||||
let running_threads: Vec<_> = path_backed_threads
|
||||
.into_iter()
|
||||
.filter_map(|(thread_id, entry)| {
|
||||
if !entry.is_running || entry.is_closed {
|
||||
return None;
|
||||
}
|
||||
Some((thread_id, entry.agent_path.as_deref()?.trim().to_string()))
|
||||
})
|
||||
.collect();
|
||||
let mut entries = Vec::new();
|
||||
for (thread_id, agent_path) in running_threads {
|
||||
let preview = if let Some(channel) = self.thread_event_channels.get(&thread_id) {
|
||||
let store = channel.store.lock().await;
|
||||
super::agent_status_feed::AgentStatusThreadPreview::from_store(
|
||||
agent_path, &store,
|
||||
)
|
||||
} else {
|
||||
super::agent_status_feed::AgentStatusThreadPreview::empty(agent_path)
|
||||
};
|
||||
entries.push(preview);
|
||||
}
|
||||
|
||||
self.chat_widget
|
||||
.add_to_history(super::agent_status_feed::AgentStatusHistoryCell::new(
|
||||
entries,
|
||||
));
|
||||
return;
|
||||
}
|
||||
|
||||
let mut thread_ids = self.agent_navigation.tracked_thread_ids();
|
||||
for thread_id in self.thread_event_channels.keys().copied() {
|
||||
if !thread_ids.contains(&thread_id) {
|
||||
@@ -44,14 +98,14 @@ impl App {
|
||||
let items: Vec<SelectionItem> = self
|
||||
.agent_navigation
|
||||
.ordered_threads()
|
||||
.iter()
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(idx, (thread_id, entry))| {
|
||||
if self.active_thread_id == Some(*thread_id) {
|
||||
if self.active_thread_id == Some(thread_id) {
|
||||
initial_selected_idx = Some(idx);
|
||||
}
|
||||
let id = *thread_id;
|
||||
let is_primary = self.primary_thread_id == Some(*thread_id);
|
||||
let id = thread_id;
|
||||
let is_primary = self.primary_thread_id == Some(thread_id);
|
||||
let name = format_agent_picker_item_name(
|
||||
entry.agent_nickname.as_deref(),
|
||||
entry.agent_role.as_deref(),
|
||||
@@ -62,7 +116,7 @@ impl App {
|
||||
name: name.clone(),
|
||||
name_prefix_spans: agent_picker_status_dot_spans(entry.is_closed),
|
||||
description: Some(uuid.clone()),
|
||||
is_current: self.active_thread_id == Some(*thread_id),
|
||||
is_current: self.active_thread_id == Some(thread_id),
|
||||
actions: vec![Box::new(move |tx| {
|
||||
tx.send(AppEvent::SelectAgentThread(id));
|
||||
})],
|
||||
@@ -145,6 +199,14 @@ impl App {
|
||||
.await
|
||||
{
|
||||
Ok(thread) => {
|
||||
let is_running = matches!(
|
||||
thread.status,
|
||||
codex_app_server_protocol::ThreadStatus::Active { .. }
|
||||
);
|
||||
let is_closed = matches!(
|
||||
thread.status,
|
||||
codex_app_server_protocol::ThreadStatus::NotLoaded
|
||||
);
|
||||
self.upsert_agent_picker_thread(
|
||||
thread_id,
|
||||
thread.agent_nickname.or_else(|| {
|
||||
@@ -157,11 +219,9 @@ impl App {
|
||||
.as_ref()
|
||||
.and_then(|entry| entry.agent_role.clone())
|
||||
}),
|
||||
matches!(
|
||||
thread.status,
|
||||
codex_app_server_protocol::ThreadStatus::NotLoaded
|
||||
),
|
||||
is_closed,
|
||||
);
|
||||
self.agent_navigation.set_running(thread_id, is_running);
|
||||
true
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -186,6 +246,8 @@ impl App {
|
||||
is_closed,
|
||||
);
|
||||
}
|
||||
self.agent_navigation
|
||||
.set_running(thread_id, /*is_running*/ false);
|
||||
true
|
||||
}
|
||||
}
|
||||
@@ -248,6 +310,9 @@ impl App {
|
||||
}
|
||||
};
|
||||
let channel = self.ensure_thread_channel(thread_id);
|
||||
if !live_attached {
|
||||
channel.mark_replay_only();
|
||||
}
|
||||
let mut store = channel.store.lock().await;
|
||||
store.set_session(session, turns);
|
||||
Ok(live_attached)
|
||||
@@ -610,13 +675,17 @@ impl App {
|
||||
}
|
||||
|
||||
for thread in find_loaded_subagent_threads_for_primary(threads, primary_thread_id) {
|
||||
let agent_path = thread.agent_path;
|
||||
self.upsert_agent_picker_thread(
|
||||
thread.thread_id,
|
||||
thread.agent_nickname,
|
||||
thread.agent_role,
|
||||
/*is_closed*/ false,
|
||||
);
|
||||
self.agent_navigation
|
||||
.set_agent_path(thread.thread_id, agent_path);
|
||||
}
|
||||
self.sync_active_agent_label();
|
||||
|
||||
!had_read_error
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ use crate::history_cell::PlainHistoryCell;
|
||||
use crate::history_cell::UserHistoryCell;
|
||||
use crate::history_cell::new_session_info;
|
||||
use crate::multi_agents::AgentPickerThreadEntry;
|
||||
use crate::multi_agents::SubAgentActivityDisplay;
|
||||
use assert_matches::assert_matches;
|
||||
|
||||
use crate::app_command::AppCommand as Op;
|
||||
@@ -1164,6 +1165,8 @@ async fn collab_receiver_notification_caches_thread_without_app_server_read() {
|
||||
Some(&AgentPickerThreadEntry {
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
agent_path: None,
|
||||
is_running: false,
|
||||
is_closed: false,
|
||||
})
|
||||
);
|
||||
@@ -1223,6 +1226,8 @@ async fn open_agent_picker_keeps_missing_threads_for_replay() -> Result<()> {
|
||||
Some(&AgentPickerThreadEntry {
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
agent_path: None,
|
||||
is_running: false,
|
||||
is_closed: true,
|
||||
})
|
||||
);
|
||||
@@ -1256,6 +1261,88 @@ async fn open_agent_picker_preserves_cached_metadata_for_replay_threads() -> Res
|
||||
Some(&AgentPickerThreadEntry {
|
||||
agent_nickname: Some("Robie".to_string()),
|
||||
agent_role: Some("explorer".to_string()),
|
||||
agent_path: None,
|
||||
is_running: false,
|
||||
is_closed: true,
|
||||
})
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn open_agent_picker_clears_completed_path_backed_agent_running_state() -> Result<()> {
|
||||
let mut app = Box::pin(make_test_app()).await;
|
||||
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
|
||||
app.chat_widget.config_ref(),
|
||||
))
|
||||
.await
|
||||
.expect("embedded app server");
|
||||
let thread_id = ThreadId::new();
|
||||
let channel = ThreadEventChannel::new(/*capacity*/ 4);
|
||||
{
|
||||
let mut store = channel.store.lock().await;
|
||||
store.push_notification(turn_started_notification(thread_id, "turn-1"));
|
||||
store.push_notification(turn_completed_notification(
|
||||
thread_id,
|
||||
"turn-1",
|
||||
TurnStatus::Completed,
|
||||
));
|
||||
}
|
||||
app.thread_event_channels.insert(thread_id, channel);
|
||||
app.agent_navigation
|
||||
.record_sub_agent_activity(SubAgentActivityDisplay {
|
||||
thread_id,
|
||||
agent_path: "/root/child".to_string(),
|
||||
is_running_hint: true,
|
||||
});
|
||||
|
||||
Box::pin(app.open_agent_picker(&mut app_server)).await;
|
||||
|
||||
assert_eq!(
|
||||
app.agent_navigation.get(&thread_id),
|
||||
Some(&AgentPickerThreadEntry {
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
agent_path: Some("/root/child".to_string()),
|
||||
is_running: false,
|
||||
is_closed: false,
|
||||
})
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn open_agent_picker_refreshes_replay_only_path_backed_liveness() -> Result<()> {
|
||||
let mut app = Box::pin(make_test_app()).await;
|
||||
let mut app_server = Box::pin(crate::start_embedded_app_server_for_picker(
|
||||
app.chat_widget.config_ref(),
|
||||
))
|
||||
.await
|
||||
.expect("embedded app server");
|
||||
let thread_id = ThreadId::new();
|
||||
let mut channel = ThreadEventChannel::new(/*capacity*/ 4);
|
||||
channel.mark_replay_only();
|
||||
{
|
||||
let mut store = channel.store.lock().await;
|
||||
store.push_notification(turn_started_notification(thread_id, "turn-1"));
|
||||
}
|
||||
app.thread_event_channels.insert(thread_id, channel);
|
||||
app.agent_navigation
|
||||
.record_sub_agent_activity(SubAgentActivityDisplay {
|
||||
thread_id,
|
||||
agent_path: "/root/child".to_string(),
|
||||
is_running_hint: true,
|
||||
});
|
||||
|
||||
Box::pin(app.open_agent_picker(&mut app_server)).await;
|
||||
|
||||
assert_eq!(
|
||||
app.agent_navigation.get(&thread_id),
|
||||
Some(&AgentPickerThreadEntry {
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
agent_path: Some("/root/child".to_string()),
|
||||
is_running: false,
|
||||
is_closed: true,
|
||||
})
|
||||
);
|
||||
@@ -1310,6 +1397,8 @@ async fn open_agent_picker_marks_terminal_read_errors_closed() -> Result<()> {
|
||||
Some(&AgentPickerThreadEntry {
|
||||
agent_nickname: Some("Robie".to_string()),
|
||||
agent_role: Some("explorer".to_string()),
|
||||
agent_path: None,
|
||||
is_running: false,
|
||||
is_closed: true,
|
||||
})
|
||||
);
|
||||
@@ -1348,6 +1437,8 @@ fn open_agent_picker_marks_loaded_threads_open() -> Result<()> {
|
||||
Some(&AgentPickerThreadEntry {
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
agent_path: None,
|
||||
is_running: false,
|
||||
is_closed: false,
|
||||
})
|
||||
);
|
||||
@@ -2819,6 +2910,8 @@ async fn inactive_thread_started_notification_initializes_replay_session() -> Re
|
||||
Some(&AgentPickerThreadEntry {
|
||||
agent_nickname: Some("Robie".to_string()),
|
||||
agent_role: Some("explorer".to_string()),
|
||||
agent_path: None,
|
||||
is_running: false,
|
||||
is_closed: false,
|
||||
})
|
||||
);
|
||||
|
||||
@@ -31,6 +31,12 @@ pub(super) struct FeedbackThreadEvent {
|
||||
pub(super) result: Result<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(super) enum ThreadEventAttachment {
|
||||
Live,
|
||||
ReplayOnly,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct ThreadEventStore {
|
||||
pub(super) session: Option<ThreadSessionState>,
|
||||
@@ -285,6 +291,7 @@ pub(super) struct ThreadEventChannel {
|
||||
pub(super) sender: mpsc::Sender<ThreadBufferedEvent>,
|
||||
pub(super) receiver: Option<mpsc::Receiver<ThreadBufferedEvent>>,
|
||||
pub(super) store: Arc<Mutex<ThreadEventStore>>,
|
||||
attachment: ThreadEventAttachment,
|
||||
}
|
||||
|
||||
impl ThreadEventChannel {
|
||||
@@ -294,9 +301,18 @@ impl ThreadEventChannel {
|
||||
sender,
|
||||
receiver: Some(receiver),
|
||||
store: Arc::new(Mutex::new(ThreadEventStore::new(capacity))),
|
||||
attachment: ThreadEventAttachment::Live,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn mark_replay_only(&mut self) {
|
||||
self.attachment = ThreadEventAttachment::ReplayOnly;
|
||||
}
|
||||
|
||||
pub(super) fn attachment(&self) -> ThreadEventAttachment {
|
||||
self.attachment
|
||||
}
|
||||
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(super) fn new_with_session(
|
||||
capacity: usize,
|
||||
@@ -310,6 +326,7 @@ impl ThreadEventChannel {
|
||||
store: Arc::new(Mutex::new(ThreadEventStore::new_with_session(
|
||||
capacity, session, turns,
|
||||
))),
|
||||
attachment: ThreadEventAttachment::Live,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -896,6 +896,14 @@ impl App {
|
||||
&mut self,
|
||||
notification: &ServerNotification,
|
||||
) {
|
||||
if let Some(activity) =
|
||||
sub_agent_activity_item(notification).and_then(sub_agent_activity_display)
|
||||
{
|
||||
self.agent_navigation.record_sub_agent_activity(activity);
|
||||
self.sync_active_agent_label();
|
||||
return;
|
||||
}
|
||||
|
||||
let Some(receiver_thread_ids) = collab_receiver_thread_ids(notification) else {
|
||||
return;
|
||||
};
|
||||
|
||||
@@ -327,6 +327,7 @@ impl ChatWidget {
|
||||
reasoning_effort,
|
||||
agents_states,
|
||||
}),
|
||||
item @ ThreadItem::SubAgentActivity { .. } => self.on_sub_agent_activity(item),
|
||||
ThreadItem::EnteredReviewMode { review, .. } if !from_replay => {
|
||||
self.enter_review_mode_with_hint(review, /*from_replay*/ false);
|
||||
}
|
||||
|
||||
@@ -190,6 +190,7 @@ impl ChatWidget {
|
||||
reasoning_effort,
|
||||
agents_states,
|
||||
}),
|
||||
item @ ThreadItem::SubAgentActivity { .. } => self.on_sub_agent_activity(item),
|
||||
ThreadItem::DynamicToolCall { .. } => {}
|
||||
}
|
||||
|
||||
|
||||
@@ -144,6 +144,13 @@ impl ChatWidget {
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn on_sub_agent_activity(&mut self, item: ThreadItem) {
|
||||
self.record_visible_turn_activity();
|
||||
if let Some(cell) = multi_agents::sub_agent_activity_history_cell(&item) {
|
||||
self.on_collab_event(cell);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn handle_file_change_completed_now(&mut self, item: ThreadItem) {
|
||||
let ThreadItem::FileChange { status, .. } = item else {
|
||||
return;
|
||||
|
||||
@@ -192,6 +192,7 @@ mod terminal_title;
|
||||
mod terminal_visualization_instructions;
|
||||
mod text_formatting;
|
||||
mod theme_picker;
|
||||
mod thread_transcript;
|
||||
mod token_usage;
|
||||
mod tooltips;
|
||||
mod transcript_reflow;
|
||||
|
||||
@@ -11,6 +11,7 @@ use codex_app_server_protocol::CollabAgentState;
|
||||
use codex_app_server_protocol::CollabAgentStatus;
|
||||
use codex_app_server_protocol::CollabAgentTool;
|
||||
use codex_app_server_protocol::CollabAgentToolCallStatus;
|
||||
use codex_app_server_protocol::SubAgentActivityKind;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
||||
@@ -35,10 +36,21 @@ pub(crate) struct AgentPickerThreadEntry {
|
||||
pub(crate) agent_nickname: Option<String>,
|
||||
/// Agent type shown in brackets when present, for example `worker`.
|
||||
pub(crate) agent_role: Option<String>,
|
||||
/// Canonical v2 agent path, when the thread was observed through v2 activity.
|
||||
pub(crate) agent_path: Option<String>,
|
||||
/// Whether the latest liveness refresh says the agent thread is actively working.
|
||||
pub(crate) is_running: bool,
|
||||
/// Whether the thread has emitted a close event and should render dimmed.
|
||||
pub(crate) is_closed: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct SubAgentActivityDisplay {
|
||||
pub(crate) thread_id: ThreadId,
|
||||
pub(crate) agent_path: String,
|
||||
pub(crate) is_running_hint: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, PartialEq, Eq)]
|
||||
pub(crate) struct AgentMetadata {
|
||||
/// Human-friendly nickname shown in rendered tool-call rows.
|
||||
@@ -266,6 +278,56 @@ pub(crate) fn tool_call_history_cell(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn sub_agent_activity_display(item: &ThreadItem) -> Option<SubAgentActivityDisplay> {
|
||||
let ThreadItem::SubAgentActivity {
|
||||
kind,
|
||||
agent_thread_id,
|
||||
agent_path,
|
||||
..
|
||||
} = item
|
||||
else {
|
||||
return None;
|
||||
};
|
||||
Some(SubAgentActivityDisplay {
|
||||
thread_id: parse_thread_id(agent_thread_id)?,
|
||||
agent_path: agent_path.clone(),
|
||||
is_running_hint: !matches!(kind, SubAgentActivityKind::Interrupted),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn sub_agent_activity_history_cell(item: &ThreadItem) -> Option<PlainHistoryCell> {
|
||||
let ThreadItem::SubAgentActivity {
|
||||
kind, agent_path, ..
|
||||
} = item
|
||||
else {
|
||||
return None;
|
||||
};
|
||||
Some(collab_event(
|
||||
sub_agent_activity_title(*kind, agent_path),
|
||||
Vec::new(),
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) fn sub_agent_activity_summary(kind: SubAgentActivityKind, agent_path: &str) -> String {
|
||||
match kind {
|
||||
SubAgentActivityKind::Started => format!("Started `{agent_path}`"),
|
||||
SubAgentActivityKind::Interacted => format!("Interacted with `{agent_path}`"),
|
||||
SubAgentActivityKind::Interrupted => format!("Interrupted `{agent_path}`"),
|
||||
}
|
||||
}
|
||||
|
||||
fn sub_agent_activity_title(kind: SubAgentActivityKind, agent_path: &str) -> Line<'static> {
|
||||
let (prefix, path) = match kind {
|
||||
SubAgentActivityKind::Started => ("Started ", agent_path),
|
||||
SubAgentActivityKind::Interacted => ("Interacted with ", agent_path),
|
||||
SubAgentActivityKind::Interrupted => ("Interrupted ", agent_path),
|
||||
};
|
||||
title_spans_line(vec![
|
||||
Span::from(prefix).bold(),
|
||||
Span::from(format!("`{path}`")).cyan(),
|
||||
])
|
||||
}
|
||||
|
||||
fn spawn_end(
|
||||
new_thread_id: Option<ThreadId>,
|
||||
prompt: &str,
|
||||
|
||||
@@ -4,8 +4,6 @@ use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
mod transcript;
|
||||
|
||||
use crate::app_server_session::AppServerSession;
|
||||
use crate::clipboard_paste::normalize_pasted_search_query;
|
||||
use crate::color::blend;
|
||||
@@ -25,6 +23,9 @@ use crate::status::format_directory_display;
|
||||
use crate::terminal_palette::best_color;
|
||||
use crate::terminal_palette::default_bg;
|
||||
use crate::text_formatting::truncate_text;
|
||||
use crate::thread_transcript::RawReasoningVisibility;
|
||||
use crate::thread_transcript::TranscriptCells;
|
||||
use crate::thread_transcript::load_session_transcript;
|
||||
use crate::tui::FrameRequester;
|
||||
use crate::tui::Tui;
|
||||
use crate::tui::TuiEvent;
|
||||
@@ -60,9 +61,6 @@ use tokio::sync::mpsc;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tracing::warn;
|
||||
use transcript::RawReasoningVisibility;
|
||||
use transcript::TranscriptCells;
|
||||
use transcript::load_session_transcript;
|
||||
use unicode_width::UnicodeWidthStr;
|
||||
|
||||
const PAGE_SIZE: usize = 25;
|
||||
@@ -5751,7 +5749,7 @@ session_picker_view = "dense"
|
||||
|
||||
#[test]
|
||||
fn thread_to_transcript_cells_renders_core_message_types() {
|
||||
use transcript::thread_to_transcript_cells;
|
||||
use crate::thread_transcript::thread_to_transcript_cells;
|
||||
|
||||
let thread_id = ThreadId::new();
|
||||
let thread = Thread {
|
||||
@@ -5820,7 +5818,7 @@ session_picker_view = "dense"
|
||||
|
||||
#[test]
|
||||
fn thread_to_transcript_cells_hides_raw_reasoning_when_not_enabled() {
|
||||
use transcript::thread_to_transcript_cells;
|
||||
use crate::thread_transcript::thread_to_transcript_cells;
|
||||
|
||||
let thread_id = ThreadId::new();
|
||||
let thread = Thread {
|
||||
@@ -5878,7 +5876,7 @@ session_picker_view = "dense"
|
||||
|
||||
#[test]
|
||||
fn thread_to_transcript_cells_shows_raw_reasoning_over_summary_when_enabled() {
|
||||
use transcript::thread_to_transcript_cells;
|
||||
use crate::thread_transcript::thread_to_transcript_cells;
|
||||
|
||||
let thread_id = ThreadId::new();
|
||||
let thread = Thread {
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
//! Render persisted thread turns into history-cell building blocks.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::app_server_session::AppServerSession;
|
||||
@@ -7,6 +9,7 @@ use crate::history_cell::HistoryCell;
|
||||
use crate::history_cell::PlainHistoryCell;
|
||||
use crate::history_cell::ReasoningSummaryCell;
|
||||
use crate::history_cell::UserHistoryCell;
|
||||
use crate::multi_agents::sub_agent_activity_summary;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -191,6 +194,11 @@ fn fallback_transcript_cell(item: &ThreadItem) -> Option<PlainHistoryCell> {
|
||||
ThreadItem::CollabAgentToolCall { tool, status, .. } => {
|
||||
vec![format!("agent tool: {tool:?} · {status:?}").dim().into()]
|
||||
}
|
||||
ThreadItem::SubAgentActivity {
|
||||
kind, agent_path, ..
|
||||
} => {
|
||||
vec![sub_agent_activity_summary(*kind, agent_path).dim().into()]
|
||||
}
|
||||
ThreadItem::WebSearch { query, .. } => {
|
||||
vec![vec!["web search: ".dim(), query.clone().into()].into()]
|
||||
}
|
||||
Reference in New Issue
Block a user