mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
core: refresh environment context before sampling (#29073)
## Why Nonblocking environment snapshots allow a turn to reach the model while a remote environment is still starting. The initial context can describe that environment as still loading, but nothing currently refreshes the model-visible environment context when startup finishes during the same turn. This adds the first request-scoped reconciliation slice on top of #28683. It is gated by `DeferredExecutor` and intentionally updates only model-visible environment context; tools and other environment-derived state will migrate separately. ## What - Add a minimal `StepContext` containing the environment snapshot captured before each sampling request. - Render attached environments with their resolved shell and starting environments with `still loading`. - Track the latest environment state recorded in model history and append a bounded update only when it changes. - Seed that baseline from full initial context so ready-at-start environments are not duplicated. - Clear the in-memory baseline when history is rewritten so replacement history can be refreshed safely. ## Testing - `just test -p codex-core deferred_executor` - `just test -p codex-core environment_context_baseline_deduplicates_until_history_is_replaced` The integration coverage verifies that a pending environment reaches the first request, the ready state reaches the next request, later requests do not duplicate it, and ready-at-start environments remain single-injected. <details> <summary>Live verification</summary> - Connected to a real remote executor with startup deliberately delayed and forced three sampling requests in one turn. - Inspected the raw model inputs: request 1 showed the remote environment as `still loading`, request 2 appended its ready shell and cwd, and request 3 contained no duplicate ready update. - With the feature disabled, startup waited for the delayed executor and the first request contained only the ready environment. - With a synchronously ready environment and the feature enabled, the first request contained one environment context with no duplicate. - Executed `pwd` and read a marker file through the remote process runner; the command exited successfully and returned the remote cwd and marker contents. </details>
This commit is contained in:
committed by
GitHub
Unverified
parent
91e6da943b
commit
22886f21de
@@ -1,3 +1,4 @@
|
||||
use crate::session::step_context::StepContext;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::session::turn_context::TurnEnvironment;
|
||||
use crate::shell::Shell;
|
||||
@@ -30,34 +31,50 @@ pub(crate) struct EnvironmentContext {
|
||||
pub(crate) struct EnvironmentContextEnvironment {
|
||||
pub(crate) id: String,
|
||||
pub(crate) cwd: PathUri,
|
||||
pub(crate) shell: String,
|
||||
status: EnvironmentContextEnvironmentStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
enum EnvironmentContextEnvironmentStatus {
|
||||
Starting,
|
||||
Ready { shell: String },
|
||||
}
|
||||
|
||||
impl EnvironmentContextEnvironment {
|
||||
fn legacy(cwd: PathUri, shell: String) -> Self {
|
||||
Self {
|
||||
id: String::new(),
|
||||
cwd,
|
||||
shell,
|
||||
}
|
||||
}
|
||||
|
||||
fn from_turn_environments(environments: &[TurnEnvironment], shell: &Shell) -> Vec<Self> {
|
||||
environments
|
||||
.iter()
|
||||
.map(|environment| Self {
|
||||
id: environment.environment_id.clone(),
|
||||
cwd: environment.cwd().clone(),
|
||||
shell: environment
|
||||
.shell
|
||||
.as_ref()
|
||||
.map(|shell| shell.name().to_string())
|
||||
.unwrap_or_else(|| shell.name().to_string()),
|
||||
status: EnvironmentContextEnvironmentStatus::Ready {
|
||||
shell: environment
|
||||
.shell
|
||||
.as_ref()
|
||||
.map(|shell| shell.name().to_string())
|
||||
.unwrap_or_else(|| shell.name().to_string()),
|
||||
},
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl EnvironmentContextEnvironmentStatus {
|
||||
fn equals_except_shell(&self, other: &Self) -> bool {
|
||||
matches!(
|
||||
(self, other),
|
||||
(Self::Starting, Self::Starting) | (Self::Ready { .. }, Self::Ready { .. })
|
||||
)
|
||||
}
|
||||
|
||||
fn render(&self, indent: &str) -> String {
|
||||
match self {
|
||||
Self::Starting => format!("{indent}<status>starting</status>"),
|
||||
Self::Ready { shell } => format!("{indent}<shell>{shell}</shell>"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum EnvironmentContextEnvironments {
|
||||
None,
|
||||
@@ -81,13 +98,16 @@ impl EnvironmentContextEnvironments {
|
||||
fn equals_except_shell(&self, other: &Self) -> bool {
|
||||
match (self, other) {
|
||||
(Self::None, Self::None) => true,
|
||||
(Self::Single(left), Self::Single(right)) => left.cwd == right.cwd,
|
||||
(Self::Single(left), Self::Single(right)) => {
|
||||
left.cwd == right.cwd && left.status.equals_except_shell(&right.status)
|
||||
}
|
||||
(Self::Multiple(left), Self::Multiple(right)) => {
|
||||
left.len() == right.len()
|
||||
&& left
|
||||
.iter()
|
||||
.zip(right.iter())
|
||||
.all(|(left, right)| left.id == right.id && left.cwd == right.cwd)
|
||||
&& left.iter().zip(right.iter()).all(|(left, right)| {
|
||||
left.id == right.id
|
||||
&& left.cwd == right.cwd
|
||||
&& left.status.equals_except_shell(&right.status)
|
||||
})
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
@@ -365,6 +385,48 @@ impl EnvironmentContext {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn from_step_context(step_context: &StepContext, shell: &Shell) -> Option<Self> {
|
||||
let mut environments = EnvironmentContextEnvironment::from_turn_environments(
|
||||
&step_context.environments.turn_environments,
|
||||
shell,
|
||||
);
|
||||
environments.extend(
|
||||
step_context
|
||||
.environments
|
||||
.starting
|
||||
.iter()
|
||||
.map(|environment| EnvironmentContextEnvironment {
|
||||
id: environment.selection.environment_id.clone(),
|
||||
cwd: environment.selection.cwd.clone(),
|
||||
status: EnvironmentContextEnvironmentStatus::Starting,
|
||||
}),
|
||||
);
|
||||
|
||||
Self::environment_only(environments)
|
||||
}
|
||||
|
||||
pub(crate) fn from_attached_environments(
|
||||
environments: &[TurnEnvironment],
|
||||
shell: &Shell,
|
||||
) -> Option<Self> {
|
||||
Self::environment_only(EnvironmentContextEnvironment::from_turn_environments(
|
||||
environments,
|
||||
shell,
|
||||
))
|
||||
}
|
||||
|
||||
fn environment_only(environments: Vec<EnvironmentContextEnvironment>) -> Option<Self> {
|
||||
(!environments.is_empty()).then(|| {
|
||||
Self::new(
|
||||
environments,
|
||||
/*current_date*/ None,
|
||||
/*timezone*/ None,
|
||||
/*network*/ None,
|
||||
/*subagents*/ None,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// Compares two environment contexts, ignoring the shell. Useful when
|
||||
/// comparing turn to turn, since the initial environment_context will
|
||||
/// include the shell, and then it is not configurable from turn to turn.
|
||||
@@ -386,10 +448,11 @@ impl EnvironmentContext {
|
||||
let environments = match &after.environments {
|
||||
EnvironmentContextEnvironments::Single(environment) => {
|
||||
if PathUri::from_abs_path(&before.cwd) != environment.cwd {
|
||||
EnvironmentContextEnvironments::Single(EnvironmentContextEnvironment::legacy(
|
||||
environment.cwd.clone(),
|
||||
environment.shell.clone(),
|
||||
))
|
||||
EnvironmentContextEnvironments::Single(EnvironmentContextEnvironment {
|
||||
id: String::new(),
|
||||
cwd: environment.cwd.clone(),
|
||||
status: environment.status.clone(),
|
||||
})
|
||||
} else {
|
||||
EnvironmentContextEnvironments::None
|
||||
}
|
||||
@@ -442,10 +505,11 @@ impl EnvironmentContext {
|
||||
shell: String,
|
||||
) -> Self {
|
||||
Self::new_with_environments(
|
||||
EnvironmentContextEnvironments::from_vec(vec![EnvironmentContextEnvironment::legacy(
|
||||
PathUri::from_abs_path(&turn_context_item.cwd),
|
||||
shell,
|
||||
)]),
|
||||
EnvironmentContextEnvironments::from_vec(vec![EnvironmentContextEnvironment {
|
||||
id: String::new(),
|
||||
cwd: PathUri::from_abs_path(&turn_context_item.cwd),
|
||||
status: EnvironmentContextEnvironmentStatus::Ready { shell },
|
||||
}]),
|
||||
turn_context_item.current_date.clone(),
|
||||
turn_context_item.timezone.clone(),
|
||||
Self::network_from_turn_context_item(turn_context_item),
|
||||
@@ -538,7 +602,7 @@ impl ContextualUserFragment for EnvironmentContext {
|
||||
EnvironmentContextEnvironments::Single(environment) => {
|
||||
let cwd = environment.cwd.inferred_native_path_string();
|
||||
lines.push(format!(" <cwd>{cwd}</cwd>"));
|
||||
lines.push(format!(" <shell>{}</shell>", environment.shell));
|
||||
lines.push(environment.status.render(" "));
|
||||
}
|
||||
EnvironmentContextEnvironments::Multiple(environments) => {
|
||||
lines.push(" <environments>".to_string());
|
||||
@@ -546,7 +610,7 @@ impl ContextualUserFragment for EnvironmentContext {
|
||||
lines.push(format!(" <environment id=\"{}\">", environment.id));
|
||||
let cwd = environment.cwd.inferred_native_path_string();
|
||||
lines.push(format!(" <cwd>{cwd}</cwd>"));
|
||||
lines.push(format!(" <shell>{}</shell>", environment.shell));
|
||||
lines.push(environment.status.render(" "));
|
||||
lines.push(" </environment>".to_string());
|
||||
}
|
||||
lines.push(" </environments>".to_string());
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::shell::ShellType;
|
||||
|
||||
use super::EnvironmentContextEnvironmentStatus::Ready;
|
||||
use super::*;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use codex_protocol::permissions::FileSystemAccessMode;
|
||||
@@ -37,7 +38,9 @@ fn serialize_workspace_write_environment_context() {
|
||||
vec![EnvironmentContextEnvironment {
|
||||
id: "local".to_string(),
|
||||
cwd: PathUri::from_abs_path(&cwd.abs()),
|
||||
shell: fake_shell_name(),
|
||||
status: Ready {
|
||||
shell: fake_shell_name(),
|
||||
},
|
||||
}],
|
||||
Some("2026-02-26".to_string()),
|
||||
Some("America/Los_Angeles".to_string()),
|
||||
@@ -64,7 +67,9 @@ fn serialize_environment_context_with_foreign_windows_cwd() {
|
||||
vec![EnvironmentContextEnvironment {
|
||||
id: "remote".to_string(),
|
||||
cwd: PathUri::parse("file:///C:/windows").expect("Windows cwd URI"),
|
||||
shell: "powershell".to_string(),
|
||||
status: Ready {
|
||||
shell: "powershell".to_string(),
|
||||
},
|
||||
}],
|
||||
/*current_date*/ None,
|
||||
/*timezone*/ None,
|
||||
@@ -91,7 +96,9 @@ fn serialize_environment_context_with_network() {
|
||||
vec![EnvironmentContextEnvironment {
|
||||
id: "local".to_string(),
|
||||
cwd: PathUri::from_abs_path(&test_abs_path("/repo")),
|
||||
shell: fake_shell_name(),
|
||||
status: Ready {
|
||||
shell: fake_shell_name(),
|
||||
},
|
||||
}],
|
||||
Some("2026-02-26".to_string()),
|
||||
Some("America/Los_Angeles".to_string()),
|
||||
@@ -153,7 +160,9 @@ fn serialize_environment_context_with_full_filesystem_profile() {
|
||||
vec![EnvironmentContextEnvironment {
|
||||
id: "local".to_string(),
|
||||
cwd: PathUri::from_abs_path(&test_abs_path("/repo")),
|
||||
shell: fake_shell_name(),
|
||||
status: Ready {
|
||||
shell: fake_shell_name(),
|
||||
},
|
||||
}],
|
||||
/*current_date*/ None,
|
||||
/*timezone*/ None,
|
||||
@@ -259,7 +268,9 @@ fn equals_except_shell_compares_cwd() {
|
||||
vec![EnvironmentContextEnvironment {
|
||||
id: "local".to_string(),
|
||||
cwd: PathUri::from_abs_path(&test_abs_path("/repo")),
|
||||
shell: fake_shell_name(),
|
||||
status: Ready {
|
||||
shell: fake_shell_name(),
|
||||
},
|
||||
}],
|
||||
/*current_date*/ None,
|
||||
/*timezone*/ None,
|
||||
@@ -270,7 +281,9 @@ fn equals_except_shell_compares_cwd() {
|
||||
vec![EnvironmentContextEnvironment {
|
||||
id: "local".to_string(),
|
||||
cwd: PathUri::from_abs_path(&test_abs_path("/repo")),
|
||||
shell: fake_shell_name(),
|
||||
status: Ready {
|
||||
shell: fake_shell_name(),
|
||||
},
|
||||
}],
|
||||
/*current_date*/ None,
|
||||
/*timezone*/ None,
|
||||
@@ -286,7 +299,9 @@ fn equals_except_shell_compares_cwd_differences() {
|
||||
vec![EnvironmentContextEnvironment {
|
||||
id: "local".to_string(),
|
||||
cwd: PathUri::from_abs_path(&test_abs_path("/repo1")),
|
||||
shell: fake_shell_name(),
|
||||
status: Ready {
|
||||
shell: fake_shell_name(),
|
||||
},
|
||||
}],
|
||||
/*current_date*/ None,
|
||||
/*timezone*/ None,
|
||||
@@ -297,7 +312,9 @@ fn equals_except_shell_compares_cwd_differences() {
|
||||
vec![EnvironmentContextEnvironment {
|
||||
id: "local".to_string(),
|
||||
cwd: PathUri::from_abs_path(&test_abs_path("/repo2")),
|
||||
shell: fake_shell_name(),
|
||||
status: Ready {
|
||||
shell: fake_shell_name(),
|
||||
},
|
||||
}],
|
||||
/*current_date*/ None,
|
||||
/*timezone*/ None,
|
||||
@@ -314,7 +331,9 @@ fn equals_except_shell_ignores_shell() {
|
||||
vec![EnvironmentContextEnvironment {
|
||||
id: "local".to_string(),
|
||||
cwd: PathUri::from_abs_path(&test_abs_path("/repo")),
|
||||
shell: "bash".to_string(),
|
||||
status: Ready {
|
||||
shell: "bash".to_string(),
|
||||
},
|
||||
}],
|
||||
/*current_date*/ None,
|
||||
/*timezone*/ None,
|
||||
@@ -325,7 +344,9 @@ fn equals_except_shell_ignores_shell() {
|
||||
vec![EnvironmentContextEnvironment {
|
||||
id: "other".to_string(),
|
||||
cwd: PathUri::from_abs_path(&test_abs_path("/repo")),
|
||||
shell: "zsh".to_string(),
|
||||
status: Ready {
|
||||
shell: "zsh".to_string(),
|
||||
},
|
||||
}],
|
||||
/*current_date*/ None,
|
||||
/*timezone*/ None,
|
||||
@@ -342,7 +363,9 @@ fn serialize_environment_context_with_subagents() {
|
||||
vec![EnvironmentContextEnvironment {
|
||||
id: "local".to_string(),
|
||||
cwd: PathUri::from_abs_path(&test_abs_path("/repo")),
|
||||
shell: fake_shell_name(),
|
||||
status: Ready {
|
||||
shell: fake_shell_name(),
|
||||
},
|
||||
}],
|
||||
Some("2026-02-26".to_string()),
|
||||
Some("America/Los_Angeles".to_string()),
|
||||
@@ -376,12 +399,16 @@ fn serialize_environment_context_with_multiple_selected_environments() {
|
||||
EnvironmentContextEnvironment {
|
||||
id: "local".to_string(),
|
||||
cwd: PathUri::from_abs_path(&local_cwd.abs()),
|
||||
shell: "bash".to_string(),
|
||||
status: Ready {
|
||||
shell: "bash".to_string(),
|
||||
},
|
||||
},
|
||||
EnvironmentContextEnvironment {
|
||||
id: "remote".to_string(),
|
||||
cwd: PathUri::from_abs_path(&remote_cwd.abs()),
|
||||
shell: "bash".to_string(),
|
||||
status: Ready {
|
||||
shell: "bash".to_string(),
|
||||
},
|
||||
},
|
||||
],
|
||||
Some("2026-02-26".to_string()),
|
||||
@@ -421,12 +448,16 @@ fn serialize_environment_context_prefers_environment_shell_when_present() {
|
||||
EnvironmentContextEnvironment {
|
||||
id: "local".to_string(),
|
||||
cwd: PathUri::from_abs_path(&local_cwd.abs()),
|
||||
shell: "powershell".to_string(),
|
||||
status: Ready {
|
||||
shell: "powershell".to_string(),
|
||||
},
|
||||
},
|
||||
EnvironmentContextEnvironment {
|
||||
id: "remote".to_string(),
|
||||
cwd: PathUri::from_abs_path(&remote_cwd.abs()),
|
||||
shell: "cmd".to_string(),
|
||||
status: Ready {
|
||||
shell: "cmd".to_string(),
|
||||
},
|
||||
},
|
||||
],
|
||||
/*current_date*/ None,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::context::EnvironmentContext;
|
||||
use crate::context_manager::normalize;
|
||||
use crate::event_mapping::has_non_contextual_dev_message_content;
|
||||
use crate::event_mapping::is_contextual_dev_message_content;
|
||||
@@ -48,6 +49,8 @@ pub(crate) struct ContextManager {
|
||||
/// also clear this when it trims a mixed initial-context developer bundle
|
||||
/// whose non-diff fragments no longer exist in the surviving history.
|
||||
reference_context_item: Option<TurnContextItem>,
|
||||
/// Environment state most recently appended to model-visible history.
|
||||
environment_context_baseline: Option<EnvironmentContext>,
|
||||
}
|
||||
|
||||
impl ContextManager {
|
||||
@@ -59,6 +62,7 @@ impl ContextManager {
|
||||
&None, &None, /*model_context_window*/ None,
|
||||
),
|
||||
reference_context_item: None,
|
||||
environment_context_baseline: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,6 +82,17 @@ impl ContextManager {
|
||||
self.reference_context_item.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn update_environment_context_baseline(
|
||||
&mut self,
|
||||
context: &EnvironmentContext,
|
||||
) -> bool {
|
||||
if self.environment_context_baseline.as_ref() == Some(context) {
|
||||
return false;
|
||||
}
|
||||
self.environment_context_baseline = Some(context.clone());
|
||||
true
|
||||
}
|
||||
|
||||
pub(crate) fn set_token_usage_full(&mut self, context_window: i64) {
|
||||
match &mut self.token_info {
|
||||
Some(info) => info.fill_to_context_window(context_window),
|
||||
@@ -163,12 +178,14 @@ impl ContextManager {
|
||||
// its corresponding counterpart to keep the invariants intact without
|
||||
// running a full normalization pass.
|
||||
normalize::remove_corresponding_for(&mut self.items, &removed);
|
||||
self.environment_context_baseline = None;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replace(&mut self, items: Vec<ResponseItem>) {
|
||||
self.items = items;
|
||||
self.history_version = self.history_version.saturating_add(1);
|
||||
self.environment_context_baseline = None;
|
||||
}
|
||||
|
||||
/// Replace image content in the last turn if it originated from a tool output.
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use crate::context::EnvironmentContext;
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use codex_protocol::AgentPath;
|
||||
@@ -73,6 +74,20 @@ fn create_history_with_items(items: Vec<ResponseItem>) -> ContextManager {
|
||||
h
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn environment_context_baseline_deduplicates_until_history_is_replaced() {
|
||||
let context =
|
||||
EnvironmentContext::from_turn_context_item(&reference_context_item(), "bash".to_string());
|
||||
let mut history = ContextManager::new();
|
||||
|
||||
assert!(history.update_environment_context_baseline(&context));
|
||||
assert!(!history.update_environment_context_baseline(&context));
|
||||
|
||||
history.replace(Vec::new());
|
||||
|
||||
assert!(history.update_environment_context_baseline(&context));
|
||||
}
|
||||
|
||||
fn user_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
|
||||
@@ -218,6 +218,7 @@ mod rollout_budget;
|
||||
mod rollout_reconstruction;
|
||||
#[allow(clippy::module_inception)]
|
||||
pub(crate) mod session;
|
||||
pub(crate) mod step_context;
|
||||
pub(crate) mod time_reminder;
|
||||
mod token_budget;
|
||||
pub(crate) mod turn;
|
||||
@@ -2772,6 +2773,35 @@ impl Session {
|
||||
self.send_raw_response_items(turn_context, items).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn record_step_environment_context_if_changed(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
step_context: &step_context::StepContext,
|
||||
) {
|
||||
if !turn_context.config.include_environment_context {
|
||||
return;
|
||||
}
|
||||
|
||||
let shell = self.user_shell();
|
||||
let Some(environment_context) =
|
||||
crate::context::EnvironmentContext::from_step_context(step_context, shell.as_ref())
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let changed = {
|
||||
let mut state = self.state.lock().await;
|
||||
state
|
||||
.history
|
||||
.update_environment_context_baseline(&environment_context)
|
||||
};
|
||||
if !changed {
|
||||
return;
|
||||
}
|
||||
|
||||
let item = ContextualUserFragment::into(environment_context);
|
||||
self.record_conversation_items(turn_context, &[item]).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn record_inter_agent_communication(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
@@ -3438,6 +3468,22 @@ impl Session {
|
||||
.await,
|
||||
);
|
||||
}
|
||||
let initial_environment_context = if should_inject_full_context
|
||||
&& !context_items.is_empty()
|
||||
&& turn_context.config.include_environment_context
|
||||
&& turn_context
|
||||
.config
|
||||
.features
|
||||
.enabled(Feature::DeferredExecutor)
|
||||
{
|
||||
let shell = self.user_shell();
|
||||
crate::context::EnvironmentContext::from_attached_environments(
|
||||
&turn_context.environments.turn_environments,
|
||||
shell.as_ref(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
if !context_items.is_empty() {
|
||||
self.record_conversation_items(turn_context, &context_items)
|
||||
.await;
|
||||
@@ -3451,6 +3497,11 @@ impl Session {
|
||||
// context items. This keeps later runtime diffing aligned with the current turn state.
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_reference_context_item(Some(turn_context_item));
|
||||
if let Some(environment_context) = initial_environment_context {
|
||||
state
|
||||
.history
|
||||
.update_environment_context_baseline(&environment_context);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn update_token_usage_info(
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
use crate::environment_selection::TurnEnvironmentSnapshot;
|
||||
|
||||
/// Request-scoped state that may change between model sampling requests.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct StepContext {
|
||||
pub(crate) environments: TurnEnvironmentSnapshot,
|
||||
}
|
||||
@@ -42,6 +42,7 @@ use crate::responses_retry::handle_retryable_response_stream_error;
|
||||
use crate::session::PreviousTurnSettings;
|
||||
use crate::session::TurnInput;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::step_context::StepContext;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::stream_events_utils::HandleOutputCtx;
|
||||
use crate::stream_events_utils::TurnItemContributorPolicy;
|
||||
@@ -241,6 +242,21 @@ pub(crate) async fn run_turn(
|
||||
)
|
||||
.await?;
|
||||
|
||||
if turn_context
|
||||
.config
|
||||
.features
|
||||
.enabled(Feature::DeferredExecutor)
|
||||
{
|
||||
let step_context = StepContext {
|
||||
environments: sess.services.turn_environments.snapshot().await,
|
||||
};
|
||||
sess.record_step_environment_context_if_changed(
|
||||
turn_context.as_ref(),
|
||||
&step_context,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Construct the input that we will send to the model.
|
||||
let sampling_request_input: Vec<ResponseItem> = async {
|
||||
sess.clone_history()
|
||||
|
||||
@@ -26,6 +26,8 @@ use codex_protocol::protocol::TurnEnvironmentSelection;
|
||||
use codex_protocol::request_permissions::PermissionGrantScope;
|
||||
use codex_protocol::request_permissions::RequestPermissionProfile;
|
||||
use codex_protocol::request_permissions::RequestPermissionsResponse;
|
||||
use codex_protocol::request_user_input::RequestUserInputAnswer;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_path_uri::PathUri;
|
||||
@@ -49,9 +51,13 @@ use core_test_support::test_codex::local;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::test_codex::test_env;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_match;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
@@ -59,6 +65,12 @@ use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
use tempfile::TempDir;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::accept_async;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
async fn unified_exec_test(server: &wiremock::MockServer) -> Result<TestCodex> {
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.use_experimental_unified_exec_tool = true;
|
||||
@@ -301,8 +313,7 @@ async fn explicit_remote_shell_runs_in_remote_cwd() -> Result<()> {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn deferred_executor_reaches_model_before_remote_environment_is_ready() -> Result<()> {
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
|
||||
async fn deferred_executor_does_not_duplicate_initial_environment_context() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let response_mock = mount_sse_once(
|
||||
&server,
|
||||
@@ -313,23 +324,216 @@ async fn deferred_executor_reaches_model_before_remote_environment_is_ready() ->
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
assert!(config.features.enable(Feature::DeferredExecutor).is_ok());
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
test.submit_turn("report the environment").await?;
|
||||
|
||||
let user_context = response_mock.single_request().message_input_texts("user");
|
||||
assert_eq!(
|
||||
user_context
|
||||
.iter()
|
||||
.filter(|text| text.contains("<environment_context>"))
|
||||
.count(),
|
||||
1
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn read_exec_server_json(websocket: &mut WebSocketStream<TcpStream>) -> Value {
|
||||
loop {
|
||||
match timeout(Duration::from_secs(5), websocket.next())
|
||||
.await
|
||||
.expect("websocket read should not time out")
|
||||
.expect("websocket should stay open")
|
||||
.expect("websocket frame should read")
|
||||
{
|
||||
Message::Text(text) => {
|
||||
return serde_json::from_str(text.as_ref()).expect("valid JSON-RPC message");
|
||||
}
|
||||
Message::Binary(bytes) => {
|
||||
return serde_json::from_slice(bytes.as_ref()).expect("valid JSON-RPC message");
|
||||
}
|
||||
Message::Ping(_) | Message::Pong(_) => {}
|
||||
other => panic!("expected JSON-RPC message, got {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn serve_environment_info(listener: TcpListener) {
|
||||
let (stream, _) = listener.accept().await.expect("connection");
|
||||
let mut websocket = accept_async(stream).await.expect("websocket handshake");
|
||||
|
||||
let initialize = read_exec_server_json(&mut websocket).await;
|
||||
assert_eq!(initialize["method"], "initialize");
|
||||
websocket
|
||||
.send(Message::Text(
|
||||
json!({
|
||||
"id": initialize["id"],
|
||||
"result": { "sessionId": "test-session" }
|
||||
})
|
||||
.to_string()
|
||||
.into(),
|
||||
))
|
||||
.await
|
||||
.expect("initialize response");
|
||||
let initialized = read_exec_server_json(&mut websocket).await;
|
||||
assert_eq!(initialized["method"], "initialized");
|
||||
|
||||
let info = read_exec_server_json(&mut websocket).await;
|
||||
assert_eq!(info["method"], "environment/info");
|
||||
websocket
|
||||
.send(Message::Text(
|
||||
json!({
|
||||
"id": info["id"],
|
||||
"result": { "shell": { "name": "zsh", "path": "/bin/zsh" } }
|
||||
})
|
||||
.to_string()
|
||||
.into(),
|
||||
))
|
||||
.await
|
||||
.expect("environment info response");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn deferred_executor_updates_model_context_after_startup() -> Result<()> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
||||
let server = start_mock_server().await;
|
||||
let user_input_call_id = "wait-for-startup";
|
||||
let response_mock = mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(
|
||||
user_input_call_id,
|
||||
"request_user_input",
|
||||
&json!({
|
||||
"questions": [{
|
||||
"id": "continue",
|
||||
"header": "Continue",
|
||||
"question": "Continue after startup?",
|
||||
"options": [{
|
||||
"label": "Yes (Recommended)",
|
||||
"description": "Continue the test."
|
||||
}, {
|
||||
"label": "No",
|
||||
"description": "Stop the test."
|
||||
}]
|
||||
}]
|
||||
})
|
||||
.to_string(),
|
||||
),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_function_call(
|
||||
"update-plan",
|
||||
"update_plan",
|
||||
&json!({
|
||||
"explanation": "Continue after startup.",
|
||||
"plan": [{"step": "Finish", "status": "completed"}]
|
||||
})
|
||||
.to_string(),
|
||||
),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
ev_assistant_message("msg-3", "done"),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
let mut builder = test_codex()
|
||||
.with_exec_server_url(format!("ws://{}", listener.local_addr()?))
|
||||
.with_config(|config| {
|
||||
assert!(config.features.enable(Feature::DeferredExecutor).is_ok());
|
||||
assert!(
|
||||
config
|
||||
.features
|
||||
.enable(Feature::DefaultModeRequestUserInput)
|
||||
.is_ok()
|
||||
);
|
||||
});
|
||||
|
||||
let test = tokio::time::timeout(Duration::from_secs(5), builder.build(&server))
|
||||
let test = timeout(Duration::from_secs(5), builder.build(&server))
|
||||
.await
|
||||
.context("thread startup should not wait for the remote environment")??;
|
||||
tokio::time::timeout(
|
||||
Duration::from_secs(5),
|
||||
test.submit_turn("respond before the environment is ready"),
|
||||
)
|
||||
.await
|
||||
.context("turn should reach the model before the remote environment is ready")??;
|
||||
|
||||
response_mock.single_request();
|
||||
test.codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "wait for the environment".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
responsesapi_client_metadata: None,
|
||||
additional_context: Default::default(),
|
||||
thread_settings: Default::default(),
|
||||
})
|
||||
.await?;
|
||||
let request = wait_for_event_match(&test.codex, |event| match event {
|
||||
EventMsg::RequestUserInput(request) => Some(request.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
serve_environment_info(listener).await;
|
||||
test.codex
|
||||
.submit(Op::UserInputAnswer {
|
||||
id: request.turn_id,
|
||||
response: RequestUserInputResponse {
|
||||
answers: HashMap::from([(
|
||||
"continue".to_string(),
|
||||
RequestUserInputAnswer {
|
||||
answers: vec!["Yes (Recommended)".to_string()],
|
||||
},
|
||||
)]),
|
||||
},
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
let requests = response_mock.requests();
|
||||
assert_eq!(requests.len(), 3);
|
||||
assert!(
|
||||
requests[0]
|
||||
.message_input_texts("user")
|
||||
.iter()
|
||||
.any(|text| text.contains("<status>starting</status>"))
|
||||
);
|
||||
let ready_user_context = requests[1].message_input_texts("user");
|
||||
assert_eq!(
|
||||
ready_user_context
|
||||
.iter()
|
||||
.filter(|text| text.contains("<shell>zsh</shell>"))
|
||||
.count(),
|
||||
1
|
||||
);
|
||||
let final_user_context = requests[2].message_input_texts("user");
|
||||
assert_eq!(
|
||||
final_user_context
|
||||
.iter()
|
||||
.filter(|text| text.contains("<status>starting</status>"))
|
||||
.count(),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
final_user_context
|
||||
.iter()
|
||||
.filter(|text| text.contains("<shell>zsh</shell>"))
|
||||
.count(),
|
||||
1
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user