diff --git a/codex-rs/debug-client/src/reader.rs b/codex-rs/debug-client/src/reader.rs deleted file mode 100644 index cb5164163..000000000 --- a/codex-rs/debug-client/src/reader.rs +++ /dev/null @@ -1,338 +0,0 @@ -#![allow(clippy::expect_used)] -use std::io::BufRead; -use std::io::BufReader; -use std::process::ChildStdout; -use std::sync::Arc; -use std::sync::Mutex; -use std::sync::mpsc::Sender; -use std::thread; -use std::thread::JoinHandle; - -use anyhow::Context; -use codex_app_server_protocol::CommandExecutionApprovalDecision; -use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; -use codex_app_server_protocol::FileChangeApprovalDecision; -use codex_app_server_protocol::FileChangeRequestApprovalResponse; -use codex_app_server_protocol::JSONRPCMessage; -use codex_app_server_protocol::JSONRPCNotification; -use codex_app_server_protocol::JSONRPCRequest; -use codex_app_server_protocol::JSONRPCResponse; -use codex_app_server_protocol::ServerNotification; -use codex_app_server_protocol::ServerRequest; -use codex_app_server_protocol::ThreadItem; -use codex_app_server_protocol::ThreadListResponse; -use codex_app_server_protocol::ThreadResumeResponse; -use codex_app_server_protocol::ThreadStartResponse; -use serde::Serialize; -use std::io::Write; - -use crate::output::LabelColor; -use crate::output::Output; -use crate::state::PendingRequest; -use crate::state::ReaderEvent; -use crate::state::State; - -pub fn start_reader( - mut stdout: BufReader, - stdin: Arc>>, - state: Arc>, - events: Sender, - output: Output, - auto_approve: bool, - filtered_output: bool, -) -> JoinHandle<()> { - thread::spawn(move || { - let command_decision = if auto_approve { - CommandExecutionApprovalDecision::Accept - } else { - CommandExecutionApprovalDecision::Decline - }; - let file_decision = if auto_approve { - FileChangeApprovalDecision::Accept - } else { - FileChangeApprovalDecision::Decline - }; - - let mut buffer = String::new(); - - loop { - buffer.clear(); - match stdout.read_line(&mut buffer) { - Ok(0) => break, - Ok(_) => {} - Err(err) => { - let _ = output.client_line(&format!("failed to read from server: {err}")); - break; - } - } - - let line = buffer.trim_end_matches(['\n', '\r']); - if !line.is_empty() { - let _ = output.server_json_line(line, filtered_output); - } - - let Ok(message) = serde_json::from_str::(line) else { - continue; - }; - - match message { - JSONRPCMessage::Request(request) => { - if let Err(err) = handle_server_request( - request, - &command_decision, - &file_decision, - &stdin, - &output, - ) { - let _ = - output.client_line(&format!("failed to handle server request: {err}")); - } - } - JSONRPCMessage::Response(response) => { - if let Err(err) = handle_response(response, &state, &events) { - let _ = output.client_line(&format!("failed to handle response: {err}")); - } - } - JSONRPCMessage::Notification(notification) => { - if filtered_output - && let Err(err) = handle_filtered_notification(notification, &output) - { - let _ = - output.client_line(&format!("failed to filter notification: {err}")); - } - } - _ => {} - } - } - }) -} - -fn handle_server_request( - request: JSONRPCRequest, - command_decision: &CommandExecutionApprovalDecision, - file_decision: &FileChangeApprovalDecision, - stdin: &Arc>>, - output: &Output, -) -> anyhow::Result<()> { - let server_request = match ServerRequest::try_from(request) { - Ok(server_request) => server_request, - Err(_) => return Ok(()), - }; - - match server_request { - ServerRequest::CommandExecutionRequestApproval { request_id, params } => { - let response = CommandExecutionRequestApprovalResponse { - decision: command_decision.clone(), - }; - output.client_line(&format!( - "auto-response for command approval {request_id:?}: {command_decision:?} ({params:?})" - ))?; - send_response(stdin, request_id, response) - } - ServerRequest::FileChangeRequestApproval { request_id, params } => { - let response = FileChangeRequestApprovalResponse { - decision: file_decision.clone(), - }; - output.client_line(&format!( - "auto-response for file change approval {request_id:?}: {file_decision:?} ({params:?})" - ))?; - send_response(stdin, request_id, response) - } - _ => Ok(()), - } -} - -fn handle_response( - response: JSONRPCResponse, - state: &Arc>, - events: &Sender, -) -> anyhow::Result<()> { - let pending = { - let mut state = state.lock().expect("state lock poisoned"); - state.pending.remove(&response.id) - }; - - let Some(pending) = pending else { - return Ok(()); - }; - - match pending { - PendingRequest::Start => { - let parsed = serde_json::from_value::(response.result) - .context("decode thread/start response")?; - let thread_id = parsed.thread.id; - { - let mut state = state.lock().expect("state lock poisoned"); - state.thread_id = Some(thread_id.clone()); - if !state.known_threads.iter().any(|id| id == &thread_id) { - state.known_threads.push(thread_id.clone()); - } - } - events.send(ReaderEvent::ThreadReady { thread_id }).ok(); - } - PendingRequest::Resume => { - let parsed = serde_json::from_value::(response.result) - .context("decode thread/resume response")?; - let thread_id = parsed.thread.id; - { - let mut state = state.lock().expect("state lock poisoned"); - state.thread_id = Some(thread_id.clone()); - if !state.known_threads.iter().any(|id| id == &thread_id) { - state.known_threads.push(thread_id.clone()); - } - } - events.send(ReaderEvent::ThreadReady { thread_id }).ok(); - } - PendingRequest::List => { - let parsed = serde_json::from_value::(response.result) - .context("decode thread/list response")?; - let thread_ids: Vec = parsed.data.into_iter().map(|thread| thread.id).collect(); - { - let mut state = state.lock().expect("state lock poisoned"); - for thread_id in &thread_ids { - if !state.known_threads.iter().any(|id| id == thread_id) { - state.known_threads.push(thread_id.clone()); - } - } - } - events - .send(ReaderEvent::ThreadList { - thread_ids, - next_cursor: parsed.next_cursor, - }) - .ok(); - } - } - - Ok(()) -} - -fn handle_filtered_notification( - notification: JSONRPCNotification, - output: &Output, -) -> anyhow::Result<()> { - let Ok(server_notification) = ServerNotification::try_from(notification) else { - return Ok(()); - }; - - match server_notification { - ServerNotification::ItemCompleted(payload) => { - emit_filtered_item(payload.item, &payload.thread_id, output) - } - _ => Ok(()), - } -} - -fn emit_filtered_item(item: ThreadItem, thread_id: &str, output: &Output) -> anyhow::Result<()> { - let thread_label = output.format_label(thread_id, LabelColor::Thread); - match item { - ThreadItem::AgentMessage { text, .. } => { - let label = output.format_label("assistant", LabelColor::Assistant); - output.server_line(&format!("{thread_label} {label}: {text}"))?; - } - ThreadItem::Plan { text, .. } => { - let label = output.format_label("assistant", LabelColor::Assistant); - output.server_line(&format!("{thread_label} {label}: plan"))?; - write_multiline(output, &thread_label, &format!("{label}:"), &text)?; - } - ThreadItem::CommandExecution { - command, - status, - exit_code, - aggregated_output, - .. - } => { - let label = output.format_label("tool", LabelColor::Tool); - output.server_line(&format!( - "{thread_label} {label}: command {command} ({status:?})" - ))?; - if let Some(exit_code) = exit_code { - let label = output.format_label("tool exit", LabelColor::ToolMeta); - output.server_line(&format!("{thread_label} {label}: {exit_code}"))?; - } - if let Some(aggregated_output) = aggregated_output { - let label = output.format_label("tool output", LabelColor::ToolMeta); - write_multiline( - output, - &thread_label, - &format!("{label}:"), - &aggregated_output, - )?; - } - } - ThreadItem::FileChange { - changes, status, .. - } => { - let label = output.format_label("tool", LabelColor::Tool); - output.server_line(&format!( - "{thread_label} {label}: file change ({status:?}, {} files)", - changes.len() - ))?; - } - ThreadItem::McpToolCall { - server, - tool, - status, - arguments, - result, - error, - .. - } => { - let label = output.format_label("tool", LabelColor::Tool); - output.server_line(&format!( - "{thread_label} {label}: {server}.{tool} ({status:?})" - ))?; - if !arguments.is_null() { - let label = output.format_label("tool args", LabelColor::ToolMeta); - output.server_line(&format!("{thread_label} {label}: {arguments}"))?; - } - if let Some(result) = result { - let label = output.format_label("tool result", LabelColor::ToolMeta); - output.server_line(&format!("{thread_label} {label}: {result:?}"))?; - } - if let Some(error) = error { - let label = output.format_label("tool error", LabelColor::ToolMeta); - output.server_line(&format!("{thread_label} {label}: {error:?}"))?; - } - } - _ => {} - } - - Ok(()) -} - -fn write_multiline( - output: &Output, - thread_label: &str, - header: &str, - text: &str, -) -> anyhow::Result<()> { - output.server_line(&format!("{thread_label} {header}"))?; - for line in text.lines() { - output.server_line(&format!("{thread_label} {line}"))?; - } - Ok(()) -} - -fn send_response( - stdin: &Arc>>, - request_id: codex_app_server_protocol::RequestId, - response: T, -) -> anyhow::Result<()> { - let result = serde_json::to_value(response).context("serialize response")?; - let message = JSONRPCResponse { - id: request_id, - result, - }; - let json = serde_json::to_string(&message).context("serialize response message")?; - let mut line = json; - line.push('\n'); - - let mut stdin = stdin.lock().expect("stdin lock poisoned"); - let Some(stdin) = stdin.as_mut() else { - anyhow::bail!("stdin already closed"); - }; - stdin.write_all(line.as_bytes()).context("write response")?; - stdin.flush().context("flush response")?; - Ok(()) -}