core: use turn-owned world state for inline compaction (#29527)

## Why

Follow-up to #29249 and its [compaction review
thread](https://github.com/openai/codex/pull/29249#discussion_r3455055101).

During a turn, environment readiness can change between sampling
requests. Inline compaction must render the same model-visible
`WorldState` used by the request it follows. Rebuilding that state
during compaction can observe a newer environment, make replacement
history disagree with what the model saw, and suppress the next
environment update.

## What changed

- Make `run_turn` own the current `Arc<WorldState>` and replace it only
between sampling requests.
- Build each state from an explicitly chosen environment snapshot, diff
deferred-executor steps against the turn-owned state, and retain the
latest state in `ContextManager` only for cross-turn and resume
tracking.
- Pass the exact turn-owned state into inline compaction and explicit
new-context-window replacement.
- Carry that state with
`InitialContextInjection::BeforeLastUserMessage`, so replacement context
and its stored baseline cannot come from different snapshots.
- Remove obsolete state-recapture helpers and ambiguous TurnContext-only
WorldState builders.
- Add an integration test that moves an environment from starting to
ready during a paused turn, triggers compaction, and verifies the next
request receives the readiness update exactly once.

## Test plan

- `just test -p codex-core
deferred_executor_compaction_preserves_then_updates_environment_once`
- `just test -p codex-core process_compacted_history`
- `just test -p codex-core mid_turn_continuation_compaction`
- `just test -p codex-core build_initial_context`
- `just test -p codex-core
ignores_session_prefix_messages_when_truncating`
This commit is contained in:
sayan-oai
2026-06-23 10:33:19 -07:00
committed by GitHub
Unverified
parent 8751fd3fcb
commit d1d11cac05
15 changed files with 323 additions and 137 deletions
+31 -8
View File
@@ -4,6 +4,7 @@ use std::time::Instant;
use crate::Prompt;
use crate::client::ModelClientSession;
use crate::client_common::ResponseEvent;
use crate::context::world_state::WorldState;
use crate::hook_runtime::PostCompactHookOutcome;
use crate::hook_runtime::PreCompactHookOutcome;
use crate::hook_runtime::run_post_compact_hooks;
@@ -60,12 +61,29 @@ const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
/// Mid-turn compaction must use `BeforeLastUserMessage` because the model is trained to see the
/// compaction summary as the last item in history after mid-turn compaction; we therefore inject
/// initial context into the replacement history just above the last real user message.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[derive(Debug)]
pub(crate) enum InitialContextInjection {
BeforeLastUserMessage,
BeforeLastUserMessage(Arc<WorldState>),
DoNotInject,
}
pub(crate) async fn build_compaction_initial_context(
sess: &Session,
turn_context: &TurnContext,
initial_context_injection: &InitialContextInjection,
) -> (Vec<ResponseItem>, Option<Arc<WorldState>>) {
// Return the rendered state with its items so history and its baseline stay identical.
match initial_context_injection {
InitialContextInjection::BeforeLastUserMessage(world_state) => {
let items = sess
.build_initial_context_with_world_state(turn_context, world_state.as_ref())
.await;
(items, Some(Arc::clone(world_state)))
}
InitialContextInjection::DoNotInject => (Vec::new(), None),
}
}
pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bool {
provider.supports_remote_compaction()
}
@@ -309,17 +327,21 @@ async fn run_compact_task_inner_impl(
}
let (window_number, window_ids) = sess.advance_auto_compact_window().await;
if matches!(
initial_context_injection,
InitialContextInjection::BeforeLastUserMessage
) {
let initial_context = sess.build_initial_context(turn_context.as_ref()).await;
let (initial_context, world_state_baseline) = build_compaction_initial_context(
sess.as_ref(),
turn_context.as_ref(),
&initial_context_injection,
)
.await;
if !initial_context.is_empty() {
new_history =
insert_initial_context_before_last_real_user_or_summary(new_history, initial_context);
}
let reference_context_item = match initial_context_injection {
InitialContextInjection::DoNotInject => None,
InitialContextInjection::BeforeLastUserMessage => Some(turn_context.to_turn_context_item()),
InitialContextInjection::BeforeLastUserMessage(_) => {
Some(turn_context.to_turn_context_item())
}
};
let compacted_item = CompactedItem {
message: summary_text.clone(),
@@ -333,6 +355,7 @@ async fn run_compact_task_inner_impl(
turn_context.as_ref(),
new_history,
reference_context_item,
world_state_baseline,
compacted_item,
)
.await;
+17 -15
View File
@@ -6,8 +6,10 @@ use crate::client::CompactConversationRequestSettings;
use crate::compact::CompactionAnalyticsAttempt;
use crate::compact::CompactionAnalyticsDetails;
use crate::compact::InitialContextInjection;
use crate::compact::build_compaction_initial_context;
use crate::compact::compaction_status_from_result;
use crate::compact::insert_initial_context_before_last_real_user_or_summary;
use crate::context::world_state::WorldState;
use crate::context_manager::ContextManager;
use crate::hook_runtime::PostCompactHookOutcome;
use crate::hook_runtime::PreCompactHookOutcome;
@@ -237,7 +239,7 @@ async fn run_remote_compact_task_inner_impl(
window_id,
CodexResponsesRequestKind::Compaction(compaction_metadata),
);
let mut new_history = sess
let new_history = sess
.services
.model_client
.compact_conversation_history(
@@ -259,17 +261,19 @@ async fn run_remote_compact_task_inner_impl(
)
.await?;
let (new_window_number, new_window_ids) = sess.advance_auto_compact_window().await;
new_history = process_compacted_history(
let (new_history, world_state_baseline) = process_compacted_history(
sess.as_ref(),
turn_context.as_ref(),
new_history,
initial_context_injection,
&initial_context_injection,
)
.await;
let reference_context_item = match initial_context_injection {
InitialContextInjection::DoNotInject => None,
InitialContextInjection::BeforeLastUserMessage => Some(turn_context.to_turn_context_item()),
InitialContextInjection::BeforeLastUserMessage(_) => {
Some(turn_context.to_turn_context_item())
}
};
let compacted_item = CompactedItem {
message: String::new(),
@@ -290,6 +294,7 @@ async fn run_remote_compact_task_inner_impl(
turn_context.as_ref(),
new_history,
reference_context_item,
world_state_baseline,
compacted_item,
)
.await;
@@ -304,22 +309,19 @@ pub(crate) async fn process_compacted_history(
sess: &Session,
turn_context: &TurnContext,
mut compacted_history: Vec<ResponseItem>,
initial_context_injection: InitialContextInjection,
) -> Vec<ResponseItem> {
initial_context_injection: &InitialContextInjection,
) -> (Vec<ResponseItem>, Option<Arc<WorldState>>) {
// Mid-turn compaction is the only path that must inject initial context above the last user
// message in the replacement history. Pre-turn compaction instead injects context after the
// compaction item, but mid-turn compaction keeps the compaction item last for model training.
let initial_context = if matches!(
initial_context_injection,
InitialContextInjection::BeforeLastUserMessage
) {
sess.build_initial_context(turn_context).await
} else {
Vec::new()
};
let (initial_context, world_state_baseline) =
build_compaction_initial_context(sess, turn_context, initial_context_injection).await;
compacted_history.retain(should_keep_compacted_history_item);
insert_initial_context_before_last_real_user_or_summary(compacted_history, initial_context)
(
insert_initial_context_before_last_real_user_or_summary(compacted_history, initial_context),
world_state_baseline,
)
}
/// Returns whether an item from remote compaction output should be preserved.
+6 -3
View File
@@ -293,17 +293,19 @@ async fn run_remote_compact_task_inner_impl(
build_v2_compacted_history(&prompt_input, compaction_output);
analytics_details.retained_image_count = Some(retained_images);
let (new_window_number, new_window_ids) = sess.advance_auto_compact_window().await;
let new_history = process_compacted_history(
let (new_history, world_state_baseline) = process_compacted_history(
sess.as_ref(),
turn_context.as_ref(),
compacted_history,
initial_context_injection,
&initial_context_injection,
)
.await;
let reference_context_item = match initial_context_injection {
InitialContextInjection::DoNotInject => None,
InitialContextInjection::BeforeLastUserMessage => Some(turn_context.to_turn_context_item()),
InitialContextInjection::BeforeLastUserMessage(_) => {
Some(turn_context.to_turn_context_item())
}
};
let compacted_item = CompactedItem {
message: String::new(),
@@ -321,6 +323,7 @@ async fn run_remote_compact_task_inner_impl(
turn_context.as_ref(),
new_history,
reference_context_item,
world_state_baseline,
compacted_item,
)
.await;
+9 -3
View File
@@ -1,9 +1,11 @@
use super::*;
use crate::session::tests::build_world_state_from_turn_context;
use codex_model_provider_info::ModelProviderInfo;
use codex_model_provider_info::WireApi;
use codex_protocol::models::DEFAULT_IMAGE_DETAIL;
use codex_protocol::models::InternalChatMessageMetadataPassthrough;
use pretty_assertions::assert_eq;
use std::sync::Arc;
async fn process_compacted_history_with_test_session(
compacted_history: Vec<ResponseItem>,
@@ -13,12 +15,16 @@ async fn process_compacted_history_with_test_session(
session
.set_previous_turn_settings(previous_turn_settings.cloned())
.await;
let initial_context = session.build_initial_context(&turn_context).await;
let refreshed = crate::compact_remote::process_compacted_history(
let world_state = Arc::new(build_world_state_from_turn_context(&session, &turn_context).await);
let initial_context = session
.build_initial_context_with_world_state(&turn_context, world_state.as_ref())
.await;
let initial_context_injection = InitialContextInjection::BeforeLastUserMessage(world_state);
let (refreshed, _) = crate::compact_remote::process_compacted_history(
&session,
&turn_context,
compacted_history,
InitialContextInjection::BeforeLastUserMessage,
&initial_context_injection,
)
.await;
(refreshed, initial_context)
@@ -24,10 +24,6 @@ pub(crate) struct EnvironmentsState {
}
impl EnvironmentsState {
pub(crate) fn from_turn_context(turn_context: &TurnContext) -> Self {
Self::from_turn_context_with_environments(turn_context, &turn_context.environments)
}
pub(crate) fn from_turn_context_with_environments(
turn_context: &TurnContext,
environments: &TurnEnvironmentSnapshot,
+4 -4
View File
@@ -86,18 +86,18 @@ impl ContextManager {
pub(crate) fn update_world_state(
&mut self,
world_state: WorldState,
world_state: Arc<WorldState>,
) -> Vec<Box<dyn ContextualUserFragment>> {
let fragments = self.world_state_baseline.as_deref().map_or_else(
|| world_state.render_full(),
|previous| world_state.render_diff(previous),
);
self.world_state_baseline = Some(Arc::new(world_state));
self.world_state_baseline = Some(world_state);
fragments
}
pub(crate) fn set_world_state_baseline(&mut self, world_state: WorldState) {
self.world_state_baseline = Some(Arc::new(world_state));
pub(crate) fn set_world_state_baseline(&mut self, world_state: Arc<WorldState>) {
self.world_state_baseline = Some(world_state);
}
pub(crate) fn set_token_usage_full(&mut self, context_window: i64) {
@@ -32,6 +32,7 @@ use image::Luma;
use image::Rgba;
use pretty_assertions::assert_eq;
use regex_lite::Regex;
use std::sync::Arc;
const EXEC_FORMAT_MAX_BYTES: usize = 10_000;
const EXEC_FORMAT_MAX_TOKENS: usize = 2_500;
@@ -82,7 +83,7 @@ fn world_state_baseline_deduplicates_until_history_is_replaced() {
state.add_section(EnvironmentsState::from_turn_context_item(
&reference_context_item(),
));
state
Arc::new(state)
};
let mut history = ContextManager::new();
+43 -41
View File
@@ -248,7 +248,6 @@ use self::turn::realtime_text_for_event;
use self::turn_context::TurnContext;
use self::turn_context::TurnSkillsContext;
use self::world_state::build_world_state_from_environment_snapshot;
use self::world_state::build_world_state_from_turn_context;
use self::world_state::build_world_state_from_turn_context_item;
#[cfg(test)]
mod rollout_reconstruction_tests;
@@ -1390,7 +1389,9 @@ impl Session {
let mut state = self.state.lock().await;
state.replace_history(history, reference_context_item);
if let Some(world_state) = world_state_baseline {
state.history.set_world_state_baseline(world_state);
state
.history
.set_world_state_baseline(Arc::new(world_state));
}
let fallback_ids = state.auto_compact_window_ids();
let window_id = window_id.unwrap_or(fallback_ids.window_id);
@@ -2766,25 +2767,27 @@ impl Session {
pub(crate) async fn record_step_environment_context_if_changed(
&self,
turn_context: &TurnContext,
previous_world_state: &Arc<WorldState>,
step_context: &step_context::StepContext,
) {
if !turn_context.config.include_environment_context {
return;
) -> Arc<WorldState> {
let world_state = Arc::new(
self.build_world_state_for_environments(turn_context, &step_context.environments)
.await,
);
let items = crate::context_manager::updates::merge_contextual_fragments(
world_state.render_diff(previous_world_state.as_ref()),
);
if !items.is_empty() {
self.record_conversation_items(turn_context, &items).await;
}
let world_state =
build_world_state_from_environment_snapshot(turn_context, &step_context.environments);
let items = {
let mut state = self.state.lock().await;
crate::context_manager::updates::merge_contextual_fragments(
state.history.update_world_state(world_state),
)
};
if items.is_empty() {
return;
}
self.record_conversation_items(turn_context, &items).await;
// ContextManager remembers this for later turns; run_turn owns the live value.
self.state
.lock()
.await
.history
.set_world_state_baseline(Arc::clone(&world_state));
world_state
}
pub(crate) async fn record_inter_agent_communication(
@@ -2886,6 +2889,7 @@ impl Session {
turn_context: &TurnContext,
items: Vec<ResponseItem>,
reference_context_item: Option<TurnContextItem>,
world_state_baseline: Option<Arc<WorldState>>,
compacted_item: CompactedItem,
) {
let items = if turn_context.config.features.enabled(Feature::ItemIds) {
@@ -2897,11 +2901,6 @@ impl Session {
replacement_history: Some(items.clone()),
..compacted_item
};
let world_state_baseline = if reference_context_item.is_some() {
Some(self.build_world_state(turn_context).await)
} else {
None
};
{
let mut state = self.state.lock().await;
state.replace_history(items, reference_context_item.clone());
@@ -3033,16 +3032,11 @@ impl Session {
items
}
pub(crate) async fn build_initial_context(
pub(crate) async fn build_world_state_for_environments(
&self,
turn_context: &TurnContext,
) -> Vec<ResponseItem> {
let world_state = self.build_world_state(turn_context).await;
self.build_initial_context_with_world_state(turn_context, &world_state)
.await
}
async fn build_world_state(&self, turn_context: &TurnContext) -> WorldState {
environments: &TurnEnvironmentSnapshot,
) -> WorldState {
let environment_subagents = if turn_context.config.include_environment_context {
self.services
.agent_control
@@ -3051,10 +3045,14 @@ impl Session {
} else {
String::new()
};
build_world_state_from_turn_context(turn_context, &environment_subagents)
build_world_state_from_environment_snapshot(
turn_context,
environments,
&environment_subagents,
)
}
async fn build_initial_context_with_world_state(
pub(crate) async fn build_initial_context_with_world_state(
&self,
turn_context: &TurnContext,
world_state: &WorldState,
@@ -3403,15 +3401,15 @@ impl Session {
pub(crate) async fn maybe_start_new_context_window(
&self,
turn_context: &TurnContext,
world_state: Arc<WorldState>,
) -> Option<u64> {
let window = {
let mut state = self.state.lock().await;
state.start_new_context_window_if_requested()
};
let (window_number, window_ids) = window?;
let world_state = self.build_world_state(turn_context).await;
let context_items = self
.build_initial_context_with_world_state(turn_context, &world_state)
.build_initial_context_with_world_state(turn_context, world_state.as_ref())
.await;
let turn_context_item = turn_context.to_turn_context_item();
let replacement_history = context_items;
@@ -3462,7 +3460,7 @@ impl Session {
pub(crate) async fn record_context_updates_and_set_reference_context_item(
&self,
turn_context: &TurnContext,
) {
) -> Arc<WorldState> {
let reference_context_item = {
let state = self.state.lock().await;
state.reference_context_item()
@@ -3470,16 +3468,19 @@ impl Session {
let turn_context_item = turn_context.to_turn_context_item();
let turn_context_changed = reference_context_item.as_ref() != Some(&turn_context_item);
let should_inject_full_context = reference_context_item.is_none();
let world_state = self.build_world_state(turn_context).await;
let world_state = Arc::new(
self.build_world_state_for_environments(turn_context, &turn_context.environments)
.await,
);
let mut context_items = if should_inject_full_context {
let context_items = self
.build_initial_context_with_world_state(turn_context, &world_state)
.build_initial_context_with_world_state(turn_context, world_state.as_ref())
.await;
self.state
.lock()
.await
.history
.set_world_state_baseline(world_state);
.set_world_state_baseline(Arc::clone(&world_state));
context_items
} else {
// Steady-state path: append only built-in context diffs here; turn-scoped extension
@@ -3490,7 +3491,7 @@ impl Session {
let world_state_items = {
let mut state = self.state.lock().await;
crate::context_manager::updates::merge_contextual_fragments(
state.history.update_world_state(world_state),
state.history.update_world_state(Arc::clone(&world_state)),
)
};
context_items.extend(world_state_items);
@@ -3503,7 +3504,7 @@ impl Session {
);
}
if !turn_context_changed && context_items.is_empty() {
return;
return world_state;
}
if !context_items.is_empty() {
self.record_conversation_items(turn_context, &context_items)
@@ -3518,6 +3519,7 @@ impl Session {
// context items.
let mut state = self.state.lock().await;
state.set_reference_context_item(Some(turn_context_item));
world_state
}
pub(crate) async fn update_token_usage_info(
+45 -30
View File
@@ -2002,7 +2002,7 @@ async fn resumed_history_injects_initial_context_on_first_context_update_only()
session
.record_context_updates_and_set_reference_context_item(&turn_context)
.await;
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = build_initial_context(&session, &turn_context).await;
expected.extend(initial_context);
let history_after_seed = session.clone_history().await;
assert_eq!(expected, history_after_seed.raw_items());
@@ -2887,7 +2887,7 @@ async fn thread_rollback_drops_last_turn_from_history() {
)
.await;
let initial_context = sess.build_initial_context(tc.as_ref()).await;
let initial_context = build_initial_context(&sess, tc.as_ref()).await;
let turn_1 = vec![
user_message("turn 1 user"),
assistant_message("turn 1 assistant"),
@@ -2955,7 +2955,7 @@ async fn thread_rollback_clears_history_when_num_turns_exceeds_existing_turns()
)
.await;
let initial_context = sess.build_initial_context(tc.as_ref()).await;
let initial_context = build_initial_context(&sess, tc.as_ref()).await;
let turn_1 = vec![user_message("turn 1 user")];
let mut full_history = Vec::new();
full_history.extend(initial_context.clone());
@@ -2981,7 +2981,7 @@ async fn thread_rollback_clears_history_when_num_turns_exceeds_existing_turns()
async fn thread_rollback_fails_without_persisted_thread_history() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let initial_context = sess.build_initial_context(tc.as_ref()).await;
let initial_context = build_initial_context(&sess, tc.as_ref()).await;
sess.record_conversation_items(tc.as_ref(), &initial_context)
.await;
@@ -3391,7 +3391,7 @@ async fn thread_rollback_persists_marker_and_replays_cumulatively() {
async fn thread_rollback_fails_when_turn_in_progress() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let initial_context = sess.build_initial_context(tc.as_ref()).await;
let initial_context = build_initial_context(&sess, tc.as_ref()).await;
sess.record_conversation_items(tc.as_ref(), &initial_context)
.await;
@@ -3412,7 +3412,7 @@ async fn thread_rollback_fails_when_turn_in_progress() {
async fn thread_rollback_fails_when_num_turns_is_zero() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let initial_context = sess.build_initial_context(tc.as_ref()).await;
let initial_context = build_initial_context(&sess, tc.as_ref()).await;
sess.record_conversation_items(tc.as_ref(), &initial_context)
.await;
@@ -5033,6 +5033,22 @@ async fn session_new_fails_when_zsh_fork_enabled_without_packaged_zsh() {
assert!(msg.contains("zsh fork feature enabled, but no packaged zsh fork is available"));
}
async fn build_initial_context(session: &Session, turn_context: &TurnContext) -> Vec<ResponseItem> {
let world_state = build_world_state_from_turn_context(session, turn_context).await;
session
.build_initial_context_with_world_state(turn_context, &world_state)
.await
}
pub(crate) async fn build_world_state_from_turn_context(
session: &Session,
turn_context: &TurnContext,
) -> WorldState {
session
.build_world_state_for_environments(turn_context, &turn_context.environments)
.await
}
// todo: use online model info
pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
let (tx_event, _rx_event) = async_channel::unbounded();
@@ -7693,7 +7709,7 @@ async fn build_initial_context_uses_previous_realtime_state() {
let (session, mut turn_context) = make_session_and_context().await;
turn_context.realtime_active = true;
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = build_initial_context(&session, &turn_context).await;
let developer_texts = developer_input_texts(&initial_context);
assert!(
developer_texts
@@ -7707,7 +7723,7 @@ async fn build_initial_context_uses_previous_realtime_state() {
let mut state = session.state.lock().await;
state.set_reference_context_item(Some(previous_context_item));
}
let resumed_context = session.build_initial_context(&turn_context).await;
let resumed_context = build_initial_context(&session, &turn_context).await;
let resumed_developer_texts = developer_input_texts(&resumed_context);
assert!(
!resumed_developer_texts
@@ -7806,7 +7822,7 @@ async fn build_initial_context_includes_prompt_fragments_from_extensions() {
.thread_extension_data
.insert(PromptExtensionTestState);
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = build_initial_context(&session, &turn_context).await;
let developer_messages = developer_message_texts(&initial_context);
assert!(
@@ -7832,7 +7848,7 @@ async fn build_initial_context_includes_turn_context_fragments_from_extensions()
expected_model_context_window: Some(50),
});
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = build_initial_context(&session, &turn_context).await;
let developer_messages = developer_message_texts(&initial_context);
assert!(
@@ -7859,7 +7875,7 @@ async fn record_context_updates_includes_turn_context_fragments_on_steady_state_
});
let mut previous_context_item = turn_context.to_turn_context_item();
previous_context_item.turn_id = Some("previous-turn-id".to_string());
let world_state = session.build_world_state(&turn_context).await;
let world_state = Arc::new(build_world_state_from_turn_context(&session, &turn_context).await);
{
let mut state = session.state.lock().await;
state.set_reference_context_item(Some(previous_context_item));
@@ -7886,7 +7902,7 @@ async fn build_initial_context_omits_prompt_fragments_without_extension_state()
let (mut session, turn_context) = make_session_and_context().await;
session.services.extensions = prompt_extension_test_registry();
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = build_initial_context(&session, &turn_context).await;
let developer_messages = developer_message_texts(&initial_context);
assert!(
@@ -7903,7 +7919,7 @@ async fn build_initial_context_adds_multi_agent_v2_root_usage_hint_as_developer_
let (session, turn_context) =
make_multi_agent_v2_usage_hint_test_session(/*enable_multi_agent_v2*/ true).await;
let initial_context = session.build_initial_context(turn_context.as_ref()).await;
let initial_context = build_initial_context(&session, turn_context.as_ref()).await;
let developer_messages = developer_message_texts(&initial_context);
assert!(
@@ -7941,7 +7957,7 @@ async fn build_initial_context_adds_multi_agent_v2_subagent_usage_hint_as_develo
.expect("thread settings should not be shared")
.session_source = session_source;
let initial_context = session.build_initial_context(turn_context.as_ref()).await;
let initial_context = build_initial_context(&session, turn_context.as_ref()).await;
let developer_messages = developer_message_texts(&initial_context);
assert!(
@@ -7963,7 +7979,7 @@ async fn build_initial_context_omits_multi_agent_v2_usage_hints_when_feature_dis
let (session, turn_context) =
make_multi_agent_v2_usage_hint_test_session(/*enable_multi_agent_v2*/ false).await;
let initial_context = session.build_initial_context(turn_context.as_ref()).await;
let initial_context = build_initial_context(&session, turn_context.as_ref()).await;
let developer_messages = developer_message_texts(&initial_context);
assert!(
@@ -7990,7 +8006,7 @@ async fn build_initial_context_omits_multi_agent_v2_usage_hints_when_hint_is_emp
)
.await;
let initial_context = session.build_initial_context(turn_context.as_ref()).await;
let initial_context = build_initial_context(&session, turn_context.as_ref()).await;
let developer_messages = developer_message_texts(&initial_context);
assert!(
@@ -8020,7 +8036,7 @@ async fn build_initial_context_omits_default_image_save_location_with_image_hist
)
.await;
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = build_initial_context(&session, &turn_context).await;
let developer_texts = developer_input_texts(&initial_context);
assert!(
!developer_texts
@@ -8034,7 +8050,7 @@ async fn build_initial_context_omits_default_image_save_location_with_image_hist
async fn build_initial_context_omits_default_image_save_location_without_image_history() {
let (session, turn_context) = make_session_and_context().await;
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = build_initial_context(&session, &turn_context).await;
let developer_texts = developer_input_texts(&initial_context);
assert!(
@@ -8076,7 +8092,7 @@ async fn build_initial_context_trims_skill_metadata_from_context_window_budget()
turn_context.model_info.context_window = Some(100);
turn_context.turn_skills = TurnSkillsContext::new(HostSkillsSnapshot::new(Arc::new(outcome)));
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = build_initial_context(&session, &turn_context).await;
let developer_texts = developer_input_texts(&initial_context);
assert!(
@@ -8227,7 +8243,7 @@ async fn build_initial_context_emits_thread_start_skill_warning_on_repeated_buil
turn_context.model_info.context_window = Some(100);
turn_context.turn_skills = TurnSkillsContext::new(HostSkillsSnapshot::new(Arc::new(outcome)));
let _ = session.build_initial_context(&turn_context).await;
let _ = build_initial_context(&session, &turn_context).await;
let warning_event = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("warning event should arrive")
@@ -8238,7 +8254,7 @@ async fn build_initial_context_emits_thread_start_skill_warning_on_repeated_buil
if message == "Exceeded skills context budget of 2%. All skill descriptions were removed and 2 additional skills were not included in the model-visible skills list."
));
let _ = session.build_initial_context(&turn_context).await;
let _ = build_initial_context(&session, &turn_context).await;
let warning_event = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("warning event should arrive on repeated build")
@@ -8358,7 +8374,7 @@ async fn build_initial_context_uses_previous_turn_settings_for_realtime_end() {
session
.set_previous_turn_settings(Some(previous_turn_settings))
.await;
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = build_initial_context(&session, &turn_context).await;
let developer_texts = developer_input_texts(&initial_context);
assert!(
developer_texts
@@ -8381,7 +8397,7 @@ async fn build_initial_context_restates_realtime_start_when_reference_context_is
session
.set_previous_turn_settings(Some(previous_turn_settings))
.await;
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = build_initial_context(&session, &turn_context).await;
let developer_texts = developer_input_texts(&initial_context);
assert!(
developer_texts
@@ -8468,7 +8484,7 @@ async fn record_context_updates_and_set_reference_context_item_injects_full_cont
.record_context_updates_and_set_reference_context_item(&turn_context)
.await;
let history = session.clone_history().await;
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = build_initial_context(&session, &turn_context).await;
assert_eq!(history.raw_items().to_vec(), initial_context);
let current_context = session.reference_context_item().await;
@@ -8515,7 +8531,7 @@ async fn record_context_updates_and_set_reference_context_item_reinjects_full_co
let history = session.clone_history().await;
let mut expected_history = vec![compacted_summary];
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = build_initial_context(&session, &turn_context).await;
expected_history.extend(initial_context);
assert_eq!(history.raw_items().to_vec(), expected_history);
}
@@ -8533,7 +8549,8 @@ async fn record_context_updates_and_set_reference_context_item_persists_baseline
.with_model(next_model.to_string(), &session.services.models_manager)
.await;
let previous_context_item = previous_context.to_turn_context_item();
let world_state = session.build_world_state(&previous_context).await;
let world_state =
Arc::new(build_world_state_from_turn_context(&session, &previous_context).await);
{
let mut state = session.state.lock().await;
state.set_reference_context_item(Some(previous_context_item.clone()));
@@ -8627,7 +8644,7 @@ async fn build_initial_context_prepends_model_switch_message() {
session
.set_previous_turn_settings(Some(previous_turn_settings))
.await;
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = build_initial_context(&session, &turn_context).await;
let ResponseItem::Message { role, content, .. } = &initial_context[0] else {
panic!("expected developer message");
@@ -9952,9 +9969,7 @@ async fn sample_rollout(
// Use the same turn_context source as record_initial_history so model_info (and thus
// personality_spec) matches reconstruction.
let reconstruction_turn = session.new_default_turn().await;
let mut initial_context = session
.build_initial_context(reconstruction_turn.as_ref())
.await;
let mut initial_context = build_initial_context(session, reconstruction_turn.as_ref()).await;
// Ensure personality_spec is present when Personality is enabled, so expected matches
// what reconstruction produces (build_initial_context may omit it when baked into model).
if !initial_context.iter().any(|m| {
@@ -523,8 +523,10 @@ async fn process_compacted_history_preserves_separate_guardian_developer_message
}
turn_context.session_source = guardian_source;
turn_context.developer_instructions = Some(guardian_policy.clone());
let world_state = Arc::new(build_world_state_from_turn_context(&session, &turn_context).await);
let initial_context_injection = InitialContextInjection::BeforeLastUserMessage(world_state);
let refreshed = crate::compact_remote::process_compacted_history(
let (refreshed, _) = crate::compact_remote::process_compacted_history(
&session,
&turn_context,
vec![
@@ -547,7 +549,7 @@ async fn process_compacted_history_preserves_separate_guardian_developer_message
internal_chat_message_metadata_passthrough: None,
},
],
InitialContextInjection::BeforeLastUserMessage,
&initial_context_injection,
)
.await;
+12 -8
View File
@@ -164,7 +164,9 @@ pub(crate) async fn run_turn(
return Ok(None);
}
sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
// Keep the exact model-visible state used by this turn and its inline compactions.
let mut world_state = sess
.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
.await;
let Some((injection_items, explicitly_enabled_connectors)) =
@@ -250,11 +252,13 @@ pub(crate) async fn run_turn(
let step_context = StepContext {
environments: sess.services.turn_environments.snapshot().await,
};
sess.record_step_environment_context_if_changed(
turn_context.as_ref(),
&step_context,
)
.await;
world_state = sess
.record_step_environment_context_if_changed(
turn_context.as_ref(),
&world_state,
&step_context,
)
.await;
}
// Construct the input that we will send to the model.
@@ -341,7 +345,7 @@ pub(crate) async fn run_turn(
.await;
let started_new_context_window = sess
.maybe_start_new_context_window(turn_context.as_ref())
.maybe_start_new_context_window(turn_context.as_ref(), Arc::clone(&world_state))
.await
.is_some();
if started_new_context_window && needs_follow_up {
@@ -361,7 +365,7 @@ pub(crate) async fn run_turn(
&sess,
&turn_context,
&mut client_session,
InitialContextInjection::BeforeLastUserMessage,
InitialContextInjection::BeforeLastUserMessage(Arc::clone(&world_state)),
CompactionReason::ContextLimit,
CompactionPhase::MidTurn,
)
+3 -16
View File
@@ -4,34 +4,21 @@ use crate::context::world_state::WorldState;
use crate::environment_selection::TurnEnvironmentSnapshot;
use codex_protocol::protocol::TurnContextItem;
pub(super) fn build_world_state_from_turn_context(
pub(super) fn build_world_state_from_environment_snapshot(
turn_context: &TurnContext,
environments: &TurnEnvironmentSnapshot,
environment_subagents: &str,
) -> WorldState {
let mut world_state = WorldState::default();
if turn_context.config.include_environment_context {
world_state.add_section(
EnvironmentsState::from_turn_context(turn_context)
EnvironmentsState::from_turn_context_with_environments(turn_context, environments)
.with_subagents(environment_subagents.to_string()),
);
}
world_state
}
pub(super) fn build_world_state_from_environment_snapshot(
turn_context: &TurnContext,
environments: &TurnEnvironmentSnapshot,
) -> WorldState {
let mut world_state = WorldState::default();
if turn_context.config.include_environment_context {
world_state.add_section(EnvironmentsState::from_turn_context_with_environments(
turn_context,
environments,
));
}
world_state
}
pub(super) fn build_world_state_from_turn_context_item(
turn_context_item: &TurnContextItem,
) -> WorldState {
+5 -1
View File
@@ -4,6 +4,7 @@ use crate::init_state_db;
use crate::installation_id::INSTALLATION_ID_FILENAME;
use crate::rollout::RolloutRecorder;
use crate::session::session::SessionSettingsUpdate;
use crate::session::tests::build_world_state_from_turn_context;
use crate::session::tests::make_session_and_context;
use crate::tasks::InterruptedTurnHistoryMarker;
use crate::tasks::interrupted_turn_history_marker;
@@ -225,7 +226,10 @@ fn out_of_range_truncation_drops_pre_user_active_turn_prefix() {
#[tokio::test]
async fn ignores_session_prefix_messages_when_truncating() {
let (session, turn_context) = make_session_and_context().await;
let mut items = session.build_initial_context(&turn_context).await;
let world_state = build_world_state_from_turn_context(&session, &turn_context).await;
let mut items = session
.build_initial_context_with_world_state(&turn_context, &world_state)
.await;
items.push(user_msg("feature request"));
items.push(assistant_msg("ack"));
items.push(user_msg("second question"));
@@ -1,4 +1,5 @@
use super::*;
use crate::session::tests::build_world_state_from_turn_context;
use crate::session::tests::make_session_and_context;
use codex_protocol::AgentPath;
use codex_protocol::models::ContentItem;
@@ -166,7 +167,10 @@ fn truncates_rollout_from_start_applies_thread_rollback_markers() {
#[tokio::test]
async fn ignores_session_prefix_messages_when_truncating_rollout_from_start() {
let (session, turn_context) = make_session_and_context().await;
let mut items = session.build_initial_context(&turn_context).await;
let world_state = build_world_state_from_turn_context(&session, &turn_context).await;
let mut items = session
.build_initial_context_with_world_state(&turn_context, &world_state)
.await;
items.push(user_msg("feature request"));
items.push(assistant_msg("ack"));
items.push(user_msg("second question"));
+137
View File
@@ -1,6 +1,7 @@
use anyhow::Context;
use anyhow::Result;
use codex_config::types::ApprovalsReviewer;
use codex_core::compact::SUMMARIZATION_PROMPT;
use codex_core::config::Constrained;
use codex_exec_server::CopyOptions;
use codex_exec_server::CreateDirectoryOptions;
@@ -38,6 +39,7 @@ use core_test_support::get_remote_test_env;
use core_test_support::responses::ev_apply_patch_custom_tool_call;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_completed_with_tokens;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
@@ -683,6 +685,141 @@ async fn deferred_executor_updates_model_context_after_startup() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn deferred_executor_compaction_preserves_then_updates_environment_once() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let server = start_mock_server().await;
let response_mock = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
"wait-for-startup",
"request_user_input",
&json!({
"questions": [{
"id": "continue",
"header": "Continue",
"question": "Continue after startup?",
"options": [{
"label": "Yes (Recommended)",
"description": "Continue the test."
}, {
"label": "No",
"description": "Stop the test."
}]
}]
})
.to_string(),
),
ev_completed_with_tokens("resp-1", /*total_tokens*/ 96),
]),
sse(vec![
ev_assistant_message("msg-compact", "AUTO_COMPACT_SUMMARY"),
ev_completed_with_tokens("resp-compact", /*total_tokens*/ 10),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-2", "done"),
ev_completed("resp-2"),
]),
],
)
.await;
let mut builder = test_codex()
.with_exec_server_url(format!("ws://{}", listener.local_addr()?))
.with_config(|config| {
assert!(config.features.enable(Feature::DeferredExecutor).is_ok());
assert!(
config
.features
.enable(Feature::DefaultModeRequestUserInput)
.is_ok()
);
config.model_provider.name = "OpenAI (test)".to_string();
config.compact_prompt = Some(SUMMARIZATION_PROMPT.to_string());
config.model_context_window = Some(100);
config.model_auto_compact_token_limit = Some(90);
});
let test = timeout(Duration::from_secs(5), builder.build(&server))
.await
.context("thread startup should not wait for the remote environment")??;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "wait for the environment".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
let request = wait_for_event_match(&test.codex, |event| match event {
EventMsg::RequestUserInput(request) => Some(request.clone()),
_ => None,
})
.await;
serve_environment_info(listener).await;
test.codex
.submit(Op::UserInputAnswer {
id: request.turn_id,
response: RequestUserInputResponse {
answers: HashMap::from([(
"continue".to_string(),
RequestUserInputAnswer {
answers: vec!["Yes (Recommended)".to_string()],
},
)]),
},
})
.await?;
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let requests = response_mock.requests();
assert_eq!(requests.len(), 3);
let initial_context = requests[0].message_input_texts("user");
assert!(
initial_context
.iter()
.any(|text| text.contains("<status>starting</status>"))
);
let post_compaction_context = requests[2].message_input_texts("user");
assert_eq!(
post_compaction_context
.iter()
.filter(|text| text.contains("<status>starting</status>"))
.count(),
1
);
assert_eq!(
post_compaction_context
.iter()
.filter(|text| text.contains("<shell>zsh</shell>"))
.count(),
1
);
let starting_index = post_compaction_context
.iter()
.position(|text| text.contains("<status>starting</status>"))
.expect("compaction should preserve the prior environment state");
let ready_index = post_compaction_context
.iter()
.position(|text| text.contains("<shell>zsh</shell>"))
.expect("the next sampling step should report that the environment is ready");
assert!(starting_index < ready_index);
Ok(())
}
fn absolute_path(path: PathBuf) -> AbsolutePathBuf {
AbsolutePathBuf::try_from(path).expect("path should be absolute")
}