diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 74161018b..6a1e6fbae 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -5671,24 +5671,6 @@ impl CodexMessageProcessor { } } - async fn send_internal_error(&self, request_id: ConnectionRequestId, message: String) { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message, - data: None, - }; - self.outgoing.send_error(request_id, error).await; - } - - async fn send_invalid_request_error(&self, request_id: ConnectionRequestId, message: String) { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message, - data: None, - }; - self.outgoing.send_error(request_id, error).await; - } - fn input_too_large_error(actual_chars: usize) -> JSONRPCErrorError { JSONRPCErrorError { code: INVALID_PARAMS_ERROR_CODE, @@ -7056,139 +7038,104 @@ impl CodexMessageProcessor { target, delivery, } = params; - let (parent_thread_id, parent_thread) = match self.load_thread(&thread_id).await { - Ok(v) => v, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; - - let (review_request, display_text) = match Self::review_request_from_target(target) { - Ok(value) => value, - Err(err) => { - self.outgoing.send_error(request_id, err).await; - return; - } - }; - - let delivery = delivery.unwrap_or(ApiReviewDelivery::Inline).to_core(); - match delivery { - CoreReviewDelivery::Inline => { - if let Err(err) = self - .start_inline_review( + let result = async { + let (parent_thread_id, parent_thread) = self.load_thread(&thread_id).await?; + let (review_request, display_text) = Self::review_request_from_target(target)?; + match delivery.unwrap_or(ApiReviewDelivery::Inline).to_core() { + CoreReviewDelivery::Inline => { + self.start_inline_review( &request_id, parent_thread, review_request, display_text.as_str(), - thread_id.clone(), + thread_id, ) - .await - { - self.outgoing.send_error(request_id, err).await; + .await?; } - } - CoreReviewDelivery::Detached => { - if let Err(err) = self - .start_detached_review( + CoreReviewDelivery::Detached => { + self.start_detached_review( &request_id, parent_thread_id, parent_thread, review_request, display_text.as_str(), ) - .await - { - self.outgoing.send_error(request_id, err).await; + .await?; } } + Ok::<_, JSONRPCErrorError>(None::) } + .await; + self.send_optional_result(request_id, result).await; } async fn turn_interrupt(&self, request_id: ConnectionRequestId, params: TurnInterruptParams) { let TurnInterruptParams { thread_id, turn_id } = params; let is_startup_interrupt = turn_id.is_empty(); - let (thread_uuid, thread) = match self.load_thread(&thread_id).await { - Ok(v) => v, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; + let result = async { + let (thread_uuid, thread) = self.load_thread(&thread_id).await?; - // Record turn interrupts so we can reply when TurnAborted arrives. Startup - // interrupts do not have a turn and are acknowledged after submission. - if !is_startup_interrupt { - let thread_state = self.thread_state_manager.thread_state(thread_uuid).await; - let is_running = matches!(thread.agent_status().await, AgentStatus::Running); - let interrupt_outcome = { - let mut thread_state = thread_state.lock().await; - if let Some(active_turn) = thread_state.active_turn_snapshot() { - if active_turn.id != turn_id { - Err(format!( - "expected active turn id {turn_id} but found {}", - active_turn.id - )) - } else { - thread_state - .pending_interrupts - .push((request_id.clone(), ApiVersion::V2)); - Ok(()) + // Record turn interrupts so we can reply when TurnAborted arrives. Startup + // interrupts do not have a turn and are acknowledged after submission. + if !is_startup_interrupt { + let thread_state = self.thread_state_manager.thread_state(thread_uuid).await; + let is_running = matches!(thread.agent_status().await, AgentStatus::Running); + { + let mut thread_state = thread_state.lock().await; + if let Some(active_turn) = thread_state.active_turn_snapshot() { + if active_turn.id != turn_id { + return Err(invalid_request(format!( + "expected active turn id {turn_id} but found {}", + active_turn.id + ))); + } + } else if thread_state.last_terminal_turn_id.as_deref() + == Some(turn_id.as_str()) + || !is_running + { + return Err(invalid_request("no active turn to interrupt")); } - } else if thread_state.last_terminal_turn_id.as_deref() == Some(turn_id.as_str()) { - Err("no active turn to interrupt".to_string()) - } else if is_running { thread_state .pending_interrupts .push((request_id.clone(), ApiVersion::V2)); - Ok(()) - } else { - Err("no active turn to interrupt".to_string()) } - }; - if let Err(message) = interrupt_outcome { - self.send_invalid_request_error(request_id, message).await; - return; - } - self.outgoing - .record_request_turn_id(&request_id, &turn_id) - .await; - } - - // Submit the interrupt. Turn interrupts respond upon TurnAborted; startup - // interrupts respond here because startup cancellation has no turn event. - let submit_result = self - .submit_core_op(&request_id, thread.as_ref(), Op::Interrupt) - .await; - match submit_result { - Ok(_) if is_startup_interrupt => { self.outgoing - .send_response(request_id, TurnInterruptResponse {}) + .record_request_turn_id(&request_id, &turn_id) .await; } - Ok(_) => {} - Err(err) => { - if !is_startup_interrupt { - let thread_state = self.thread_state_manager.thread_state(thread_uuid).await; - let mut thread_state = thread_state.lock().await; - thread_state - .pending_interrupts - .retain(|(pending_request_id, _)| pending_request_id != &request_id); + + // Submit the interrupt. Turn interrupts respond upon TurnAborted; startup + // interrupts respond here because startup cancellation has no turn event. + match self + .submit_core_op(&request_id, thread.as_ref(), Op::Interrupt) + .await + { + Ok(_) if is_startup_interrupt => Ok(Some(TurnInterruptResponse {})), + Ok(_) => Ok(None), + Err(err) => { + if !is_startup_interrupt { + let thread_state = + self.thread_state_manager.thread_state(thread_uuid).await; + let mut thread_state = thread_state.lock().await; + thread_state + .pending_interrupts + .retain(|(pending_request_id, _)| pending_request_id != &request_id); + } + let interrupt_target = if is_startup_interrupt { + "startup" + } else { + "turn" + }; + Err(internal_error(format!( + "failed to interrupt {interrupt_target}: {err}" + ))) } - let interrupt_target = if is_startup_interrupt { - "startup" - } else { - "turn" - }; - self.send_internal_error( - request_id, - format!("failed to interrupt {interrupt_target}: {err}"), - ) - .await; } } + .await; + self.send_optional_result(request_id, result).await; } async fn ensure_conversation_listener( @@ -7499,24 +7446,18 @@ impl CodexMessageProcessor { Ok(()) } async fn git_diff_to_origin(&self, request_id: ConnectionRequestId, cwd: PathBuf) { - let diff = git_diff_to_remote(&cwd).await; - match diff { - Some(value) => { - let response = GitDiffToRemoteResponse { - sha: value.sha, - diff: value.diff, - }; - self.outgoing.send_response(request_id, response).await; - } - None => { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("failed to compute git diff to remote for cwd: {cwd:?}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - } - } + let result = git_diff_to_remote(&cwd) + .await + .map(|value| GitDiffToRemoteResponse { + sha: value.sha, + diff: value.diff, + }) + .ok_or_else(|| { + invalid_request(format!( + "failed to compute git diff to remote for cwd: {cwd:?}" + )) + }); + self.outgoing.send_result(request_id, result).await; } async fn fuzzy_file_search( @@ -7568,38 +7509,29 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: FuzzyFileSearchSessionStartParams, ) { + let result = self.fuzzy_file_search_session_start_response(params).await; + self.outgoing.send_result(request_id, result).await; + } + + async fn fuzzy_file_search_session_start_response( + &self, + params: FuzzyFileSearchSessionStartParams, + ) -> Result { let FuzzyFileSearchSessionStartParams { session_id, roots } = params; if session_id.is_empty() { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: "sessionId must not be empty".to_string(), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; + return Err(invalid_request("sessionId must not be empty")); } let session = - start_fuzzy_file_search_session(session_id.clone(), roots, self.outgoing.clone()); - match session { - Ok(session) => { - self.fuzzy_search_sessions - .lock() - .await - .insert(session_id, session); - self.outgoing - .send_response(request_id, FuzzyFileSearchSessionStartResponse {}) - .await; - } - Err(err) => { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to start fuzzy file search session: {err}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - } - } + start_fuzzy_file_search_session(session_id.clone(), roots, self.outgoing.clone()) + .map_err(|err| { + internal_error(format!("failed to start fuzzy file search session: {err}")) + })?; + self.fuzzy_search_sessions + .lock() + .await + .insert(session_id, session); + Ok(FuzzyFileSearchSessionStartResponse {}) } async fn fuzzy_file_search_session_update( @@ -7607,6 +7539,14 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: FuzzyFileSearchSessionUpdateParams, ) { + let result = self.fuzzy_file_search_session_update_response(params).await; + self.outgoing.send_result(request_id, result).await; + } + + async fn fuzzy_file_search_session_update_response( + &self, + params: FuzzyFileSearchSessionUpdateParams, + ) -> Result { let FuzzyFileSearchSessionUpdateParams { session_id, query } = params; let found = { let sessions = self.fuzzy_search_sessions.lock().await; @@ -7618,18 +7558,12 @@ impl CodexMessageProcessor { } }; if !found { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("fuzzy file search session not found: {session_id}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; + return Err(invalid_request(format!( + "fuzzy file search session not found: {session_id}" + ))); } - self.outgoing - .send_response(request_id, FuzzyFileSearchSessionUpdateResponse {}) - .await; + Ok(FuzzyFileSearchSessionUpdateResponse {}) } async fn fuzzy_file_search_session_stop( @@ -7649,14 +7583,18 @@ impl CodexMessageProcessor { } async fn upload_feedback(&self, request_id: ConnectionRequestId, params: FeedbackUploadParams) { + let result = self.upload_feedback_response(params).await; + self.outgoing.send_result(request_id, result).await; + } + + async fn upload_feedback_response( + &self, + params: FeedbackUploadParams, + ) -> Result { if !self.config.feedback_enabled { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: "sending feedback is disabled by configuration".to_string(), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; + return Err(invalid_request( + "sending feedback is disabled by configuration", + )); } let FeedbackUploadParams { @@ -7671,15 +7609,7 @@ impl CodexMessageProcessor { let conversation_id = match thread_id.as_deref() { Some(thread_id) => match ThreadId::from_string(thread_id) { Ok(conversation_id) => Some(conversation_id), - Err(err) => { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("invalid thread id: {err}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; - } + Err(err) => return Err(invalid_request(format!("invalid thread id: {err}"))), }, None => None, }; @@ -7808,30 +7738,14 @@ impl CodexMessageProcessor { let upload_result = match upload_result { Ok(result) => result, Err(join_err) => { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to upload feedback: {join_err}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; + return Err(internal_error(format!( + "failed to upload feedback: {join_err}" + ))); } }; - match upload_result { - Ok(()) => { - let response = FeedbackUploadResponse { thread_id }; - self.outgoing.send_response(request_id, response).await; - } - Err(err) => { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to upload feedback: {err}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - } - } + upload_result.map_err(|err| internal_error(format!("failed to upload feedback: {err}")))?; + Ok(FeedbackUploadResponse { thread_id }) } async fn windows_sandbox_setup_start( @@ -7924,6 +7838,26 @@ impl CodexMessageProcessor { None }) } + + async fn send_invalid_request_error( + &self, + request_id: ConnectionRequestId, + message: impl Into, + ) { + self.outgoing + .send_error(request_id, invalid_request(message)) + .await; + } + + async fn send_internal_error( + &self, + request_id: ConnectionRequestId, + message: impl Into, + ) { + self.outgoing + .send_error(request_id, internal_error(message)) + .await; + } } fn normalize_thread_list_cwd_filters(