mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
[2/3] core: track starting environments in snapshots (#28683)
## Why Remote environments may still be resolving when Codex creates a session or turn. Waiting for the existing all-or-nothing environment snapshot can hold startup until the selected environment is usable. Behind the default-off `deferred_executor` feature, let callers take a useful snapshot immediately: completed environments remain available normally, while unfinished environments are reported without blocking startup. With the feature disabled, snapshots preserve the existing blocking behavior. Depends on #28674. ## What changed - Store one ordered list of selected environments in `ThreadEnvironments`. Each selection owns one shared resolution that produces its complete `TurnEnvironment`. - Start new resolutions in the background with `remote_handle()`, allowing snapshots and the future wait tool to share the same result while cancellation follows the retained handles. - Make `snapshot()` a read-only operation: nonblocking snapshots collect completed resolutions and retain handles for unfinished ones, while blocking snapshots await every resolution. - Replace completed failed resolutions from the current manager entry and log when failed environments are omitted. - Return attached and starting environments as a point-in-time view, and count starting environments when deciding whether a snapshot is local-only. - Keep existing consumers attached-only. `to_selections()` derives from attached environments, so child threads do not inherit an environment that is still starting. ## Test plan - `just test -p codex-core environment_selection` - `just test -p codex-core deferred_executor_reaches_model_before_remote_environment_is_ready` ## Landing note Keep `deferred_executor` disabled for slow-starting executors until configurable `environment/add` connection timeouts and caller support land. When enabled, an environment that attaches after session startup may remain absent from environment-derived model context, tools, instructions, skills, and related state until follow-up refresh work lands.
This commit is contained in:
committed by
GitHub
Unverified
parent
41988e6a24
commit
45a133bae0
@@ -479,6 +479,9 @@
|
||||
"default_mode_request_user_input": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"deferred_executor": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"elevated_windows_sandbox": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -4712,6 +4715,9 @@
|
||||
"default_mode_request_user_input": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"deferred_executor": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"elevated_windows_sandbox": {
|
||||
"type": "boolean"
|
||||
},
|
||||
|
||||
@@ -277,6 +277,7 @@ fn resolved_local_environments<const N: usize>(
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
starting: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use codex_exec_server::Environment;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_exec_server::ExecServerError;
|
||||
use codex_exec_server::ExecutorFileSystem;
|
||||
use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::error::Result as CodexResult;
|
||||
use codex_protocol::protocol::TurnEnvironmentSelection;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_path_uri::PathUri;
|
||||
@@ -31,13 +32,37 @@ pub(crate) fn default_thread_environment_selections(
|
||||
.collect()
|
||||
}
|
||||
|
||||
type SnapshotTask = Shared<BoxFuture<'static, TurnEnvironmentSnapshot>>;
|
||||
type TurnEnvironmentResult = Result<TurnEnvironment, Arc<ExecServerError>>;
|
||||
type TurnEnvironmentResolution = Shared<BoxFuture<'static, TurnEnvironmentResult>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct SelectedTurnEnvironment {
|
||||
selection: TurnEnvironmentSelection,
|
||||
resolution: TurnEnvironmentResolution,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct StartingTurnEnvironment {
|
||||
pub(crate) selection: TurnEnvironmentSelection,
|
||||
resolution: TurnEnvironmentResolution,
|
||||
}
|
||||
|
||||
impl fmt::Debug for StartingTurnEnvironment {
|
||||
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
formatter
|
||||
.debug_struct("StartingTurnEnvironment")
|
||||
.field("selection", &self.selection)
|
||||
.field("resolved", &self.resolution.peek().is_some())
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct ThreadEnvironments {
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
local_shell: Shell,
|
||||
shell_snapshot: ShellSnapshot,
|
||||
snapshot_task: ArcSwap<SnapshotTask>,
|
||||
non_blocking_snapshots: bool,
|
||||
environments: ArcSwap<Vec<SelectedTurnEnvironment>>,
|
||||
}
|
||||
|
||||
impl ThreadEnvironments {
|
||||
@@ -46,132 +71,142 @@ impl ThreadEnvironments {
|
||||
local_shell: Shell,
|
||||
shell_snapshot: ShellSnapshot,
|
||||
current: TurnEnvironmentSnapshot,
|
||||
non_blocking_snapshots: bool,
|
||||
) -> Self {
|
||||
// Reuse only attached environments from the supplied snapshot; drop starting entries.
|
||||
let environments = current
|
||||
.turn_environments
|
||||
.into_iter()
|
||||
.map(|environment| {
|
||||
let selection = environment.selection();
|
||||
let resolution: TurnEnvironmentResolution =
|
||||
futures::future::ready(Ok(environment)).boxed().shared();
|
||||
SelectedTurnEnvironment {
|
||||
selection,
|
||||
resolution,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
Self {
|
||||
environment_manager,
|
||||
local_shell,
|
||||
shell_snapshot,
|
||||
snapshot_task: ArcSwap::from_pointee(futures::future::ready(current).boxed().shared()),
|
||||
non_blocking_snapshots,
|
||||
environments: ArcSwap::from_pointee(environments),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn update_selections(&self, environments: &[TurnEnvironmentSelection]) {
|
||||
let previous = self
|
||||
.snapshot_task
|
||||
.load()
|
||||
.peek()
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
let environment_manager = Arc::clone(&self.environment_manager);
|
||||
let local_shell = self.local_shell.clone();
|
||||
let shell_snapshot = self.shell_snapshot.clone();
|
||||
let environments = environments.to_vec();
|
||||
let (snapshot_task, snapshot) = async move {
|
||||
Self::resolve_snapshot(
|
||||
environment_manager,
|
||||
local_shell,
|
||||
shell_snapshot,
|
||||
previous,
|
||||
environments,
|
||||
)
|
||||
.await
|
||||
}
|
||||
.remote_handle();
|
||||
self.snapshot_task
|
||||
.store(Arc::new(snapshot.boxed().shared()));
|
||||
drop(tokio::spawn(snapshot_task));
|
||||
}
|
||||
|
||||
async fn resolve_snapshot(
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
local_shell: Shell,
|
||||
shell_snapshot: ShellSnapshot,
|
||||
current: TurnEnvironmentSnapshot,
|
||||
environments: Vec<TurnEnvironmentSelection>,
|
||||
) -> TurnEnvironmentSnapshot {
|
||||
let previous = self.environments.load();
|
||||
let mut seen_environment_ids = HashSet::with_capacity(environments.len());
|
||||
let mut turn_environments = Vec::with_capacity(environments.len());
|
||||
for selected_environment in &environments {
|
||||
let mut next = Vec::with_capacity(environments.len());
|
||||
for selected_environment in environments {
|
||||
if !seen_environment_ids.insert(selected_environment.environment_id.as_str()) {
|
||||
continue;
|
||||
}
|
||||
let turn_environment = match current.turn_environments.iter().find(|environment| {
|
||||
environment.environment_id == selected_environment.environment_id
|
||||
&& environment.cwd() == &selected_environment.cwd
|
||||
}) {
|
||||
Some(environment) => environment.clone(),
|
||||
None => match Self::resolve_selection(
|
||||
&environment_manager,
|
||||
&local_shell,
|
||||
&shell_snapshot,
|
||||
selected_environment,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(environment) => environment,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"skipping unresolved turn environment `{}`: {err}",
|
||||
selected_environment.environment_id
|
||||
);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
if let Some(environment) = previous
|
||||
.iter()
|
||||
.find(|environment| environment.selection == *selected_environment)
|
||||
&& !matches!(environment.resolution.clone().now_or_never(), Some(Err(_)))
|
||||
{
|
||||
next.push(environment.clone());
|
||||
continue;
|
||||
}
|
||||
|
||||
let environment_id = &selected_environment.environment_id;
|
||||
let Some(environment) = self.environment_manager.get_environment(environment_id) else {
|
||||
tracing::warn!("skipping unknown turn environment `{environment_id}`");
|
||||
continue;
|
||||
};
|
||||
turn_environments.push(turn_environment);
|
||||
let (resolution_task, resolution) = Self::resolve_environment(
|
||||
selected_environment.clone(),
|
||||
environment,
|
||||
self.local_shell.clone(),
|
||||
self.shell_snapshot.clone(),
|
||||
)
|
||||
.remote_handle();
|
||||
drop(tokio::spawn(resolution_task));
|
||||
let resolution = resolution.boxed().shared();
|
||||
next.push(SelectedTurnEnvironment {
|
||||
selection: selected_environment.clone(),
|
||||
resolution,
|
||||
});
|
||||
}
|
||||
TurnEnvironmentSnapshot { turn_environments }
|
||||
self.environments.store(Arc::new(next));
|
||||
}
|
||||
|
||||
async fn resolve_selection(
|
||||
environment_manager: &EnvironmentManager,
|
||||
local_shell: &Shell,
|
||||
shell_snapshot: &ShellSnapshot,
|
||||
selected_environment: &TurnEnvironmentSelection,
|
||||
) -> CodexResult<TurnEnvironment> {
|
||||
let environment_id = selected_environment.environment_id.clone();
|
||||
let environment = environment_manager
|
||||
.get_environment(&environment_id)
|
||||
.ok_or_else(|| {
|
||||
CodexErr::InvalidRequest(format!("unknown turn environment id `{environment_id}`"))
|
||||
})?;
|
||||
let shell = if environment.is_remote() {
|
||||
match environment.info().await {
|
||||
Ok(info) => match Shell::from_environment_shell_info(info.shell) {
|
||||
Ok(shell) => Some(shell),
|
||||
fn resolve_environment(
|
||||
selection: TurnEnvironmentSelection,
|
||||
environment: Arc<Environment>,
|
||||
local_shell: Shell,
|
||||
shell_snapshot: ShellSnapshot,
|
||||
) -> BoxFuture<'static, TurnEnvironmentResult> {
|
||||
async move {
|
||||
let environment_id = &selection.environment_id;
|
||||
if let Err(err) = environment.wait_until_ready().await {
|
||||
tracing::warn!("turn environment `{environment_id}` failed to start: {err}");
|
||||
return Err(Arc::new(err));
|
||||
}
|
||||
let shell = if environment.is_remote() {
|
||||
match environment.info().await {
|
||||
Ok(info) => match Shell::from_environment_shell_info(info.shell) {
|
||||
Ok(shell) => Some(shell),
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"failed to resolve shell for environment `{environment_id}`: {err}"
|
||||
);
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"failed to resolve shell for environment `{environment_id}`: {err}"
|
||||
"failed to get info for environment `{environment_id}`: {err}"
|
||||
);
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
tracing::warn!("failed to get info for environment `{environment_id}`: {err}");
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Some(local_shell.clone())
|
||||
};
|
||||
let mut turn_environment = TurnEnvironment::new(
|
||||
environment_id,
|
||||
environment,
|
||||
selected_environment.cwd.clone(),
|
||||
shell,
|
||||
);
|
||||
let task = shell_snapshot
|
||||
.clone()
|
||||
.build(turn_environment.clone())
|
||||
.boxed()
|
||||
.shared();
|
||||
drop(tokio::spawn(task.clone()));
|
||||
turn_environment.shell_snapshot = task;
|
||||
Ok(turn_environment)
|
||||
} else {
|
||||
Some(local_shell)
|
||||
};
|
||||
let mut turn_environment =
|
||||
TurnEnvironment::new(selection.environment_id, environment, selection.cwd, shell);
|
||||
let task = shell_snapshot
|
||||
.build(turn_environment.clone())
|
||||
.boxed()
|
||||
.shared();
|
||||
drop(tokio::spawn(task.clone()));
|
||||
turn_environment.shell_snapshot = task;
|
||||
Ok(turn_environment)
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
|
||||
pub(crate) async fn snapshot(&self) -> TurnEnvironmentSnapshot {
|
||||
self.snapshot_task.load_full().as_ref().clone().await
|
||||
let current = self.environments.load_full();
|
||||
let mut turn_environments = Vec::with_capacity(current.len());
|
||||
let mut starting = Vec::new();
|
||||
for environment in current.iter() {
|
||||
let resolved = if self.non_blocking_snapshots {
|
||||
environment.resolution.clone().now_or_never()
|
||||
} else {
|
||||
Some(environment.resolution.clone().await)
|
||||
};
|
||||
match resolved {
|
||||
Some(Ok(turn_environment)) => turn_environments.push(turn_environment),
|
||||
Some(Err(err)) => tracing::debug!(
|
||||
environment_id = %environment.selection.environment_id,
|
||||
"skipping failed turn environment: {err}"
|
||||
),
|
||||
None => starting.push(StartingTurnEnvironment {
|
||||
selection: environment.selection.clone(),
|
||||
resolution: environment.resolution.clone(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
TurnEnvironmentSnapshot {
|
||||
turn_environments,
|
||||
starting,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn environment_manager(&self) -> Arc<EnvironmentManager> {
|
||||
@@ -182,6 +217,7 @@ impl ThreadEnvironments {
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub(crate) struct TurnEnvironmentSnapshot {
|
||||
pub(crate) turn_environments: Vec<TurnEnvironment>,
|
||||
pub(crate) starting: Vec<StartingTurnEnvironment>,
|
||||
}
|
||||
|
||||
impl TurnEnvironmentSnapshot {
|
||||
@@ -214,6 +250,9 @@ impl TurnEnvironmentSnapshot {
|
||||
}
|
||||
|
||||
pub(crate) fn single_local_environment(&self) -> Option<&TurnEnvironment> {
|
||||
if !self.starting.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let [environment] = self.turn_environments.as_slice() else {
|
||||
return None;
|
||||
};
|
||||
@@ -230,6 +269,8 @@ impl TurnEnvironmentSnapshot {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_exec_server::Environment;
|
||||
use codex_exec_server::ExecServerRuntimePaths;
|
||||
use codex_exec_server::LOCAL_ENVIRONMENT_ID;
|
||||
@@ -237,7 +278,16 @@ mod tests {
|
||||
use codex_protocol::protocol::TurnEnvironmentSelection;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_path_uri::PathUri;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::accept_async;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -250,6 +300,7 @@ mod tests {
|
||||
crate::shell::default_user_shell(),
|
||||
ShellSnapshot::disabled(),
|
||||
TurnEnvironmentSnapshot::default(),
|
||||
/*non_blocking_snapshots*/ false,
|
||||
));
|
||||
turn_environments.update_selections(selections);
|
||||
turn_environments.snapshot().await;
|
||||
@@ -264,6 +315,61 @@ mod tests {
|
||||
.expect("runtime paths")
|
||||
}
|
||||
|
||||
async fn read_websocket_json(websocket: &mut WebSocketStream<TcpStream>) -> Value {
|
||||
loop {
|
||||
match timeout(std::time::Duration::from_secs(5), websocket.next())
|
||||
.await
|
||||
.expect("websocket read should not time out")
|
||||
.expect("websocket should stay open")
|
||||
.expect("websocket frame should read")
|
||||
{
|
||||
Message::Text(text) => {
|
||||
return serde_json::from_str(text.as_ref()).expect("valid JSON-RPC message");
|
||||
}
|
||||
Message::Binary(bytes) => {
|
||||
return serde_json::from_slice(bytes.as_ref()).expect("valid JSON-RPC message");
|
||||
}
|
||||
Message::Ping(_) | Message::Pong(_) => {}
|
||||
other => panic!("expected JSON-RPC message, got {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn serve_environment_info(listener: TcpListener) {
|
||||
let (stream, _) = listener.accept().await.expect("connection");
|
||||
let mut websocket = accept_async(stream).await.expect("websocket handshake");
|
||||
|
||||
let initialize = read_websocket_json(&mut websocket).await;
|
||||
assert_eq!(initialize["method"], "initialize");
|
||||
websocket
|
||||
.send(Message::Text(
|
||||
serde_json::json!({
|
||||
"id": initialize["id"],
|
||||
"result": { "sessionId": "test-session" }
|
||||
})
|
||||
.to_string()
|
||||
.into(),
|
||||
))
|
||||
.await
|
||||
.expect("initialize response");
|
||||
let initialized = read_websocket_json(&mut websocket).await;
|
||||
assert_eq!(initialized["method"], "initialized");
|
||||
|
||||
let info = read_websocket_json(&mut websocket).await;
|
||||
assert_eq!(info["method"], "environment/info");
|
||||
websocket
|
||||
.send(Message::Text(
|
||||
serde_json::json!({
|
||||
"id": info["id"],
|
||||
"result": { "shell": { "name": "zsh", "path": "/bin/zsh" } }
|
||||
})
|
||||
.to_string()
|
||||
.into(),
|
||||
))
|
||||
.await
|
||||
.expect("environment info response");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn default_thread_environment_selections_use_manager_default_id() {
|
||||
let cwd = AbsolutePathBuf::current_dir().expect("cwd");
|
||||
@@ -340,6 +446,7 @@ url = "ws://127.0.0.1:8765"
|
||||
local_shell.clone(),
|
||||
ShellSnapshot::disabled(),
|
||||
TurnEnvironmentSnapshot::default(),
|
||||
/*non_blocking_snapshots*/ false,
|
||||
);
|
||||
turn_environments.update_selections(&[TurnEnvironmentSelection {
|
||||
environment_id: LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
@@ -448,8 +555,53 @@ url = "ws://127.0.0.1:8765"
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn latest_environment_update_wins_while_previous_resolution_is_pending() {
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
|
||||
async fn blocking_snapshot_waits_for_starting_environment() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("bind websocket listener");
|
||||
let manager = Arc::new(
|
||||
EnvironmentManager::create_for_tests(
|
||||
Some(format!(
|
||||
"ws://{}",
|
||||
listener.local_addr().expect("listener address")
|
||||
)),
|
||||
Some(test_runtime_paths()),
|
||||
)
|
||||
.await,
|
||||
);
|
||||
let selection = TurnEnvironmentSelection {
|
||||
environment_id: REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
cwd: PathUri::from_abs_path(&AbsolutePathBuf::current_dir().expect("cwd")),
|
||||
};
|
||||
let environments = Arc::new(ThreadEnvironments::new(
|
||||
manager,
|
||||
crate::shell::default_user_shell(),
|
||||
ShellSnapshot::disabled(),
|
||||
TurnEnvironmentSnapshot::default(),
|
||||
/*non_blocking_snapshots*/ false,
|
||||
));
|
||||
environments.update_selections(std::slice::from_ref(&selection));
|
||||
let snapshot_task = tokio::spawn({
|
||||
let environments = Arc::clone(&environments);
|
||||
async move { environments.snapshot().await }
|
||||
});
|
||||
tokio::task::yield_now().await;
|
||||
assert!(!snapshot_task.is_finished());
|
||||
|
||||
let server = tokio::spawn(serve_environment_info(listener));
|
||||
let snapshot = timeout(Duration::from_secs(5), snapshot_task)
|
||||
.await
|
||||
.expect("snapshot should finish after the environment starts")
|
||||
.expect("snapshot task");
|
||||
|
||||
assert!(snapshot.starting.is_empty());
|
||||
assert_eq!(snapshot.to_selections(), vec![selection]);
|
||||
server.await.expect("server task");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn snapshot_keeps_starting_environment_until_it_can_be_attached() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("bind websocket listener");
|
||||
let manager = Arc::new(
|
||||
@@ -463,43 +615,115 @@ url = "ws://127.0.0.1:8765"
|
||||
.await,
|
||||
);
|
||||
let cwd = AbsolutePathBuf::current_dir().expect("cwd");
|
||||
let turn_environments = Arc::new(ThreadEnvironments::new(
|
||||
let cwd = PathUri::from_abs_path(&cwd);
|
||||
let remote = TurnEnvironmentSelection {
|
||||
environment_id: REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
cwd: cwd.clone(),
|
||||
};
|
||||
let local = TurnEnvironmentSelection {
|
||||
environment_id: LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
cwd,
|
||||
};
|
||||
let turn_environments = ThreadEnvironments::new(
|
||||
manager,
|
||||
crate::shell::default_user_shell(),
|
||||
ShellSnapshot::disabled(),
|
||||
TurnEnvironmentSnapshot::default(),
|
||||
));
|
||||
turn_environments.update_selections(&[TurnEnvironmentSelection {
|
||||
environment_id: REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
cwd: PathUri::from_abs_path(&cwd),
|
||||
}]);
|
||||
let (_connection, _) =
|
||||
tokio::time::timeout(std::time::Duration::from_secs(5), listener.accept())
|
||||
.await
|
||||
.expect("remote resolution should connect")
|
||||
.expect("accept remote resolution connection");
|
||||
let local = TurnEnvironmentSelection {
|
||||
environment_id: LOCAL_ENVIRONMENT_ID.to_string(),
|
||||
cwd: PathUri::from_abs_path(&cwd),
|
||||
};
|
||||
/*non_blocking_snapshots*/ true,
|
||||
);
|
||||
turn_environments.update_selections(&[remote.clone(), local.clone()]);
|
||||
|
||||
turn_environments.update_selections(std::slice::from_ref(&local));
|
||||
let snapshot = tokio::time::timeout(
|
||||
let starting = turn_environments.snapshot().await;
|
||||
assert!(starting.turn_environments.is_empty());
|
||||
assert_eq!(
|
||||
starting
|
||||
.starting
|
||||
.iter()
|
||||
.map(|environment| environment.selection.clone())
|
||||
.collect::<Vec<_>>(),
|
||||
vec![remote.clone(), local.clone()]
|
||||
);
|
||||
assert!(starting.to_selections().is_empty());
|
||||
assert!(starting.single_local_environment().is_none());
|
||||
|
||||
let server = tokio::spawn(serve_environment_info(listener));
|
||||
timeout(
|
||||
std::time::Duration::from_secs(5),
|
||||
turn_environments.snapshot(),
|
||||
starting.starting[0].resolution.clone(),
|
||||
)
|
||||
.await
|
||||
.expect("latest environment resolution should complete");
|
||||
.expect("environment resolution should finish")
|
||||
.expect("environment resolution should succeed");
|
||||
let attached = turn_environments.snapshot().await;
|
||||
|
||||
assert_eq!(snapshot.to_selections(), vec![local]);
|
||||
assert!(attached.starting.is_empty());
|
||||
assert_eq!(
|
||||
attached
|
||||
.turn_environments
|
||||
.iter()
|
||||
.map(TurnEnvironment::selection)
|
||||
.collect::<Vec<_>>(),
|
||||
vec![remote.clone(), local.clone()]
|
||||
);
|
||||
assert_eq!(attached.to_selections(), vec![remote, local]);
|
||||
server.await.expect("server task");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn matching_environment_id_and_cwd_reuse_resolved_environment() {
|
||||
let cwd = AbsolutePathBuf::current_dir().expect("cwd");
|
||||
async fn failed_resolution_is_replaced_from_the_environment_manager() {
|
||||
let manager = Arc::new(
|
||||
EnvironmentManager::create_for_tests(
|
||||
Some("ws://127.0.0.1:8765".to_string()),
|
||||
Some("http://example.com".to_string()),
|
||||
Some(test_runtime_paths()),
|
||||
)
|
||||
.await,
|
||||
);
|
||||
let selection = TurnEnvironmentSelection {
|
||||
environment_id: REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
cwd: PathUri::from_abs_path(&AbsolutePathBuf::current_dir().expect("cwd")),
|
||||
};
|
||||
let environments = ThreadEnvironments::new(
|
||||
Arc::clone(&manager),
|
||||
crate::shell::default_user_shell(),
|
||||
ShellSnapshot::disabled(),
|
||||
TurnEnvironmentSnapshot::default(),
|
||||
/*non_blocking_snapshots*/ true,
|
||||
);
|
||||
environments.update_selections(std::slice::from_ref(&selection));
|
||||
let failed_resolution = environments.environments.load()[0].resolution.clone();
|
||||
assert!(failed_resolution.clone().await.is_err());
|
||||
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("bind replacement listener");
|
||||
manager
|
||||
.upsert_environment(
|
||||
REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
format!("ws://{}", listener.local_addr().expect("listener address")),
|
||||
)
|
||||
.expect("replacement environment");
|
||||
environments.update_selections(std::slice::from_ref(&selection));
|
||||
|
||||
let replacement = environments.snapshot().await;
|
||||
let [replacement] = replacement.starting.as_slice() else {
|
||||
panic!("expected the replacement environment to be starting");
|
||||
};
|
||||
assert_eq!(replacement.selection, selection);
|
||||
assert!(!failed_resolution.ptr_eq(&replacement.resolution));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn matching_environment_id_and_cwd_reuse_resolution() {
|
||||
let cwd = AbsolutePathBuf::current_dir().expect("cwd");
|
||||
let first_listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("bind first listener");
|
||||
let manager = Arc::new(
|
||||
EnvironmentManager::create_for_tests(
|
||||
Some(format!(
|
||||
"ws://{}",
|
||||
first_listener.local_addr().expect("first listener address")
|
||||
)),
|
||||
Some(test_runtime_paths()),
|
||||
)
|
||||
.await,
|
||||
@@ -508,43 +732,98 @@ url = "ws://127.0.0.1:8765"
|
||||
environment_id: REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
cwd: PathUri::from_abs_path(&cwd),
|
||||
};
|
||||
let initial =
|
||||
resolve_turn_environments(Arc::clone(&manager), std::slice::from_ref(&selection)).await;
|
||||
let environments = ThreadEnvironments::new(
|
||||
Arc::clone(&manager),
|
||||
crate::shell::default_user_shell(),
|
||||
ShellSnapshot::disabled(),
|
||||
TurnEnvironmentSnapshot::default(),
|
||||
/*non_blocking_snapshots*/ true,
|
||||
);
|
||||
environments.update_selections(std::slice::from_ref(&selection));
|
||||
let initial_snapshot = environments.snapshot().await;
|
||||
let second_listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("bind second listener");
|
||||
manager
|
||||
.upsert_environment(
|
||||
REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
format!(
|
||||
"ws://{}",
|
||||
second_listener
|
||||
.local_addr()
|
||||
.expect("second listener address")
|
||||
),
|
||||
)
|
||||
.expect("replace environment");
|
||||
|
||||
environments.update_selections(std::slice::from_ref(&selection));
|
||||
let reused_snapshot = environments.snapshot().await;
|
||||
environments.update_selections(&[TurnEnvironmentSelection {
|
||||
cwd: PathUri::from_abs_path(&cwd.join("changed")),
|
||||
..selection
|
||||
}]);
|
||||
let changed_snapshot = environments.snapshot().await;
|
||||
|
||||
let initial = initial_snapshot
|
||||
.starting
|
||||
.first()
|
||||
.expect("initial environment");
|
||||
let reused = reused_snapshot
|
||||
.starting
|
||||
.first()
|
||||
.expect("reused environment");
|
||||
let changed = changed_snapshot
|
||||
.starting
|
||||
.first()
|
||||
.expect("changed environment");
|
||||
assert!(initial.resolution.ptr_eq(&reused.resolution));
|
||||
assert!(!reused.resolution.ptr_eq(&changed.resolution));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn inherited_environment_reuses_parent_handle() {
|
||||
let cwd = AbsolutePathBuf::current_dir().expect("cwd");
|
||||
let selection = TurnEnvironmentSelection {
|
||||
environment_id: REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
cwd: PathUri::from_abs_path(&cwd),
|
||||
};
|
||||
let inherited_environment = Arc::new(
|
||||
Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string()))
|
||||
.expect("inherited environment"),
|
||||
);
|
||||
let inherited = TurnEnvironment::new(
|
||||
selection.environment_id.clone(),
|
||||
Arc::clone(&inherited_environment),
|
||||
selection.cwd.clone(),
|
||||
/*shell*/ None,
|
||||
);
|
||||
let manager = Arc::new(EnvironmentManager::without_environments());
|
||||
manager
|
||||
.upsert_environment(
|
||||
REMOTE_ENVIRONMENT_ID.to_string(),
|
||||
"ws://127.0.0.1:9876".to_string(),
|
||||
)
|
||||
.expect("replace environment");
|
||||
.expect("replacement environment");
|
||||
let environments = ThreadEnvironments::new(
|
||||
manager,
|
||||
crate::shell::default_user_shell(),
|
||||
ShellSnapshot::disabled(),
|
||||
TurnEnvironmentSnapshot {
|
||||
turn_environments: vec![inherited],
|
||||
starting: Vec::new(),
|
||||
},
|
||||
/*non_blocking_snapshots*/ false,
|
||||
);
|
||||
|
||||
let initial_snapshot = initial.snapshot().await;
|
||||
initial.update_selections(std::slice::from_ref(&selection));
|
||||
let reused_snapshot = initial.snapshot().await;
|
||||
initial.update_selections(&[TurnEnvironmentSelection {
|
||||
cwd: PathUri::from_abs_path(&cwd.join("changed")),
|
||||
..selection
|
||||
}]);
|
||||
let changed_snapshot = initial.snapshot().await;
|
||||
environments.update_selections(std::slice::from_ref(&selection));
|
||||
let snapshot = environments.snapshot().await;
|
||||
|
||||
assert!(Arc::ptr_eq(
|
||||
&initial_snapshot
|
||||
&snapshot
|
||||
.primary()
|
||||
.expect("initial environment")
|
||||
.environment,
|
||||
&reused_snapshot
|
||||
.primary()
|
||||
.expect("reused environment")
|
||||
.environment,
|
||||
));
|
||||
assert!(!Arc::ptr_eq(
|
||||
&reused_snapshot
|
||||
.primary()
|
||||
.expect("reused environment")
|
||||
.environment,
|
||||
&changed_snapshot
|
||||
.primary()
|
||||
.expect("changed environment")
|
||||
.expect("inherited environment")
|
||||
.environment,
|
||||
&inherited_environment,
|
||||
));
|
||||
}
|
||||
|
||||
@@ -573,6 +852,7 @@ url = "ws://127.0.0.1:8765"
|
||||
cwd_uri.clone(),
|
||||
/*shell*/ None,
|
||||
)],
|
||||
starting: Vec::new(),
|
||||
};
|
||||
let multiple = TurnEnvironmentSnapshot {
|
||||
turn_environments: vec![
|
||||
@@ -584,6 +864,7 @@ url = "ws://127.0.0.1:8765"
|
||||
/*shell*/ None,
|
||||
),
|
||||
],
|
||||
starting: Vec::new(),
|
||||
};
|
||||
|
||||
assert_eq!(local.single_local_environment_cwd(), Some(cwd));
|
||||
|
||||
@@ -826,6 +826,7 @@ impl Session {
|
||||
default_shell.clone(),
|
||||
shell_snapshot,
|
||||
inherited_environments.unwrap_or_default(),
|
||||
config.features.enabled(Feature::DeferredExecutor),
|
||||
));
|
||||
turn_environments.update_selections(session_configuration.environment_selections());
|
||||
let resolved_environments = turn_environments.snapshot().await;
|
||||
|
||||
@@ -4101,6 +4101,7 @@ async fn resolved_environments_for_configuration(
|
||||
default_user_shell(),
|
||||
ShellSnapshot::disabled(),
|
||||
TurnEnvironmentSnapshot::default(),
|
||||
/*non_blocking_snapshots*/ false,
|
||||
);
|
||||
turn_environments.update_selections(session_configuration.environment_selections());
|
||||
(environment_manager, turn_environments.snapshot().await)
|
||||
@@ -5010,6 +5011,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
default_user_shell(),
|
||||
ShellSnapshot::disabled(),
|
||||
resolved_environments,
|
||||
/*non_blocking_snapshots*/ false,
|
||||
));
|
||||
let environment = Arc::clone(
|
||||
&resolved_turn_environments
|
||||
@@ -7065,6 +7067,7 @@ where
|
||||
default_user_shell(),
|
||||
ShellSnapshot::disabled(),
|
||||
resolved_turn_environments.clone(),
|
||||
/*non_blocking_snapshots*/ false,
|
||||
));
|
||||
let environment = Arc::clone(
|
||||
&resolved_turn_environments
|
||||
|
||||
@@ -55,6 +55,7 @@ use serde_json::json;
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
use tempfile::TempDir;
|
||||
@@ -299,6 +300,39 @@ async fn explicit_remote_shell_runs_in_remote_cwd() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn deferred_executor_reaches_model_before_remote_environment_is_ready() -> Result<()> {
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
|
||||
let server = start_mock_server().await;
|
||||
let response_mock = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let mut builder = test_codex()
|
||||
.with_exec_server_url(format!("ws://{}", listener.local_addr()?))
|
||||
.with_config(|config| {
|
||||
assert!(config.features.enable(Feature::DeferredExecutor).is_ok());
|
||||
});
|
||||
|
||||
let test = tokio::time::timeout(Duration::from_secs(5), builder.build(&server))
|
||||
.await
|
||||
.context("thread startup should not wait for the remote environment")??;
|
||||
tokio::time::timeout(
|
||||
Duration::from_secs(5),
|
||||
test.submit_turn("respond before the environment is ready"),
|
||||
)
|
||||
.await
|
||||
.context("turn should reach the model before the remote environment is ready")??;
|
||||
|
||||
response_mock.single_request();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn absolute_path(path: PathBuf) -> AbsolutePathBuf {
|
||||
AbsolutePathBuf::try_from(path).expect("path should be absolute")
|
||||
}
|
||||
|
||||
@@ -124,6 +124,8 @@ pub enum Feature {
|
||||
UseLegacyLandlock,
|
||||
/// Experimental shell snapshotting.
|
||||
ShellSnapshot,
|
||||
/// Allow turns to start while selected executors are still starting.
|
||||
DeferredExecutor,
|
||||
/// Enable runtime metrics snapshots via a manual reader.
|
||||
RuntimeMetrics,
|
||||
/// Enable startup memory extraction and file-backed memory consolidation.
|
||||
@@ -816,6 +818,12 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
stage: Stage::Stable,
|
||||
default_enabled: true,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::DeferredExecutor,
|
||||
key: "deferred_executor",
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::JsRepl,
|
||||
key: "js_repl",
|
||||
|
||||
Reference in New Issue
Block a user