Files
codex/codex-rs/core/src/compact.rs
T
ningyi-oai bee78806a9 [codex] add compaction metadata to turn headers (#24368)
## Summary
- Add `request_kind` values for foreground turn, startup prewarm,
compaction, and detached memory model requests.
- Attach compaction dispatch metadata to local Responses, legacy
`/v1/responses/compact`, and remote v2 compact requests.
- Add the existing logical context-window identifier as `window_id` on
turn-owned model request metadata.
- Keep identity fields optional for detached memory requests, while
still emitting `request_kind="memory"` in non-git/no-sandbox workspaces.

## Root Cause
`x-codex-turn-metadata` has more than one producer. Foreground turns and
compaction requests own a real turn and should carry that turn identity.
Detached memory stage-one requests do not own a foreground turn, so
absent identity fields are valid rather than missing data. Startup
websocket prewarm is also a model request, but it has `generate=false`
and must not be counted as a foreground turn.

`thread_source` or session source identifies where a thread came from
(for example review, guardian, or another subagent). `request_kind`
identifies what the current outbound model request is doing (`turn`,
`prewarm`, `compaction`, or `memory`). A review or guardian thread can
issue either a normal turn request or a compaction request, so source
cannot replace request kind.

## Behavior / Impact
- Ordinary foreground requests send `request_kind="turn"`, their real
identity fields, and `window_id="<thread_id>:<window_generation>"`.
- Startup websocket warmup requests send `request_kind="prewarm"` so
they are not counted as foreground turns.
- Compaction requests send `request_kind="compaction"`, their real
owning turn identity, the existing `window_id`, and
`compaction.{trigger,reason,implementation,phase,strategy}`.
- Detached memory stage-one requests send `request_kind="memory"`
without `session_id`, `thread_id`, `turn_id`, or `window_id`; when no
workspace metadata exists, the kind-only header is still emitted.
- `session_id`, `thread_id`, `turn_id`, and `window_id` remain optional
in the header schema because detached memory requests do not own a
foreground turn or context window.
- `window_id` is not a new ID system: it is copied from the already-sent
`x-codex-window-id` / WS client metadata value at model-request dispatch
time.
- Existing `x-codex-window-id` HTTP/WS emission, value format,
generation advancement, resume behavior, and fork reset behavior are
unchanged.
- `request_kind`, `window_id`, and upstream turn-owned identity fields
remain schema-owned; input `responsesapi_client_metadata` cannot replace
their canonical values.
- No table, DAG, export, app-server API, or MCP `_meta` schema changes
are included.

A compaction attempt stopped by a pre-compact hook issues no model
request and therefore has no request header; its outcome remains in
analytics events. Status, error, duration, and token deltas also remain
analytics fields rather than request-header fields.

Future detached-memory attribution using a real initiating turn ID as
`trigger_turn_id` is intentionally not part of this PR.

## Sync With Main
- Final pushed head `716342e79` is rebased onto `origin/main@0d37db4b2`.
- The metadata conflict came from upstream `#24160`, which added
`forked_from_thread_id` on the same `turn_metadata` surface. Resolution
preserves that field and its protection from client metadata override
alongside this PR's request-kind, compaction, and window-id fields.
- While resolving the overlapping commits, I removed an accidental
recursive model-request overlay and a duplicate detached-memory header
builder before completing the rebase.

## Latency / User Experience Boundary
- Foreground turns perform no new filesystem, git, or network work. New
fields are inserted into metadata already serialized for outgoing
requests.
- Compaction issues the same model/HTTP requests with the same prompt,
model, service tier, and sampling settings; only metadata bytes change.
- Startup prewarm already sent metadata; it is now correctly classified
as `prewarm`.
- Non-git detached memory now sends a small kind-only metadata header
rather than no header.
- This client diff adds no user-visible latency mechanism beyond
negligible serialization and header bytes on already-existing requests.

## Validation
On conflict-resolved head `1d35c2cfb` based on `origin/main@487521733`:
- `just fmt` (passed)
- `just fix -p codex-core` (passed)
- `git diff --check origin/main...HEAD` (passed)
- `just test -p codex-core -E 'test(turn_metadata) |
test(websocket_first_turn_uses_startup_prewarm_and_create) |
test(responses_stream_includes_turn_metadata_header_for_git_workspace_e2e)
|
test(responses_websocket_forwards_turn_metadata_on_initial_and_incremental_create)
| test(remote_compact_v2_retries_failures_with_stream_retry_budget) |
test(window_id_advances_after_compact_persists_on_resume_and_resets_on_fork)'`
(`23 passed`; `bench-smoke` passed)
- `just test -p codex-app-server -E
'test(turn_start_forwards_client_metadata_to_responses_request_v2) |
test(turn_start_forwards_client_metadata_to_responses_websocket_request_body_v2)
| test(auto_compaction_remote_emits_started_and_completed_items)'` (`3
passed`; `bench-smoke` passed)
- `just test -p codex-memories-write` (`29 passed`; `bench-smoke`
passed)
2026-05-27 11:09:33 -07:00

594 lines
21 KiB
Rust

use std::sync::Arc;
use std::time::Instant;
use crate::Prompt;
use crate::client::ModelClientSession;
use crate::client_common::ResponseEvent;
use crate::hook_runtime::PostCompactHookOutcome;
use crate::hook_runtime::PreCompactHookOutcome;
use crate::hook_runtime::run_post_compact_hooks;
use crate::hook_runtime::run_pre_compact_hooks;
#[cfg(test)]
use crate::session::PreviousTurnSettings;
use crate::session::session::Session;
use crate::session::turn::get_last_assistant_message_from_turn;
use crate::session::turn_context::TurnContext;
use crate::turn_metadata::CompactionTurnMetadata;
use crate::util::backoff;
use codex_analytics::CodexCompactionEvent;
use codex_analytics::CompactionImplementation;
use codex_analytics::CompactionPhase;
use codex_analytics::CompactionReason;
use codex_analytics::CompactionStatus;
use codex_analytics::CompactionStrategy;
use codex_analytics::CompactionTrigger;
use codex_analytics::now_unix_seconds;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CompactedItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::WarningEvent;
use codex_protocol::user_input::UserInput;
use codex_rollout_trace::InferenceTraceContext;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::approx_token_count;
use codex_utils_output_truncation::truncate_text;
use futures::prelude::*;
use tracing::error;
use codex_model_provider_info::ModelProviderInfo;
pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt.md");
pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md");
const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
/// Controls whether compaction replacement history must include initial context.
///
/// Pre-turn/manual compaction variants use `DoNotInject`: they replace history with a summary and
/// clear `reference_context_item`, so the next regular turn will fully reinject initial context
/// after compaction.
///
/// Mid-turn compaction must use `BeforeLastUserMessage` because the model is trained to see the
/// compaction summary as the last item in history after mid-turn compaction; we therefore inject
/// initial context into the replacement history just above the last real user message.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum InitialContextInjection {
BeforeLastUserMessage,
DoNotInject,
}
pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bool {
provider.supports_remote_compaction()
}
pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
reason: CompactionReason,
phase: CompactionPhase,
) -> CodexResult<()> {
let prompt = turn_context.compact_prompt().to_string();
let input = vec![UserInput::Text {
text: prompt,
// Compaction prompt is synthesized; no UI element ranges to preserve.
text_elements: Vec::new(),
}];
run_compact_task_inner(
sess,
turn_context,
input,
initial_context_injection,
CompactionTrigger::Auto,
reason,
phase,
)
.await?;
Ok(())
}
pub(crate) async fn run_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) -> CodexResult<()> {
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_context.sub_id.clone(),
trace_id: turn_context.trace_id.clone(),
started_at: turn_context.turn_timing_state.started_at_unix_secs().await,
model_context_window: turn_context.model_context_window(),
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, start_event).await;
run_compact_task_inner(
sess.clone(),
turn_context,
input,
InitialContextInjection::DoNotInject,
CompactionTrigger::Manual,
CompactionReason::UserRequested,
CompactionPhase::StandaloneTurn,
)
.await?;
Ok(())
}
async fn run_compact_task_inner(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
initial_context_injection: InitialContextInjection,
trigger: CompactionTrigger,
reason: CompactionReason,
phase: CompactionPhase,
) -> CodexResult<()> {
let compaction_metadata =
CompactionTurnMetadata::new(trigger, reason, CompactionImplementation::Responses, phase);
let attempt = CompactionAnalyticsAttempt::begin(
sess.as_ref(),
turn_context.as_ref(),
trigger,
reason,
CompactionImplementation::Responses,
phase,
)
.await;
let pre_compact_outcome = run_pre_compact_hooks(&sess, &turn_context, trigger).await;
match pre_compact_outcome {
PreCompactHookOutcome::Continue => {}
PreCompactHookOutcome::Stopped { reason } => {
let error = reason.unwrap_or_else(|| "PreCompact hook stopped execution".to_string());
attempt
.track(sess.as_ref(), CompactionStatus::Interrupted, Some(error))
.await;
return Err(CodexErr::TurnAborted);
}
}
let result = run_compact_task_inner_impl(
Arc::clone(&sess),
Arc::clone(&turn_context),
input,
initial_context_injection,
compaction_metadata,
)
.await;
let status = compaction_status_from_result(&result);
let error = result.as_ref().err().map(ToString::to_string);
if result.is_ok() {
let post_compact_outcome = run_post_compact_hooks(&sess, &turn_context, trigger).await;
if let PostCompactHookOutcome::Stopped = post_compact_outcome {
attempt.track(sess.as_ref(), status, error).await;
return Err(CodexErr::TurnAborted);
}
}
attempt.track(sess.as_ref(), status, error).await;
result.map(|_| ())
}
async fn run_compact_task_inner_impl(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
initial_context_injection: InitialContextInjection,
compaction_metadata: CompactionTurnMetadata,
) -> CodexResult<String> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(&turn_context, &compaction_item)
.await;
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut history = sess.clone_history().await;
history.record_items(
&[initial_input_for_turn.into()],
turn_context.truncation_policy,
);
let max_retries = turn_context.provider.info().stream_max_retries();
let mut retries = 0;
let mut client_session = sess.services.model_client.new_session();
// Reuse one client session so turn-scoped state (sticky routing, websocket incremental
// request tracking)
// survives retries within this compact turn.
loop {
// Clone is required because of the loop
let turn_input = history
.clone()
.for_prompt(&turn_context.model_info.input_modalities);
let turn_input_len = turn_input.len();
let prompt = Prompt {
input: turn_input,
base_instructions: sess.get_base_instructions().await,
personality: turn_context.personality,
..Default::default()
};
let window_id = sess.services.model_client.current_window_id();
let turn_metadata_header = turn_context
.turn_metadata_state
.current_header_value_for_compaction(&window_id, compaction_metadata);
let attempt_result = drain_to_completed(
&sess,
turn_context.as_ref(),
&mut client_session,
turn_metadata_header.as_deref(),
&prompt,
)
.await;
match attempt_result {
Ok(()) => {
break;
}
Err(CodexErr::Interrupted) => {
return Err(CodexErr::Interrupted);
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input_len > 1 {
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
error!(
"Context window exceeded while compacting; removing oldest history item. Error: {e}"
);
history.remove_first_item();
retries = 0;
continue;
}
sess.set_total_tokens_full(turn_context.as_ref()).await;
let event = EventMsg::Error(e.to_error_event(/*message_prefix*/ None));
sess.send_event(&turn_context, event).await;
return Err(e);
}
Err(e) => {
if retries < max_retries {
retries += 1;
let delay = backoff(retries);
sess.notify_stream_error(
turn_context.as_ref(),
format!("Reconnecting... {retries}/{max_retries}"),
e,
)
.await;
tokio::time::sleep(delay).await;
continue;
} else {
let event = EventMsg::Error(e.to_error_event(/*message_prefix*/ None));
sess.send_event(&turn_context, event).await;
return Err(e);
}
}
}
}
let history_snapshot = sess.clone_history().await;
let history_items = history_snapshot.raw_items();
let summary_suffix = get_last_assistant_message_from_turn(history_items).unwrap_or_default();
let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}");
let user_messages = collect_user_messages(history_items);
let mut new_history = build_compacted_history(Vec::new(), &user_messages, &summary_text);
if matches!(
initial_context_injection,
InitialContextInjection::BeforeLastUserMessage
) {
let initial_context = sess.build_initial_context(turn_context.as_ref()).await;
new_history =
insert_initial_context_before_last_real_user_or_summary(new_history, initial_context);
}
let reference_context_item = match initial_context_injection {
InitialContextInjection::DoNotInject => None,
InitialContextInjection::BeforeLastUserMessage => Some(turn_context.to_turn_context_item()),
};
let compacted_item = CompactedItem {
message: summary_text.clone(),
replacement_history: Some(new_history.clone()),
};
sess.replace_compacted_history(new_history, reference_context_item, compacted_item)
.await;
sess.recompute_token_usage(&turn_context).await;
sess.emit_turn_item_completed(&turn_context, compaction_item)
.await;
let warning = EventMsg::Warning(WarningEvent {
message: "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.".to_string(),
});
sess.send_event(&turn_context, warning).await;
Ok(summary_suffix)
}
pub(crate) struct CompactionAnalyticsAttempt {
thread_id: String,
turn_id: String,
trigger: CompactionTrigger,
reason: CompactionReason,
implementation: CompactionImplementation,
phase: CompactionPhase,
active_context_tokens_before: i64,
started_at: u64,
start_instant: Instant,
}
impl CompactionAnalyticsAttempt {
pub(crate) async fn begin(
sess: &Session,
turn_context: &TurnContext,
trigger: CompactionTrigger,
reason: CompactionReason,
implementation: CompactionImplementation,
phase: CompactionPhase,
) -> Self {
let active_context_tokens_before = sess.get_total_token_usage().await;
Self {
thread_id: sess.conversation_id.to_string(),
turn_id: turn_context.sub_id.clone(),
trigger,
reason,
implementation,
phase,
active_context_tokens_before,
started_at: now_unix_seconds(),
start_instant: Instant::now(),
}
}
pub(crate) async fn track(
self,
sess: &Session,
status: CompactionStatus,
error: Option<String>,
) {
let active_context_tokens_after = sess.get_total_token_usage().await;
sess.services
.analytics_events_client
.track_compaction(CodexCompactionEvent {
thread_id: self.thread_id,
turn_id: self.turn_id,
trigger: self.trigger,
reason: self.reason,
implementation: self.implementation,
phase: self.phase,
strategy: CompactionStrategy::Memento,
status,
error,
active_context_tokens_before: self.active_context_tokens_before,
active_context_tokens_after,
started_at: self.started_at,
completed_at: now_unix_seconds(),
duration_ms: Some(
u64::try_from(self.start_instant.elapsed().as_millis()).unwrap_or(u64::MAX),
),
});
}
}
pub(crate) fn compaction_status_from_result<T>(result: &CodexResult<T>) -> CompactionStatus {
match result {
Ok(_) => CompactionStatus::Completed,
Err(CodexErr::Interrupted | CodexErr::TurnAborted) => CompactionStatus::Interrupted,
Err(_) => CompactionStatus::Failed,
}
}
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {
let mut pieces = Vec::new();
for item in content {
match item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
if !text.is_empty() {
pieces.push(text.as_str());
}
}
ContentItem::InputImage { .. } => {}
}
}
if pieces.is_empty() {
None
} else {
Some(pieces.join("\n"))
}
}
pub(crate) fn collect_user_messages(items: &[ResponseItem]) -> Vec<String> {
items
.iter()
.filter_map(|item| match crate::event_mapping::parse_turn_item(item) {
Some(TurnItem::UserMessage(user)) => {
if is_summary_message(&user.message()) {
None
} else {
Some(user.message())
}
}
_ => None,
})
.collect()
}
pub(crate) fn is_summary_message(message: &str) -> bool {
message.starts_with(format!("{SUMMARY_PREFIX}\n").as_str())
}
/// Inserts canonical initial context into compacted replacement history at the
/// model-expected boundary.
///
/// Placement rules:
/// - Prefer immediately before the last real user message.
/// - If no real user messages remain, insert before the compaction summary so
/// the summary stays last.
/// - If there are no user messages, insert before the last compaction item so
/// that item remains last (remote compaction may return only compaction items).
/// - If there are no user messages or compaction items, append the context.
pub(crate) fn insert_initial_context_before_last_real_user_or_summary(
mut compacted_history: Vec<ResponseItem>,
initial_context: Vec<ResponseItem>,
) -> Vec<ResponseItem> {
let mut last_user_or_summary_index = None;
let mut last_real_user_index = None;
for (i, item) in compacted_history.iter().enumerate().rev() {
let Some(TurnItem::UserMessage(user)) = crate::event_mapping::parse_turn_item(item) else {
continue;
};
// Compaction summaries are encoded as user messages, so track both:
// the last real user message (preferred insertion point) and the last
// user-message-like item (fallback summary insertion point).
last_user_or_summary_index.get_or_insert(i);
if !is_summary_message(&user.message()) {
last_real_user_index = Some(i);
break;
}
}
let last_compaction_index = compacted_history
.iter()
.enumerate()
.rev()
.find_map(|(i, item)| {
matches!(
item,
ResponseItem::Compaction { .. } | ResponseItem::ContextCompaction { .. }
)
.then_some(i)
});
let insertion_index = last_real_user_index
.or(last_user_or_summary_index)
.or(last_compaction_index);
// Re-inject canonical context from the current session since we stripped it
// from the pre-compaction history. Prefer placing it before the last real
// user message; if there is no real user message left, place it before the
// summary or compaction item so the compaction item remains last.
if let Some(insertion_index) = insertion_index {
compacted_history.splice(insertion_index..insertion_index, initial_context);
} else {
compacted_history.extend(initial_context);
}
compacted_history
}
pub(crate) fn build_compacted_history(
initial_context: Vec<ResponseItem>,
user_messages: &[String],
summary_text: &str,
) -> Vec<ResponseItem> {
build_compacted_history_with_limit(
initial_context,
user_messages,
summary_text,
COMPACT_USER_MESSAGE_MAX_TOKENS,
)
}
fn build_compacted_history_with_limit(
mut history: Vec<ResponseItem>,
user_messages: &[String],
summary_text: &str,
max_tokens: usize,
) -> Vec<ResponseItem> {
let mut selected_messages: Vec<String> = Vec::new();
if max_tokens > 0 {
let mut remaining = max_tokens;
for message in user_messages.iter().rev() {
if remaining == 0 {
break;
}
let tokens = approx_token_count(message);
if tokens <= remaining {
selected_messages.push(message.clone());
remaining = remaining.saturating_sub(tokens);
} else {
let truncated = truncate_text(message, TruncationPolicy::Tokens(remaining));
selected_messages.push(truncated);
break;
}
}
selected_messages.reverse();
}
for message in &selected_messages {
history.push(ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: message.clone(),
}],
phase: None,
});
}
let summary_text = if summary_text.is_empty() {
"(no summary available)".to_string()
} else {
summary_text.to_string()
};
history.push(ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText { text: summary_text }],
phase: None,
});
history
}
async fn drain_to_completed(
sess: &Session,
turn_context: &TurnContext,
client_session: &mut ModelClientSession,
turn_metadata_header: Option<&str>,
prompt: &Prompt,
) -> CodexResult<()> {
let mut stream = client_session
.stream(
prompt,
&turn_context.model_info,
&turn_context.session_telemetry,
turn_context.reasoning_effort,
turn_context.reasoning_summary,
turn_context.config.service_tier.clone(),
turn_metadata_header,
// Rollout tracing currently models remote compaction only; local compaction streams
// are left untraced until the reducer has a first-class local compaction lifecycle.
&InferenceTraceContext::disabled(),
)
.await?;
loop {
let maybe_event = stream.next().await;
let Some(event) = maybe_event else {
return Err(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
));
};
match event {
Ok(ResponseEvent::OutputItemDone(item)) => {
sess.record_into_history(std::slice::from_ref(&item), turn_context)
.await;
}
Ok(ResponseEvent::ServerReasoningIncluded(included)) => {
sess.set_server_reasoning_included(included).await;
}
Ok(ResponseEvent::RateLimits(snapshot)) => {
sess.update_rate_limits(turn_context, snapshot).await;
}
Ok(ResponseEvent::Completed { token_usage, .. }) => {
sess.update_token_usage_info(turn_context, token_usage.as_ref())
.await;
return Ok(());
}
Ok(_) => continue,
Err(e) => return Err(e),
}
}
}
#[cfg(test)]
#[path = "compact_tests.rs"]
mod tests;