Streamline review and feedback handlers (#19498)

## Why

The remaining review, interrupt, fuzzy search, feedback, and git-diff
handlers still had local send-error branches that obscured otherwise
simple request handling. This final slice flattens those handlers
without changing the public protocol behavior.

## What Changed

- Streamlined review start, turn interrupt, fuzzy search session,
feedback upload, and git diff handlers in
`codex-rs/app-server/src/codex_message_processor.rs`.
- Converted validation and upload failures into returned JSON-RPC errors
where that avoids nested `send_error`/`return` blocks.
- Left unrelated sandbox setup and notification code untouched.

## Verification

- `cargo check -p codex-app-server`
- `cargo test -p codex-app-server --test all v2::review --
--test-threads=1`
This commit is contained in:
pakrym-oai
2026-04-27 16:36:04 -07:00
committed by GitHub
Unverified
parent dcd139b7c4
commit c5a495c2cd
+143 -209
View File
@@ -5671,24 +5671,6 @@ impl CodexMessageProcessor {
}
}
async fn send_internal_error(&self, request_id: ConnectionRequestId, message: String) {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message,
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
async fn send_invalid_request_error(&self, request_id: ConnectionRequestId, message: String) {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message,
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
fn input_too_large_error(actual_chars: usize) -> JSONRPCErrorError {
JSONRPCErrorError {
code: INVALID_PARAMS_ERROR_CODE,
@@ -7056,139 +7038,104 @@ impl CodexMessageProcessor {
target,
delivery,
} = params;
let (parent_thread_id, parent_thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let (review_request, display_text) = match Self::review_request_from_target(target) {
Ok(value) => value,
Err(err) => {
self.outgoing.send_error(request_id, err).await;
return;
}
};
let delivery = delivery.unwrap_or(ApiReviewDelivery::Inline).to_core();
match delivery {
CoreReviewDelivery::Inline => {
if let Err(err) = self
.start_inline_review(
let result = async {
let (parent_thread_id, parent_thread) = self.load_thread(&thread_id).await?;
let (review_request, display_text) = Self::review_request_from_target(target)?;
match delivery.unwrap_or(ApiReviewDelivery::Inline).to_core() {
CoreReviewDelivery::Inline => {
self.start_inline_review(
&request_id,
parent_thread,
review_request,
display_text.as_str(),
thread_id.clone(),
thread_id,
)
.await
{
self.outgoing.send_error(request_id, err).await;
.await?;
}
}
CoreReviewDelivery::Detached => {
if let Err(err) = self
.start_detached_review(
CoreReviewDelivery::Detached => {
self.start_detached_review(
&request_id,
parent_thread_id,
parent_thread,
review_request,
display_text.as_str(),
)
.await
{
self.outgoing.send_error(request_id, err).await;
.await?;
}
}
Ok::<_, JSONRPCErrorError>(None::<ReviewStartResponse>)
}
.await;
self.send_optional_result(request_id, result).await;
}
async fn turn_interrupt(&self, request_id: ConnectionRequestId, params: TurnInterruptParams) {
let TurnInterruptParams { thread_id, turn_id } = params;
let is_startup_interrupt = turn_id.is_empty();
let (thread_uuid, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let result = async {
let (thread_uuid, thread) = self.load_thread(&thread_id).await?;
// Record turn interrupts so we can reply when TurnAborted arrives. Startup
// interrupts do not have a turn and are acknowledged after submission.
if !is_startup_interrupt {
let thread_state = self.thread_state_manager.thread_state(thread_uuid).await;
let is_running = matches!(thread.agent_status().await, AgentStatus::Running);
let interrupt_outcome = {
let mut thread_state = thread_state.lock().await;
if let Some(active_turn) = thread_state.active_turn_snapshot() {
if active_turn.id != turn_id {
Err(format!(
"expected active turn id {turn_id} but found {}",
active_turn.id
))
} else {
thread_state
.pending_interrupts
.push((request_id.clone(), ApiVersion::V2));
Ok(())
// Record turn interrupts so we can reply when TurnAborted arrives. Startup
// interrupts do not have a turn and are acknowledged after submission.
if !is_startup_interrupt {
let thread_state = self.thread_state_manager.thread_state(thread_uuid).await;
let is_running = matches!(thread.agent_status().await, AgentStatus::Running);
{
let mut thread_state = thread_state.lock().await;
if let Some(active_turn) = thread_state.active_turn_snapshot() {
if active_turn.id != turn_id {
return Err(invalid_request(format!(
"expected active turn id {turn_id} but found {}",
active_turn.id
)));
}
} else if thread_state.last_terminal_turn_id.as_deref()
== Some(turn_id.as_str())
|| !is_running
{
return Err(invalid_request("no active turn to interrupt"));
}
} else if thread_state.last_terminal_turn_id.as_deref() == Some(turn_id.as_str()) {
Err("no active turn to interrupt".to_string())
} else if is_running {
thread_state
.pending_interrupts
.push((request_id.clone(), ApiVersion::V2));
Ok(())
} else {
Err("no active turn to interrupt".to_string())
}
};
if let Err(message) = interrupt_outcome {
self.send_invalid_request_error(request_id, message).await;
return;
}
self.outgoing
.record_request_turn_id(&request_id, &turn_id)
.await;
}
// Submit the interrupt. Turn interrupts respond upon TurnAborted; startup
// interrupts respond here because startup cancellation has no turn event.
let submit_result = self
.submit_core_op(&request_id, thread.as_ref(), Op::Interrupt)
.await;
match submit_result {
Ok(_) if is_startup_interrupt => {
self.outgoing
.send_response(request_id, TurnInterruptResponse {})
.record_request_turn_id(&request_id, &turn_id)
.await;
}
Ok(_) => {}
Err(err) => {
if !is_startup_interrupt {
let thread_state = self.thread_state_manager.thread_state(thread_uuid).await;
let mut thread_state = thread_state.lock().await;
thread_state
.pending_interrupts
.retain(|(pending_request_id, _)| pending_request_id != &request_id);
// Submit the interrupt. Turn interrupts respond upon TurnAborted; startup
// interrupts respond here because startup cancellation has no turn event.
match self
.submit_core_op(&request_id, thread.as_ref(), Op::Interrupt)
.await
{
Ok(_) if is_startup_interrupt => Ok(Some(TurnInterruptResponse {})),
Ok(_) => Ok(None),
Err(err) => {
if !is_startup_interrupt {
let thread_state =
self.thread_state_manager.thread_state(thread_uuid).await;
let mut thread_state = thread_state.lock().await;
thread_state
.pending_interrupts
.retain(|(pending_request_id, _)| pending_request_id != &request_id);
}
let interrupt_target = if is_startup_interrupt {
"startup"
} else {
"turn"
};
Err(internal_error(format!(
"failed to interrupt {interrupt_target}: {err}"
)))
}
let interrupt_target = if is_startup_interrupt {
"startup"
} else {
"turn"
};
self.send_internal_error(
request_id,
format!("failed to interrupt {interrupt_target}: {err}"),
)
.await;
}
}
.await;
self.send_optional_result(request_id, result).await;
}
async fn ensure_conversation_listener(
@@ -7499,24 +7446,18 @@ impl CodexMessageProcessor {
Ok(())
}
async fn git_diff_to_origin(&self, request_id: ConnectionRequestId, cwd: PathBuf) {
let diff = git_diff_to_remote(&cwd).await;
match diff {
Some(value) => {
let response = GitDiffToRemoteResponse {
sha: value.sha,
diff: value.diff,
};
self.outgoing.send_response(request_id, response).await;
}
None => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("failed to compute git diff to remote for cwd: {cwd:?}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
let result = git_diff_to_remote(&cwd)
.await
.map(|value| GitDiffToRemoteResponse {
sha: value.sha,
diff: value.diff,
})
.ok_or_else(|| {
invalid_request(format!(
"failed to compute git diff to remote for cwd: {cwd:?}"
))
});
self.outgoing.send_result(request_id, result).await;
}
async fn fuzzy_file_search(
@@ -7568,38 +7509,29 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: FuzzyFileSearchSessionStartParams,
) {
let result = self.fuzzy_file_search_session_start_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn fuzzy_file_search_session_start_response(
&self,
params: FuzzyFileSearchSessionStartParams,
) -> Result<FuzzyFileSearchSessionStartResponse, JSONRPCErrorError> {
let FuzzyFileSearchSessionStartParams { session_id, roots } = params;
if session_id.is_empty() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "sessionId must not be empty".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
return Err(invalid_request("sessionId must not be empty"));
}
let session =
start_fuzzy_file_search_session(session_id.clone(), roots, self.outgoing.clone());
match session {
Ok(session) => {
self.fuzzy_search_sessions
.lock()
.await
.insert(session_id, session);
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionStartResponse {})
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start fuzzy file search session: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
start_fuzzy_file_search_session(session_id.clone(), roots, self.outgoing.clone())
.map_err(|err| {
internal_error(format!("failed to start fuzzy file search session: {err}"))
})?;
self.fuzzy_search_sessions
.lock()
.await
.insert(session_id, session);
Ok(FuzzyFileSearchSessionStartResponse {})
}
async fn fuzzy_file_search_session_update(
@@ -7607,6 +7539,14 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: FuzzyFileSearchSessionUpdateParams,
) {
let result = self.fuzzy_file_search_session_update_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn fuzzy_file_search_session_update_response(
&self,
params: FuzzyFileSearchSessionUpdateParams,
) -> Result<FuzzyFileSearchSessionUpdateResponse, JSONRPCErrorError> {
let FuzzyFileSearchSessionUpdateParams { session_id, query } = params;
let found = {
let sessions = self.fuzzy_search_sessions.lock().await;
@@ -7618,18 +7558,12 @@ impl CodexMessageProcessor {
}
};
if !found {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("fuzzy file search session not found: {session_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
return Err(invalid_request(format!(
"fuzzy file search session not found: {session_id}"
)));
}
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionUpdateResponse {})
.await;
Ok(FuzzyFileSearchSessionUpdateResponse {})
}
async fn fuzzy_file_search_session_stop(
@@ -7649,14 +7583,18 @@ impl CodexMessageProcessor {
}
async fn upload_feedback(&self, request_id: ConnectionRequestId, params: FeedbackUploadParams) {
let result = self.upload_feedback_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn upload_feedback_response(
&self,
params: FeedbackUploadParams,
) -> Result<FeedbackUploadResponse, JSONRPCErrorError> {
if !self.config.feedback_enabled {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "sending feedback is disabled by configuration".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
return Err(invalid_request(
"sending feedback is disabled by configuration",
));
}
let FeedbackUploadParams {
@@ -7671,15 +7609,7 @@ impl CodexMessageProcessor {
let conversation_id = match thread_id.as_deref() {
Some(thread_id) => match ThreadId::from_string(thread_id) {
Ok(conversation_id) => Some(conversation_id),
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid thread id: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
Err(err) => return Err(invalid_request(format!("invalid thread id: {err}"))),
},
None => None,
};
@@ -7808,30 +7738,14 @@ impl CodexMessageProcessor {
let upload_result = match upload_result {
Ok(result) => result,
Err(join_err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to upload feedback: {join_err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
return Err(internal_error(format!(
"failed to upload feedback: {join_err}"
)));
}
};
match upload_result {
Ok(()) => {
let response = FeedbackUploadResponse { thread_id };
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to upload feedback: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
upload_result.map_err(|err| internal_error(format!("failed to upload feedback: {err}")))?;
Ok(FeedbackUploadResponse { thread_id })
}
async fn windows_sandbox_setup_start(
@@ -7924,6 +7838,26 @@ impl CodexMessageProcessor {
None
})
}
async fn send_invalid_request_error(
&self,
request_id: ConnectionRequestId,
message: impl Into<String>,
) {
self.outgoing
.send_error(request_id, invalid_request(message))
.await;
}
async fn send_internal_error(
&self,
request_id: ConnectionRequestId,
message: impl Into<String>,
) {
self.outgoing
.send_error(request_id, internal_error(message))
.await;
}
}
fn normalize_thread_list_cwd_filters(