Files
Adrian 51864b0b4b feat: use run agent task auth for inference (#19051)
## Stack

This is PR 3 of the simplified HAI single-run-task stack:

- [#19047](https://github.com/openai/codex/pull/19047) Agent Identity
assertion and task-registration primitives, including the shared
run-task helper used by existing Agent Identity JWT auth.
- [#19049](https://github.com/openai/codex/pull/19049)
Disabled-by-default ChatGPT auth opt-in that provisions/reuses persisted
Agent Identity runtime auth and its single run task.
- [#19051](https://github.com/openai/codex/pull/19051) Run-scoped
provider auth that uses one backend-owned task id for first-party
inference and compaction requests.

[#19054](https://github.com/openai/codex/pull/19054) collapsed out of
the active stack because the simplified design no longer needs a
separate background/control-plane task helper.

## Summary

This PR moves Agent Identity usage into provider auth resolution. That
keeps `AgentAssertion` auth tied to first-party OpenAI provider requests
instead of applying a late session-wide override that could affect
local, custom, Bedrock, API-key, or external-bearer providers.

What changed:

- adds a small `ProviderAuthScope` struct carrying the run auth policy
and session source needed by provider-scoped auth resolution
- lets `Session` opt the existing `ModelClient` into `ChatGptAuth`
policy when `use_agent_identity` is enabled, without adding a second
model-client constructor
- resolves Agent Identity only for first-party OpenAI provider auth
paths
- uses the persisted run task id from the `AgentIdentityAuth` record to
build `AgentAssertion` auth for Responses requests
- routes shared request setup through scoped provider auth so unary
compact requests use the same run-task assertion path as inference turns
- keeps local/custom/Bedrock/env-key/external-bearer provider auth
unchanged
- lets missing run-task state surface through the existing model-request
error path instead of silently falling back to bearer auth

This PR intentionally does not create thread-scoped, target-scoped, or
background-scoped task identities. The run task is the only task Codex
registers in this POC shape.

## Testing

- `just test -p codex-model-provider`
- `just test -p codex-core client::tests::provider_auth_scope_uses`
- `just test -p codex-core remote_compact_uses_agent_identity_assertion`
2026-06-24 22:31:41 -07:00

4371 lines
154 KiB
Rust

use core_test_support::test_codex::local_selections;
use std::fs;
use anyhow::Result;
use codex_core::compact::SUMMARY_PREFIX;
use codex_features::Feature;
use codex_login::CodexAuth;
use codex_login::auth::AgentIdentityAuth;
use codex_login::auth::AgentIdentityAuthRecord;
use codex_protocol::account::PlanType as AccountPlanType;
use codex_protocol::config_types::ServiceTier;
use codex_protocol::dynamic_tools::DynamicToolFunctionSpec;
use codex_protocol::dynamic_tools::DynamicToolNamespaceSpec;
use codex_protocol::dynamic_tools::DynamicToolNamespaceTool;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::RealtimeOutputModality;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::user_input::UserInput;
use core_test_support::PathBufExt;
use core_test_support::apps_test_server::configure_search_capable_model;
use core_test_support::context_snapshot;
use core_test_support::context_snapshot::ContextSnapshotOptions;
use core_test_support::context_snapshot::ContextSnapshotRenderMode;
use core_test_support::responses;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::responses::start_websocket_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodexBuilder;
use core_test_support::test_codex::TestCodexHarness;
use core_test_support::test_codex::test_codex as base_test_codex;
use core_test_support::test_path_buf;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use core_test_support::wait_for_event_with_timeout;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use tokio::time::Duration;
use wiremock::ResponseTemplate;
const CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE: &str =
"Output exceeded the available model context and was truncated";
fn approx_token_count(text: &str) -> i64 {
i64::try_from(text.len().saturating_add(3) / 4).unwrap_or(i64::MAX)
}
fn estimate_compact_input_tokens(request: &responses::ResponsesRequest) -> i64 {
request.input().into_iter().fold(0i64, |acc, item| {
acc.saturating_add(approx_token_count(&item.to_string()))
})
}
fn estimate_compact_payload_tokens(request: &responses::ResponsesRequest) -> i64 {
estimate_compact_input_tokens(request)
.saturating_add(approx_token_count(&request.instructions_text()))
}
fn assert_tools_payload_does_not_defer(body: &Value) {
if let Some(tools) = body.get("tools") {
assert!(
!contains_defer_loading(tools),
"model-visible tools should not include deferred declarations: {tools}"
);
}
}
fn namespace_child_tool_names(body: &Value, namespace: &str) -> Vec<String> {
body.get("tools")
.and_then(Value::as_array)
.and_then(|tools| {
tools.iter().find_map(|tool| {
if tool.get("type").and_then(Value::as_str) == Some("namespace")
&& tool.get("name").and_then(Value::as_str) == Some(namespace)
{
tool.get("tools").and_then(Value::as_array).map(|children| {
children
.iter()
.filter_map(|child| {
child
.get("name")
.and_then(Value::as_str)
.map(str::to_string)
})
.collect()
})
} else {
None
}
})
})
.unwrap_or_default()
}
fn contains_defer_loading(value: &Value) -> bool {
match value {
Value::Object(map) => {
map.get("defer_loading").and_then(Value::as_bool) == Some(true)
|| map.values().any(contains_defer_loading)
}
Value::Array(values) => values.iter().any(contains_defer_loading),
Value::Null | Value::Bool(_) | Value::Number(_) | Value::String(_) => false,
}
}
fn canonical_json(value: &Value) -> Value {
match value {
Value::Object(map) => {
let mut entries = map.iter().collect::<Vec<_>>();
entries.sort_by_key(|(left_key, _)| *left_key);
Value::Object(
entries
.into_iter()
.map(|(key, value)| (key.clone(), canonical_json(value)))
.collect(),
)
}
Value::Array(values) => Value::Array(values.iter().map(canonical_json).collect()),
Value::Null | Value::Bool(_) | Value::Number(_) | Value::String(_) => value.clone(),
}
}
const PRETURN_CONTEXT_DIFF_CWD: &str = "/tmp/PRETURN_CONTEXT_DIFF_CWD";
const DUMMY_FUNCTION_NAME: &str = "test_tool";
const TURN_STATE_HEADER: &str = "x-codex-turn-state";
const REMOTE_COMPACT_TURN_COMPLETE_TIMEOUT: Duration = Duration::from_secs(30);
const TEST_AGENT_IDENTITY_PRIVATE_KEY: &str =
"MC4CAQAwBQYDK2VwBCIEIJ7kFBaOujmoz1gvBNEC+BeM2IX87FFB0xmISOZ/XO0c";
fn summary_with_prefix(summary: &str) -> String {
format!("{SUMMARY_PREFIX}\n{summary}")
}
fn context_snapshot_options() -> ContextSnapshotOptions {
ContextSnapshotOptions::default()
.strip_capability_instructions()
.render_mode(ContextSnapshotRenderMode::KindWithTextPrefix { max_chars: 64 })
}
fn format_labeled_requests_snapshot(
scenario: &str,
sections: &[(&str, &responses::ResponsesRequest)],
) -> String {
context_snapshot::format_labeled_requests_snapshot(
scenario,
sections,
&context_snapshot_options(),
)
}
fn compacted_summary_only_output(summary: &str) -> Vec<ResponseItem> {
vec![ResponseItem::Compaction {
id: None,
encrypted_content: summary_with_prefix(summary),
internal_chat_message_metadata_passthrough: None,
}]
}
fn test_codex() -> TestCodexBuilder {
base_test_codex().with_config(|config| {
let _ = config.features.disable(Feature::RemoteCompactionV2);
})
}
fn remote_realtime_test_codex_builder(
realtime_server: &responses::WebSocketTestServer,
) -> TestCodexBuilder {
let realtime_base_url = realtime_server.uri().to_string();
test_codex()
.with_auth(CodexAuth::from_api_key("dummy"))
.with_config(move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
})
}
async fn start_remote_realtime_server() -> responses::WebSocketTestServer {
start_websocket_server(vec![vec![
vec![json!({
"type": "session.updated",
"session": { "id": "sess_remote_compact", "instructions": "backend prompt" }
})],
// Keep the websocket open after startup so routed transcript items during the test do not
// exhaust the scripted responses and mark realtime inactive before the assertions run.
vec![],
vec![],
vec![],
vec![],
vec![],
vec![],
vec![],
vec![],
]])
.await
}
async fn start_realtime_conversation(codex: &codex_core::CodexThread) -> Result<()> {
codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
client_managed_handoffs: false,
codex_responses_as_items: false,
codex_response_item_prefix: None,
codex_response_handoff_prefix: None,
model: None,
output_modality: RealtimeOutputModality::Audio,
include_startup_context: true,
prompt: Some(Some("backend prompt".to_string())),
realtime_session_id: None,
transport: None,
version: None,
voice: None,
}))
.await?;
wait_for_event_match(codex, |msg| match msg {
EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.expect("conversation start failed");
wait_for_event_match(codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload:
RealtimeEvent::SessionUpdated {
realtime_session_id: session_id,
..
},
}) => Some(session_id.clone()),
_ => None,
})
.await;
Ok(())
}
async fn close_realtime_conversation(codex: &codex_core::CodexThread) -> Result<()> {
codex.submit(Op::RealtimeConversationClose).await?;
wait_for_event_match(codex, |msg| match msg {
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
_ => None,
})
.await;
Ok(())
}
fn assert_request_contains_realtime_start(request: &responses::ResponsesRequest) {
let body = request.body_json().to_string();
assert!(
body.contains("<realtime_conversation>"),
"expected request to restate realtime instructions"
);
assert!(
!body.contains("Reason: inactive"),
"expected request to use realtime start instructions"
);
}
fn assert_request_contains_custom_realtime_start(
request: &responses::ResponsesRequest,
instructions: &str,
) {
let body = request.body_json().to_string();
assert!(
body.contains("<realtime_conversation>"),
"expected request to preserve the realtime wrapper"
);
assert!(
body.contains(instructions),
"expected request to use custom realtime start instructions"
);
assert!(
!body.contains("Realtime conversation started."),
"expected request to replace the default realtime start instructions"
);
}
fn assert_request_contains_realtime_end(request: &responses::ResponsesRequest) {
let body = request.body_json().to_string();
assert!(
body.contains("<realtime_conversation>"),
"expected request to restate realtime instructions"
);
assert!(
body.contains("Reason: inactive"),
"expected request to use realtime end instructions"
);
}
async fn wait_for_turn_complete(codex: &codex_core::CodexThread) {
wait_for_event_with_timeout(
codex,
|ev| matches!(ev, EventMsg::TurnComplete(_)),
REMOTE_COMPACT_TURN_COMPLETE_TIMEOUT,
)
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_replaces_history_for_followups() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await?;
let codex = harness.test().codex.clone();
let session_id = harness.test().session_configured.session_id.to_string();
let thread_id = harness.test().session_configured.thread_id.to_string();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed("resp-2"),
]),
],
)
.await;
let compacted_history = vec![ResponseItem::Compaction {
id: None,
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
internal_chat_message_metadata_passthrough: None,
}];
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_history.clone() }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
codex.submit(Op::Compact).await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "after compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
let compact_request = compact_mock.single_request();
assert_eq!(compact_request.path(), "/v1/responses/compact");
assert_eq!(
compact_request.header("chatgpt-account-id").as_deref(),
Some("account_id")
);
assert_eq!(
compact_request.header("authorization").as_deref(),
Some("Bearer Access Token")
);
assert_eq!(
compact_request.header("session-id").as_deref(),
Some(session_id.as_str())
);
assert_eq!(
compact_request.header("thread-id").as_deref(),
Some(thread_id.as_str())
);
let compact_metadata: Value = serde_json::from_str(
&compact_request
.header("x-codex-turn-metadata")
.expect("remote compact request should include turn metadata"),
)
.expect("remote compact turn metadata should be valid json");
assert_eq!(
compact_request.header("x-codex-installation-id").as_deref(),
compact_metadata["installation_id"].as_str()
);
assert!(
compact_metadata["turn_id"]
.as_str()
.is_some_and(|id| !id.is_empty()),
"remote compact turn metadata should include its turn id"
);
assert_eq!(
compact_metadata["request_kind"].as_str(),
Some("compaction")
);
assert_eq!(
compact_metadata["window_id"].as_str(),
compact_request.header("x-codex-window-id").as_deref()
);
assert_eq!(
compact_metadata["compaction"],
json!({
"trigger": "manual",
"reason": "user_requested",
"implementation": "responses_compact",
"phase": "standalone_turn",
"strategy": "memento",
})
);
let compact_body = compact_request.body_json();
assert_eq!(
compact_body.get("model").and_then(|v| v.as_str()),
Some(harness.test().session_configured.model.as_str())
);
let response_requests = responses_mock.requests();
let first_response_request = response_requests.first().expect("initial request missing");
let first_response_metadata: Value = serde_json::from_str(
&first_response_request
.header("x-codex-turn-metadata")
.expect("initial request should include turn metadata"),
)
.expect("initial turn metadata should be valid json");
assert_ne!(
first_response_metadata["turn_id"], compact_metadata["turn_id"],
"manual compaction should use its own turn id"
);
assert_eq!(
compact_body["tools"],
first_response_request.body_json()["tools"],
"compact requests should send the same tools payload as /v1/responses"
);
assert_eq!(
compact_body["parallel_tool_calls"],
first_response_request.body_json()["parallel_tool_calls"],
"compact requests should match /v1/responses parallel_tool_calls"
);
assert_eq!(
compact_body["reasoning"],
first_response_request.body_json()["reasoning"],
"compact requests should match /v1/responses reasoning"
);
assert_eq!(
compact_body["text"],
first_response_request.body_json()["text"],
"compact requests should match /v1/responses text controls"
);
let compact_body_text = compact_body.to_string();
assert!(
compact_body_text.contains("hello remote compact"),
"expected compact request to include user history"
);
assert!(
compact_body_text.contains("FIRST_REMOTE_REPLY"),
"expected compact request to include assistant history"
);
let response_requests = responses_mock.requests();
let follow_up_request = response_requests.last().expect("follow-up request missing");
let follow_up_metadata: Value = serde_json::from_str(
&follow_up_request
.header("x-codex-turn-metadata")
.expect("follow-up request should include turn metadata"),
)
.expect("follow-up turn metadata should be valid json");
assert_eq!(
follow_up_metadata["request_kind"].as_str(),
Some("turn"),
"regular requests after compaction should remain turn requests"
);
assert!(
follow_up_metadata.get("compaction").is_none(),
"regular requests after compaction should not be marked as compact requests"
);
assert_ne!(
follow_up_metadata["turn_id"], compact_metadata["turn_id"],
"the following user turn should not reuse a manual compact turn id"
);
assert_eq!(
follow_up_metadata["window_id"].as_str(),
follow_up_request.header("x-codex-window-id").as_deref()
);
assert_ne!(
follow_up_metadata["window_id"], compact_metadata["window_id"],
"the following user turn should use the new compacted context window"
);
let follow_up_body = follow_up_request.body_json().to_string();
assert!(
follow_up_body.contains("\"type\":\"compaction\""),
"expected follow-up request to use compacted history"
);
assert!(
follow_up_body.contains("ENCRYPTED_COMPACTION_SUMMARY"),
"expected follow-up request to include compaction summary item"
);
assert!(
!follow_up_body.contains("FIRST_REMOTE_REPLY"),
"expected follow-up request to drop pre-compaction assistant messages"
);
assert!(
!follow_up_body.contains("hello remote compact"),
"expected follow-up request to drop compacted-away user turns when remote output omits them"
);
insta::assert_snapshot!(
"remote_manual_compact_with_history_shapes",
format_labeled_requests_snapshot(
"Remote manual /compact where remote compact output is compaction-only: follow-up layout uses the returned compaction item plus new user message.",
&[
("Remote Compaction Request", &compact_request),
("Remote Post-Compaction History Layout", follow_up_request),
]
)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_uses_agent_identity_assertion() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex().with_auth(CodexAuth::AgentIdentity(
AgentIdentityAuth::from_record(
AgentIdentityAuthRecord {
agent_runtime_id: "agent-runtime-compact".to_string(),
agent_private_key: TEST_AGENT_IDENTITY_PRIVATE_KEY.to_string(),
account_id: "account-compact".to_string(),
chatgpt_user_id: "user-compact".to_string(),
email: Some("agent@example.com".to_string()),
plan_type: AccountPlanType::Plus,
chatgpt_account_is_fedramp: false,
task_id: Some("task-compact".to_string()),
},
"https://auth.openai.com/api/accounts",
/*auth_route_config*/ None,
)
.await?,
)),
)
.await?;
let codex = harness.test().codex.clone();
let _responses_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
)
.await;
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_summary_only_output("COMPACTED") }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
codex.submit(Op::Compact).await?;
wait_for_turn_complete(&codex).await;
let compact_request = compact_mock.single_request();
assert_eq!(compact_request.path(), "/v1/responses/compact");
assert!(
compact_request
.header("authorization")
.is_some_and(|value| value.starts_with("AgentAssertion ")),
"compact request should use task-scoped AgentAssertion auth"
);
assert_eq!(
compact_request.header("chatgpt-account-id").as_deref(),
Some("account-compact")
);
Ok(())
}
async fn assert_remote_manual_compact_request_parity(
auth: CodexAuth,
configured_service_tier: Option<ServiceTier>,
expected_service_tier: Option<&str>,
snapshot_name: &str,
scenario: &str,
) -> Result<()> {
let mut builder = test_codex().with_auth(auth);
if let Some(service_tier) = configured_service_tier {
builder = builder.with_config(move |config| {
config.service_tier = Some(service_tier.request_value().to_string());
});
}
let harness = TestCodexHarness::with_builder(builder).await?;
let codex = harness.test().codex.clone();
let image_url =
"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR4nGNgYAAAAAMAASsJTYQAAAAASUVORK5CYII="
.to_string();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("turn-one-assistant", "TURN_ONE_ASSISTANT"),
responses::ev_completed("turn-one-response"),
]),
responses::sse(vec![
responses::ev_reasoning_item(
"turn-two-reasoning",
&["TURN_TWO_REASONING"],
&["turn two raw content"],
),
responses::ev_assistant_message("turn-two-assistant", "TURN_TWO_ASSISTANT"),
responses::ev_completed("turn-two-response"),
]),
responses::sse(vec![
responses::ev_function_call("turn-three-call", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed("turn-three-call-response"),
]),
responses::sse(vec![
responses::ev_assistant_message("turn-three-assistant", "TURN_THREE_ASSISTANT"),
responses::ev_completed("turn-three-final-response"),
]),
responses::sse(vec![
responses::ev_shell_command_call(
"turn-four-shell-command",
"echo TURN_FOUR_LOCAL_SHELL",
),
responses::ev_completed("turn-four-local-shell-response"),
]),
responses::sse(vec![
responses::ev_assistant_message("turn-four-assistant", "TURN_FOUR_ASSISTANT"),
responses::ev_completed("turn-four-final-response"),
]),
responses::sse(vec![
responses::ev_reasoning_item(
"turn-five-reasoning",
&["TURN_FIVE_REASONING"],
&["turn five raw content"],
),
responses::ev_assistant_message("turn-five-assistant", "TURN_FIVE_ASSISTANT"),
responses::ev_completed("turn-five-response"),
]),
],
)
.await;
let compact_mock = responses::mount_compact_user_history_with_summary_once(
harness.server(),
"REMOTE_CACHE_TIER_SUMMARY",
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "TURN_ONE_USER".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
items: vec![
UserInput::Text {
text: "TURN_TWO_PREFIX".to_string(),
text_elements: Vec::new(),
},
UserInput::Text {
text: "TURN_TWO_SUFFIX".to_string(),
text_elements: Vec::new(),
},
],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "TURN_THREE_TOOL_USER".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
items: vec![
UserInput::Image {
image_url,
detail: None,
},
UserInput::Text {
text: "TURN_FOUR_IMAGE_USER".to_string(),
text_elements: Vec::new(),
},
],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "TURN_FIVE_USER".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
codex.submit(Op::Compact).await?;
wait_for_turn_complete(&codex).await;
let response_requests = responses_mock.requests();
assert_eq!(
response_requests.len(),
7,
"expected five turns with one unsupported tool continuation and one shell command continuation"
);
assert_eq!(
compact_mock.requests().len(),
1,
"expected exactly one remote compact request"
);
let normal_request = response_requests
.last()
.cloned()
.expect("last turn request missing");
let compact_request = compact_mock.single_request();
let normal_body = normal_request.body_json();
let compact_body = compact_request.body_json();
let mut expected_compact_body_without_input = normal_body.clone();
let expected_compact_object = expected_compact_body_without_input
.as_object_mut()
.expect("responses request body should be an object");
for field in [
"input",
"client_metadata",
"include",
"store",
"stream",
"tool_choice",
] {
expected_compact_object.remove(field);
}
if expected_service_tier.is_none() {
expected_compact_object.remove("service_tier");
}
let mut compact_body_without_input = compact_body.clone();
compact_body_without_input
.as_object_mut()
.expect("compact request body should be an object")
.remove("input");
let canonical_compact_body_without_input = canonical_json(&compact_body_without_input);
let canonical_expected_compact_body_without_input =
canonical_json(&expected_compact_body_without_input);
assert_eq!(
json!({
"compact_body_without_input": canonical_compact_body_without_input,
"expected_compact_body_without_input": canonical_expected_compact_body_without_input,
"prompt_cache_key_matches_responses": compact_body["prompt_cache_key"] == normal_body["prompt_cache_key"],
"prompt_cache_key_present": compact_body["prompt_cache_key"].is_string(),
"service_tier": compact_body.get("service_tier").and_then(Value::as_str),
}),
json!({
"compact_body_without_input": canonical_expected_compact_body_without_input,
"expected_compact_body_without_input": canonical_expected_compact_body_without_input,
"prompt_cache_key_matches_responses": true,
"prompt_cache_key_present": true,
"service_tier": expected_service_tier,
}),
"compact requests should carry the same shared request fields as /responses"
);
insta::assert_snapshot!(
snapshot_name,
context_snapshot::format_request_body_diff_snapshot(
scenario,
"Last Normal /responses Request",
&normal_request,
"Remote /responses/compact Request",
&compact_request,
&ContextSnapshotOptions::default(),
)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_manual_compact_api_auth_omits_service_tier_and_reuses_prompt_cache_key()
-> Result<()> {
skip_if_no_network!(Ok(()));
assert_remote_manual_compact_request_parity(
CodexAuth::from_api_key("dummy"),
Some(ServiceTier::Fast),
/*expected_service_tier*/ None,
"remote_manual_compact_api_auth_prompt_cache_key_request_diff",
"After five varied API-key-auth turns, remote manual compaction omits service_tier, reuses prompt_cache_key, and still omits responses-only fields.",
)
.await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_manual_compact_chatgpt_auth_reuses_service_tier_and_prompt_cache_key() -> Result<()>
{
skip_if_no_network!(Ok(()));
assert_remote_manual_compact_request_parity(
CodexAuth::create_dummy_chatgpt_auth_for_testing(),
Some(ServiceTier::Fast),
Some("priority"),
"remote_manual_compact_chatgpt_auth_service_tier_prompt_cache_key_request_diff",
"After five varied ChatGPT-auth turns, remote manual compaction reuses service_tier and prompt_cache_key while omitting responses-only fields.",
)
.await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_v2_reuses_compaction_trigger_for_followups() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
let _ = config.features.enable(Feature::RemoteCompactionV2);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
serde_json::json!({
"type": "response.output_item.done",
"item": {
"type": "compaction",
"encrypted_content": "ENCRYPTED_CONTEXT_COMPACTION_SUMMARY",
}
}),
responses::ev_completed("resp-compact"),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed("resp-2"),
]),
],
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
codex.submit(Op::Compact).await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "after compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
let response_requests = responses_mock.requests();
let compact_request = &response_requests[1];
assert!(
compact_request
.header("x-codex-beta-features")
.as_deref()
.is_some_and(|value| value
.split(',')
.any(|feature| feature == "remote_compaction_v2")),
"expected compact request to advertise the remote_compaction_v2 beta feature"
);
assert_eq!(compact_request.path(), "/v1/responses");
let compact_metadata: Value = serde_json::from_str(
&compact_request
.header("x-codex-turn-metadata")
.expect("v2 compact request should include turn metadata"),
)
.expect("v2 compact turn metadata should be valid json");
assert_eq!(
compact_metadata["request_kind"].as_str(),
Some("compaction")
);
assert_eq!(
compact_metadata["window_id"].as_str(),
compact_request.header("x-codex-window-id").as_deref()
);
assert_eq!(
compact_request.body_json()["client_metadata"]["x-codex-window-id"].as_str(),
compact_metadata["window_id"].as_str()
);
assert_eq!(
compact_metadata["compaction"],
json!({
"trigger": "manual",
"reason": "user_requested",
"implementation": "responses_compaction_v2",
"phase": "standalone_turn",
"strategy": "memento",
})
);
let compact_body = compact_request.body_json().to_string();
assert!(
compact_body.contains("\"type\":\"compaction_trigger\""),
"expected v2 compaction request to include the compaction_trigger item"
);
assert!(
!compact_body.contains("ENCRYPTED_CONTEXT_COMPACTION_SUMMARY"),
"expected v2 compaction trigger item to omit encrypted_content"
);
let follow_up_request = response_requests.last().expect("follow-up request missing");
let follow_up_body = follow_up_request.body_json().to_string();
assert!(
follow_up_body.contains("\"type\":\"compaction\""),
"expected follow-up request to preserve the compaction item"
);
assert!(
follow_up_body.contains("ENCRYPTED_CONTEXT_COMPACTION_SUMMARY"),
"expected follow-up request to include the compaction payload"
);
assert!(
follow_up_body.contains("hello remote compact"),
"expected v2 follow-up request to preserve retained original user messages"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_v2_retries_failures_with_stream_retry_budget() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
let _ = config.features.enable(Feature::RemoteCompactionV2);
config.model_provider.request_max_retries = Some(0);
config.model_provider.stream_max_retries = Some(2);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_response_sequence(
harness.server(),
vec![
responses::sse_response(responses::sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed("resp-1"),
])),
ResponseTemplate::new(500).set_body_string("first compact open failed"),
responses::sse_response(responses::sse(vec![serde_json::json!({
"type": "response.output_item.done",
"item": {
"type": "compaction",
"encrypted_content": "FAILED_COMPACT_SUMMARY",
}
})])),
responses::sse_response(responses::sse(vec![
serde_json::json!({
"type": "response.output_item.done",
"item": {
"type": "compaction",
"encrypted_content": "RETRIED_COMPACT_SUMMARY",
}
}),
responses::ev_completed("resp-compact-retry"),
])),
responses::sse_response(responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed("resp-2"),
])),
],
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
codex.submit(Op::Compact).await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "after compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
let response_requests = responses_mock.requests();
assert_eq!(
5,
response_requests.len(),
"expected initial turn, failed open, failed stream, compact retry, and follow-up turn"
);
for compact_request in &response_requests[1..=3] {
assert_eq!("/v1/responses", compact_request.path());
assert!(
compact_request
.body_json()
.to_string()
.contains("\"type\":\"compaction_trigger\""),
"expected v2 compaction request to include the compaction_trigger item"
);
}
let follow_up_request = response_requests.last().expect("follow-up request missing");
let follow_up_body = follow_up_request.body_json().to_string();
assert!(
follow_up_body.contains("RETRIED_COMPACT_SUMMARY"),
"expected follow-up request to include the retried compaction payload"
);
assert!(
!follow_up_body.contains("FAILED_COMPACT_SUMMARY"),
"expected failed compaction attempt output to be discarded"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_v2_accepts_additional_output_items_before_compaction() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
let _ = config.features.enable(Feature::RemoteCompactionV2);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
responses::ev_assistant_message("m-compact-noise", "IGNORED_COMPACT_REPLY"),
serde_json::json!({
"type": "response.output_item.done",
"item": {
"type": "compaction",
"encrypted_content": "ENCRYPTED_CONTEXT_COMPACTION_SUMMARY",
}
}),
responses::ev_completed("resp-compact"),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed("resp-2"),
]),
],
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
codex.submit(Op::Compact).await?;
wait_for_turn_complete(&codex).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "after compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
let response_requests = responses_mock.requests();
let follow_up_request = response_requests.last().expect("follow-up request missing");
let follow_up_body = follow_up_request.body_json().to_string();
assert!(
follow_up_body.contains("\"type\":\"compaction\""),
"expected follow-up request to preserve the compaction item"
);
assert!(
follow_up_body.contains("ENCRYPTED_CONTEXT_COMPACTION_SUMMARY"),
"expected follow-up request to include the compaction payload"
);
assert!(
!follow_up_body.contains("IGNORED_COMPACT_REPLY"),
"expected follow-up request to ignore unrelated output items from the compaction stream"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_filters_deferred_dynamic_tools() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let mut test = builder.build(&server).await?;
let hidden_tool = "hidden_dynamic_tool";
let visible_tool = "visible_dynamic_tool";
let input_schema = json!({
"type": "object",
"properties": {},
"additionalProperties": false,
});
let dynamic_tools = vec![DynamicToolSpec::Namespace(DynamicToolNamespaceSpec {
name: "codex_app".to_string(),
description: "Codex app tools.".to_string(),
tools: vec![
DynamicToolNamespaceTool::Function(DynamicToolFunctionSpec {
name: hidden_tool.to_string(),
description: "Hidden until discovered.".to_string(),
input_schema: input_schema.clone(),
defer_loading: true,
}),
DynamicToolNamespaceTool::Function(DynamicToolFunctionSpec {
name: visible_tool.to_string(),
description: "Visible immediately.".to_string(),
input_schema,
defer_loading: false,
}),
],
})];
let new_thread = test
.thread_manager
.start_thread_with_tools(test.config.clone(), dynamic_tools)
.await?;
test.codex = new_thread.thread;
test.session_configured = new_thread.session_configured;
let codex = test.codex.clone();
let responses_mock = mount_sse_once(
&server,
sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
)
.await;
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({
"output": compacted_summary_only_output("compact summary"),
}),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
codex.submit(Op::Compact).await?;
wait_for_turn_complete(&codex).await;
let first_response_body = responses_mock.single_request().body_json();
let compact_body = compact_mock.single_request().body_json();
assert_eq!(
compact_body["tools"], first_response_body["tools"],
"compact requests should send the same model-visible tools payload as /v1/responses"
);
assert_tools_payload_does_not_defer(&first_response_body);
assert_tools_payload_does_not_defer(&compact_body);
assert_eq!(
namespace_child_tool_names(&first_response_body, "codex_app"),
vec![visible_tool.to_string()]
);
assert_eq!(
namespace_child_tool_names(&compact_body, "codex_app"),
vec![visible_tool.to_string()]
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_runs_automatically() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await?;
let codex = harness.test().codex.clone();
let session_id = harness.test().session_configured.session_id.to_string();
let thread_id = harness.test().session_configured.thread_id.to_string();
let initial_request = mount_sse_once(
harness.server(),
sse(vec![
responses::ev_shell_command_call("m1", "echo 'hi'"),
responses::ev_completed_with_tokens("resp-1", /*total_tokens*/ 100000000), // over token limit
]),
)
.await;
let responses_mock = mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed("resp-2"),
]),
)
.await;
let compact_mock = responses::mount_compact_user_history_with_summary_once(
harness.server(),
"REMOTE_COMPACTED_SUMMARY",
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
let message = wait_for_event_match(&codex, |event| match event {
EventMsg::ContextCompacted(_) => Some(true),
_ => None,
})
.await;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert!(message);
assert_eq!(compact_mock.requests().len(), 1);
assert_eq!(
compact_mock
.single_request()
.header("session-id")
.as_deref(),
Some(session_id.as_str())
);
assert_eq!(
compact_mock.single_request().header("thread-id").as_deref(),
Some(thread_id.as_str())
);
let compact_metadata: Value = serde_json::from_str(
&compact_mock
.single_request()
.header("x-codex-turn-metadata")
.expect("auto remote compact request should include turn metadata"),
)
.expect("auto remote compact turn metadata should be valid json");
assert_eq!(
compact_metadata["request_kind"].as_str(),
Some("compaction")
);
assert_eq!(
compact_metadata["compaction"],
json!({
"trigger": "auto",
"reason": "context_limit",
"implementation": "responses_compact",
"phase": "mid_turn",
"strategy": "memento",
})
);
let initial_metadata: Value = serde_json::from_str(
&initial_request
.single_request()
.header("x-codex-turn-metadata")
.expect("initial request should include turn metadata"),
)
.expect("initial turn metadata should be valid json");
assert_eq!(
initial_metadata["turn_id"], compact_metadata["turn_id"],
"automatic mid-turn compaction should keep the current turn id"
);
assert_eq!(
initial_metadata["window_id"], compact_metadata["window_id"],
"automatic mid-turn compaction summarizes the current context window"
);
let follow_up_request = responses_mock.single_request();
let follow_up_metadata: Value = serde_json::from_str(
&follow_up_request
.header("x-codex-turn-metadata")
.expect("post-compaction continuation should include turn metadata"),
)
.expect("post-compaction turn metadata should be valid json");
assert_eq!(
follow_up_metadata["request_kind"].as_str(),
Some("turn"),
"post-compaction continuation should be a regular request"
);
assert!(follow_up_metadata.get("compaction").is_none());
assert_eq!(
follow_up_metadata["turn_id"], compact_metadata["turn_id"],
"automatic mid-turn continuation should keep the current turn id"
);
assert_ne!(
follow_up_metadata["window_id"], compact_metadata["window_id"],
"post-compaction continuation should use the next context window"
);
let follow_up_body = follow_up_request.body_json().to_string();
assert!(follow_up_body.contains("REMOTE_COMPACTED_SUMMARY"));
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_trims_function_call_history_to_fit_context_window() -> Result<()> {
skip_if_no_network!(Ok(()));
let first_user_message = "turn with retained shell call";
let second_user_message = "turn with trimmed shell call";
let retained_call_id = "retained-call";
let trimmed_call_id = "trimmed-call";
let retained_command = "echo retained-shell-output";
let trimmed_command = "yes x | head -n 3000";
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_context_window = Some(2_000);
config.model_auto_compact_token_limit = Some(200_000);
}),
)
.await?;
let codex = harness.test().codex.clone();
responses::mount_sse_sequence(
harness.server(),
vec![
sse(vec![
responses::ev_shell_command_call(retained_call_id, retained_command),
responses::ev_completed("retained-call-response"),
]),
sse(vec![
responses::ev_assistant_message("retained-assistant", "retained complete"),
responses::ev_completed("retained-final-response"),
]),
sse(vec![
responses::ev_shell_command_call(trimmed_call_id, trimmed_command),
responses::ev_completed("trimmed-call-response"),
]),
],
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: first_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: second_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let compact_mock = responses::mount_compact_user_history_with_summary_once(
harness.server(),
"REMOTE_COMPACT_SUMMARY",
)
.await;
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let compact_request = compact_mock.single_request();
let user_messages = compact_request.message_input_texts("user");
assert!(
user_messages
.iter()
.any(|message| message == first_user_message),
"expected compact request to retain earlier user history"
);
assert!(
user_messages
.iter()
.any(|message| message == second_user_message),
"expected compact request to retain the user boundary message"
);
assert!(
compact_request.has_function_call(retained_call_id)
&& compact_request
.function_call_output_text(retained_call_id)
.is_some(),
"expected compact request to keep the older function call/result pair"
);
assert!(
compact_request.has_function_call(trimmed_call_id),
"expected compact request to retain the trailing function call"
);
assert_eq!(
compact_request.function_call_output_text(trimmed_call_id),
Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()),
"expected compact request to rewrite the trailing function call output past the boundary"
);
assert_eq!(
compact_request.inputs_of_type("function_call").len(),
2,
"expected both function calls after rewriting the trailing output"
);
assert_eq!(
compact_request.inputs_of_type("function_call_output").len(),
2,
"expected both function call outputs after rewriting the trailing output"
);
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_rewrites_multiple_trailing_function_call_outputs() -> Result<()> {
skip_if_no_network!(Ok(()));
let first_user_message = "turn with retained shell call";
let second_user_message = "turn with parallel shell calls";
let retained_call_id = "retained-call";
let first_trimmed_call_id = "first-trimmed-call";
let second_trimmed_call_id = "second-trimmed-call";
let retained_command = "echo retained-shell-output";
let first_trimmed_command = "yes x | head -n 3000";
let second_trimmed_command = "yes y | head -n 3000";
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_context_window = Some(2_000);
config.model_auto_compact_token_limit = Some(200_000);
}),
)
.await?;
let codex = harness.test().codex.clone();
responses::mount_sse_sequence(
harness.server(),
vec![
sse(vec![
responses::ev_shell_command_call(retained_call_id, retained_command),
responses::ev_completed("retained-call-response"),
]),
sse(vec![
responses::ev_assistant_message("retained-assistant", "retained complete"),
responses::ev_completed("retained-final-response"),
]),
sse(vec![
responses::ev_shell_command_call(first_trimmed_call_id, first_trimmed_command),
responses::ev_shell_command_call(second_trimmed_call_id, second_trimmed_command),
responses::ev_completed("parallel-call-response"),
]),
],
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: first_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: second_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let compact_mock = responses::mount_compact_user_history_with_summary_once(
harness.server(),
"REMOTE_COMPACT_SUMMARY",
)
.await;
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let compact_request = compact_mock.single_request();
assert!(
compact_request.has_function_call(retained_call_id)
&& compact_request
.function_call_output_text(retained_call_id)
.is_some(),
"expected compact request to keep the older function call/result pair"
);
assert!(
compact_request.has_function_call(first_trimmed_call_id)
&& compact_request.has_function_call(second_trimmed_call_id),
"expected compact request to retain both trailing parallel function calls"
);
assert_eq!(
compact_request.function_call_output_text(first_trimmed_call_id),
Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()),
"expected compact request to rewrite the first trailing function call output"
);
assert_eq!(
compact_request.function_call_output_text(second_trimmed_call_id),
Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()),
"expected compact request to rewrite the second trailing function call output"
);
assert_eq!(
compact_request.inputs_of_type("function_call").len(),
3,
"expected all function calls after rewriting trailing outputs"
);
assert_eq!(
compact_request.inputs_of_type("function_call_output").len(),
3,
"expected all function call outputs after rewriting trailing outputs"
);
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_remote_compact_trims_function_call_history_to_fit_context_window() -> Result<()> {
skip_if_no_network!(Ok(()));
let first_user_message = "turn with retained shell call";
let second_user_message = "turn with trimmed shell call";
let retained_call_id = "retained-call";
let trimmed_call_id = "trimmed-call";
let retained_command = "echo retained-shell-output";
let trimmed_command = "yes x | head -n 3000";
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_context_window = Some(2_000);
config.model_auto_compact_token_limit = Some(200_000);
}),
)
.await?;
let codex = harness.test().codex.clone();
responses::mount_sse_sequence(
harness.server(),
vec![
sse(vec![
responses::ev_shell_command_call(retained_call_id, retained_command),
responses::ev_completed_with_tokens(
"retained-call-response",
/*total_tokens*/ 100,
),
]),
sse(vec![
responses::ev_assistant_message("retained-assistant", "retained complete"),
responses::ev_completed("retained-final-response"),
]),
sse(vec![
responses::ev_shell_command_call(trimmed_call_id, trimmed_command),
responses::ev_completed_with_tokens(
"trimmed-call-response",
/*total_tokens*/ 100,
),
]),
sse(vec![responses::ev_completed_with_tokens(
"trimmed-final-response",
/*total_tokens*/ 500_000,
)]),
],
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: first_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: second_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let compact_mock = responses::mount_compact_user_history_with_summary_once(
harness.server(),
"REMOTE_AUTO_COMPACT_SUMMARY",
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "turn that triggers auto compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert_eq!(
compact_mock.requests().len(),
1,
"expected exactly one remote compact request"
);
let compact_request = compact_mock.single_request();
let user_messages = compact_request.message_input_texts("user");
assert!(
user_messages
.iter()
.any(|message| message == first_user_message),
"expected compact request to retain earlier user history"
);
assert!(
user_messages
.iter()
.any(|message| message == second_user_message),
"expected compact request to retain the user boundary message"
);
assert!(
compact_request.has_function_call(retained_call_id)
&& compact_request
.function_call_output_text(retained_call_id)
.is_some(),
"expected compact request to keep the older function call/result pair"
);
assert!(
compact_request.has_function_call(trimmed_call_id),
"expected compact request to retain the trailing function call"
);
assert_eq!(
compact_request.function_call_output_text(trimmed_call_id),
Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()),
"expected compact request to rewrite the trailing function call output past the boundary"
);
assert_eq!(
compact_request.inputs_of_type("function_call").len(),
2,
"expected both function calls after rewriting the trailing output"
);
assert_eq!(
compact_request.inputs_of_type("function_call_output").len(),
2,
"expected both function call outputs after rewriting the trailing output"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_trims_tool_search_output_to_empty_tools_array() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let search_call_id = "tool-search-1";
let tool_name = "oversized_dynamic_tool";
let tool_description = format!(
"Oversized deferred tool for remote compaction. {}",
"x".repeat(20_000)
);
let _responses_mock = mount_sse_once(
&server,
sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_tool_search_call(
search_call_id,
&json!({
"query": "oversized deferred tool",
"limit": 8,
}),
),
responses::ev_completed("resp-1"),
]),
)
.await;
let input_schema = json!({
"type": "object",
"properties": {
"mode": { "type": "string" },
},
"required": ["mode"],
"additionalProperties": false,
});
let dynamic_tool = DynamicToolSpec::Namespace(DynamicToolNamespaceSpec {
name: "codex_app".to_string(),
description: "Codex app tools.".to_string(),
tools: vec![DynamicToolNamespaceTool::Function(
DynamicToolFunctionSpec {
name: tool_name.to_string(),
description: tool_description,
input_schema,
defer_loading: true,
},
)],
});
let mut builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
configure_search_capable_model(config);
config.model_context_window = Some(2_000);
});
let mut test = builder.build(&server).await?;
let new_thread = test
.thread_manager
.start_thread_with_tools(test.config.clone(), vec![dynamic_tool])
.await?;
test.codex = new_thread.thread;
test.session_configured = new_thread.session_configured;
let codex = test.codex.clone();
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "Find the oversized deferred tool".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
let compact_mock =
responses::mount_compact_user_history_with_summary_once(&server, "REMOTE_COMPACT_SUMMARY")
.await;
codex.submit(Op::Compact).await?;
wait_for_turn_complete(&codex).await;
let compact_request = compact_mock.single_request();
let compact_tools = compact_request
.tool_search_output(search_call_id)
.get("tools")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
assert!(
compact_request
.inputs_of_type("tool_search_output")
.iter()
.any(|item| item.get("call_id").and_then(Value::as_str) == Some(search_call_id)),
"expected compact request to retain the tool_search_output item"
);
assert!(
compact_tools.is_empty(),
"expected compact request to rewrite trailing tool_search output to an empty tools array"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_remote_compact_failure_stops_agent_loop() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(120);
}),
)
.await?;
let codex = harness.test().codex.clone();
mount_sse_once(
harness.server(),
sse(vec![
responses::ev_assistant_message("initial-assistant", "initial turn complete"),
responses::ev_completed_with_tokens("initial-response", /*total_tokens*/ 500_000),
]),
)
.await;
let first_compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": "invalid compact payload shape" }),
)
.await;
let post_compact_turn_mock = mount_sse_once(
harness.server(),
sse(vec![
responses::ev_assistant_message("post-compact-assistant", "should not run"),
responses::ev_completed("post-compact-response"),
]),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "turn that exceeds token threshold".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "turn that triggers auto compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
let error_message = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert!(
error_message.contains("Error running remote compact task"),
"expected remote compact task error prefix, got {error_message}"
);
assert_eq!(
first_compact_mock.requests().len(),
1,
"expected first remote compact attempt with incoming items"
);
assert!(
post_compact_turn_mock.requests().is_empty(),
"expected agent loop to stop after compaction failure"
);
insta::assert_snapshot!(
"remote_pre_turn_compaction_failure_shapes",
format_labeled_requests_snapshot(
"Remote pre-turn auto-compaction parse failure: compaction request excludes the incoming user message and the turn stops.",
&[(
"Remote Compaction Request (Incoming User Excluded)",
&first_compact_mock.single_request()
),]
)
);
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_trim_estimate_uses_session_base_instructions() -> Result<()> {
skip_if_no_network!(Ok(()));
let first_user_message = "turn with baseline shell call";
let second_user_message = "turn with trailing shell call";
let baseline_retained_call_id = "baseline-retained-call";
let baseline_trailing_call_id = "baseline-trailing-call";
let override_retained_call_id = "override-retained-call";
let override_trailing_call_id = "override-trailing-call";
let retained_command = "printf retained-shell-output";
let trailing_command = "printf '%020000d' 0";
let baseline_harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_context_window = Some(200_000);
}),
)
.await?;
let baseline_codex = baseline_harness.test().codex.clone();
responses::mount_sse_sequence(
baseline_harness.server(),
vec![
sse(vec![
responses::ev_shell_command_call(baseline_retained_call_id, retained_command),
responses::ev_completed("baseline-retained-call-response"),
]),
sse(vec![
responses::ev_assistant_message("baseline-retained-assistant", "retained complete"),
responses::ev_completed("baseline-retained-final-response"),
]),
sse(vec![
responses::ev_shell_command_call(baseline_trailing_call_id, trailing_command),
responses::ev_completed("baseline-trailing-call-response"),
]),
sse(vec![responses::ev_completed(
"baseline-trailing-final-response",
)]),
],
)
.await;
baseline_codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: first_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&baseline_codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
baseline_codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: second_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&baseline_codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let baseline_compact_mock = responses::mount_compact_user_history_with_summary_once(
baseline_harness.server(),
"REMOTE_BASELINE_SUMMARY",
)
.await;
baseline_codex.submit(Op::Compact).await?;
wait_for_event(&baseline_codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let baseline_compact_request = baseline_compact_mock.single_request();
assert!(
baseline_compact_request.has_function_call(baseline_retained_call_id),
"expected baseline compact request to retain older function call history"
);
assert!(
baseline_compact_request.has_function_call(baseline_trailing_call_id),
"expected baseline compact request to retain trailing function call history"
);
let baseline_input_tokens = estimate_compact_input_tokens(&baseline_compact_request);
let baseline_payload_tokens = estimate_compact_payload_tokens(&baseline_compact_request);
let override_base_instructions = format!(
"{}\nREMOTE_BASE_INSTRUCTIONS_OVERRIDE {}",
baseline_compact_request.instructions_text(),
"x".repeat(8_000)
);
let override_context_window = baseline_payload_tokens.saturating_add(500);
let pretrim_override_estimate =
baseline_input_tokens.saturating_add(approx_token_count(&override_base_instructions));
assert!(
pretrim_override_estimate > override_context_window,
"expected override instructions to push pre-trim estimate past the context window"
);
let override_harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config({
let override_base_instructions = override_base_instructions.clone();
move |config| {
config.model_context_window = Some(override_context_window);
config.base_instructions = Some(override_base_instructions);
}
}),
)
.await?;
let override_codex = override_harness.test().codex.clone();
responses::mount_sse_sequence(
override_harness.server(),
vec![
sse(vec![
responses::ev_shell_command_call(override_retained_call_id, retained_command),
responses::ev_completed("override-retained-call-response"),
]),
sse(vec![
responses::ev_assistant_message("override-retained-assistant", "retained complete"),
responses::ev_completed("override-retained-final-response"),
]),
sse(vec![
responses::ev_shell_command_call(override_trailing_call_id, trailing_command),
responses::ev_completed("override-trailing-call-response"),
]),
sse(vec![responses::ev_completed(
"override-trailing-final-response",
)]),
],
)
.await;
override_codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: first_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&override_codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
override_codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: second_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&override_codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let override_compact_mock = responses::mount_compact_user_history_with_summary_once(
override_harness.server(),
"REMOTE_OVERRIDE_SUMMARY",
)
.await;
override_codex.submit(Op::Compact).await?;
wait_for_event(&override_codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let override_compact_request = override_compact_mock.single_request();
assert_eq!(
override_compact_request.instructions_text(),
override_base_instructions
);
assert!(
override_compact_request.has_function_call(override_retained_call_id),
"expected remote compact request to preserve older function call history"
);
assert!(
override_compact_request.has_function_call(override_trailing_call_id),
"expected remote compact request to preserve trailing function call history with override instructions"
);
assert_eq!(
override_compact_request.function_call_output_text(override_trailing_call_id),
Some(CONTEXT_WINDOW_TRUNCATED_OUTPUT_MESSAGE.to_string()),
"expected remote compact request to rewrite trailing function call output with override instructions"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_manual_compact_emits_context_compaction_items() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await?;
let codex = harness.test().codex.clone();
mount_sse_once(
harness.server(),
sse(vec![
responses::ev_assistant_message("m1", "REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
)
.await;
let compact_mock = responses::mount_compact_user_history_with_summary_once(
harness.server(),
"REMOTE_COMPACTED_SUMMARY",
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "manual remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await?;
let mut started_item = None;
let mut completed_item = None;
let mut legacy_event = false;
let mut saw_turn_complete = false;
while !saw_turn_complete || started_item.is_none() || completed_item.is_none() || !legacy_event
{
let event = codex.next_event().await.unwrap();
match event.msg {
EventMsg::ItemStarted(ItemStartedEvent {
item: TurnItem::ContextCompaction(item),
..
}) => {
started_item = Some(item);
}
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::ContextCompaction(item),
..
}) => {
completed_item = Some(item);
}
EventMsg::ContextCompacted(_) => {
legacy_event = true;
}
EventMsg::TurnComplete(_) => {
saw_turn_complete = true;
}
_ => {}
}
}
let started_item = started_item.expect("context compaction item started");
let completed_item = completed_item.expect("context compaction item completed");
assert_eq!(started_item.id, completed_item.id);
assert!(legacy_event);
assert_eq!(compact_mock.requests().len(), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_manual_compact_failure_emits_task_error_event() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await?;
let codex = harness.test().codex.clone();
mount_sse_once(
harness.server(),
sse(vec![
responses::ev_assistant_message("m1", "REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
)
.await;
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": "invalid compact payload shape" }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "manual remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await?;
let error_message = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
assert!(
error_message.contains("Error running remote compact task"),
"expected remote compact task error prefix, got {error_message}"
);
assert!(
error_message.contains("invalid compact payload shape")
|| error_message.contains("invalid type: string"),
"expected invalid compact payload details, got {error_message}"
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Re-enable after the follow-up compaction behavior PR lands.
// Current main behavior for rollout replacement-history persistence is known-incorrect.
#[ignore = "behavior change covered in follow-up compaction PR"]
async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await?;
let codex = harness.test().codex.clone();
let rollout_path = harness
.test()
.session_configured
.rollout_path
.clone()
.expect("rollout path");
let responses_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m1", "COMPACT_BASELINE_REPLY"),
responses::ev_completed("resp-1"),
]),
)
.await;
let compacted_history = vec![
ResponseItem::Compaction {
id: None,
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
internal_chat_message_metadata_passthrough: None,
},
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "COMPACTED_ASSISTANT_NOTE".to_string(),
}],
phase: None,
internal_chat_message_metadata_passthrough: None,
},
];
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_history.clone() }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "needs compaction".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Shutdown).await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
assert_eq!(responses_mock.requests().len(), 1);
assert_eq!(compact_mock.requests().len(), 1);
let rollout_text = fs::read_to_string(&rollout_path)?;
let mut saw_compacted_history = false;
for line in rollout_text
.lines()
.map(str::trim)
.filter(|l| !l.is_empty())
{
let Ok(entry) = serde_json::from_str::<RolloutLine>(line) else {
continue;
};
if let RolloutItem::Compacted(compacted) = entry.item
&& compacted.message.is_empty()
&& let Some(replacement_history) = compacted.replacement_history.as_ref()
{
let has_compaction_item = replacement_history.iter().any(|item| {
matches!(
item,
ResponseItem::Compaction {
encrypted_content, ..
}
if encrypted_content == "ENCRYPTED_COMPACTION_SUMMARY"
)
});
let has_compacted_assistant_note = replacement_history.iter().any(|item| {
matches!(
item,
ResponseItem::Message { role, content, .. }
if role == "assistant"
&& content.iter().any(|part| matches!(
part,
ContentItem::OutputText { text } if text == "COMPACTED_ASSISTANT_NOTE"
))
)
});
let has_permissions_developer_message = replacement_history.iter().any(|item| {
matches!(
item,
ResponseItem::Message { role, content, .. }
if role == "developer"
&& content.iter().any(|part| matches!(
part,
ContentItem::InputText { text }
if text.contains("<permissions instructions>")
))
)
});
if has_compaction_item && has_compacted_assistant_note {
assert!(
!has_permissions_developer_message,
"manual remote compact rollout replacement history should not inject permissions context"
);
saw_compacted_history = true;
break;
}
}
}
assert!(
saw_compacted_history,
"expected rollout to persist remote compaction history"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_and_resume_refresh_stale_developer_instructions() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let stale_developer_message = "STALE_DEVELOPER_INSTRUCTIONS_SHOULD_BE_REMOVED";
let mut start_builder =
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let initial = start_builder.build(&server).await?;
let home = initial.home.clone();
let rollout_path = initial
.session_configured
.rollout_path
.clone()
.expect("rollout path");
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "BASELINE_REPLY"),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed("resp-2"),
]),
responses::sse(vec![
responses::ev_assistant_message("m3", "AFTER_RESUME_REPLY"),
responses::ev_completed("resp-3"),
]),
],
)
.await;
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: stale_developer_message.to_string(),
}],
phase: None,
internal_chat_message_metadata_passthrough: None,
},
ResponseItem::Compaction {
id: None,
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
internal_chat_message_metadata_passthrough: None,
},
];
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({ "output": compacted_history }),
)
.await;
initial
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "start remote compact flow".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
initial.codex.submit(Op::Compact).await?;
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
initial
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "after compact in same session".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
initial.codex.submit(Op::Shutdown).await?;
wait_for_event(&initial.codex, |ev| {
matches!(ev, EventMsg::ShutdownComplete)
})
.await;
let mut resume_builder =
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let resumed = resume_builder.resume(&server, home, rollout_path).await?;
resumed
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "after resume".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&resumed.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 3, "expected three model requests");
let after_compact_request = &requests[1];
let after_resume_request = &requests[2];
let after_compact_body = after_compact_request.body_json().to_string();
assert!(
!after_compact_body.contains(stale_developer_message),
"stale developer instructions should be removed immediately after compaction"
);
assert!(
after_compact_body.contains("<permissions instructions>"),
"fresh developer instructions should be present after compaction"
);
assert!(
after_compact_body.contains("ENCRYPTED_COMPACTION_SUMMARY"),
"compaction item should be present after compaction"
);
let after_resume_body = after_resume_request.body_json().to_string();
assert!(
!after_resume_body.contains(stale_developer_message),
"stale developer instructions should be removed after resume"
);
assert!(
after_resume_body.contains("<permissions instructions>"),
"fresh developer instructions should be present after resume"
);
assert!(
after_resume_body.contains("ENCRYPTED_COMPACTION_SUMMARY"),
"compaction item should persist after resume"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_refreshes_stale_developer_instructions_without_resume() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let stale_developer_message = "STALE_DEVELOPER_INSTRUCTIONS_SHOULD_BE_REMOVED";
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let test = builder.build(&server).await?;
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "BASELINE_REPLY"),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed("resp-2"),
]),
],
)
.await;
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: stale_developer_message.to_string(),
}],
phase: None,
internal_chat_message_metadata_passthrough: None,
},
ResponseItem::Compaction {
id: None,
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
internal_chat_message_metadata_passthrough: None,
},
];
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({ "output": compacted_history }),
)
.await;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "start remote compact flow".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
test.codex.submit(Op::Compact).await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "after compact in same session".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 2, "expected two model requests");
let after_compact_body = requests[1].body_json().to_string();
assert!(
!after_compact_body.contains(stale_developer_message),
"stale developer instructions should be removed immediately after compaction"
);
assert!(
after_compact_body.contains("<permissions instructions>"),
"fresh developer instructions should be present after compaction"
);
assert!(
after_compact_body.contains("ENCRYPTED_COMPACTION_SUMMARY"),
"compaction item should be present after compaction"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_pre_turn_compaction_restates_realtime_start() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
});
let test = builder.build(&server).await?;
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 500),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_SECOND_REPLY"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 80),
]),
],
)
.await;
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({
"output": compacted_summary_only_output(
"REMOTE_PRETURN_REALTIME_STILL_ACTIVE_SUMMARY"
)
}),
)
.await;
start_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 2, "expected two model requests");
let compact_request = compact_mock.single_request();
let post_compact_request = &requests[1];
assert_request_contains_realtime_start(post_compact_request);
insta::assert_snapshot!(
"remote_pre_turn_compaction_restates_realtime_start_shapes",
format_labeled_requests_snapshot(
"Remote pre-turn auto-compaction while realtime remains active: compaction clears the reference baseline, so the follow-up request restates realtime-start instructions.",
&[
("Remote Compaction Request", &compact_request),
(
"Remote Post-Compaction History Layout",
post_compact_request
),
]
)
);
close_realtime_conversation(test.codex.as_ref()).await?;
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_request_uses_custom_experimental_realtime_start_instructions() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let custom_instructions = "custom realtime start instructions";
let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config({
let custom_instructions = custom_instructions.to_string();
move |config| {
config.experimental_realtime_start_instructions = Some(custom_instructions);
}
});
let test = builder.build(&server).await?;
let responses_mock = responses::mount_sse_once(
&server,
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed("r1"),
]),
)
.await;
start_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_request_contains_custom_realtime_start(
&responses_mock.single_request(),
custom_instructions,
);
close_realtime_conversation(test.codex.as_ref()).await?;
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_pre_turn_compaction_restates_realtime_end() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
});
let test = builder.build(&server).await?;
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 500),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_SECOND_REPLY"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 80),
]),
],
)
.await;
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({
"output": compacted_summary_only_output(
"REMOTE_PRETURN_REALTIME_CLOSED_SUMMARY"
)
}),
)
.await;
start_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
close_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 2, "expected two model requests");
let compact_request = compact_mock.single_request();
let post_compact_request = &requests[1];
assert_request_contains_realtime_end(post_compact_request);
insta::assert_snapshot!(
"remote_pre_turn_compaction_restates_realtime_end_shapes",
format_labeled_requests_snapshot(
"Remote pre-turn auto-compaction after realtime was closed between turns: the follow-up request emits realtime-end instructions from previous-turn settings even though compaction cleared the reference baseline.",
&[
("Remote Compaction Request", &compact_request),
(
"Remote Post-Compaction History Layout",
post_compact_request
),
]
)
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_manual_compact_restates_realtime_start() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let mut builder = remote_realtime_test_codex_builder(&realtime_server);
let test = builder.build(&server).await?;
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 60),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_SECOND_REPLY"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 80),
]),
],
)
.await;
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({
"output": compacted_summary_only_output(
"REMOTE_MANUAL_REALTIME_STILL_ACTIVE_SUMMARY"
)
}),
)
.await;
start_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
test.codex.submit(Op::Compact).await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 2, "expected two model requests");
let compact_request = compact_mock.single_request();
let post_compact_request = &requests[1];
assert_request_contains_realtime_start(post_compact_request);
insta::assert_snapshot!(
"remote_manual_compact_restates_realtime_start_shapes",
format_labeled_requests_snapshot(
"Remote manual /compact while realtime remains active: the next regular turn restates realtime-start instructions after compaction clears the baseline.",
&[
("Remote Compaction Request", &compact_request),
(
"Remote Post-Compaction History Layout",
post_compact_request
),
]
)
);
close_realtime_conversation(test.codex.as_ref()).await?;
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_mid_turn_compaction_does_not_restate_realtime_end()
-> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
});
let test = builder.build(&server).await?;
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_assistant_message("setup", "REMOTE_SETUP_REPLY"),
responses::ev_completed_with_tokens("setup-response", /*total_tokens*/ 60),
]),
responses::sse(vec![
responses::ev_function_call("call-remote-mid-turn", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 500),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_MID_TURN_FINAL_REPLY"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 80),
]),
],
)
.await;
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({
"output": compacted_summary_only_output(
"REMOTE_MID_TURN_REALTIME_CLOSED_SUMMARY"
)
}),
)
.await;
start_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "SETUP_USER".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
close_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 3, "expected three model requests");
let second_turn_request = &requests[1];
let compact_request = compact_mock.single_request();
let post_compact_request = &requests[2];
assert_request_contains_realtime_end(second_turn_request);
assert!(
!post_compact_request
.body_json()
.to_string()
.contains("<realtime_conversation>"),
"did not expect post-compaction history to restate realtime instructions once the current turn had already established an inactive baseline"
);
insta::assert_snapshot!(
"remote_mid_turn_compaction_does_not_restate_realtime_end_shapes",
format_labeled_requests_snapshot(
"Remote mid-turn continuation compaction after realtime was closed before the turn: the initial second-turn request emits realtime-end instructions, but the continuation request does not restate them after compaction because the current turn already established the inactive baseline.",
&[
("Second Turn Initial Request", second_turn_request),
("Remote Compaction Request", &compact_request),
(
"Remote Post-Compaction History Layout",
post_compact_request
),
]
)
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_compact_resume_restates_realtime_end() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let mut builder = remote_realtime_test_codex_builder(&realtime_server);
let initial = builder.build(&server).await?;
let home = initial.home.clone();
let rollout_path = initial
.session_configured
.rollout_path
.clone()
.expect("rollout path");
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 60),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_AFTER_RESUME_REPLY"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 80),
]),
],
)
.await;
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({
"output": compacted_summary_only_output(
"REMOTE_RESUME_REALTIME_CLOSED_SUMMARY"
)
}),
)
.await;
start_realtime_conversation(initial.codex.as_ref()).await?;
initial
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
close_realtime_conversation(initial.codex.as_ref()).await?;
initial.codex.submit(Op::Compact).await?;
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
initial.codex.submit(Op::Shutdown).await?;
wait_for_event(&initial.codex, |ev| {
matches!(ev, EventMsg::ShutdownComplete)
})
.await;
let mut resume_builder =
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let resumed = resume_builder.resume(&server, home, rollout_path).await?;
resumed
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&resumed.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 2, "expected two model requests");
let compact_request = compact_mock.single_request();
let after_resume_request = &requests[1];
assert_request_contains_realtime_end(after_resume_request);
insta::assert_snapshot!(
"remote_compact_resume_restates_realtime_end_shapes",
format_labeled_requests_snapshot(
"After remote manual /compact and resume, the first resumed turn rebuilds history from the compaction item and restates realtime-end instructions from reconstructed previous-turn settings.",
&[
("Remote Compaction Request", &compact_request),
("Remote Post-Resume History Layout", after_resume_request),
]
)
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Update once remote pre-turn compaction includes incoming user input.
async fn snapshot_request_shape_remote_pre_turn_compaction_including_incoming_user_message()
-> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 60),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_SECOND_REPLY"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 500),
]),
responses::sse(vec![
responses::ev_assistant_message("m3", "REMOTE_FINAL_REPLY"),
responses::ev_completed_with_tokens("r3", /*total_tokens*/ 80),
]),
],
)
.await;
let compact_mock = responses::mount_compact_user_history_with_summary_once(
harness.server(),
&summary_with_prefix("REMOTE_PRE_TURN_SUMMARY"),
)
.await;
for user in ["USER_ONE", "USER_TWO", "USER_THREE"] {
if user == "USER_THREE" {
core_test_support::submit_thread_settings(
&codex,
codex_protocol::protocol::ThreadSettingsOverrides {
environments: Some(local_selections(
test_path_buf(PRETURN_CONTEXT_DIFF_CWD).abs(),
)),
..Default::default()
},
)
.await?;
}
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: user.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
}
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(
requests.len(),
3,
"expected user, user, and post-compact turn"
);
let compact_request = compact_mock.single_request();
insta::assert_snapshot!(
"remote_pre_turn_compaction_including_incoming_shapes",
format_labeled_requests_snapshot(
"Remote pre-turn auto-compaction with a context override emits the context diff in the compact request while excluding the incoming user message.",
&[
("Remote Compaction Request", &compact_request),
("Remote Post-Compaction History Layout", &requests[2]),
]
)
);
assert_eq!(
requests[2]
.message_input_texts("user")
.iter()
.filter(|text| text.as_str() == "USER_THREE")
.count(),
1,
"post-compaction request should contain incoming user exactly once from runtime append"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_pre_turn_compaction_strips_incoming_model_switch()
-> Result<()> {
skip_if_no_network!(Ok(()));
let previous_model = "gpt-5.4";
let next_model = "gpt-5.3-codex";
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_model(previous_model)
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let initial_turn_request_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m1", "BEFORE_SWITCH_REPLY"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 500),
]),
)
.await;
let post_compact_turn_request_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_SWITCH_REPLY"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 80),
]),
)
.await;
let compact_mock = responses::mount_compact_user_history_with_summary_once(
harness.server(),
&summary_with_prefix("REMOTE_SWITCH_SUMMARY"),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "BEFORE_SWITCH_USER".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
core_test_support::submit_thread_settings(
&codex,
codex_protocol::protocol::ThreadSettingsOverrides {
model: Some(next_model.to_string()),
..Default::default()
},
)
.await?;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "AFTER_SWITCH_USER".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(
compact_mock.requests().len(),
1,
"expected a single remote pre-turn compaction request"
);
assert_eq!(
initial_turn_request_mock.requests().len(),
1,
"expected initial turn request"
);
assert_eq!(
post_compact_turn_request_mock.requests().len(),
1,
"expected post-compaction follow-up request"
);
let initial_turn_request = initial_turn_request_mock.single_request();
let compact_request = compact_mock.single_request();
let post_compact_turn_request = post_compact_turn_request_mock.single_request();
let compact_body = compact_request.body_json().to_string();
assert!(
!compact_body.contains("AFTER_SWITCH_USER"),
"current behavior excludes incoming user from the pre-turn remote compaction request"
);
assert!(
!compact_body.contains("<model_switch>"),
"pre-turn remote compaction request should strip incoming model-switch update item"
);
let follow_up_body = post_compact_turn_request.body_json().to_string();
assert!(
follow_up_body.contains("BEFORE_SWITCH_USER"),
"post-compaction follow-up should preserve older user messages when they fit"
);
assert!(
follow_up_body.contains("AFTER_SWITCH_USER"),
"post-compaction follow-up should preserve incoming user message via runtime append"
);
assert!(
follow_up_body.contains("<model_switch>"),
"post-compaction follow-up should include the model-switch update item"
);
insta::assert_snapshot!(
"remote_pre_turn_compaction_strips_incoming_model_switch_shapes",
format_labeled_requests_snapshot(
"Remote pre-turn compaction during model switch currently excludes incoming user input, strips incoming <model_switch> from the compact request payload, and restores it in the post-compaction follow-up request.",
&[
("Initial Request (Previous Model)", &initial_turn_request),
("Remote Compaction Request", &compact_request),
(
"Remote Post-Compaction History Layout",
&post_compact_turn_request
),
]
)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Update once remote pre-turn compaction context-overflow handling includes
// incoming user input and emits richer oversized-input messaging.
async fn snapshot_request_shape_remote_pre_turn_compaction_context_window_exceeded() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 500),
])],
)
.await;
let compact_mock = responses::mount_compact_response_once(
harness.server(),
ResponseTemplate::new(400).set_body_json(serde_json::json!({
"error": {
"code": "context_length_exceeded",
"message": "Your input exceeds the context window of this model. Please adjust your input and try again."
}
})),
)
.await;
let post_compact_turn_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_POST_COMPACT_SHOULD_NOT_RUN"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 80),
]),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
let error_message = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(
requests.len(),
1,
"expected no post-compaction follow-up turn request after compact failure"
);
assert!(
post_compact_turn_mock.requests().is_empty(),
"expected turn to stop after compaction failure"
);
let include_attempt_request = compact_mock.single_request();
insta::assert_snapshot!(
"remote_pre_turn_compaction_context_window_exceeded_shapes",
format_labeled_requests_snapshot(
"Remote pre-turn auto-compaction context-window failure: compaction request excludes the incoming user message and the turn errors.",
&[(
"Remote Compaction Request (Incoming User Excluded)",
&include_attempt_request
),]
)
);
assert!(
error_message.to_lowercase().contains("context window"),
"expected context window failure to surface, got {error_message}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_pre_turn_compact_response_seeds_turn_state() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_response_sequence(
harness.server(),
vec![
responses::sse_response(responses::sse(vec![
responses::ev_assistant_message("m1", "BEFORE_COMPACT_REPLY"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 500),
])),
responses::sse_response(responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 80),
])),
],
)
.await;
let compact_mock = responses::mount_compact_response_once(
harness.server(),
ResponseTemplate::new(200)
.insert_header("content-type", "application/json")
.insert_header(TURN_STATE_HEADER, "compact-state")
.set_body_json(json!({
"output": compacted_summary_only_output("PRE_TURN_COMPACT_SUMMARY"),
})),
)
.await;
// Phase 1: the first turn raises usage above the pre-turn compact threshold.
// Phase 2: the next turn compacts before sampling and establishes turn state.
for text in ["BEFORE_COMPACT_USER", "AFTER_COMPACT_USER"] {
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
}
// Phase 3: compact starts empty, and its returned state is sent to the first sample.
assert_eq!(
compact_mock.single_request().header(TURN_STATE_HEADER),
None
);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 2);
assert_eq!(requests[0].header(TURN_STATE_HEADER), None);
assert_eq!(
requests[1].header(TURN_STATE_HEADER).as_deref(),
Some("compact-state")
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_mid_turn_compact_v1_sends_turn_state_over_http() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_response_sequence(
harness.server(),
vec![
responses::sse_response(responses::sse(vec![
responses::ev_function_call("call-before-compact", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 500),
]))
.insert_header(TURN_STATE_HEADER, "sampling-state"),
responses::sse_response(responses::sse(vec![
responses::ev_function_call("call-after-compact", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 80),
]))
.insert_header(TURN_STATE_HEADER, "continuation-state"),
responses::sse_response(responses::sse(vec![
responses::ev_assistant_message("m1", "FINAL_REPLY"),
responses::ev_completed_with_tokens("r3", /*total_tokens*/ 80),
])),
],
)
.await;
let compact_mock = responses::mount_compact_response_once(
harness.server(),
ResponseTemplate::new(200)
.insert_header("content-type", "application/json")
.insert_header(TURN_STATE_HEADER, "compact-state")
.set_body_json(json!({
"output": compacted_summary_only_output("MID_TURN_COMPACT_SUMMARY"),
})),
)
.await;
// Phase 1: sampling mints state and crosses the token limit with a pending tool follow-up.
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "RUN_WITH_MID_TURN_COMPACT".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
// Phase 2: v1 compact receives the state established by sampling.
let compact_request = compact_mock.single_request();
assert_eq!(compact_request.path(), "/v1/responses/compact");
assert_eq!(
compact_request.header(TURN_STATE_HEADER).as_deref(),
Some("sampling-state")
);
// Phase 3: every remaining request keeps replaying that first value.
let requests = responses_mock.requests();
assert_eq!(requests.len(), 3);
assert_eq!(requests[0].header(TURN_STATE_HEADER), None);
assert_eq!(
requests[1].header(TURN_STATE_HEADER).as_deref(),
Some("sampling-state")
);
assert_eq!(
requests[2].header(TURN_STATE_HEADER).as_deref(),
Some("sampling-state")
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_mid_turn_compact_v2_sends_turn_state_over_http() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
let _ = config.features.enable(Feature::RemoteCompactionV2);
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_response_sequence(
harness.server(),
vec![
responses::sse_response(responses::sse(vec![
responses::ev_function_call("call-before-compact", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 500),
]))
.insert_header(TURN_STATE_HEADER, "sampling-state"),
responses::sse_response(responses::sse(vec![
json!({
"type": "response.output_item.done",
"item": {
"type": "compaction",
"encrypted_content": "V2_COMPACT_SUMMARY",
}
}),
responses::ev_completed("r-compact"),
]))
.insert_header(TURN_STATE_HEADER, "compact-state"),
responses::sse_response(responses::sse(vec![
responses::ev_function_call("call-after-compact", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 80),
]))
.insert_header(TURN_STATE_HEADER, "continuation-state"),
responses::sse_response(responses::sse(vec![
responses::ev_assistant_message("m1", "FINAL_REPLY"),
responses::ev_completed_with_tokens("r3", /*total_tokens*/ 80),
])),
],
)
.await;
// Phase 1: sampling mints state and schedules inline v2 compaction.
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "RUN_WITH_MID_TURN_COMPACT_V2".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&codex).await;
let requests = responses_mock.requests();
assert_eq!(requests.len(), 4);
assert!(
requests
.iter()
.all(|request| request.path() == "/v1/responses")
);
assert_eq!(requests[0].header(TURN_STATE_HEADER), None);
// Phase 2: the v2 compaction request replays the state already established by sampling.
assert!(
requests[1]
.body_json()
.to_string()
.contains("\"type\":\"compaction_trigger\"")
);
assert_eq!(
requests[1].header(TURN_STATE_HEADER).as_deref(),
Some("sampling-state")
);
// Phase 3: later response headers do not replace the first value in the OnceLock.
assert_eq!(
requests[2].header(TURN_STATE_HEADER).as_deref(),
Some("sampling-state")
);
assert_eq!(
requests[3].header(TURN_STATE_HEADER).as_deref(),
Some("sampling-state")
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_mid_turn_compact_v2_sends_turn_state_over_websocket() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![vec![
vec![
responses::ev_response_created("warm-1"),
responses::ev_completed("warm-1"),
],
vec![
json!({
"type": "response.metadata",
"headers": {(TURN_STATE_HEADER): "sampling-state"},
}),
responses::ev_function_call("call-before-compact", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 500),
],
vec![
json!({
"type": "response.metadata",
"headers": {(TURN_STATE_HEADER): "compact-state"},
}),
json!({
"type": "response.output_item.done",
"item": {
"type": "compaction",
"encrypted_content": "V2_WS_COMPACT_SUMMARY",
}
}),
responses::ev_completed("r-compact"),
],
vec![
json!({
"type": "response.metadata",
"headers": {(TURN_STATE_HEADER): "continuation-state"},
}),
responses::ev_function_call("call-after-compact", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 80),
],
vec![
responses::ev_assistant_message("m1", "FINAL_REPLY"),
responses::ev_completed_with_tokens("r3", /*total_tokens*/ 80),
],
]])
.await;
let mut builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
let _ = config.features.enable(Feature::RemoteCompactionV2);
config.model_auto_compact_token_limit = Some(200);
});
let test = builder.build_with_websocket_server(&server).await?;
// Phase 1: startup prewarm stays empty, then WebSocket sampling mints state and schedules
// inline v2 compaction.
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "RUN_WITH_WS_MID_TURN_COMPACT_V2".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_turn_complete(&test.codex).await;
let requests = server.single_connection();
assert_eq!(requests.len(), 5);
assert_eq!(requests[0].body_json()["generate"].as_bool(), Some(false));
// Phase 2: the v2 compact request replays the state already established by sampling.
assert!(
requests[2]
.body_json()
.to_string()
.contains("\"type\":\"compaction_trigger\"")
);
// Phase 3: both post-compact requests keep replaying that first value.
assert_eq!(
requests
.iter()
.map(|request| request.body_json()["client_metadata"][TURN_STATE_HEADER].clone())
.collect::<Vec<_>>(),
vec![
json!(null),
json!(null),
json!("sampling-state"),
json!("sampling-state"),
json!("sampling-state"),
]
);
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_mid_turn_continuation_compaction() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_function_call("call-remote-mid-turn", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 500),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_MID_TURN_FINAL_REPLY"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 80),
]),
],
)
.await;
let compact_mock = responses::mount_compact_user_history_with_summary_once(
harness.server(),
&summary_with_prefix("REMOTE_MID_TURN_SUMMARY"),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(
requests.len(),
2,
"expected initial and post-compact requests"
);
let compact_request = compact_mock.single_request();
insta::assert_snapshot!(
"remote_mid_turn_compaction_shapes",
format_labeled_requests_snapshot(
"Remote mid-turn continuation compaction after tool output: compact request includes tool artifacts and the follow-up request includes the returned compaction item.",
&[
("Remote Compaction Request", &compact_request),
("Remote Post-Compaction History Layout", &requests[1]),
]
)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_mid_turn_compaction_summary_only_reinjects_context()
-> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let initial_turn_request_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_function_call("call-remote-summary-only", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 500),
]),
)
.await;
let post_compact_turn_request_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_SUMMARY_ONLY_FINAL_REPLY"),
responses::ev_completed_with_tokens("r2", /*total_tokens*/ 80),
]),
)
.await;
let compacted_history = vec![ResponseItem::Compaction {
id: None,
encrypted_content: summary_with_prefix("REMOTE_SUMMARY_ONLY"),
internal_chat_message_metadata_passthrough: None,
}];
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_history }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
assert_eq!(
initial_turn_request_mock.requests().len(),
1,
"expected initial turn request"
);
assert_eq!(
post_compact_turn_request_mock.requests().len(),
1,
"expected post-compaction request"
);
let compact_request = compact_mock.single_request();
let post_compact_turn_request = post_compact_turn_request_mock.single_request();
insta::assert_snapshot!(
"remote_mid_turn_compaction_summary_only_reinjects_context_shapes",
format_labeled_requests_snapshot(
"Remote mid-turn compaction where compact output has only a compaction item: continuation layout reinjects context before that compaction item.",
&[
("Remote Compaction Request", &compact_request),
(
"Remote Post-Compaction History Layout",
&post_compact_turn_request
),
]
)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_mid_turn_compaction_multi_summary_reinjects_above_last_summary()
-> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let setup_turn_request_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("setup", "REMOTE_SETUP_REPLY"),
responses::ev_completed_with_tokens("setup-response", /*total_tokens*/ 60),
]),
)
.await;
let second_turn_request_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_shell_command_call("call-remote-multi-summary", "echo multi-summary"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 1_000),
]),
)
.await;
let compact_mock = responses::mount_compact_user_history_with_summary_sequence(
harness.server(),
vec![
summary_with_prefix("REMOTE_OLDER_SUMMARY"),
summary_with_prefix("REMOTE_LATEST_SUMMARY"),
],
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 2);
assert_eq!(
setup_turn_request_mock.requests().len(),
1,
"expected setup turn request"
);
assert_eq!(
second_turn_request_mock.requests().len(),
1,
"expected second-turn pre-compaction request"
);
let compact_requests = compact_mock.requests();
assert_eq!(
compact_requests.len(),
2,
"expected one setup compact and one mid-turn compact request"
);
let compact_request = compact_requests[1].clone();
let second_turn_request = second_turn_request_mock.single_request();
assert!(
compact_request.body_contains_text("REMOTE_OLDER_SUMMARY"),
"older summary should round-trip from conversation history into the next compact request"
);
insta::assert_snapshot!(
"remote_mid_turn_compaction_multi_summary_reinjects_above_last_summary_shapes",
format_labeled_requests_snapshot(
"After a prior manual /compact produced an older remote compaction item, the next turn hits remote auto-compaction before the next sampling request. The compact request carries forward that earlier compaction item, and the next sampling request shows the latest compaction item with context reinjected before USER_TWO.",
&[
("Remote Compaction Request", &compact_request),
(
"Second Turn Request (After Compaction)",
&second_turn_request
),
]
)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_manual_compact_without_previous_user_messages() -> Result<()>
{
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_MANUAL_EMPTY_FOLLOW_UP_REPLY"),
responses::ev_completed_with_tokens("r1", /*total_tokens*/ 80),
]),
)
.await;
let compact_mock =
responses::mount_compact_json_once(harness.server(), serde_json::json!({ "output": [] }))
.await;
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
additional_context: Default::default(),
thread_settings: Default::default(),
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(
compact_mock.requests().len(),
0,
"manual /compact without prior user should not issue a remote compaction request"
);
let follow_up_request = responses_mock.single_request();
insta::assert_snapshot!(
"remote_manual_compact_without_prev_user_shapes",
format_labeled_requests_snapshot(
"Remote manual /compact with no prior user turn skips the remote compact request; the follow-up turn carries canonical context and new user message.",
&[("Remote Post-Compaction History Layout", &follow_up_request)]
)
);
Ok(())
}