From f94157a4b23ed5e90da583aca26a1e551a2219b0 Mon Sep 17 00:00:00 2001 From: Channing Conger Date: Fri, 22 May 2026 19:09:02 -0700 Subject: [PATCH] code-mode: merge stored values by key (#24159) ## Summary Change code-mode stored value updates to merge writes by key instead of replacing the session's complete stored-value map after each cell completes. Previously, each cell received a snapshot of stored values and returned the complete resulting map. When multiple cells ran concurrently, a later completion could overwrite values written by another cell because it committed an older snapshot. This change moves stored-value ownership into `CodeModeService`: - Each runtime starts from the service's current stored values. - Runtime completion reports only keys written by that cell. - The service merges those writes into the current stored-value map on successful completion. - Core no longer replaces its stored-value state from a cell result. As a result, concurrently executing cells can update different stored keys without clobbering one another. The move into CodeModeService is motivated by a desire to have this lifetime tied to a new lifetime object on that side in a subsequent PR. --- codex-rs/code-mode/src/runtime/callbacks.rs | 3 +- codex-rs/code-mode/src/runtime/mod.rs | 39 +++-- .../code-mode/src/runtime/module_loader.rs | 10 +- codex-rs/code-mode/src/service.rs | 36 ++--- .../src/tools/code_mode/execute_handler.rs | 7 - codex-rs/core/src/tools/code_mode/mod.rs | 17 -- codex-rs/core/tests/suite/code_mode.rs | 153 ++++++++++++++++++ 7 files changed, 188 insertions(+), 77 deletions(-) diff --git a/codex-rs/code-mode/src/runtime/callbacks.rs b/codex-rs/code-mode/src/runtime/callbacks.rs index c3a648ae3..7d8a28662 100644 --- a/codex-rs/code-mode/src/runtime/callbacks.rs +++ b/codex-rs/code-mode/src/runtime/callbacks.rs @@ -157,7 +157,8 @@ pub(super) fn store_callback( } }; if let Some(state) = scope.get_slot_mut::() { - state.stored_values.insert(key, serialized); + state.stored_values.insert(key.clone(), serialized.clone()); + state.stored_value_writes.insert(key, serialized); } } diff --git a/codex-rs/code-mode/src/runtime/mod.rs b/codex-rs/code-mode/src/runtime/mod.rs index 2b605fa57..89893d2b2 100644 --- a/codex-rs/code-mode/src/runtime/mod.rs +++ b/codex-rs/code-mode/src/runtime/mod.rs @@ -35,7 +35,6 @@ pub struct ExecuteRequest { pub tool_call_id: String, pub enabled_tools: Vec, pub source: String, - pub stored_values: HashMap, pub yield_time_ms: Option, pub max_output_tokens: Option, } @@ -116,7 +115,6 @@ pub enum RuntimeResponse { Result { cell_id: String, content_items: Vec, - stored_values: HashMap, error_text: Option, }, } @@ -182,12 +180,13 @@ pub(crate) enum RuntimeEvent { text: String, }, Result { - stored_values: HashMap, + stored_value_writes: HashMap, error_text: Option, }, } pub(crate) fn spawn_runtime( + stored_values: HashMap, request: ExecuteRequest, event_tx: mpsc::UnboundedSender, pending_mode: PendingRuntimeMode, @@ -214,7 +213,7 @@ pub(crate) fn spawn_runtime( tool_call_id: request.tool_call_id, enabled_tools, source: request.source, - stored_values: request.stored_values, + stored_values, }; thread::spawn(move || { @@ -248,6 +247,7 @@ pub(super) struct RuntimeState { pending_tool_calls: HashMap>, pending_timeouts: HashMap, stored_values: HashMap, + stored_value_writes: HashMap, enabled_tools: Vec, next_tool_call_id: u64, next_timeout_id: u64, @@ -259,7 +259,7 @@ pub(super) struct RuntimeState { pub(super) enum CompletionState { Pending, Completed { - stored_values: HashMap, + stored_value_writes: HashMap, error_text: Option, }, } @@ -305,6 +305,7 @@ fn run_runtime( pending_tool_calls: HashMap::new(), pending_timeouts: HashMap::new(), stored_values: config.stored_values, + stored_value_writes: HashMap::new(), enabled_tools: config.enabled_tools, next_tool_call_id: 1, next_timeout_id: 1, @@ -330,10 +331,10 @@ fn run_runtime( match module_loader::completion_state(scope, pending_promise.as_ref()) { CompletionState::Completed { - stored_values, + stored_value_writes, error_text, } => { - send_result(&event_tx, stored_values, error_text); + send_result(&event_tx, stored_value_writes, error_text); return; } CompletionState::Pending => {} @@ -375,10 +376,10 @@ fn run_runtime( scope.perform_microtask_checkpoint(); match module_loader::completion_state(scope, pending_promise.as_ref()) { CompletionState::Completed { - stored_values, + stored_value_writes, error_text, } => { - send_result(&event_tx, stored_values, error_text); + send_result(&event_tx, stored_value_writes, error_text); return; } CompletionState::Pending => {} @@ -422,21 +423,21 @@ fn capture_scope_send_error( event_tx: &mpsc::UnboundedSender, error_text: Option, ) { - let stored_values = scope + let stored_value_writes = scope .get_slot::() - .map(|state| state.stored_values.clone()) + .map(|state| state.stored_value_writes.clone()) .unwrap_or_default(); - send_result(event_tx, stored_values, error_text); + send_result(event_tx, stored_value_writes, error_text); } fn send_result( event_tx: &mpsc::UnboundedSender, - stored_values: HashMap, + stored_value_writes: HashMap, error_text: Option, ) { let _ = event_tx.send(RuntimeEvent::Result { - stored_values, + stored_value_writes, error_text, }); } @@ -463,7 +464,6 @@ mod tests { tool_call_id: "call_1".to_string(), enabled_tools: Vec::new(), source: source.to_string(), - stored_values: HashMap::new(), yield_time_ms: Some(1), max_output_tokens: None, } @@ -473,6 +473,7 @@ mod tests { async fn terminate_execution_stops_cpu_bound_module() { let (event_tx, mut event_rx) = mpsc::unbounded_channel(); let (_runtime_tx, _runtime_control_tx, runtime_terminate_handle) = spawn_runtime( + HashMap::new(), execute_request("while (true) {}"), event_tx, PendingRuntimeMode::Continue, @@ -491,14 +492,9 @@ mod tests { .await .unwrap() .unwrap(); - let RuntimeEvent::Result { - stored_values, - error_text, - } = result_event - else { + let RuntimeEvent::Result { error_text, .. } = result_event else { panic!("expected runtime result after termination"); }; - assert_eq!(stored_values, HashMap::new()); assert!(error_text.is_some()); assert!( @@ -513,6 +509,7 @@ mod tests { async fn pending_mode_freezes_runtime_commands_until_resume() { let (event_tx, mut event_rx) = mpsc::unbounded_channel(); let (runtime_tx, runtime_control_tx, _runtime_terminate_handle) = spawn_runtime( + HashMap::new(), execute_request( r#" await new Promise((resolve) => setTimeout(resolve, 60_000)); diff --git a/codex-rs/code-mode/src/runtime/module_loader.rs b/codex-rs/code-mode/src/runtime/module_loader.rs index 83ce3d347..00185edb5 100644 --- a/codex-rs/code-mode/src/runtime/module_loader.rs +++ b/codex-rs/code-mode/src/runtime/module_loader.rs @@ -104,14 +104,14 @@ pub(super) fn completion_state( scope: &mut v8::PinScope<'_, '_>, pending_promise: Option<&v8::Global>, ) -> CompletionState { - let stored_values = scope + let stored_value_writes = scope .get_slot::() - .map(|state| state.stored_values.clone()) + .map(|state| state.stored_value_writes.clone()) .unwrap_or_default(); let Some(pending_promise) = pending_promise else { return CompletionState::Completed { - stored_values, + stored_value_writes, error_text: None, }; }; @@ -120,7 +120,7 @@ pub(super) fn completion_state( match promise.state() { v8::PromiseState::Pending => CompletionState::Pending, v8::PromiseState::Fulfilled => CompletionState::Completed { - stored_values, + stored_value_writes, error_text: None, }, v8::PromiseState::Rejected => { @@ -131,7 +131,7 @@ pub(super) fn completion_state( Some(value_to_error_text(scope, result)) }; CompletionState::Completed { - stored_values, + stored_value_writes, error_text, } } diff --git a/codex-rs/code-mode/src/service.rs b/codex-rs/code-mode/src/service.rs index 1544f84f3..de4ed13e5 100644 --- a/codex-rs/code-mode/src/service.rs +++ b/codex-rs/code-mode/src/service.rs @@ -73,14 +73,10 @@ impl CodeModeService { } } - pub async fn stored_values(&self) -> HashMap { + async fn stored_values(&self) -> HashMap { self.inner.stored_values.lock().await.clone() } - pub async fn replace_stored_values(&self, values: HashMap) { - *self.inner.stored_values.lock().await = values; - } - /// Reserves the runtime cell id for a future `execute` request. /// /// The runtime can issue nested tool calls before the first `execute` @@ -138,6 +134,7 @@ impl CodeModeService { let cell_id = request.cell_id.clone(); let (event_tx, event_rx) = mpsc::unbounded_channel(); let (control_tx, control_rx) = mpsc::unbounded_channel(); + let stored_values = self.stored_values().await; let (runtime_tx, runtime_control_tx, runtime_terminate_handle) = { let mut sessions = self.inner.sessions.lock().await; if sessions.contains_key(&cell_id) { @@ -145,7 +142,7 @@ impl CodeModeService { } let (runtime_tx, runtime_control_tx, runtime_terminate_handle) = - spawn_runtime(request, event_tx, pending_mode)?; + spawn_runtime(stored_values, request, event_tx, pending_mode)?; // Keep the session registry locked through insertion so a // caller-owned cell id cannot race with another execute and replace @@ -349,7 +346,6 @@ enum SessionResponseSender { struct PendingResult { content_items: Vec, - stored_values: HashMap, error_text: Option, } @@ -366,7 +362,6 @@ fn missing_cell_response(cell_id: String) -> RuntimeResponse { error_text: Some(format!("exec cell {cell_id} not found")), cell_id, content_items: Vec::new(), - stored_values: HashMap::new(), } } @@ -374,7 +369,6 @@ fn pending_result_response(cell_id: &str, result: PendingResult) -> RuntimeRespo RuntimeResponse::Result { cell_id: cell_id.to_string(), content_items: result.content_items, - stored_values: result.stored_values, error_text: result.error_text, } } @@ -476,7 +470,6 @@ async fn run_session_control( if pending_result.is_none() { let result = PendingResult { content_items: std::mem::take(&mut content_items), - stored_values: HashMap::new(), error_text: Some("exec runtime ended unexpectedly".to_string()), }; if send_or_buffer_result( @@ -551,7 +544,7 @@ async fn run_session_control( .await; } RuntimeEvent::Result { - stored_values, + stored_value_writes, error_text, } => { yield_timer = None; @@ -565,9 +558,13 @@ async fn run_session_control( } break; } + inner + .stored_values + .lock() + .await + .extend(stored_value_writes); let result = PendingResult { content_items: std::mem::take(&mut content_items), - stored_values, error_text, }; if send_or_buffer_result( @@ -715,7 +712,6 @@ mod tests { tool_call_id: "call_1".to_string(), enabled_tools: Vec::new(), source: source.to_string(), - stored_values: HashMap::new(), yield_time_ms: Some(1), max_output_tokens: None, } @@ -752,7 +748,6 @@ mod tests { content_items: vec![FunctionCallOutputContentItem::InputText { text: "before".to_string(), }], - stored_values: HashMap::new(), error_text: None, } ); @@ -778,7 +773,6 @@ mod tests { content_items: vec![FunctionCallOutputContentItem::InputText { text: "done".to_string(), }], - stored_values: HashMap::new(), error_text: None, }) ); @@ -1108,7 +1102,6 @@ text("done"); content_items: vec![FunctionCallOutputContentItem::InputText { text: "done".to_string(), }], - stored_values: HashMap::new(), error_text: None, } )) @@ -1135,7 +1128,6 @@ text("done"); content_items: vec![FunctionCallOutputContentItem::InputText { text: "false".to_string(), }], - stored_values: HashMap::new(), error_text: None, } ); @@ -1175,7 +1167,6 @@ text(value); content_items: vec![FunctionCallOutputContentItem::InputText { text: "jeudi 2 janvier \u{e0} 03:04:05".to_string(), }], - stored_values: HashMap::new(), error_text: None, } ); @@ -1214,7 +1205,6 @@ text(formatter.format(new Date("2025-01-02T03:04:05Z"))); content_items: vec![FunctionCallOutputContentItem::InputText { text: "jeudi 2 janvier \u{e0} 03:04:05".to_string(), }], - stored_values: HashMap::new(), error_text: None, } ); @@ -1257,7 +1247,6 @@ text(JSON.stringify(returnsUndefined)); text: "[true,true,true]".to_string(), }, ], - stored_values: HashMap::new(), error_text: None, } ); @@ -1292,7 +1281,6 @@ image({ image_url: "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==".to_string(), detail: Some(crate::ImageDetail::Original), }], - stored_values: HashMap::new(), error_text: None, } ); @@ -1328,7 +1316,6 @@ image( image_url: "https://example.com/image.jpg".to_string(), detail: Some(crate::ImageDetail::Original), }], - stored_values: HashMap::new(), error_text: None, } ); @@ -1366,7 +1353,6 @@ image( image_url: "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==".to_string(), detail: Some(crate::ImageDetail::High), }], - stored_values: HashMap::new(), error_text: None, } ); @@ -1396,7 +1382,6 @@ image({ RuntimeResponse::Result { cell_id: "1".to_string(), content_items: Vec::new(), - stored_values: HashMap::new(), error_text: Some("image detail must be one of: high, original".to_string()), } ); @@ -1433,7 +1418,6 @@ image({ RuntimeResponse::Result { cell_id: "1".to_string(), content_items: Vec::new(), - stored_values: HashMap::new(), error_text: Some( "image expects a non-empty image URL string, an object with image_url and optional detail, or a raw MCP image block".to_string(), ), @@ -1459,7 +1443,6 @@ image({ WaitOutcome::MissingCell(RuntimeResponse::Result { cell_id: "missing".to_string(), content_items: Vec::new(), - stored_values: HashMap::new(), error_text: Some("exec cell missing not found".to_string()), }) ); @@ -1473,6 +1456,7 @@ image({ let (initial_response_tx, initial_response_rx) = oneshot::channel(); let (runtime_event_tx, _runtime_event_rx) = mpsc::unbounded_channel(); let (runtime_tx, runtime_control_tx, runtime_terminate_handle) = spawn_runtime( + HashMap::new(), ExecuteRequest { source: "await new Promise(() => {})".to_string(), yield_time_ms: None, diff --git a/codex-rs/core/src/tools/code_mode/execute_handler.rs b/codex-rs/core/src/tools/code_mode/execute_handler.rs index 02b60a426..29f562384 100644 --- a/codex-rs/core/src/tools/code_mode/execute_handler.rs +++ b/codex-rs/core/src/tools/code_mode/execute_handler.rs @@ -38,12 +38,6 @@ impl CodeModeExecuteHandler { let exec = ExecContext { session, turn }; let enabled_tools = codex_tools::collect_code_mode_tool_definitions(&self.nested_tool_specs); - let stored_values = exec - .session - .services - .code_mode_service - .stored_values() - .await; // Allocate before starting V8 so the trace can create the parent // CodeCell before model-authored JavaScript issues nested tool calls. let runtime_cell_id = exec.session.services.code_mode_service.allocate_cell_id(); @@ -67,7 +61,6 @@ impl CodeModeExecuteHandler { tool_call_id: call_id, enabled_tools, source: args.code, - stored_values, yield_time_ms: args.yield_time_ms, max_output_tokens: args.max_output_tokens, }) diff --git a/codex-rs/core/src/tools/code_mode/mod.rs b/codex-rs/core/src/tools/code_mode/mod.rs index ff9f8c889..09d58ea8f 100644 --- a/codex-rs/core/src/tools/code_mode/mod.rs +++ b/codex-rs/core/src/tools/code_mode/mod.rs @@ -66,17 +66,6 @@ impl CodeModeService { } } - pub(crate) async fn stored_values(&self) -> std::collections::HashMap { - self.inner.stored_values().await - } - - pub(crate) async fn replace_stored_values( - &self, - values: std::collections::HashMap, - ) { - self.inner.replace_stored_values(values).await; - } - pub(crate) fn allocate_cell_id(&self) -> String { self.inner.allocate_cell_id() } @@ -182,17 +171,11 @@ pub(super) async fn handle_runtime_response( } RuntimeResponse::Result { content_items, - stored_values, error_text, .. } => { let mut content_items = into_function_call_output_content_items(content_items); sanitize_runtime_image_detail(exec.turn.as_ref(), &mut content_items); - exec.session - .services - .code_mode_service - .replace_stored_values(stored_values) - .await; let success = error_text.is_none(); if let Some(error_text) = error_text { content_items.push(FunctionCallOutputContentItem::InputText { diff --git a/codex-rs/core/tests/suite/code_mode.rs b/codex-rs/core/tests/suite/code_mode.rs index 406981fad..a1dda52ec 100644 --- a/codex-rs/core/tests/suite/code_mode.rs +++ b/codex-rs/core/tests/suite/code_mode.rs @@ -1359,6 +1359,159 @@ text("session b done"); Ok(()) } +#[cfg_attr(windows, ignore = "no exec_command on Windows")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn code_mode_concurrent_cells_merge_only_the_stored_values_they_write() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let mut builder = test_codex().with_config(move |config| { + let _ = config.features.enable(Feature::CodeMode); + }); + let test = builder.build(&server).await?; + let first_gate = test.workspace_path("code-mode-first-store.ready"); + let first_wait = wait_for_file_source(&first_gate)?; + + responses::mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-1"), + ev_custom_tool_call( + "call-init", + "exec", + r#" +store("a", 1); +store("b", 2); +"#, + ), + ev_completed("resp-1"), + ]), + ) + .await; + responses::mount_sse_once( + &server, + sse(vec![ + ev_assistant_message("msg-1", "initialized"), + ev_completed("resp-2"), + ]), + ) + .await; + + test.submit_turn("initialize stored values").await?; + + let first_code = format!( + r#" +store("a", 3); +yield_control(); +{first_wait} +"# + ); + responses::mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-3"), + ev_custom_tool_call("call-first", "exec", &first_code), + ev_completed("resp-3"), + ]), + ) + .await; + let first_started = responses::mount_sse_once( + &server, + sse(vec![ + ev_assistant_message("msg-2", "first pending"), + ev_completed("resp-4"), + ]), + ) + .await; + + test.submit_turn("start first store").await?; + + let first_request = first_started.single_request(); + let first_items = custom_tool_output_items(&first_request, "call-first"); + let first_cell_id = extract_running_cell_id(text_item(&first_items, /*index*/ 0)); + + responses::mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-5"), + ev_custom_tool_call("call-second", "exec", r#"store("b", 4);"#), + ev_completed("resp-5"), + ]), + ) + .await; + responses::mount_sse_once( + &server, + sse(vec![ + ev_assistant_message("msg-3", "second complete"), + ev_completed("resp-6"), + ]), + ) + .await; + + test.submit_turn("write the second key").await?; + + fs::write(&first_gate, "ready")?; + responses::mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-7"), + responses::ev_function_call( + "call-wait", + "wait", + &serde_json::to_string(&serde_json::json!({ + "cell_id": first_cell_id, + "yield_time_ms": 1_000, + }))?, + ), + ev_completed("resp-7"), + ]), + ) + .await; + responses::mount_sse_once( + &server, + sse(vec![ + ev_assistant_message("msg-4", "first completed"), + ev_completed("resp-8"), + ]), + ) + .await; + + test.submit_turn("complete the first store").await?; + + responses::mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-9"), + ev_custom_tool_call( + "call-check", + "exec", + r#"text(JSON.stringify({ a: load("a"), b: load("b") }));"#, + ), + ev_completed("resp-9"), + ]), + ) + .await; + let check_response = responses::mount_sse_once( + &server, + sse(vec![ + ev_assistant_message("msg-5", "checked"), + ev_completed("resp-10"), + ]), + ) + .await; + + test.submit_turn("check merged stored values").await?; + + let check_request = check_response.single_request(); + let stored_values: Value = serde_json::from_str( + &custom_tool_output_last_non_empty_text(&check_request, "call-check") + .expect("checking stored values should emit JSON"), + )?; + assert_eq!(stored_values, serde_json::json!({ "a": 3, "b": 4 })); + + Ok(()) +} + #[cfg_attr(windows, ignore = "no exec_command on Windows")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn code_mode_wait_can_terminate_and_continue() -> Result<()> {