code-mode: merge stored values by key (#24159)

## Summary

Change code-mode stored value updates to merge writes by key instead of
replacing the session's complete stored-value map after each cell
completes.

Previously, each cell received a snapshot of stored values and returned
the complete resulting map. When multiple cells ran concurrently, a
later completion could overwrite values written by another cell because
it committed an older snapshot.

This change moves stored-value ownership into `CodeModeService`:

- Each runtime starts from the service's current stored values.
- Runtime completion reports only keys written by that cell.
- The service merges those writes into the current stored-value map on
successful completion.
- Core no longer replaces its stored-value state from a cell result.

As a result, concurrently executing cells can update different stored
keys without clobbering one another.

The move into CodeModeService is motivated by a desire to have this
lifetime tied to a new lifetime object on that side in a subsequent PR.
This commit is contained in:
Channing Conger
2026-05-22 19:09:02 -07:00
committed by GitHub
Unverified
parent 0febb1100f
commit f94157a4b2
7 changed files with 188 additions and 77 deletions
+2 -1
View File
@@ -157,7 +157,8 @@ pub(super) fn store_callback(
}
};
if let Some(state) = scope.get_slot_mut::<RuntimeState>() {
state.stored_values.insert(key, serialized);
state.stored_values.insert(key.clone(), serialized.clone());
state.stored_value_writes.insert(key, serialized);
}
}
+18 -21
View File
@@ -35,7 +35,6 @@ pub struct ExecuteRequest {
pub tool_call_id: String,
pub enabled_tools: Vec<ToolDefinition>,
pub source: String,
pub stored_values: HashMap<String, JsonValue>,
pub yield_time_ms: Option<u64>,
pub max_output_tokens: Option<usize>,
}
@@ -116,7 +115,6 @@ pub enum RuntimeResponse {
Result {
cell_id: String,
content_items: Vec<FunctionCallOutputContentItem>,
stored_values: HashMap<String, JsonValue>,
error_text: Option<String>,
},
}
@@ -182,12 +180,13 @@ pub(crate) enum RuntimeEvent {
text: String,
},
Result {
stored_values: HashMap<String, JsonValue>,
stored_value_writes: HashMap<String, JsonValue>,
error_text: Option<String>,
},
}
pub(crate) fn spawn_runtime(
stored_values: HashMap<String, JsonValue>,
request: ExecuteRequest,
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
pending_mode: PendingRuntimeMode,
@@ -214,7 +213,7 @@ pub(crate) fn spawn_runtime(
tool_call_id: request.tool_call_id,
enabled_tools,
source: request.source,
stored_values: request.stored_values,
stored_values,
};
thread::spawn(move || {
@@ -248,6 +247,7 @@ pub(super) struct RuntimeState {
pending_tool_calls: HashMap<String, v8::Global<v8::PromiseResolver>>,
pending_timeouts: HashMap<u64, timers::ScheduledTimeout>,
stored_values: HashMap<String, JsonValue>,
stored_value_writes: HashMap<String, JsonValue>,
enabled_tools: Vec<EnabledToolMetadata>,
next_tool_call_id: u64,
next_timeout_id: u64,
@@ -259,7 +259,7 @@ pub(super) struct RuntimeState {
pub(super) enum CompletionState {
Pending,
Completed {
stored_values: HashMap<String, JsonValue>,
stored_value_writes: HashMap<String, JsonValue>,
error_text: Option<String>,
},
}
@@ -305,6 +305,7 @@ fn run_runtime(
pending_tool_calls: HashMap::new(),
pending_timeouts: HashMap::new(),
stored_values: config.stored_values,
stored_value_writes: HashMap::new(),
enabled_tools: config.enabled_tools,
next_tool_call_id: 1,
next_timeout_id: 1,
@@ -330,10 +331,10 @@ fn run_runtime(
match module_loader::completion_state(scope, pending_promise.as_ref()) {
CompletionState::Completed {
stored_values,
stored_value_writes,
error_text,
} => {
send_result(&event_tx, stored_values, error_text);
send_result(&event_tx, stored_value_writes, error_text);
return;
}
CompletionState::Pending => {}
@@ -375,10 +376,10 @@ fn run_runtime(
scope.perform_microtask_checkpoint();
match module_loader::completion_state(scope, pending_promise.as_ref()) {
CompletionState::Completed {
stored_values,
stored_value_writes,
error_text,
} => {
send_result(&event_tx, stored_values, error_text);
send_result(&event_tx, stored_value_writes, error_text);
return;
}
CompletionState::Pending => {}
@@ -422,21 +423,21 @@ fn capture_scope_send_error(
event_tx: &mpsc::UnboundedSender<RuntimeEvent>,
error_text: Option<String>,
) {
let stored_values = scope
let stored_value_writes = scope
.get_slot::<RuntimeState>()
.map(|state| state.stored_values.clone())
.map(|state| state.stored_value_writes.clone())
.unwrap_or_default();
send_result(event_tx, stored_values, error_text);
send_result(event_tx, stored_value_writes, error_text);
}
fn send_result(
event_tx: &mpsc::UnboundedSender<RuntimeEvent>,
stored_values: HashMap<String, JsonValue>,
stored_value_writes: HashMap<String, JsonValue>,
error_text: Option<String>,
) {
let _ = event_tx.send(RuntimeEvent::Result {
stored_values,
stored_value_writes,
error_text,
});
}
@@ -463,7 +464,6 @@ mod tests {
tool_call_id: "call_1".to_string(),
enabled_tools: Vec::new(),
source: source.to_string(),
stored_values: HashMap::new(),
yield_time_ms: Some(1),
max_output_tokens: None,
}
@@ -473,6 +473,7 @@ mod tests {
async fn terminate_execution_stops_cpu_bound_module() {
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let (_runtime_tx, _runtime_control_tx, runtime_terminate_handle) = spawn_runtime(
HashMap::new(),
execute_request("while (true) {}"),
event_tx,
PendingRuntimeMode::Continue,
@@ -491,14 +492,9 @@ mod tests {
.await
.unwrap()
.unwrap();
let RuntimeEvent::Result {
stored_values,
error_text,
} = result_event
else {
let RuntimeEvent::Result { error_text, .. } = result_event else {
panic!("expected runtime result after termination");
};
assert_eq!(stored_values, HashMap::new());
assert!(error_text.is_some());
assert!(
@@ -513,6 +509,7 @@ mod tests {
async fn pending_mode_freezes_runtime_commands_until_resume() {
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let (runtime_tx, runtime_control_tx, _runtime_terminate_handle) = spawn_runtime(
HashMap::new(),
execute_request(
r#"
await new Promise((resolve) => setTimeout(resolve, 60_000));
@@ -104,14 +104,14 @@ pub(super) fn completion_state(
scope: &mut v8::PinScope<'_, '_>,
pending_promise: Option<&v8::Global<v8::Promise>>,
) -> CompletionState {
let stored_values = scope
let stored_value_writes = scope
.get_slot::<RuntimeState>()
.map(|state| state.stored_values.clone())
.map(|state| state.stored_value_writes.clone())
.unwrap_or_default();
let Some(pending_promise) = pending_promise else {
return CompletionState::Completed {
stored_values,
stored_value_writes,
error_text: None,
};
};
@@ -120,7 +120,7 @@ pub(super) fn completion_state(
match promise.state() {
v8::PromiseState::Pending => CompletionState::Pending,
v8::PromiseState::Fulfilled => CompletionState::Completed {
stored_values,
stored_value_writes,
error_text: None,
},
v8::PromiseState::Rejected => {
@@ -131,7 +131,7 @@ pub(super) fn completion_state(
Some(value_to_error_text(scope, result))
};
CompletionState::Completed {
stored_values,
stored_value_writes,
error_text,
}
}
+10 -26
View File
@@ -73,14 +73,10 @@ impl CodeModeService {
}
}
pub async fn stored_values(&self) -> HashMap<String, JsonValue> {
async fn stored_values(&self) -> HashMap<String, JsonValue> {
self.inner.stored_values.lock().await.clone()
}
pub async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
*self.inner.stored_values.lock().await = values;
}
/// Reserves the runtime cell id for a future `execute` request.
///
/// The runtime can issue nested tool calls before the first `execute`
@@ -138,6 +134,7 @@ impl CodeModeService {
let cell_id = request.cell_id.clone();
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (control_tx, control_rx) = mpsc::unbounded_channel();
let stored_values = self.stored_values().await;
let (runtime_tx, runtime_control_tx, runtime_terminate_handle) = {
let mut sessions = self.inner.sessions.lock().await;
if sessions.contains_key(&cell_id) {
@@ -145,7 +142,7 @@ impl CodeModeService {
}
let (runtime_tx, runtime_control_tx, runtime_terminate_handle) =
spawn_runtime(request, event_tx, pending_mode)?;
spawn_runtime(stored_values, request, event_tx, pending_mode)?;
// Keep the session registry locked through insertion so a
// caller-owned cell id cannot race with another execute and replace
@@ -349,7 +346,6 @@ enum SessionResponseSender {
struct PendingResult {
content_items: Vec<FunctionCallOutputContentItem>,
stored_values: HashMap<String, JsonValue>,
error_text: Option<String>,
}
@@ -366,7 +362,6 @@ fn missing_cell_response(cell_id: String) -> RuntimeResponse {
error_text: Some(format!("exec cell {cell_id} not found")),
cell_id,
content_items: Vec::new(),
stored_values: HashMap::new(),
}
}
@@ -374,7 +369,6 @@ fn pending_result_response(cell_id: &str, result: PendingResult) -> RuntimeRespo
RuntimeResponse::Result {
cell_id: cell_id.to_string(),
content_items: result.content_items,
stored_values: result.stored_values,
error_text: result.error_text,
}
}
@@ -476,7 +470,6 @@ async fn run_session_control(
if pending_result.is_none() {
let result = PendingResult {
content_items: std::mem::take(&mut content_items),
stored_values: HashMap::new(),
error_text: Some("exec runtime ended unexpectedly".to_string()),
};
if send_or_buffer_result(
@@ -551,7 +544,7 @@ async fn run_session_control(
.await;
}
RuntimeEvent::Result {
stored_values,
stored_value_writes,
error_text,
} => {
yield_timer = None;
@@ -565,9 +558,13 @@ async fn run_session_control(
}
break;
}
inner
.stored_values
.lock()
.await
.extend(stored_value_writes);
let result = PendingResult {
content_items: std::mem::take(&mut content_items),
stored_values,
error_text,
};
if send_or_buffer_result(
@@ -715,7 +712,6 @@ mod tests {
tool_call_id: "call_1".to_string(),
enabled_tools: Vec::new(),
source: source.to_string(),
stored_values: HashMap::new(),
yield_time_ms: Some(1),
max_output_tokens: None,
}
@@ -752,7 +748,6 @@ mod tests {
content_items: vec![FunctionCallOutputContentItem::InputText {
text: "before".to_string(),
}],
stored_values: HashMap::new(),
error_text: None,
}
);
@@ -778,7 +773,6 @@ mod tests {
content_items: vec![FunctionCallOutputContentItem::InputText {
text: "done".to_string(),
}],
stored_values: HashMap::new(),
error_text: None,
})
);
@@ -1108,7 +1102,6 @@ text("done");
content_items: vec![FunctionCallOutputContentItem::InputText {
text: "done".to_string(),
}],
stored_values: HashMap::new(),
error_text: None,
}
))
@@ -1135,7 +1128,6 @@ text("done");
content_items: vec![FunctionCallOutputContentItem::InputText {
text: "false".to_string(),
}],
stored_values: HashMap::new(),
error_text: None,
}
);
@@ -1175,7 +1167,6 @@ text(value);
content_items: vec![FunctionCallOutputContentItem::InputText {
text: "jeudi 2 janvier \u{e0} 03:04:05".to_string(),
}],
stored_values: HashMap::new(),
error_text: None,
}
);
@@ -1214,7 +1205,6 @@ text(formatter.format(new Date("2025-01-02T03:04:05Z")));
content_items: vec![FunctionCallOutputContentItem::InputText {
text: "jeudi 2 janvier \u{e0} 03:04:05".to_string(),
}],
stored_values: HashMap::new(),
error_text: None,
}
);
@@ -1257,7 +1247,6 @@ text(JSON.stringify(returnsUndefined));
text: "[true,true,true]".to_string(),
},
],
stored_values: HashMap::new(),
error_text: None,
}
);
@@ -1292,7 +1281,6 @@ image({
image_url: "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==".to_string(),
detail: Some(crate::ImageDetail::Original),
}],
stored_values: HashMap::new(),
error_text: None,
}
);
@@ -1328,7 +1316,6 @@ image(
image_url: "https://example.com/image.jpg".to_string(),
detail: Some(crate::ImageDetail::Original),
}],
stored_values: HashMap::new(),
error_text: None,
}
);
@@ -1366,7 +1353,6 @@ image(
image_url: "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==".to_string(),
detail: Some(crate::ImageDetail::High),
}],
stored_values: HashMap::new(),
error_text: None,
}
);
@@ -1396,7 +1382,6 @@ image({
RuntimeResponse::Result {
cell_id: "1".to_string(),
content_items: Vec::new(),
stored_values: HashMap::new(),
error_text: Some("image detail must be one of: high, original".to_string()),
}
);
@@ -1433,7 +1418,6 @@ image({
RuntimeResponse::Result {
cell_id: "1".to_string(),
content_items: Vec::new(),
stored_values: HashMap::new(),
error_text: Some(
"image expects a non-empty image URL string, an object with image_url and optional detail, or a raw MCP image block".to_string(),
),
@@ -1459,7 +1443,6 @@ image({
WaitOutcome::MissingCell(RuntimeResponse::Result {
cell_id: "missing".to_string(),
content_items: Vec::new(),
stored_values: HashMap::new(),
error_text: Some("exec cell missing not found".to_string()),
})
);
@@ -1473,6 +1456,7 @@ image({
let (initial_response_tx, initial_response_rx) = oneshot::channel();
let (runtime_event_tx, _runtime_event_rx) = mpsc::unbounded_channel();
let (runtime_tx, runtime_control_tx, runtime_terminate_handle) = spawn_runtime(
HashMap::new(),
ExecuteRequest {
source: "await new Promise(() => {})".to_string(),
yield_time_ms: None,