[2 of 2] Finish moving goal runtime to extension (#26548)

## Stack

1. [#26547](https://github.com/openai/codex/pull/26547) - [1 of 2] Align
goal extension with core behavior
2. [#26548](https://github.com/openai/codex/pull/26548) - [2 of 2] Move
goal runtime to extension

## Why

This PR completes the switch of the goal behavior to the
extension-backed runtime and removes the old core goal implementation.

## What Changed

- Installs the goal extension for app-server `ThreadManager` sessions.
- Routes app-server thread goal `get`, `set`, and `clear` through
`GoalService`.
- Uses thread-idle lifecycle emission after goal resume and snapshot
ordering so the extension can decide whether to continue the goal.
- Forwards extension goal updates through a FIFO async app-server
notification path so backpressure does not drop them or reorder updates.
- Keeps review turns from enabling goal runtime behavior.
- Plans extension tools before dynamic tools so built-in goal tool names
keep their old precedence when goals are enabled.
- Removes the old core goal runtime, core goal tool handlers, and core
goal tool specs.
- Updates tests that were coupled to the core-owned goal runtime while
leaving the legacy `<goal_context>` compatibility path in core for old
threads.
- Removes the stale cargo-shear ignore now that `codex-goal-extension`
is used by the workspace.
- Keeps realtime event matching exhaustive after removing the old
goal-specific realtime text path.


## Validation

- Ran manual `/goal` runs in TUI. Validated time accounting matched
wall-clock time and goal lifecycle state transitions.
This commit is contained in:
Eric Traut
2026-06-05 14:17:30 -07:00
committed by GitHub
Unverified
parent 679cc08445
commit 479a14cf59
34 changed files with 280 additions and 3908 deletions
+1
View File
@@ -1926,6 +1926,7 @@ dependencies = [
"codex-file-search",
"codex-file-watcher",
"codex-git-utils",
"codex-goal-extension",
"codex-guardian",
"codex-hooks",
"codex-image-generation-extension",
-1
View File
@@ -483,7 +483,6 @@ unwrap_used = "deny"
[workspace.metadata.cargo-shear]
ignored = [
"codex-agent-graph-store",
"codex-goal-extension",
"icu_provider",
"openssl-sys",
"codex-v8-poc",
+1
View File
@@ -41,6 +41,7 @@ codex-extension-api = { workspace = true }
codex-external-agent-migration = { workspace = true }
codex-external-agent-sessions = { workspace = true }
codex-features = { workspace = true }
codex-goal-extension = { workspace = true }
codex-guardian = { workspace = true }
codex-git-utils = { workspace = true }
codex-file-watcher = { workspace = true }
+101 -55
View File
@@ -2,6 +2,7 @@ use std::sync::Arc;
use std::sync::Weak;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadGoal;
use codex_app_server_protocol::ThreadGoalUpdatedNotification;
use codex_core::NewThread;
use codex_core::StartThreadOptions;
@@ -12,23 +13,40 @@ use codex_extension_api::AgentSpawner;
use codex_extension_api::ExtensionEventSink;
use codex_extension_api::ExtensionRegistry;
use codex_extension_api::ExtensionRegistryBuilder;
use codex_goal_extension::GoalService;
use codex_login::AuthManager;
use codex_protocol::ThreadId;
use codex_protocol::error::CodexErr;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_rollout::state_db::StateDbHandle;
use crate::outgoing_message::OutgoingMessageSender;
use crate::thread_state::ThreadListenerCommand;
use crate::thread_state::ThreadStateManager;
pub(crate) fn thread_extensions<S>(
guardian_agent_spawner: S,
event_sink: Arc<dyn ExtensionEventSink>,
auth_manager: Arc<AuthManager>,
state_db: Option<StateDbHandle>,
thread_manager: Weak<ThreadManager>,
goal_service: Arc<GoalService>,
) -> Arc<ExtensionRegistry<Config>>
where
S: AgentSpawner<StartThreadOptions, Spawned = NewThread, Error = CodexErr> + 'static,
{
let mut builder = ExtensionRegistryBuilder::<Config>::with_event_sink(event_sink);
if let Some(state_db) = state_db {
codex_goal_extension::install_with_backend(
&mut builder,
state_db,
codex_otel::global(),
thread_manager,
goal_service,
|config: &Config| config.features.enabled(codex_features::Feature::Goals),
);
}
codex_guardian::install(&mut builder, guardian_agent_spawner);
codex_memories_extension::install(&mut builder, codex_otel::global());
codex_web_search_extension::install(&mut builder, auth_manager.clone());
@@ -38,26 +56,53 @@ where
pub(crate) fn app_server_extension_event_sink(
outgoing: Arc<OutgoingMessageSender>,
thread_state_manager: ThreadStateManager,
) -> Arc<dyn ExtensionEventSink> {
Arc::new(AppServerExtensionEventSink { outgoing })
Arc::new(AppServerExtensionEventSink {
outgoing,
thread_state_manager,
})
}
struct AppServerExtensionEventSink {
outgoing: Arc<OutgoingMessageSender>,
thread_state_manager: ThreadStateManager,
}
impl ExtensionEventSink for AppServerExtensionEventSink {
fn emit(&self, event: Event) {
match event.msg {
EventMsg::ThreadGoalUpdated(thread_goal_event) => {
self.outgoing
.try_send_server_notification(ServerNotification::ThreadGoalUpdated(
ThreadGoalUpdatedNotification {
thread_id: thread_goal_event.thread_id.to_string(),
turn_id: thread_goal_event.turn_id,
goal: thread_goal_event.goal.into(),
},
));
let thread_id = thread_goal_event.thread_id;
let turn_id = thread_goal_event.turn_id;
let goal: ThreadGoal = thread_goal_event.goal.into();
if let Some(listener_command_tx) = self
.thread_state_manager
.current_listener_command_tx(thread_id)
{
let command = ThreadListenerCommand::EmitThreadGoalUpdated {
turn_id: turn_id.clone(),
goal: goal.clone(),
};
if listener_command_tx.send(command).is_ok() {
return;
}
tracing::warn!(
"failed to enqueue extension goal update for {thread_id}: listener command channel is closed"
);
}
let outgoing = Arc::clone(&self.outgoing);
tokio::spawn(async move {
outgoing
.send_server_notification(ServerNotification::ThreadGoalUpdated(
ThreadGoalUpdatedNotification {
thread_id: thread_id.to_string(),
turn_id,
goal,
},
))
.await;
});
}
msg => {
tracing::debug!(event_id = %event.id, ?msg, "dropping unsupported extension event");
@@ -89,10 +134,7 @@ mod tests {
use std::time::Duration;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadGoal as AppServerThreadGoal;
use codex_app_server_protocol::ThreadGoalStatus as AppServerThreadGoalStatus;
use codex_protocol::protocol::ThreadGoal;
use codex_protocol::protocol::ThreadGoal as CoreThreadGoal;
use codex_protocol::protocol::ThreadGoalStatus;
use codex_protocol::protocol::ThreadGoalUpdatedEvent;
use pretty_assertions::assert_eq;
@@ -100,25 +142,61 @@ mod tests {
use tokio::time::timeout;
use super::*;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
#[tokio::test]
async fn app_server_event_sink_forwards_thread_goal_updates() {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(4);
async fn app_server_event_sink_uses_listener_fifo_for_goal_updates_and_clears() {
let (outgoing_tx, _outgoing_rx) = mpsc::channel(4);
let outgoing = Arc::new(OutgoingMessageSender::new(
outgoing_tx,
AnalyticsEventsClient::disabled(),
));
let sink = app_server_extension_event_sink(outgoing);
let thread_state_manager = ThreadStateManager::new();
let thread_id = ThreadId::default();
let (listener_command_tx, mut listener_command_rx) = mpsc::unbounded_channel();
thread_state_manager.register_listener_command_tx(thread_id, listener_command_tx.clone());
let sink = app_server_extension_event_sink(outgoing, thread_state_manager);
sink.emit(Event {
id: "call-1".to_string(),
for turn_id in ["turn-1", "turn-2"] {
sink.emit(thread_goal_updated_event(thread_id, turn_id));
}
listener_command_tx
.send(ThreadListenerCommand::EmitThreadGoalCleared)
.expect("listener command channel should be open");
let mut observed = Vec::new();
for _ in 0..3 {
let command = timeout(Duration::from_secs(1), listener_command_rx.recv())
.await
.expect("timed out waiting for listener command")
.expect("listener command channel closed unexpectedly");
match command {
ThreadListenerCommand::EmitThreadGoalUpdated { turn_id, .. } => {
observed.push(turn_id.expect("extension goal updates should include turn ids"));
}
ThreadListenerCommand::EmitThreadGoalCleared => {
observed.push("cleared".to_string())
}
_ => panic!("unexpected listener command"),
}
}
assert_eq!(
vec![
"turn-1".to_string(),
"turn-2".to_string(),
"cleared".to_string()
],
observed
);
}
fn thread_goal_updated_event(thread_id: ThreadId, turn_id: &str) -> Event {
Event {
id: turn_id.to_string(),
msg: EventMsg::ThreadGoalUpdated(ThreadGoalUpdatedEvent {
thread_id,
turn_id: Some("turn-1".to_string()),
goal: ThreadGoal {
turn_id: Some(turn_id.to_string()),
goal: CoreThreadGoal {
thread_id,
objective: "wire extension events".to_string(),
status: ThreadGoalStatus::Active,
@@ -129,38 +207,6 @@ mod tests {
updated_at: 8,
},
}),
});
let envelope = timeout(Duration::from_secs(1), outgoing_rx.recv())
.await
.expect("timed out waiting for forwarded extension event")
.expect("outgoing channel closed unexpectedly");
let OutgoingEnvelope::Broadcast { message } = envelope else {
panic!("expected broadcast notification");
};
let OutgoingMessage::AppServerNotification(ServerNotification::ThreadGoalUpdated(
notification,
)) = message
else {
panic!("expected thread goal updated notification");
};
assert_eq!(
ThreadGoalUpdatedNotification {
thread_id: thread_id.to_string(),
turn_id: Some("turn-1".to_string()),
goal: AppServerThreadGoal {
thread_id: thread_id.to_string(),
objective: "wire extension events".to_string(),
status: AppServerThreadGoalStatus::Active,
token_budget: Some(123),
tokens_used: 45,
time_used_seconds: 6,
created_at: 7,
updated_at: 8,
},
},
notification
);
}
}
}
+3
View File
@@ -191,6 +191,9 @@ mod tests {
guardian_agent_spawner(thread_manager.clone()),
Arc::new(NoopExtensionEventSink),
auth_manager.clone(),
Some(state_db.clone()),
thread_manager.clone(),
Arc::new(codex_goal_extension::GoalService::new()),
),
/*analytics_events_client*/ None,
Arc::clone(&thread_store),
+7 -1
View File
@@ -70,6 +70,7 @@ use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_goal_extension::GoalService;
use codex_login::AuthManager;
use codex_login::auth::ExternalAuth;
use codex_login::auth::ExternalAuthRefreshContext;
@@ -305,6 +306,7 @@ impl MessageProcessor {
// resumed, or forked threads to a different persistence backend/root.
let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone());
let environment_manager_for_requests = Arc::clone(&environment_manager);
let goal_service = Arc::new(GoalService::new());
let thread_manager = Arc::new_cyclic(|thread_manager| {
ThreadManager::new(
config.as_ref(),
@@ -313,8 +315,11 @@ impl MessageProcessor {
environment_manager,
thread_extensions(
guardian_agent_spawner(thread_manager.clone()),
app_server_extension_event_sink(outgoing.clone()),
app_server_extension_event_sink(outgoing.clone(), thread_state_manager.clone()),
auth_manager.clone(),
state_db.clone(),
thread_manager.clone(),
Arc::clone(&goal_service),
),
Some(analytics_events_client.clone()),
Arc::clone(&thread_store),
@@ -416,6 +421,7 @@ impl MessageProcessor {
Arc::clone(&config),
thread_state_manager.clone(),
state_db.clone(),
Arc::clone(&goal_service),
);
let thread_processor = ThreadRequestProcessor::new(
auth_manager.clone(),
@@ -555,16 +555,6 @@ impl OutgoingMessageSender {
.await;
}
pub(crate) fn try_send_server_notification(&self, notification: ServerNotification) {
tracing::trace!("app-server event: {notification}");
let outgoing_message = OutgoingMessage::AppServerNotification(notification);
if let Err(err) = self.sender.try_send(OutgoingEnvelope::Broadcast {
message: outgoing_message,
}) {
warn!("failed to send server notification to client without waiting: {err:?}");
}
}
pub(crate) async fn send_server_notification_to_connections(
&self,
connection_ids: &[ConnectionId],
@@ -279,8 +279,6 @@ use codex_config::loader::project_trust_key;
use codex_config::types::McpServerTransportConfig;
use codex_core::CodexThread;
use codex_core::CodexThreadSettingsOverrides;
use codex_core::ExternalGoalPreviousStatus;
use codex_core::ExternalGoalSet;
use codex_core::ForkSnapshot;
use codex_core::NewThread;
#[cfg(test)]
@@ -1,5 +1,9 @@
use super::*;
use codex_protocol::protocol::validate_thread_goal_objective;
use codex_goal_extension::GoalObjectiveUpdate;
use codex_goal_extension::GoalService;
use codex_goal_extension::GoalServiceError;
use codex_goal_extension::GoalSetRequest;
use codex_goal_extension::GoalTokenBudgetUpdate;
#[derive(Clone)]
pub(crate) struct ThreadGoalRequestProcessor {
@@ -8,6 +12,7 @@ pub(crate) struct ThreadGoalRequestProcessor {
config: Arc<Config>,
thread_state_manager: ThreadStateManager,
state_db: Option<StateDbHandle>,
goal_service: Arc<GoalService>,
}
impl ThreadGoalRequestProcessor {
@@ -17,6 +22,7 @@ impl ThreadGoalRequestProcessor {
config: Arc<Config>,
thread_state_manager: ThreadStateManager,
state_db: Option<StateDbHandle>,
goal_service: Arc<GoalService>,
) -> Self {
Self {
thread_manager,
@@ -24,6 +30,7 @@ impl ThreadGoalRequestProcessor {
config,
thread_state_manager,
state_db,
goal_service,
}
}
@@ -66,10 +73,8 @@ impl ThreadGoalRequestProcessor {
}
self.emit_thread_goal_snapshot(thread_id).await;
// App-server owns resume response and snapshot ordering, so wait until
// those are sent before letting core start goal continuation.
if let Err(err) = thread.continue_active_goal_if_idle().await {
tracing::warn!("failed to continue active goal after resume: {err}");
}
// those are sent before letting extensions react to the idle thread.
thread.emit_thread_idle_lifecycle_if_idle().await;
}
pub(crate) async fn pending_resume_goal_state(
@@ -100,140 +105,36 @@ impl ThreadGoalRequestProcessor {
let thread_id = parse_thread_id_for_request(params.thread_id.as_str())?;
let state_db = self.state_db_for_materialized_thread(thread_id).await?;
let running_thread = self.thread_manager.get_thread(thread_id).await.ok();
let rollout_path = match running_thread.as_ref() {
Some(thread) => thread.rollout_path().ok_or_else(|| {
invalid_request(format!(
"ephemeral thread does not support goals: {thread_id}"
))
})?,
None => codex_rollout::find_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
self.state_db.as_deref(),
)
.await
.map_err(|err| {
internal_error(format!("failed to locate thread id {thread_id}: {err}"))
})?
.ok_or_else(|| invalid_request(format!("thread not found: {thread_id}")))?,
};
reconcile_rollout(
Some(&state_db),
rollout_path.as_path(),
self.config.model_provider_id.as_str(),
/*builder*/ None,
&[],
/*archived_only*/ None,
/*new_thread_memory_mode*/ None,
)
.await;
self.reconcile_thread_goal_rollout(thread_id, &state_db)
.await?;
let listener_command_tx = {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
let thread_state = thread_state.lock().await;
thread_state.listener_command_tx()
};
let status = params.status.map(thread_goal_status_to_state);
let objective = params.objective.as_deref().map(str::trim);
let status = params.status.map(ThreadGoalStatus::to_core);
let objective = params.objective.as_deref();
if let Some(objective) = objective {
validate_thread_goal_objective(objective).map_err(invalid_request)?;
}
if objective.is_some() || params.token_budget.is_some() {
validate_goal_budget(params.token_budget.flatten()).map_err(invalid_request)?;
}
if let Some(thread) = running_thread.as_ref() {
thread.prepare_external_goal_mutation().await;
}
let should_set_thread_preview = objective.is_some();
let (goal, previous_status) = (if let Some(objective) = objective {
let existing_goal = state_db
.thread_goals()
.get_thread_goal(thread_id)
.await
.map_err(|err| invalid_request(err.to_string()))?;
if let Some(goal) = existing_goal.as_ref() {
let previous_status = ExternalGoalPreviousStatus::from(goal);
state_db
.thread_goals()
.update_thread_goal(
thread_id,
codex_state::GoalUpdate {
objective: Some(objective.to_string()),
status,
token_budget: params.token_budget,
expected_goal_id: Some(goal.goal_id.clone()),
},
)
.await
.and_then(|goal| {
goal.ok_or_else(|| {
anyhow::anyhow!(
"cannot update goal for thread {thread_id}: no goal exists"
)
})
})
.map(|goal| (goal, previous_status))
} else {
let previous_status = ExternalGoalPreviousStatus::NewGoal;
state_db
.thread_goals()
.replace_thread_goal(
thread_id,
objective,
status.unwrap_or(codex_state::ThreadGoalStatus::Active),
params.token_budget.flatten(),
)
.await
.map(|goal| (goal, previous_status))
}
} else {
let existing_goal = state_db
.thread_goals()
.get_thread_goal(thread_id)
.await
.map_err(|err| invalid_request(err.to_string()))?;
let Some(existing_goal) = existing_goal else {
return Err(invalid_request(format!(
"cannot update goal for thread {thread_id}: no goal exists"
)));
};
let previous_status = ExternalGoalPreviousStatus::from(&existing_goal);
state_db
.thread_goals()
.update_thread_goal(
let outcome = self
.goal_service
.set_thread_goal(
&state_db,
GoalSetRequest {
thread_id,
codex_state::GoalUpdate {
objective: None,
status,
token_budget: params.token_budget,
expected_goal_id: None,
objective: objective
.map(GoalObjectiveUpdate::Set)
.unwrap_or(GoalObjectiveUpdate::Keep),
status,
token_budget: match params.token_budget {
Some(token_budget) => GoalTokenBudgetUpdate::Set(token_budget),
None => GoalTokenBudgetUpdate::Keep,
},
)
.await
.and_then(|goal| {
goal.ok_or_else(|| {
anyhow::anyhow!("cannot update goal for thread {thread_id}: no goal exists")
})
})
.map(|goal| (goal, previous_status))
})
.map_err(|err| invalid_request(err.to_string()))?;
if should_set_thread_preview
&& let Err(err) = state_db
.set_thread_preview_if_empty(thread_id, goal.objective.as_str())
.await
{
warn!("failed to set empty thread preview from goal objective for {thread_id}: {err}");
}
let external_goal_set = ExternalGoalSet {
goal: goal.clone(),
previous_status,
};
let goal = api_thread_goal_from_state(goal);
},
)
.await
.map_err(goal_service_error)?;
let goal = ThreadGoal::from(outcome.goal.clone());
self.outgoing
.send_response(
request_id.clone(),
@@ -242,9 +143,7 @@ impl ThreadGoalRequestProcessor {
.await;
self.emit_thread_goal_updated_ordered(thread_id, goal, listener_command_tx)
.await;
if let Some(thread) = running_thread.as_ref() {
thread.apply_external_goal_set(external_goal_set).await;
}
outcome.apply_runtime_effects(&self.goal_service).await;
Ok(())
}
@@ -258,12 +157,12 @@ impl ThreadGoalRequestProcessor {
let thread_id = parse_thread_id_for_request(params.thread_id.as_str())?;
let state_db = self.state_db_for_materialized_thread(thread_id).await?;
let goal = state_db
.thread_goals()
.get_thread_goal(thread_id)
let goal = self
.goal_service
.get_thread_goal(&state_db, thread_id)
.await
.map_err(|err| internal_error(format!("failed to read thread goal: {err}")))?
.map(api_thread_goal_from_state);
.map_err(goal_service_error)?
.map(ThreadGoal::from);
Ok(ThreadGoalGetResponse { goal })
}
@@ -278,53 +177,19 @@ impl ThreadGoalRequestProcessor {
let thread_id = parse_thread_id_for_request(params.thread_id.as_str())?;
let state_db = self.state_db_for_materialized_thread(thread_id).await?;
let running_thread = self.thread_manager.get_thread(thread_id).await.ok();
let rollout_path = match running_thread.as_ref() {
Some(thread) => thread.rollout_path().ok_or_else(|| {
invalid_request(format!(
"ephemeral thread does not support goals: {thread_id}"
))
})?,
None => codex_rollout::find_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
self.state_db.as_deref(),
)
.await
.map_err(|err| {
internal_error(format!("failed to locate thread id {thread_id}: {err}"))
})?
.ok_or_else(|| invalid_request(format!("thread not found: {thread_id}")))?,
};
reconcile_rollout(
Some(&state_db),
rollout_path.as_path(),
self.config.model_provider_id.as_str(),
/*builder*/ None,
&[],
/*archived_only*/ None,
/*new_thread_memory_mode*/ None,
)
.await;
if let Some(thread) = running_thread.as_ref() {
thread.prepare_external_goal_mutation().await;
}
self.reconcile_thread_goal_rollout(thread_id, &state_db)
.await?;
let listener_command_tx = {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
let thread_state = thread_state.lock().await;
thread_state.listener_command_tx()
};
let cleared = state_db
.thread_goals()
.delete_thread_goal(thread_id)
let cleared = self
.goal_service
.clear_thread_goal(&state_db, thread_id)
.await
.map_err(|err| internal_error(format!("failed to clear thread goal: {err}")))?;
if cleared && let Some(thread) = running_thread.as_ref() {
thread.apply_external_goal_clear().await;
}
.map_err(goal_service_error)?;
self.outgoing
.send_response(request_id, ThreadGoalClearResponse { cleared })
@@ -367,6 +232,42 @@ impl ThreadGoalRequestProcessor {
.ok_or_else(|| internal_error("sqlite state db unavailable for thread goals"))
}
async fn reconcile_thread_goal_rollout(
&self,
thread_id: ThreadId,
state_db: &StateDbHandle,
) -> Result<(), JSONRPCErrorError> {
let running_thread = self.thread_manager.get_thread(thread_id).await.ok();
let rollout_path = match running_thread.as_ref() {
Some(thread) => thread.rollout_path().ok_or_else(|| {
invalid_request(format!(
"ephemeral thread does not support goals: {thread_id}"
))
})?,
None => codex_rollout::find_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
self.state_db.as_deref(),
)
.await
.map_err(|err| {
internal_error(format!("failed to locate thread id {thread_id}: {err}"))
})?
.ok_or_else(|| invalid_request(format!("thread not found: {thread_id}")))?,
};
reconcile_rollout(
Some(state_db),
rollout_path.as_path(),
self.config.model_provider_id.as_str(),
/*builder*/ None,
&[],
/*archived_only*/ None,
/*new_thread_memory_mode*/ None,
)
.await;
Ok(())
}
async fn emit_thread_goal_snapshot(&self, thread_id: ThreadId) {
let state_db = match self.state_db_for_materialized_thread(thread_id).await {
Ok(state_db) => state_db,
@@ -405,6 +306,7 @@ impl ThreadGoalRequestProcessor {
) {
if let Some(listener_command_tx) = listener_command_tx {
let command = crate::thread_state::ThreadListenerCommand::EmitThreadGoalUpdated {
turn_id: None,
goal: goal.clone(),
};
if listener_command_tx.send(command).is_ok() {
@@ -449,27 +351,20 @@ impl ThreadGoalRequestProcessor {
}
}
fn validate_goal_budget(value: Option<i64>) -> Result<(), String> {
if let Some(value) = value
&& value <= 0
{
return Err("goal budgets must be positive when provided".to_string());
}
Ok(())
}
fn thread_goal_status_to_state(status: ThreadGoalStatus) -> codex_state::ThreadGoalStatus {
match status {
ThreadGoalStatus::Active => codex_state::ThreadGoalStatus::Active,
ThreadGoalStatus::Paused => codex_state::ThreadGoalStatus::Paused,
ThreadGoalStatus::Blocked => codex_state::ThreadGoalStatus::Blocked,
ThreadGoalStatus::UsageLimited => codex_state::ThreadGoalStatus::UsageLimited,
ThreadGoalStatus::BudgetLimited => codex_state::ThreadGoalStatus::BudgetLimited,
ThreadGoalStatus::Complete => codex_state::ThreadGoalStatus::Complete,
pub(super) fn api_thread_goal_from_state(goal: codex_state::ThreadGoal) -> ThreadGoal {
ThreadGoal {
thread_id: goal.thread_id.to_string(),
objective: goal.objective,
status: api_thread_goal_status_from_state(goal.status),
token_budget: goal.token_budget,
tokens_used: goal.tokens_used,
time_used_seconds: goal.time_used_seconds,
created_at: goal.created_at.timestamp(),
updated_at: goal.updated_at.timestamp(),
}
}
fn thread_goal_status_from_state(status: codex_state::ThreadGoalStatus) -> ThreadGoalStatus {
fn api_thread_goal_status_from_state(status: codex_state::ThreadGoalStatus) -> ThreadGoalStatus {
match status {
codex_state::ThreadGoalStatus::Active => ThreadGoalStatus::Active,
codex_state::ThreadGoalStatus::Paused => ThreadGoalStatus::Paused,
@@ -480,16 +375,10 @@ fn thread_goal_status_from_state(status: codex_state::ThreadGoalStatus) -> Threa
}
}
pub(super) fn api_thread_goal_from_state(goal: codex_state::ThreadGoal) -> ThreadGoal {
ThreadGoal {
thread_id: goal.thread_id.to_string(),
objective: goal.objective,
status: thread_goal_status_from_state(goal.status),
token_budget: goal.token_budget,
tokens_used: goal.tokens_used,
time_used_seconds: goal.time_used_seconds,
created_at: goal.created_at.timestamp(),
updated_at: goal.updated_at.timestamp(),
fn goal_service_error(err: GoalServiceError) -> JSONRPCErrorError {
match err {
GoalServiceError::InvalidRequest(message) => invalid_request(message),
GoalServiceError::Internal(message) => internal_error(message),
}
}
@@ -244,12 +244,22 @@ pub(super) async fn ensure_listener_task_running(
if thread_state.listener_matches(&conversation) {
return Ok(());
}
thread_state.set_listener(
let (listener_command_rx, listener_generation) = thread_state.set_listener(
cancel_tx,
&conversation,
watch_registration,
thread_settings_baseline,
)
);
let Some(listener_command_tx) = thread_state.listener_command_tx() else {
tracing::warn!(
"thread listener command sender missing immediately after listener registration"
);
return Ok(());
};
listener_task_context
.thread_state_manager
.register_listener_command_tx(conversation_id, listener_command_tx);
(listener_command_rx, listener_generation)
};
let ListenerTaskContext {
outgoing,
@@ -378,6 +388,7 @@ pub(super) async fn ensure_listener_task_running(
let mut thread_state = thread_state.lock().await;
if thread_state.listener_generation == listener_generation {
thread_state_manager.unregister_listener_command_tx(conversation_id);
thread_state.clear_listener();
}
});
@@ -471,12 +482,12 @@ pub(super) async fn handle_thread_listener_command(
)
.await;
}
ThreadListenerCommand::EmitThreadGoalUpdated { goal } => {
ThreadListenerCommand::EmitThreadGoalUpdated { turn_id, goal } => {
outgoing
.send_server_notification(ServerNotification::ThreadGoalUpdated(
ThreadGoalUpdatedNotification {
thread_id: conversation_id.to_string(),
turn_id: None,
turn_id,
goal,
},
))
@@ -616,12 +627,6 @@ pub(super) async fn handle_pending_thread_resume_request(
}
}
if pending.emit_thread_goal_update
&& let Err(err) = conversation.apply_goal_resume_runtime_effects().await
{
tracing::warn!("failed to apply goal resume runtime effects: {err}");
}
let ThreadConfigSnapshot {
model,
model_provider_id,
@@ -691,11 +696,9 @@ pub(super) async fn handle_pending_thread_resume_request(
.replay_requests_to_connection_for_thread(connection_id, conversation_id)
.await;
// App-server owns resume response and snapshot ordering, so wait until
// replay completes before letting core start goal continuation.
if pending.emit_thread_goal_update
&& let Err(err) = conversation.continue_active_goal_if_idle().await
{
tracing::warn!("failed to continue active goal after running-thread resume: {err}");
// replay completes before letting extensions react to the idle thread.
if pending.emit_thread_goal_update {
conversation.emit_thread_idle_lifecycle_if_idle().await;
}
}
+38 -1
View File
@@ -17,6 +17,7 @@ use codex_utils_absolute_path::AbsolutePathBuf;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::Weak;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
@@ -44,8 +45,9 @@ pub(crate) struct PendingThreadResumeRequest {
pub(crate) enum ThreadListenerCommand {
// SendThreadResumeResponse is used to resume an already running thread by sending the thread's history to the client and atomically subscribing for new updates.
SendThreadResumeResponse(Box<PendingThreadResumeRequest>),
// EmitThreadGoalUpdated is used to order app-server goal updates with running-thread resume responses.
// EmitThreadGoalUpdated is used to order goal updates with running-thread resume responses and goal clears.
EmitThreadGoalUpdated {
turn_id: Option<String>,
goal: ThreadGoal,
},
// EmitThreadGoalCleared is used to order app-server goal clears with running-thread resume responses.
@@ -284,6 +286,10 @@ pub(crate) struct ConnectionCapabilities {
#[derive(Clone, Default)]
pub(crate) struct ThreadStateManager {
state: Arc<Mutex<ThreadStateManagerInner>>,
// Extension event sinks are synchronous, so they need an await-free way to
// enqueue work on the active per-thread listener.
listener_commands:
Arc<StdMutex<HashMap<ThreadId, mpsc::UnboundedSender<ThreadListenerCommand>>>>,
}
impl ThreadStateManager {
@@ -337,6 +343,35 @@ impl ThreadStateManager {
state.threads.entry(thread_id).or_default().state.clone()
}
pub(crate) fn current_listener_command_tx(
&self,
thread_id: ThreadId,
) -> Option<mpsc::UnboundedSender<ThreadListenerCommand>> {
self.listener_commands
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.get(&thread_id)
.cloned()
}
pub(crate) fn register_listener_command_tx(
&self,
thread_id: ThreadId,
tx: mpsc::UnboundedSender<ThreadListenerCommand>,
) {
self.listener_commands
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(thread_id, tx);
}
pub(crate) fn unregister_listener_command_tx(&self, thread_id: ThreadId) {
self.listener_commands
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.remove(&thread_id);
}
pub(crate) async fn remove_thread_state(&self, thread_id: ThreadId) {
let thread_state = {
let mut state = self.state.lock().await;
@@ -350,6 +385,7 @@ impl ThreadStateManager {
});
thread_state
};
self.unregister_listener_command_tx(thread_id);
if let Some(thread_state) = thread_state {
let mut thread_state = thread_state.lock().await;
@@ -375,6 +411,7 @@ impl ThreadStateManager {
};
for (thread_id, thread_state) in thread_states {
self.unregister_listener_command_tx(thread_id);
let mut thread_state = thread_state.lock().await;
tracing::debug!(
thread_id = %thread_id,
+3 -45
View File
@@ -1,7 +1,5 @@
use crate::agent::AgentStatus;
use crate::config::ConstraintResult;
use crate::goals::ExternalGoalSet;
use crate::goals::GoalRuntimeEvent;
use crate::session::Codex;
use crate::session::SessionSettingsUpdate;
use crate::session::SteerInputError;
@@ -205,51 +203,11 @@ impl CodexThread {
}
}
pub async fn apply_goal_resume_runtime_effects(&self) -> anyhow::Result<()> {
pub async fn emit_thread_idle_lifecycle_if_idle(&self) {
self.codex
.session
.goal_runtime_apply(GoalRuntimeEvent::ThreadResumed)
.await
}
pub async fn continue_active_goal_if_idle(&self) -> anyhow::Result<()> {
self.codex
.session
.goal_runtime_apply(GoalRuntimeEvent::MaybeContinueIfIdle)
.await
}
pub async fn prepare_external_goal_mutation(&self) {
if let Err(err) = self
.codex
.session
.goal_runtime_apply(GoalRuntimeEvent::ExternalMutationStarting)
.await
{
tracing::warn!("failed to prepare external goal mutation: {err}");
}
}
pub async fn apply_external_goal_set(&self, external_set: ExternalGoalSet) {
if let Err(err) = self
.codex
.session
.goal_runtime_apply(GoalRuntimeEvent::ExternalSet { external_set })
.await
{
tracing::warn!("failed to apply external goal status runtime effects: {err}");
}
}
pub async fn apply_external_goal_clear(&self) {
if let Err(err) = self
.codex
.session
.goal_runtime_apply(GoalRuntimeEvent::ExternalClear)
.await
{
tracing::warn!("failed to apply external goal clear runtime effects: {err}");
}
.emit_thread_idle_lifecycle_if_idle()
.await;
}
#[doc(hidden)]
@@ -33,14 +33,14 @@ fn detects_subagent_notification_fragment_case_insensitively() {
#[test]
fn detects_internal_model_context_fragment() {
let text = InternalModelContextFragment::new(
InternalContextSource::from_static("goal"),
"Continue working toward the active thread goal.",
InternalContextSource::from_static("extension"),
"Internal steering.",
)
.render();
assert_eq!(
text,
"<codex_internal_context source=\"goal\">\nContinue working toward the active thread goal.\n</codex_internal_context>"
"<codex_internal_context source=\"extension\">\nInternal steering.\n</codex_internal_context>"
);
assert!(is_contextual_user_fragment(&ContentItem::InputText {
text
@@ -65,7 +65,7 @@ fn does_not_hide_arbitrary_context_tags() {
#[test]
fn rejects_invalid_internal_model_context_source() {
assert!(!is_contextual_user_fragment(&ContentItem::InputText {
text: "<codex_internal_context source=\"Goal\">\nbody\n</codex_internal_context>"
text: "<codex_internal_context source=\"Extension\">\nbody\n</codex_internal_context>"
.to_string(),
}));
}
@@ -73,13 +73,13 @@ fn rejects_invalid_internal_model_context_source() {
#[test]
fn contextual_user_fragment_is_dyn_compatible() {
let fragment: Box<dyn ContextualUserFragment> = Box::new(InternalModelContextFragment::new(
InternalContextSource::from_static("goal"),
"Continue working toward the active thread goal.",
InternalContextSource::from_static("extension"),
"Internal steering.",
));
assert_eq!(
fragment.render(),
"<codex_internal_context source=\"goal\">\nContinue working toward the active thread goal.\n</codex_internal_context>"
"<codex_internal_context source=\"extension\">\nInternal steering.\n</codex_internal_context>"
);
}
+2 -2
View File
@@ -324,8 +324,8 @@ fn internal_model_context_does_not_parse_as_visible_turn_item() {
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: InternalModelContextFragment::new(
InternalContextSource::from_static("goal"),
"Continue working toward the active thread goal.",
InternalContextSource::from_static("extension"),
"Internal steering.",
)
.render(),
}],
File diff suppressed because it is too large Load Diff
-3
View File
@@ -39,9 +39,6 @@ pub mod exec_env;
mod exec_policy;
#[cfg(test)]
mod git_info_tests;
mod goals;
pub use goals::ExternalGoalPreviousStatus;
pub use goals::ExternalGoalSet;
mod guardian;
mod hook_runtime;
mod installation_id;
-2
View File
@@ -26,7 +26,6 @@ pub(super) async fn spawn_review_thread(
let _ = review_features.disable(Feature::WebSearchCached);
let _ = review_features.disable(Feature::Goals);
let review_web_search_mode = WebSearchMode::Disabled;
let goal_tools_supported = !config.ephemeral && parent_turn_context.goal_tools_enabled();
let available_models = sess
.services
.models_manager
@@ -126,7 +125,6 @@ pub(super) async fn spawn_review_thread(
environments: parent_turn_context.environments.clone(),
available_models,
unified_exec_shell_mode,
goal_tools_supported,
features: review_features,
ghost_snapshot: parent_turn_context.ghost_snapshot.clone(),
current_date: parent_turn_context.current_date.clone(),
-3
View File
@@ -2,7 +2,6 @@ use super::input_queue::InputQueue;
use super::*;
use crate::agents_md::LoadedAgentsMd;
use crate::config::ConstraintError;
use crate::goals::GoalRuntimeState;
use crate::skills::SkillError;
use crate::state::ActiveTurn;
use codex_protocol::SessionId;
@@ -37,7 +36,6 @@ pub(crate) struct Session {
pub(crate) conversation: Arc<RealtimeConversationManager>,
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
pub(crate) input_queue: InputQueue,
pub(crate) goal_runtime: GoalRuntimeState,
pub(crate) guardian_review_session: GuardianReviewSessionManager,
pub(crate) services: SessionServices,
pub(super) next_internal_sub_id: AtomicU64,
@@ -1059,7 +1057,6 @@ impl Session {
conversation: Arc::new(RealtimeConversationManager::new()),
active_turn: Mutex::new(None),
input_queue: InputQueue::new(),
goal_runtime: GoalRuntimeState::new(),
guardian_review_session: GuardianReviewSessionManager::default(),
services,
next_internal_sub_id: AtomicU64::new(0),
File diff suppressed because it is too large Load Diff
-30
View File
@@ -18,7 +18,6 @@ use crate::compact_remote_v2::run_inline_remote_auto_compact_task as run_inline_
use crate::connectors;
use crate::context::ContextualUserFragment;
use crate::feedback_tags;
use crate::goals::GoalRuntimeEvent;
use crate::hook_runtime::inspect_pending_input;
use crate::hook_runtime::record_additional_contexts;
use crate::hook_runtime::record_pending_input;
@@ -151,15 +150,6 @@ pub(crate) async fn run_turn(
let error = err.to_codex_protocol_error();
sess.emit_turn_error_lifecycle(turn_context.as_ref(), error.clone())
.await;
if error == CodexErrorInfo::UsageLimitExceeded
&& let Err(err) = sess
.goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached {
turn_context: turn_context.as_ref(),
})
.await
{
warn!("failed to usage-limit active goal after usage-limit error: {err}");
}
error!("Failed to run pre-sampling compact");
return None;
}
@@ -294,17 +284,6 @@ pub(crate) async fn run_turn(
let error = err.to_codex_protocol_error();
sess.emit_turn_error_lifecycle(turn_context.as_ref(), error.clone())
.await;
if error == CodexErrorInfo::UsageLimitExceeded
&& let Err(err) = sess
.goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached {
turn_context: turn_context.as_ref(),
})
.await
{
warn!(
"failed to usage-limit active goal after usage-limit error: {err}"
);
}
return None;
}
can_drain_pending_input = !model_needs_follow_up;
@@ -390,15 +369,6 @@ pub(crate) async fn run_turn(
let error = e.to_codex_protocol_error();
sess.emit_turn_error_lifecycle(turn_context.as_ref(), error.clone())
.await;
if error == CodexErrorInfo::UsageLimitExceeded
&& let Err(err) = sess
.goal_runtime_apply(GoalRuntimeEvent::UsageLimitReached {
turn_context: turn_context.as_ref(),
})
.await
{
warn!("failed to usage-limit active goal after usage-limit error: {err}");
}
sess.track_turn_codex_error(turn_context.as_ref(), &e);
let event = EventMsg::Error(e.to_error_event(/*message_prefix*/ None));
sess.send_event(&turn_context, event).await;
-10
View File
@@ -91,7 +91,6 @@ pub struct TurnContext {
pub(crate) shell_environment_policy: ShellEnvironmentPolicy,
pub(crate) available_models: Vec<ModelPreset>,
pub(crate) unified_exec_shell_mode: UnifiedExecShellMode,
pub(crate) goal_tools_supported: bool,
pub features: ManagedFeatures,
pub(crate) ghost_snapshot: GhostSnapshotConfig,
pub(crate) final_output_json_schema: Option<Value>,
@@ -170,10 +169,6 @@ impl TurnContext {
ToolEnvironmentMode::from_count(self.environments.turn_environments.len())
}
pub(crate) fn goal_tools_enabled(&self) -> bool {
self.goal_tools_supported && self.features.get().enabled(Feature::Goals)
}
pub(crate) async fn with_model(
&self,
model: String,
@@ -264,7 +259,6 @@ impl TurnContext {
shell_environment_policy: self.shell_environment_policy.clone(),
available_models,
unified_exec_shell_mode: self.unified_exec_shell_mode.clone(),
goal_tools_supported: self.goal_tools_supported,
features,
ghost_snapshot: self.ghost_snapshot.clone(),
final_output_json_schema: self.final_output_json_schema.clone(),
@@ -477,7 +471,6 @@ impl Session {
cwd: AbsolutePathBuf,
sub_id: String,
skills_outcome: Arc<SkillLoadOutcome>,
goal_tools_supported: bool,
) -> TurnContext {
let reasoning_effort = session_configuration.collaboration_mode.reasoning_effort();
let reasoning_summary = session_configuration
@@ -568,7 +561,6 @@ impl Session {
shell_environment_policy: per_turn_config.permissions.shell_environment_policy.clone(),
available_models,
unified_exec_shell_mode,
goal_tools_supported,
features: per_turn_config.features.clone(),
ghost_snapshot: per_turn_config.ghost_snapshot.clone(),
final_output_json_schema: None,
@@ -781,7 +773,6 @@ impl Session {
.skills_for_config(&skills_input, fs)
.await,
);
let goal_tools_supported = !per_turn_config.ephemeral && self.state_db().is_some();
let mut turn_context: TurnContext = Self::make_turn_context(
self.thread_id(),
self.session_id(),
@@ -810,7 +801,6 @@ impl Session {
cwd,
sub_id,
skills_outcome,
goal_tools_supported,
);
turn_context.realtime_active = self.conversation.running_state().await.is_some();
-42
View File
@@ -23,7 +23,6 @@ use tracing::warn;
use crate::config::Config;
use crate::context::ContextualUserFragment;
use crate::goals::GoalRuntimeEvent;
use crate::hook_runtime::inspect_pending_input;
use crate::hook_runtime::record_additional_contexts;
use crate::hook_runtime::record_pending_input;
@@ -342,15 +341,6 @@ impl Session {
.await
.clear_turn(&turn_context.sub_id);
if let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TurnStarted {
turn_context: turn_context.as_ref(),
token_usage: token_usage_at_turn_start.clone(),
})
.await
{
warn!("failed to apply goal runtime turn-start event: {err}");
}
let pending_items = self.input_queue.get_pending_input(&self.active_turn).await;
let turn_state = {
let mut active = self.active_turn.lock().await;
@@ -504,15 +494,6 @@ impl Session {
self.emit_turn_abort_lifecycle(reason.clone(), turn_context.extension_data.as_ref())
.await;
}
if (aborted_turn || reason == TurnAbortReason::Interrupted)
&& let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TaskAborted {
turn_context: turn_context.as_deref(),
})
.await
{
warn!("failed to apply goal runtime abort event: {err}");
}
if let Some(active_turn) = active_turn_to_clear {
// Let interrupted tasks observe cancellation before dropping pending approvals, or an
// in-flight approval wait can surface as a model-visible rejection before TurnAborted.
@@ -553,14 +534,6 @@ impl Session {
self.emit_turn_abort_lifecycle(reason.clone(), turn_context.extension_data.as_ref())
.await;
}
if let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TaskAborted {
turn_context: turn_context.as_deref(),
})
.await
{
warn!("failed to apply goal runtime abort event: {err}");
}
// Let interrupted tasks observe cancellation before dropping pending approvals, or an
// in-flight approval wait can surface as a model-visible rejection before TurnAborted.
self.input_queue.clear_pending(&active_turn).await;
@@ -760,15 +733,6 @@ impl Session {
});
self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref())
.await;
if let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TurnFinished {
turn_context: turn_context.as_ref(),
turn_completed: true,
})
.await
{
warn!("failed to apply goal runtime turn-finished event: {err}");
}
let event = EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_context.sub_id.clone(),
last_agent_message,
@@ -798,12 +762,6 @@ impl Session {
if !cleared_active_turn {
return;
}
if let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::MaybeContinueIfIdle)
.await
{
warn!("failed to apply goal runtime maybe-continue event: {err}");
}
self.emit_thread_idle_lifecycle_if_idle().await;
}
-3
View File
@@ -1343,9 +1343,6 @@ impl ThreadManagerState {
.await?;
if is_resumed_thread {
new_thread.thread.emit_thread_resume_lifecycle().await;
if let Err(err) = new_thread.thread.apply_goal_resume_runtime_effects().await {
warn!("failed to apply goal resume runtime effects: {err}");
}
}
Ok(new_thread)
}
-101
View File
@@ -8,7 +8,6 @@ use crate::session::tests::make_session_and_context;
use crate::tasks::InterruptedTurnHistoryMarker;
use crate::tasks::interrupted_turn_history_marker;
use codex_extension_api::empty_extension_registry;
use codex_features::Feature;
use codex_models_manager::manager::RefreshStrategy;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemReasoningSummary;
@@ -1501,103 +1500,3 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_
1,
);
}
#[tokio::test]
async fn resumed_thread_keeps_paused_goal_paused() -> anyhow::Result<()> {
let temp_dir = tempdir().expect("tempdir");
let mut config = test_config().await;
config.codex_home = temp_dir.path().join("codex-home").abs();
config.cwd = config.codex_home.abs();
config
.features
.enable(Feature::Goals)
.expect("goals should be enableable in tests");
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let state_db = init_state_db(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
empty_extension_registry(),
/*analytics_events_client*/ None,
thread_store_from_config(&config, state_db.clone()),
state_db.clone(),
TEST_INSTALLATION_ID.to_string(),
/*attestation_provider*/ None,
);
let source = manager
.resume_thread_with_history(
config.clone(),
InitialHistory::Forked(vec![RolloutItem::ResponseItem(user_msg("keep working"))]),
auth_manager.clone(),
/*parent_trace*/ None,
)
.await
.expect("create source thread");
let source_path = source
.thread
.rollout_path()
.expect("source rollout path should exist");
source.thread.flush_rollout().await?;
let state_db = source
.thread
.state_db()
.expect("source thread should have a state db");
state_db
.thread_goals()
.replace_thread_goal(
source.thread_id,
"Keep working until the task is done",
codex_state::ThreadGoalStatus::Paused,
/*token_budget*/ None,
)
.await?;
source.thread.shutdown_and_wait().await?;
manager.remove_thread(&source.thread_id).await;
let resumed = manager
.resume_thread_from_rollout(
config.clone(),
source_path,
auth_manager,
/*parent_trace*/ None,
)
.await
.expect("resume source thread");
let goal = state_db
.thread_goals()
.get_thread_goal(resumed.thread_id)
.await?
.expect("goal should still exist after resume");
assert_eq!(codex_state::ThreadGoalStatus::Paused, goal.status);
assert!(
resumed
.thread
.codex
.session
.active_turn
.lock()
.await
.is_none()
);
resumed.thread.continue_active_goal_if_idle().await?;
assert!(
resumed
.thread
.codex
.session
.active_turn
.lock()
.await
.is_none()
);
resumed.thread.shutdown_and_wait().await?;
Ok(())
}
-158
View File
@@ -1,158 +0,0 @@
//! Built-in model tool handlers for persisted thread goals.
//!
//! The public tool contract intentionally splits goal creation from stopped
//! status updates: `create_goal` starts an active objective, while
//! `update_goal` can only mark the existing goal complete or blocked.
use crate::function_tool::FunctionCallError;
use crate::tools::context::FunctionToolOutput;
use codex_protocol::protocol::ThreadGoal;
use codex_protocol::protocol::ThreadGoalStatus;
use serde::Deserialize;
use serde::Serialize;
use std::fmt::Write as _;
mod create_goal;
mod get_goal;
mod update_goal;
pub use create_goal::CreateGoalHandler;
pub use get_goal::GetGoalHandler;
pub use update_goal::UpdateGoalHandler;
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct CreateGoalArgs {
objective: String,
token_budget: Option<i64>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct UpdateGoalArgs {
status: ThreadGoalStatus,
}
#[derive(Debug, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
struct GoalToolResponse {
goal: Option<ThreadGoal>,
remaining_tokens: Option<i64>,
completion_budget_report: Option<String>,
}
#[derive(Clone, Copy)]
enum CompletionBudgetReport {
Include,
Omit,
}
impl GoalToolResponse {
fn new(goal: Option<ThreadGoal>, report_mode: CompletionBudgetReport) -> Self {
let remaining_tokens = goal.as_ref().and_then(|goal| {
goal.token_budget
.map(|budget| (budget - goal.tokens_used).max(0))
});
let completion_budget_report = match report_mode {
CompletionBudgetReport::Include => goal
.as_ref()
.filter(|goal| goal.status == ThreadGoalStatus::Complete)
.and_then(completion_budget_report),
CompletionBudgetReport::Omit => None,
};
Self {
goal,
remaining_tokens,
completion_budget_report,
}
}
}
fn format_goal_error(err: anyhow::Error) -> String {
let mut message = err.to_string();
for cause in err.chain().skip(1) {
let _ = write!(message, ": {cause}");
}
message
}
fn goal_response(
goal: Option<ThreadGoal>,
completion_budget_report: CompletionBudgetReport,
) -> Result<FunctionToolOutput, FunctionCallError> {
let response =
serde_json::to_string_pretty(&GoalToolResponse::new(goal, completion_budget_report))
.map_err(|err| FunctionCallError::Fatal(err.to_string()))?;
Ok(FunctionToolOutput::from_text(response, Some(true)))
}
fn completion_budget_report(goal: &ThreadGoal) -> Option<String> {
if goal.token_budget.is_none() && goal.time_used_seconds <= 0 {
None
} else {
Some(
"Goal achieved. Report final usage from this tool result's structured goal fields. If `goal.tokenBudget` is present, include token usage from `goal.tokensUsed` and `goal.tokenBudget`. If `goal.timeUsedSeconds` is greater than 0, summarize elapsed time in a concise, human-friendly form appropriate to the response language."
.to_string(),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use codex_protocol::ThreadId;
use pretty_assertions::assert_eq;
#[test]
fn completed_budgeted_goal_response_reports_final_usage() {
let goal = ThreadGoal {
thread_id: ThreadId::new(),
objective: "Keep optimizing".to_string(),
status: ThreadGoalStatus::Complete,
token_budget: Some(10_000),
tokens_used: 3_250,
time_used_seconds: 75,
created_at: 1,
updated_at: 2,
};
let response = GoalToolResponse::new(Some(goal.clone()), CompletionBudgetReport::Include);
assert_eq!(
response,
GoalToolResponse {
goal: Some(goal),
remaining_tokens: Some(6_750),
completion_budget_report: Some(
"Goal achieved. Report final usage from this tool result's structured goal fields. If `goal.tokenBudget` is present, include token usage from `goal.tokensUsed` and `goal.tokenBudget`. If `goal.timeUsedSeconds` is greater than 0, summarize elapsed time in a concise, human-friendly form appropriate to the response language."
.to_string()
),
}
);
}
#[test]
fn completed_unbudgeted_goal_response_omits_budget_report() {
let goal = ThreadGoal {
thread_id: ThreadId::new(),
objective: "Write a poem".to_string(),
status: ThreadGoalStatus::Complete,
token_budget: None,
tokens_used: 120,
time_used_seconds: 0,
created_at: 1,
updated_at: 2,
};
let response = GoalToolResponse::new(Some(goal.clone()), CompletionBudgetReport::Include);
assert_eq!(
response,
GoalToolResponse {
goal: Some(goal),
remaining_tokens: None,
completion_budget_report: None,
}
);
}
}
@@ -1,78 +0,0 @@
use crate::function_tool::FunctionCallError;
use crate::goals::CreateGoalRequest;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolPayload;
use crate::tools::context::boxed_tool_output;
use crate::tools::handlers::goal_spec::CREATE_GOAL_TOOL_NAME;
use crate::tools::handlers::goal_spec::create_create_goal_tool;
use crate::tools::handlers::parse_arguments;
use crate::tools::registry::CoreToolRuntime;
use crate::tools::registry::ToolExecutor;
use codex_tools::ToolName;
use codex_tools::ToolSpec;
use super::CompletionBudgetReport;
use super::CreateGoalArgs;
use super::format_goal_error;
use super::goal_response;
pub struct CreateGoalHandler;
#[async_trait::async_trait]
impl ToolExecutor<ToolInvocation> for CreateGoalHandler {
fn tool_name(&self) -> ToolName {
ToolName::plain(CREATE_GOAL_TOOL_NAME)
}
fn spec(&self) -> ToolSpec {
create_create_goal_tool()
}
async fn handle(
&self,
invocation: ToolInvocation,
) -> Result<Box<dyn crate::tools::context::ToolOutput>, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
..
} = invocation;
let arguments = match payload {
ToolPayload::Function { arguments } => arguments,
_ => {
return Err(FunctionCallError::RespondToModel(
"goal handler received unsupported payload".to_string(),
));
}
};
let args: CreateGoalArgs = parse_arguments(&arguments)?;
let goal = session
.create_thread_goal(
turn.as_ref(),
CreateGoalRequest {
objective: args.objective,
token_budget: args.token_budget,
},
)
.await
.map_err(|err| {
if err
.chain()
.any(|cause| cause.to_string().contains("already has a goal"))
{
FunctionCallError::RespondToModel(
"cannot create a new goal because this thread already has a goal; use update_goal only when the existing goal is complete"
.to_string(),
)
} else {
FunctionCallError::RespondToModel(format_goal_error(err))
}
})?;
goal_response(Some(goal), CompletionBudgetReport::Omit).map(boxed_tool_output)
}
}
impl CoreToolRuntime for CreateGoalHandler {}
@@ -1,51 +0,0 @@
use crate::function_tool::FunctionCallError;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolPayload;
use crate::tools::context::boxed_tool_output;
use crate::tools::handlers::goal_spec::GET_GOAL_TOOL_NAME;
use crate::tools::handlers::goal_spec::create_get_goal_tool;
use crate::tools::registry::CoreToolRuntime;
use crate::tools::registry::ToolExecutor;
use codex_tools::ToolName;
use codex_tools::ToolSpec;
use super::CompletionBudgetReport;
use super::format_goal_error;
use super::goal_response;
pub struct GetGoalHandler;
#[async_trait::async_trait]
impl ToolExecutor<ToolInvocation> for GetGoalHandler {
fn tool_name(&self) -> ToolName {
ToolName::plain(GET_GOAL_TOOL_NAME)
}
fn spec(&self) -> ToolSpec {
create_get_goal_tool()
}
async fn handle(
&self,
invocation: ToolInvocation,
) -> Result<Box<dyn crate::tools::context::ToolOutput>, FunctionCallError> {
let ToolInvocation {
session, payload, ..
} = invocation;
match payload {
ToolPayload::Function { .. } => {
let goal = session
.get_thread_goal()
.await
.map_err(|err| FunctionCallError::RespondToModel(format_goal_error(err)))?;
goal_response(goal, CompletionBudgetReport::Omit).map(boxed_tool_output)
}
_ => Err(FunctionCallError::RespondToModel(
"get_goal handler received unsupported payload".to_string(),
)),
}
}
}
impl CoreToolRuntime for GetGoalHandler {}
@@ -1,89 +0,0 @@
use crate::function_tool::FunctionCallError;
use crate::goals::GoalRuntimeEvent;
use crate::goals::SetGoalRequest;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolPayload;
use crate::tools::context::boxed_tool_output;
use crate::tools::handlers::goal_spec::UPDATE_GOAL_TOOL_NAME;
use crate::tools::handlers::goal_spec::create_update_goal_tool;
use crate::tools::handlers::parse_arguments;
use crate::tools::registry::CoreToolRuntime;
use crate::tools::registry::ToolExecutor;
use codex_protocol::protocol::ThreadGoalStatus;
use codex_tools::ToolName;
use codex_tools::ToolSpec;
use super::CompletionBudgetReport;
use super::UpdateGoalArgs;
use super::format_goal_error;
use super::goal_response;
pub struct UpdateGoalHandler;
#[async_trait::async_trait]
impl ToolExecutor<ToolInvocation> for UpdateGoalHandler {
fn tool_name(&self) -> ToolName {
ToolName::plain(UPDATE_GOAL_TOOL_NAME)
}
fn spec(&self) -> ToolSpec {
create_update_goal_tool()
}
async fn handle(
&self,
invocation: ToolInvocation,
) -> Result<Box<dyn crate::tools::context::ToolOutput>, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
..
} = invocation;
let arguments = match payload {
ToolPayload::Function { arguments } => arguments,
_ => {
return Err(FunctionCallError::RespondToModel(
"update_goal handler received unsupported payload".to_string(),
));
}
};
let args: UpdateGoalArgs = parse_arguments(&arguments)?;
if !matches!(
args.status,
ThreadGoalStatus::Complete | ThreadGoalStatus::Blocked
) {
return Err(FunctionCallError::RespondToModel(
"update_goal can only mark the existing goal complete or blocked; pause, resume, budget-limited, and usage-limited status changes are controlled by the user or system"
.to_string(),
));
}
session
.goal_runtime_apply(GoalRuntimeEvent::ToolCompletedGoal {
turn_context: turn.as_ref(),
})
.await
.map_err(|err| FunctionCallError::RespondToModel(format_goal_error(err)))?;
let goal = session
.set_thread_goal(
turn.as_ref(),
SetGoalRequest {
objective: None,
status: Some(args.status),
token_budget: None,
},
)
.await
.map_err(|err| FunctionCallError::RespondToModel(format_goal_error(err)))?;
let completion_budget_report = if args.status == ThreadGoalStatus::Complete {
CompletionBudgetReport::Include
} else {
CompletionBudgetReport::Omit
};
goal_response(Some(goal), completion_budget_report).map(boxed_tool_output)
}
}
impl CoreToolRuntime for UpdateGoalHandler {}
@@ -1,120 +0,0 @@
//! Responses API tool definitions for persisted thread goals.
//!
//! These specs expose goal read/update primitives to the model while keeping
//! usage accounting system-managed.
use codex_tools::JsonSchema;
use codex_tools::ResponsesApiTool;
use codex_tools::ToolSpec;
use serde_json::json;
use std::collections::BTreeMap;
pub const GET_GOAL_TOOL_NAME: &str = "get_goal";
pub const CREATE_GOAL_TOOL_NAME: &str = "create_goal";
pub const UPDATE_GOAL_TOOL_NAME: &str = "update_goal";
pub fn create_get_goal_tool() -> ToolSpec {
ToolSpec::Function(ResponsesApiTool {
name: GET_GOAL_TOOL_NAME.to_string(),
description: "Get the current goal for this thread, including status, budgets, token and elapsed-time usage, and remaining token budget."
.to_string(),
strict: false,
defer_loading: None,
parameters: JsonSchema::object(BTreeMap::new(), Some(Vec::new()), Some(false.into())),
output_schema: None,
})
}
pub fn create_create_goal_tool() -> ToolSpec {
let properties = BTreeMap::from([
(
"objective".to_string(),
JsonSchema::string(Some(
"Required. The concrete objective to start pursuing. This starts a new active goal only when no goal is currently defined; if a goal already exists, this tool fails."
.to_string(),
)),
),
(
"token_budget".to_string(),
JsonSchema::integer(Some(
"Positive token budget for the new goal. Omit unless explicitly requested."
.to_string(),
)),
),
]);
ToolSpec::Function(ResponsesApiTool {
name: CREATE_GOAL_TOOL_NAME.to_string(),
description: format!(
r#"Create a goal only when explicitly requested by the user or system/developer instructions; do not infer goals from ordinary tasks.
Set token_budget only when an explicit token budget is requested. Fails if a goal exists; use {UPDATE_GOAL_TOOL_NAME} only for status."#
),
strict: false,
defer_loading: None,
parameters: JsonSchema::object(
properties,
/*required*/ Some(vec!["objective".to_string()]),
Some(false.into()),
),
output_schema: None,
})
}
pub fn create_update_goal_tool() -> ToolSpec {
let properties = BTreeMap::from([(
"status".to_string(),
JsonSchema::string_enum(
vec![json!("complete"), json!("blocked")],
Some(
"Required. Set to `complete` only when the objective is achieved and no required work remains. Set to `blocked` only after the same blocking condition has recurred for at least three consecutive goal turns and the agent is at an impasse. After a previously blocked goal is resumed, the resumed run starts a fresh blocked audit."
.to_string(),
),
),
)]);
ToolSpec::Function(ResponsesApiTool {
name: UPDATE_GOAL_TOOL_NAME.to_string(),
description: r#"Update the existing goal.
Use this tool only to mark the goal achieved or genuinely blocked.
Set status to `complete` only when the objective has actually been achieved and no required work remains.
Set status to `blocked` only when the same blocking condition has repeated for at least three consecutive goal turns, counting the original/user-triggered turn and any automatic continuations, and the agent cannot make meaningful progress without user input or an external-state change.
If the user resumes a goal that was previously marked `blocked`, treat the resumed run as a fresh blocked audit. If the same blocking condition then repeats for at least three consecutive resumed goal turns, set status to `blocked` again.
Once the blocked threshold is satisfied, do not keep reporting that you are still blocked while leaving the goal active; set status to `blocked`.
Do not use `blocked` merely because the work is hard, slow, uncertain, incomplete, or would benefit from clarification.
Do not mark a goal complete merely because its budget is nearly exhausted or because you are stopping work.
You cannot use this tool to pause, resume, budget-limit, or usage-limit a goal; those status changes are controlled by the user or system.
When marking a budgeted goal achieved with status `complete`, report the final token usage from the tool result to the user."#
.to_string(),
strict: false,
defer_loading: None,
parameters: JsonSchema::object(
properties,
/*required*/ Some(vec!["status".to_string()]),
Some(false.into()),
),
output_schema: None,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn update_goal_tool_exposes_complete_and_blocked_statuses() {
let ToolSpec::Function(tool) = create_update_goal_tool() else {
panic!("update_goal should be a function tool");
};
let status = tool
.parameters
.properties
.as_ref()
.and_then(|properties| properties.get("status"))
.expect("status property should exist");
assert_eq!(
status.enum_values,
Some(vec![json!("complete"), json!("blocked")])
);
}
}
-5
View File
@@ -4,8 +4,6 @@ pub(crate) mod apply_patch;
pub(crate) mod apply_patch_spec;
mod dynamic;
pub(crate) mod extension_tools;
mod goal;
pub(crate) mod goal_spec;
mod list_available_plugins_to_install;
pub(crate) mod list_available_plugins_to_install_spec;
mod mcp;
@@ -53,9 +51,6 @@ pub use apply_patch::ApplyPatchHandler;
use codex_protocol::models::AdditionalPermissionProfile;
use codex_protocol::protocol::AskForApproval;
pub use dynamic::DynamicToolHandler;
pub use goal::CreateGoalHandler;
pub use goal::GetGoalHandler;
pub use goal::UpdateGoalHandler;
pub use list_available_plugins_to_install::ListAvailablePluginsToInstallHandler;
pub use mcp::McpHandler;
pub use mcp_resource::ListMcpResourceTemplatesHandler;
+1 -15
View File
@@ -5,7 +5,6 @@ use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::function_tool::FunctionCallError;
use crate::goals::GoalRuntimeEvent;
use crate::hook_runtime::PreToolUseHookResult;
use crate::hook_runtime::record_additional_contexts;
use crate::hook_runtime::run_post_tool_use_hooks;
@@ -34,7 +33,6 @@ use codex_tools::ToolSearchInfo;
use codex_tools::ToolSpec;
use futures::future::BoxFuture;
use serde_json::Value;
use tracing::warn;
pub(crate) type ToolTelemetryTags = Vec<(&'static str, String)>;
@@ -649,25 +647,13 @@ impl ToolRegistry {
handler_executed: true,
},
};
let finished = notify_tool_finish_if_unclaimed(
notify_tool_finish_if_unclaimed(
&invocation,
terminal_outcome_reached.as_deref(),
lifecycle_outcome,
)
.await;
if finished
&& let Err(err) = invocation
.session
.goal_runtime_apply(GoalRuntimeEvent::ToolCompleted {
turn_context: invocation.turn.as_ref(),
tool_name: tool_name.name.as_str(),
})
.await
{
warn!("failed to account thread goal progress after tool call: {err}");
}
match result {
Ok(_) => {
let mut guard = response_cell.lock().await;
+1 -17
View File
@@ -6,11 +6,9 @@ use crate::tools::context::ToolInvocation;
use crate::tools::handlers::ApplyPatchHandler;
use crate::tools::handlers::CodeModeExecuteHandler;
use crate::tools::handlers::CodeModeWaitHandler;
use crate::tools::handlers::CreateGoalHandler;
use crate::tools::handlers::DynamicToolHandler;
use crate::tools::handlers::ExecCommandHandler;
use crate::tools::handlers::ExecCommandHandlerOptions;
use crate::tools::handlers::GetGoalHandler;
use crate::tools::handlers::ListAvailablePluginsToInstallHandler;
use crate::tools::handlers::ListMcpResourceTemplatesHandler;
use crate::tools::handlers::ListMcpResourcesHandler;
@@ -24,7 +22,6 @@ use crate::tools::handlers::ShellCommandHandler;
use crate::tools::handlers::ShellCommandHandlerOptions;
use crate::tools::handlers::TestSyncHandler;
use crate::tools::handlers::ToolSearchHandler;
use crate::tools::handlers::UpdateGoalHandler;
use crate::tools::handlers::ViewImageHandler;
use crate::tools::handlers::WriteStdinHandler;
use crate::tools::handlers::agent_jobs::ReportAgentJobResultHandler;
@@ -305,14 +302,6 @@ fn collab_tools_enabled(turn_context: &TurnContext) -> bool {
}
}
fn goal_tools_enabled(turn_context: &TurnContext) -> bool {
turn_context.goal_tools_enabled()
&& !matches!(
turn_context.session_source,
SessionSource::SubAgent(SubAgentSource::Review)
)
}
fn agent_jobs_tools_enabled(turn_context: &TurnContext) -> bool {
turn_context.features.get().enabled(Feature::SpawnCsv) && collab_tools_enabled(turn_context)
}
@@ -558,8 +547,8 @@ fn add_tool_sources(context: &CoreToolPlanContext<'_>, planned_tools: &mut Plann
add_core_utility_tools(context, planned_tools);
add_collaboration_tools(context, planned_tools);
add_mcp_runtime_tools(context, planned_tools);
add_dynamic_tools(context, planned_tools);
add_extension_tools(context, planned_tools);
add_dynamic_tools(context, planned_tools);
for spec in hosted_model_tool_specs(context) {
planned_tools.add_hosted_spec(spec);
}
@@ -639,11 +628,6 @@ fn add_core_utility_tools(context: &CoreToolPlanContext<'_>, planned_tools: &mut
let environment_mode = turn_context.tool_environment_mode();
planned_tools.add(PlanHandler);
if goal_tools_enabled(turn_context) {
planned_tools.add(GetGoalHandler);
planned_tools.add(CreateGoalHandler);
planned_tools.add(UpdateGoalHandler);
}
if turn_context.config.experimental_request_user_input_enabled {
planned_tools.add(RequestUserInputHandler {
+1 -30
View File
@@ -619,36 +619,7 @@ async fn environment_count_controls_environment_backed_tools() {
}
#[tokio::test]
async fn host_context_gates_goal_and_agent_job_tools() {
let feature_disabled = probe(|turn| {
set_feature(turn, Feature::Goals, /*enabled*/ false);
turn.goal_tools_supported = true;
})
.await;
feature_disabled.assert_visible_lacks(&["get_goal", "create_goal", "update_goal"]);
let host_disabled = probe(|turn| {
set_feature(turn, Feature::Goals, /*enabled*/ true);
turn.goal_tools_supported = false;
})
.await;
host_disabled.assert_visible_lacks(&["get_goal", "create_goal", "update_goal"]);
let enabled = probe(|turn| {
set_feature(turn, Feature::Goals, /*enabled*/ true);
turn.goal_tools_supported = true;
})
.await;
enabled.assert_visible_contains(&["get_goal", "create_goal", "update_goal"]);
let review_thread = probe(|turn| {
set_feature(turn, Feature::Goals, /*enabled*/ true);
turn.goal_tools_supported = true;
turn.session_source = SessionSource::SubAgent(SubAgentSource::Review);
})
.await;
review_thread.assert_visible_lacks(&["get_goal", "create_goal", "update_goal"]);
async fn host_context_gates_agent_job_tools() {
let normal_agent_job = probe(|turn| {
set_feature(turn, Feature::SpawnCsv, /*enabled*/ true);
})
@@ -188,9 +188,6 @@ async fn prompt_tools_are_consistent_across_requests() -> anyhow::Result<()> {
};
expected_tools_names.extend([
"update_plan",
"get_goal",
"create_goal",
"update_goal",
"request_user_input",
"apply_patch",
"view_image",