[codex-analytics] emit goal lifecycle analytics (#27078)

## Why
- Currently, there is no analytics event for `/goal` behavior
- Existing events cannot identify goal execution or its resulting
outcome
- The original update in
[#26182](https://github.com/openai/codex/pull/26182) was implemented
before `/goal` moved into `codex-goal-extension`.

## What Changed
- Adds `codex_goal_event` serialization and enrichment to
`codex-analytics`
- Emits goal events from the canonical `codex-goal-extension` mutation
and accounting paths:
  - `created` when a new logical goal is persisted
  - `usage_accounted` when cumulative goal usage is persisted
  - `status_changed` when the stored goal status changes
  - `cleared` when the goal is deleted
- Preserves causal `turn_id` for turn driven events and uses null
attribution for external or idle lifecycle events
- Changes goal deletion to return the deleted row so `cleared` retains
the stable goal ID

## Event Details

Includes standard analytics metadata along with goal specific fields:
- `goal_id`: Stable ID stored in the local SQLite goal row and shared
across the goal's events
- `event_kind`: Observed operation (see the 4 lifecycle events cited in
the above bullet)
- `goal_status`: Resulting or last stored status: `active`, `paused`,
`blocked`, `usage_limited`, etc.
  - `has_token_budget`: Indicates whether a token budget is configured
  - `turn_id`: Causal turn ID, or null when no causal turn exists
- `cumulative_tokens_accounted`: Cumulative tokens on `usage_accounted`
events; null otherwise
- `cumulative_time_accounted_seconds`: Cumulative active time on
`usage_accounted` events; null otherwise

## Validation
- `just test -p codex-analytics -p codex-state -p codex-goal-extension`
- `just test -p codex-core -E 'test(/goal/)'`
- `just test -p codex-app-server`
- `cargo build -p codex-analytics -p codex-core -p codex-state -p
codex-app-server`
This commit is contained in:
marksteinbrick-oai
2026-06-09 18:45:54 -07:00
committed by GitHub
Unverified
parent 4a3eac2144
commit 608b8b1cc6
23 changed files with 412 additions and 24 deletions
+2
View File
@@ -1892,6 +1892,7 @@ dependencies = [
"codex-model-provider",
"codex-plugin",
"codex-protocol",
"codex-state",
"codex-utils-absolute-path",
"os_info",
"pretty_assertions",
@@ -3049,6 +3050,7 @@ dependencies = [
"anyhow",
"async-trait",
"chrono",
"codex-analytics",
"codex-core",
"codex-extension-api",
"codex-otel",
+1
View File
@@ -19,6 +19,7 @@ codex-login = { workspace = true }
codex-model-provider = { workspace = true }
codex-plugin = { workspace = true }
codex-protocol = { workspace = true }
codex-state = { workspace = true }
os_info = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
+7
View File
@@ -9,6 +9,7 @@ use crate::facts::AnalyticsJsonRpcError;
use crate::facts::AppInvocation;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
use crate::facts::CodexGoalEvent;
use crate::facts::CustomAnalyticsFact;
use crate::facts::HookRunFact;
use crate::facts::HookRunInput;
@@ -246,6 +247,12 @@ impl AnalyticsEventsClient {
)));
}
pub fn track_goal_event(&self, event: CodexGoalEvent) {
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::Goal(Box::new(
event,
))));
}
pub fn track_turn_resolved_config(&self, fact: TurnResolvedConfigFact) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::TurnResolvedConfig(Box::new(fact)),
+54
View File
@@ -4,12 +4,14 @@ use crate::facts::AcceptedLineFingerprint;
use crate::facts::AppInvocation;
use crate::facts::CodexCompactionEvent;
use crate::facts::CodexErrKind;
use crate::facts::CodexGoalEvent;
use crate::facts::CompactionImplementation;
use crate::facts::CompactionPhase;
use crate::facts::CompactionReason;
use crate::facts::CompactionStatus;
use crate::facts::CompactionStrategy;
use crate::facts::CompactionTrigger;
use crate::facts::GoalEventKind;
use crate::facts::HookRunFact;
use crate::facts::InvocationType;
use crate::facts::PluginState;
@@ -63,6 +65,7 @@ pub(crate) enum TrackEventRequest {
AppUsed(CodexAppUsedEventRequest),
HookRun(CodexHookRunEventRequest),
Compaction(Box<CodexCompactionEventRequest>),
Goal(Box<CodexGoalEventRequest>),
TurnEvent(Box<CodexTurnEventRequest>),
TurnSteer(CodexTurnSteerEventRequest),
CommandExecution(CodexCommandExecutionEventRequest),
@@ -771,6 +774,30 @@ pub(crate) struct CodexCompactionEventRequest {
pub(crate) event_params: CodexCompactionEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexGoalEventParams {
pub(crate) thread_id: String,
pub(crate) session_id: String,
pub(crate) turn_id: Option<String>,
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) thread_source: Option<ThreadSource>,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) goal_id: String,
pub(crate) event_kind: GoalEventKind,
pub(crate) goal_status: codex_state::ThreadGoalStatus,
pub(crate) has_token_budget: bool,
pub(crate) cumulative_tokens_accounted: Option<i64>,
pub(crate) cumulative_time_accounted_seconds: Option<i64>,
}
#[derive(Serialize)]
pub(crate) struct CodexGoalEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexGoalEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexTurnEventParams {
pub(crate) thread_id: String,
@@ -979,6 +1006,33 @@ pub(crate) fn codex_compaction_event_params(
}
}
pub(crate) fn codex_goal_event_params(
input: CodexGoalEvent,
session_id: String,
app_server_client: CodexAppServerClientMetadata,
runtime: CodexRuntimeMetadata,
thread_source: Option<ThreadSource>,
subagent_source: Option<String>,
parent_thread_id: Option<String>,
) -> CodexGoalEventParams {
CodexGoalEventParams {
thread_id: input.thread_id,
session_id,
turn_id: input.turn_id,
app_server_client,
runtime,
thread_source,
subagent_source,
parent_thread_id,
goal_id: input.goal_id,
event_kind: input.event_kind,
goal_status: input.goal_status,
has_token_budget: input.has_token_budget,
cumulative_tokens_accounted: input.cumulative_tokens_accounted,
cumulative_time_accounted_seconds: input.cumulative_time_accounted_seconds,
}
}
pub(crate) fn codex_plugin_used_metadata(
tracking: &TrackEventsContext,
plugin: PluginTelemetryMetadata,
+22
View File
@@ -417,6 +417,27 @@ pub struct CodexCompactionEvent {
pub duration_ms: Option<u64>,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GoalEventKind {
Created,
UsageAccounted,
StatusChanged,
Cleared,
}
#[derive(Clone)]
pub struct CodexGoalEvent {
pub thread_id: String,
pub turn_id: Option<String>,
pub goal_id: String,
pub event_kind: GoalEventKind,
pub goal_status: codex_state::ThreadGoalStatus,
pub has_token_budget: bool,
pub cumulative_tokens_accounted: Option<i64>,
pub cumulative_time_accounted_seconds: Option<i64>,
}
#[allow(dead_code)]
pub(crate) enum AnalyticsFact {
Initialize {
@@ -468,6 +489,7 @@ pub(crate) enum AnalyticsFact {
pub(crate) enum CustomAnalyticsFact {
SubAgentThreadStarted(SubAgentThreadStartedInput),
Compaction(Box<CodexCompactionEvent>),
Goal(Box<CodexGoalEvent>),
GuardianReview(Box<GuardianReviewEventParams>),
TurnResolvedConfig(Box<TurnResolvedConfigFact>),
TurnTokenUsage(Box<TurnTokenUsageFact>),
+2
View File
@@ -24,6 +24,7 @@ pub use facts::AcceptedLineFingerprint;
pub use facts::AnalyticsJsonRpcError;
pub use facts::AppInvocation;
pub use facts::CodexCompactionEvent;
pub use facts::CodexGoalEvent;
pub use facts::CodexTurnSteerEvent;
pub use facts::CompactionImplementation;
pub use facts::CompactionPhase;
@@ -31,6 +32,7 @@ pub use facts::CompactionReason;
pub use facts::CompactionStatus;
pub use facts::CompactionStrategy;
pub use facts::CompactionTrigger;
pub use facts::GoalEventKind;
pub use facts::HookRunFact;
pub use facts::InputError;
pub use facts::InvocationType;
+36
View File
@@ -15,6 +15,7 @@ use crate::events::CodexDynamicToolCallEventParams;
use crate::events::CodexDynamicToolCallEventRequest;
use crate::events::CodexFileChangeEventParams;
use crate::events::CodexFileChangeEventRequest;
use crate::events::CodexGoalEventRequest;
use crate::events::CodexHookRunEventRequest;
use crate::events::CodexImageGenerationEventParams;
use crate::events::CodexImageGenerationEventRequest;
@@ -51,6 +52,7 @@ use crate::events::TrackEventRequest;
use crate::events::WebSearchActionKind;
use crate::events::codex_app_metadata;
use crate::events::codex_compaction_event_params;
use crate::events::codex_goal_event_params;
use crate::events::codex_hook_run_metadata;
use crate::events::codex_plugin_metadata;
use crate::events::codex_plugin_used_metadata;
@@ -62,6 +64,7 @@ use crate::facts::AnalyticsJsonRpcError;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
use crate::facts::CodexCompactionEvent;
use crate::facts::CodexGoalEvent;
use crate::facts::CustomAnalyticsFact;
use crate::facts::HookRunInput;
use crate::facts::PluginState;
@@ -192,6 +195,16 @@ impl<'a> AnalyticsDropSite<'a> {
}
}
fn goal(input: &'a CodexGoalEvent) -> Self {
Self {
event_name: "goal",
thread_id: &input.thread_id,
turn_id: input.turn_id.as_deref(),
review_id: None,
item_id: None,
}
}
fn tool_item(
notification: &'a codex_app_server_protocol::ItemCompletedNotification,
item_id: &'a str,
@@ -461,6 +474,9 @@ impl AnalyticsReducer {
CustomAnalyticsFact::Compaction(input) => {
self.ingest_compaction(*input, out);
}
CustomAnalyticsFact::Goal(input) => {
self.ingest_goal(*input, out);
}
CustomAnalyticsFact::GuardianReview(input) => {
self.ingest_guardian_review(*input, out);
}
@@ -1271,6 +1287,26 @@ impl AnalyticsReducer {
)));
}
fn ingest_goal(&mut self, input: CodexGoalEvent, out: &mut Vec<TrackEventRequest>) {
let Some((connection_state, thread_metadata)) =
self.thread_context_or_warn(AnalyticsDropSite::goal(&input))
else {
return;
};
out.push(TrackEventRequest::Goal(Box::new(CodexGoalEventRequest {
event_type: "codex_goal_event",
event_params: codex_goal_event_params(
input,
thread_metadata.session_id.clone(),
connection_state.app_server_client.clone(),
connection_state.runtime.clone(),
thread_metadata.thread_source,
thread_metadata.subagent_source.clone(),
thread_metadata.parent_thread_id.clone(),
),
})));
}
fn ingest_guardian_review_completed(
&mut self,
notification: codex_app_server_protocol::ItemGuardianApprovalReviewCompletedNotification,
+3 -1
View File
@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::sync::Weak;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadGoal;
use codex_app_server_protocol::ThreadGoalUpdatedNotification;
@@ -30,6 +31,7 @@ pub(crate) fn thread_extensions<S>(
event_sink: Arc<dyn ExtensionEventSink>,
auth_manager: Arc<AuthManager>,
state_db: Option<StateDbHandle>,
analytics_events_client: AnalyticsEventsClient,
thread_manager: Weak<ThreadManager>,
goal_service: Arc<GoalService>,
executor_skill_provider: Arc<dyn codex_skills_extension::SkillProvider>,
@@ -42,6 +44,7 @@ where
codex_goal_extension::install_with_backend(
&mut builder,
state_db,
analytics_events_client,
codex_otel::global(),
thread_manager,
goal_service,
@@ -140,7 +143,6 @@ pub(crate) fn guardian_agent_spawner(
mod tests {
use std::time::Duration;
use codex_analytics::AnalyticsEventsClient;
use codex_protocol::protocol::ThreadGoal as CoreThreadGoal;
use codex_protocol::protocol::ThreadGoalStatus;
use codex_protocol::protocol::ThreadGoalUpdatedEvent;
+1
View File
@@ -196,6 +196,7 @@ mod tests {
Arc::new(NoopExtensionEventSink),
auth_manager.clone(),
Some(state_db.clone()),
codex_analytics::AnalyticsEventsClient::disabled(),
thread_manager.clone(),
Arc::new(codex_goal_extension::GoalService::new()),
Arc::clone(&executor_skill_provider),
@@ -327,6 +327,7 @@ impl MessageProcessor {
app_server_extension_event_sink(outgoing.clone(), thread_state_manager.clone()),
auth_manager.clone(),
state_db.clone(),
analytics_events_client.clone(),
thread_manager.clone(),
Arc::clone(&goal_service),
Arc::clone(&executor_skill_provider),
@@ -124,6 +124,31 @@ pub(crate) async fn wait_for_analytics_event(
server: &MockServer,
read_timeout: Duration,
event_type: &str,
) -> Result<Value> {
wait_for_matching_analytics_event(server, read_timeout, |event| {
event["event_type"] == event_type
})
.await
}
pub(crate) async fn wait_for_goal_event(
server: &MockServer,
read_timeout: Duration,
event_kind: &str,
goal_status: &str,
) -> Result<Value> {
wait_for_matching_analytics_event(server, read_timeout, |event| {
event["event_type"] == "codex_goal_event"
&& event["event_params"]["event_kind"] == event_kind
&& event["event_params"]["goal_status"] == goal_status
})
.await
}
async fn wait_for_matching_analytics_event(
server: &MockServer,
read_timeout: Duration,
matches: impl Fn(&Value) -> bool,
) -> Result<Value> {
timeout(read_timeout, async {
loop {
@@ -142,10 +167,7 @@ pub(crate) async fn wait_for_analytics_event(
let Some(events) = payload["events"].as_array() else {
continue;
};
if let Some(event) = events
.iter()
.find(|event| event["event_type"] == event_type)
{
if let Some(event) = events.iter().find(|event| matches(event)) {
return Ok::<Value, anyhow::Error>(event.clone());
}
}
@@ -103,6 +103,7 @@ use super::analytics::assert_basic_thread_initialized_event;
use super::analytics::mount_analytics_capture;
use super::analytics::thread_initialized_event;
use super::analytics::wait_for_analytics_payload;
use super::analytics::wait_for_goal_event;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
@@ -1316,19 +1317,30 @@ async fn thread_goal_set_edits_objective_without_resetting_usage() -> Result<()>
}
#[tokio::test]
async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
async fn thread_goal_lifecycle_emits_analytics_and_clear_deletes_goal() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(vec![
responses::sse(vec![
responses::ev_response_created("materialize-thread"),
responses::ev_completed("materialize-thread"),
]),
responses::sse(vec![
responses::ev_response_created("goal-continuation"),
responses::ev_completed_with_tokens("goal-continuation", /*total_tokens*/ 200),
]),
])
.await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
create_config_toml_with_chatgpt_base_url(codex_home.path(), &server.uri(), &server.uri())?;
let config_path = codex_home.path().join("config.toml");
let config = std::fs::read_to_string(&config_path)?;
std::fs::write(
&config_path,
config.replace("personality = true\n", "personality = true\ngoals = true\n"),
)?;
mount_analytics_capture(&server, codex_home.path()).await?;
let mut mcp = TestAppServer::new_without_managed_config(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
timeout(DEFAULT_READ_TIMEOUT.saturating_mul(2), mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
@@ -1370,7 +1382,8 @@ async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> {
"thread/goal/set",
Some(json!({
"threadId": thread.id,
"objective": "keep polishing",
"objective": "do not serialize this objective",
"tokenBudget": 100,
})),
)
.await?;
@@ -1386,6 +1399,55 @@ async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> {
)
.await??;
let created = wait_for_goal_event(&server, DEFAULT_READ_TIMEOUT, "created", "active").await?;
let persisted_goal_id = created["event_params"]["goal_id"]
.as_str()
.expect("created goal id");
assert_eq!(created["event_params"]["thread_id"], thread.id);
assert_eq!(created["event_params"]["turn_id"], serde_json::Value::Null);
assert_eq!(created["event_params"]["has_token_budget"], true);
assert!(created["event_params"]["session_id"].is_string());
assert!(created["event_params"]["app_server_client"].is_object());
assert!(created["event_params"]["runtime"].is_object());
assert!(created["event_params"].get("objective").is_none());
assert!(created["event_params"].get("token_budget").is_none());
let usage = wait_for_goal_event(
&server,
DEFAULT_READ_TIMEOUT,
"usage_accounted",
"budget_limited",
)
.await?;
let causal_turn_id = usage["event_params"]["turn_id"]
.as_str()
.expect("accounted usage turn id");
assert_eq!(usage["event_params"]["goal_id"], persisted_goal_id);
assert_eq!(usage["event_params"]["cumulative_tokens_accounted"], 200);
assert!(
usage["event_params"]["cumulative_time_accounted_seconds"]
.as_i64()
.is_some()
);
let status = wait_for_goal_event(
&server,
DEFAULT_READ_TIMEOUT,
"status_changed",
"budget_limited",
)
.await?;
assert_eq!(status["event_params"]["goal_id"], persisted_goal_id);
assert_eq!(status["event_params"]["turn_id"], causal_turn_id);
assert_eq!(
status["event_params"]["cumulative_tokens_accounted"],
serde_json::Value::Null
);
assert_eq!(
status["event_params"]["cumulative_time_accounted_seconds"],
serde_json::Value::Null
);
let clear_id = mcp
.send_raw_request(
"thread/goal/clear",
@@ -1408,6 +1470,11 @@ async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> {
)
.await??;
let cleared =
wait_for_goal_event(&server, DEFAULT_READ_TIMEOUT, "cleared", "budget_limited").await?;
assert_eq!(cleared["event_params"]["goal_id"], persisted_goal_id);
assert_eq!(cleared["event_params"]["turn_id"], serde_json::Value::Null);
let get_id = mcp
.send_raw_request(
"thread/goal/get",
+1
View File
@@ -15,6 +15,7 @@ workspace = true
[dependencies]
async-trait = { workspace = true }
codex-analytics = { workspace = true }
codex-core = { workspace = true }
codex-extension-api = { workspace = true }
codex-otel = { workspace = true }
+77
View File
@@ -0,0 +1,77 @@
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::CodexGoalEvent;
use codex_analytics::GoalEventKind;
#[derive(Clone)]
pub(crate) struct GoalAnalytics {
client: AnalyticsEventsClient,
}
pub(crate) enum GoalEventAttribution<'a> {
Turn(&'a str),
NoTurn,
}
impl GoalAnalytics {
pub(crate) fn new(client: AnalyticsEventsClient) -> Self {
Self { client }
}
pub(crate) fn created(
&self,
goal: &codex_state::ThreadGoal,
attribution: GoalEventAttribution<'_>,
) {
self.track(goal, attribution, GoalEventKind::Created);
}
pub(crate) fn usage_accounted(
&self,
goal: &codex_state::ThreadGoal,
attribution: GoalEventAttribution<'_>,
) {
self.track(goal, attribution, GoalEventKind::UsageAccounted);
}
pub(crate) fn status_changed(
&self,
goal: &codex_state::ThreadGoal,
previous_status: Option<codex_state::ThreadGoalStatus>,
attribution: GoalEventAttribution<'_>,
) {
if previous_status.is_some_and(|status| status != goal.status) {
self.track(goal, attribution, GoalEventKind::StatusChanged);
}
}
pub(crate) fn cleared(&self, goal: &codex_state::ThreadGoal) {
self.track(goal, GoalEventAttribution::NoTurn, GoalEventKind::Cleared);
}
fn track(
&self,
goal: &codex_state::ThreadGoal,
attribution: GoalEventAttribution<'_>,
event_kind: GoalEventKind,
) {
let (cumulative_tokens_accounted, cumulative_time_accounted_seconds) = match event_kind {
GoalEventKind::UsageAccounted => (Some(goal.tokens_used), Some(goal.time_used_seconds)),
GoalEventKind::Created | GoalEventKind::StatusChanged | GoalEventKind::Cleared => {
(None, None)
}
};
self.client.track_goal_event(CodexGoalEvent {
thread_id: goal.thread_id.to_string(),
turn_id: match attribution {
GoalEventAttribution::Turn(turn_id) => Some(turn_id.to_string()),
GoalEventAttribution::NoTurn => None,
},
goal_id: goal.goal_id.clone(),
event_kind,
goal_status: goal.status,
has_token_budget: goal.token_budget.is_some(),
cumulative_tokens_accounted,
cumulative_time_accounted_seconds,
});
}
}
+4 -4
View File
@@ -259,19 +259,19 @@ impl GoalService {
tracing::warn!("failed to prepare external goal mutation: {err}");
}
let cleared = state_db
let cleared_goal = state_db
.thread_goals()
.delete_thread_goal(thread_id)
.await
.map_err(|err| {
GoalServiceError::Internal(format!("failed to clear thread goal: {err}"))
})?;
let cleared = cleared_goal.is_some();
drop(goal_state_permit);
drop(runtime);
if cleared
&& let Some(runtime) = self.runtime_for_thread(thread_id)
&& let Err(err) = runtime.apply_external_goal_clear().await
if let (Some(runtime), Some(goal)) = (self.runtime_for_thread(thread_id), cleared_goal)
&& let Err(err) = runtime.apply_external_goal_clear(goal).await
{
tracing::warn!("failed to apply external goal clear runtime effects: {err}");
}
+11
View File
@@ -2,6 +2,7 @@ use std::sync::Arc;
use std::sync::Weak;
use async_trait::async_trait;
use codex_analytics::AnalyticsEventsClient;
use codex_core::ThreadManager;
use codex_extension_api::ConfigContributor;
use codex_extension_api::ExtensionData;
@@ -33,6 +34,7 @@ use codex_protocol::protocol::TokenUsageInfo;
use crate::accounting::BudgetLimitedGoalDisposition;
use crate::accounting::GoalAccountingState;
use crate::analytics::GoalAnalytics;
use crate::api::GoalService;
use crate::events::GoalEventEmitter;
use crate::metrics::GoalMetrics;
@@ -57,6 +59,7 @@ impl GoalExtensionConfig {
#[derive(Clone)]
pub struct GoalExtension<C> {
state_dbs: Arc<codex_state::StateRuntime>,
analytics: GoalAnalytics,
event_emitter: GoalEventEmitter,
metrics: GoalMetrics,
thread_manager: Weak<ThreadManager>,
@@ -73,6 +76,7 @@ impl<C> std::fmt::Debug for GoalExtension<C> {
impl<C> GoalExtension<C> {
pub(crate) fn new_with_host_capabilities(
state_dbs: Arc<codex_state::StateRuntime>,
analytics_events_client: AnalyticsEventsClient,
event_sink: Arc<dyn ExtensionEventSink>,
metrics_client: Option<MetricsClient>,
thread_manager: Weak<ThreadManager>,
@@ -81,6 +85,7 @@ impl<C> GoalExtension<C> {
) -> Self {
Self {
state_dbs,
analytics: GoalAnalytics::new(analytics_events_client),
event_emitter: GoalEventEmitter::new(event_sink),
metrics: GoalMetrics::new(metrics_client),
thread_manager,
@@ -120,6 +125,7 @@ where
self.thread_manager.clone(),
accounting_state,
GoalRuntimeConfig {
analytics: self.analytics.clone(),
enabled,
tools_available_for_thread,
},
@@ -403,6 +409,7 @@ where
runtime.thread_id(),
Arc::clone(&self.state_dbs),
runtime.accounting_state(),
self.analytics.clone(),
self.event_emitter.clone(),
self.metrics.clone(),
)),
@@ -410,6 +417,7 @@ where
runtime.thread_id(),
Arc::clone(&self.state_dbs),
runtime.accounting_state(),
self.analytics.clone(),
self.event_emitter.clone(),
self.metrics.clone(),
)),
@@ -417,6 +425,7 @@ where
runtime.thread_id(),
Arc::clone(&self.state_dbs),
runtime.accounting_state(),
self.analytics.clone(),
self.event_emitter.clone(),
self.metrics.clone(),
)),
@@ -427,6 +436,7 @@ where
pub fn install_with_backend<C>(
registry: &mut ExtensionRegistryBuilder<C>,
state_dbs: Arc<codex_state::StateRuntime>,
analytics_events_client: AnalyticsEventsClient,
metrics_client: Option<MetricsClient>,
thread_manager: Weak<ThreadManager>,
goal_service: Arc<GoalService>,
@@ -436,6 +446,7 @@ pub fn install_with_backend<C>(
{
let extension = Arc::new(GoalExtension::new_with_host_capabilities(
state_dbs,
analytics_events_client,
registry.event_sink(),
metrics_client,
thread_manager,
+1
View File
@@ -1,6 +1,7 @@
//! Extension crate for the `/goal` feature.
mod accounting;
mod analytics;
mod api;
mod events;
mod extension;
+37 -1
View File
@@ -10,6 +10,8 @@ use codex_protocol::protocol::ThreadGoal;
use crate::accounting::BudgetLimitedGoalDisposition;
use crate::accounting::GoalAccountingState;
use crate::analytics::GoalAnalytics;
use crate::analytics::GoalEventAttribution;
use crate::events::GoalEventEmitter;
use crate::metrics::GoalMetrics;
use crate::steering::continuation_steering_item;
@@ -24,6 +26,7 @@ pub struct GoalRuntimeHandle {
}
pub(crate) struct GoalRuntimeConfig {
pub(crate) analytics: GoalAnalytics,
pub(crate) enabled: bool,
pub(crate) tools_available_for_thread: bool,
}
@@ -36,6 +39,7 @@ pub(crate) enum ActiveGoalStopReason {
struct GoalRuntimeInner {
thread_id: ThreadId,
state_dbs: Arc<codex_state::StateRuntime>,
analytics: GoalAnalytics,
event_emitter: GoalEventEmitter,
metrics: GoalMetrics,
thread_manager: Weak<ThreadManager>,
@@ -87,6 +91,7 @@ impl GoalRuntimeHandle {
inner: Arc::new(GoalRuntimeInner {
thread_id,
state_dbs,
analytics: config.analytics,
event_emitter,
metrics,
thread_manager,
@@ -165,6 +170,9 @@ impl GoalRuntimeHandle {
.is_some_and(|previous_goal| previous_goal.goal_id != goal.goal_id);
if previous_goal.is_none() || replaced_existing_goal {
self.inner.metrics.record_created();
self.inner
.analytics
.created(&goal, GoalEventAttribution::NoTurn);
}
let previous_status = previous_goal
.as_ref()
@@ -175,6 +183,9 @@ impl GoalRuntimeHandle {
self.inner
.metrics
.record_terminal_if_status_changed(previous_status, &goal);
self.inner
.analytics
.status_changed(&goal, previous_status, GoalEventAttribution::NoTurn);
let objective_changed = previous_goal.as_ref().is_some_and(|previous_goal| {
!replaced_existing_goal && previous_goal.objective != goal.objective
});
@@ -211,11 +222,15 @@ impl GoalRuntimeHandle {
Ok(())
}
pub async fn apply_external_goal_clear(&self) -> Result<(), String> {
pub async fn apply_external_goal_clear(
&self,
goal: codex_state::ThreadGoal,
) -> Result<(), String> {
if !self.is_enabled() {
return Ok(());
}
self.inner.analytics.cleared(&goal);
self.inner.accounting_state.clear_active_goal();
Ok(())
}
@@ -302,6 +317,11 @@ impl GoalRuntimeHandle {
self.inner
.metrics
.record_terminal_if_status_changed(previous_status, &goal);
self.inner.analytics.status_changed(
&goal,
previous_status,
GoalEventAttribution::Turn(turn_id),
);
self.inner.accounting_state.clear_active_goal();
let goal = protocol_goal_from_state(goal);
self.inner.event_emitter.thread_goal_updated(
@@ -445,6 +465,14 @@ impl GoalRuntimeHandle {
self.inner
.metrics
.record_terminal_if_status_changed(previous_status, &goal);
self.inner
.analytics
.usage_accounted(&goal, GoalEventAttribution::Turn(turn_id));
self.inner.analytics.status_changed(
&goal,
previous_status,
GoalEventAttribution::Turn(turn_id),
);
accounting.mark_progress_accounted_for_status(
turn_id,
&snapshot,
@@ -499,6 +527,14 @@ impl GoalRuntimeHandle {
self.inner
.metrics
.record_terminal_if_status_changed(previous_status, &goal);
self.inner
.analytics
.usage_accounted(&goal, GoalEventAttribution::NoTurn);
self.inner.analytics.status_changed(
&goal,
previous_status,
GoalEventAttribution::NoTurn,
);
accounting.mark_idle_progress_accounted_for_status(
&snapshot,
goal.status,
+25
View File
@@ -17,6 +17,8 @@ use serde::Serialize;
use crate::accounting::BudgetLimitedGoalDisposition;
use crate::accounting::GoalAccountingState;
use crate::analytics::GoalAnalytics;
use crate::analytics::GoalEventAttribution;
use crate::events::GoalEventEmitter;
use crate::metrics::GoalMetrics;
use crate::spec::CREATE_GOAL_TOOL_NAME;
@@ -32,6 +34,7 @@ pub(crate) struct GoalToolExecutor {
thread_id: ThreadId,
state_db: Arc<codex_state::StateRuntime>,
accounting_state: Arc<GoalAccountingState>,
analytics: GoalAnalytics,
event_emitter: GoalEventEmitter,
metrics: GoalMetrics,
}
@@ -75,6 +78,7 @@ impl GoalToolExecutor {
thread_id: ThreadId,
state_db: Arc<codex_state::StateRuntime>,
accounting_state: Arc<GoalAccountingState>,
analytics: GoalAnalytics,
event_emitter: GoalEventEmitter,
metrics: GoalMetrics,
) -> Self {
@@ -83,6 +87,7 @@ impl GoalToolExecutor {
thread_id,
state_db,
accounting_state,
analytics,
event_emitter,
metrics,
}
@@ -92,6 +97,7 @@ impl GoalToolExecutor {
thread_id: ThreadId,
state_db: Arc<codex_state::StateRuntime>,
accounting_state: Arc<GoalAccountingState>,
analytics: GoalAnalytics,
event_emitter: GoalEventEmitter,
metrics: GoalMetrics,
) -> Self {
@@ -100,6 +106,7 @@ impl GoalToolExecutor {
thread_id,
state_db,
accounting_state,
analytics,
event_emitter,
metrics,
}
@@ -109,6 +116,7 @@ impl GoalToolExecutor {
thread_id: ThreadId,
state_db: Arc<codex_state::StateRuntime>,
accounting_state: Arc<GoalAccountingState>,
analytics: GoalAnalytics,
event_emitter: GoalEventEmitter,
metrics: GoalMetrics,
) -> Self {
@@ -117,6 +125,7 @@ impl GoalToolExecutor {
thread_id,
state_db,
accounting_state,
analytics,
event_emitter,
metrics,
}
@@ -200,6 +209,10 @@ impl GoalToolExecutor {
.accounting_state
.mark_current_turn_goal_active(goal.goal_id.clone());
self.metrics.record_created();
self.analytics.created(
&goal,
GoalEventAttribution::Turn(invocation.turn_id.as_str()),
);
let goal = protocol_goal_from_state(goal);
self.emit_goal_updated_from_tool_call(&invocation, turn_id, goal.clone());
goal_response(Some(goal), CompletionBudgetReport::Omit)
@@ -259,6 +272,11 @@ impl GoalToolExecutor {
})?;
self.metrics
.record_terminal_if_status_changed(previous_status, &goal);
self.analytics.status_changed(
&goal,
previous_status,
GoalEventAttribution::Turn(invocation.turn_id.as_str()),
);
let goal = protocol_goal_from_state(goal);
let turn_id = self.accounting_state.clear_current_turn_goal();
self.emit_goal_updated_from_tool_call(&invocation, turn_id, goal.clone());
@@ -324,6 +342,13 @@ impl GoalToolExecutor {
codex_state::GoalAccountingOutcome::Updated(goal) => {
self.metrics
.record_terminal_if_status_changed(previous_status, &goal);
self.analytics
.usage_accounted(&goal, GoalEventAttribution::Turn(turn_id.as_str()));
self.analytics.status_changed(
&goal,
previous_status,
GoalEventAttribution::Turn(turn_id.as_str()),
);
self.accounting_state.mark_progress_accounted_for_status(
turn_id.as_str(),
&snapshot,
@@ -4,6 +4,7 @@ use std::sync::PoisonError;
use std::sync::Weak;
use std::time::Duration;
use codex_analytics::AnalyticsEventsClient;
use codex_extension_api::ExtensionData;
use codex_extension_api::ExtensionEventSink;
use codex_extension_api::ExtensionRegistryBuilder;
@@ -1114,6 +1115,7 @@ async fn installed_tools_with_start(
install_with_backend(
&mut builder,
runtime,
AnalyticsEventsClient::disabled(),
/*metrics_client*/ None,
Weak::new(),
goal_service,
@@ -1164,6 +1166,7 @@ impl GoalExtensionHarness {
install_with_backend(
&mut builder,
runtime,
AnalyticsEventsClient::disabled(),
/*metrics_client*/ None,
Weak::new(),
Arc::clone(&goal_service),
+3 -1
View File
@@ -3,12 +3,14 @@ use anyhow::anyhow;
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::ThreadId;
use serde::Serialize;
use sqlx::Row;
use sqlx::sqlite::SqliteRow;
use super::epoch_millis_to_datetime;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ThreadGoalStatus {
Active,
Paused,
+22 -7
View File
@@ -377,18 +377,31 @@ WHERE thread_id = ?
self.get_thread_goal(thread_id).await
}
pub async fn delete_thread_goal(&self, thread_id: ThreadId) -> anyhow::Result<bool> {
let result = sqlx::query(
pub async fn delete_thread_goal(
&self,
thread_id: ThreadId,
) -> anyhow::Result<Option<crate::ThreadGoal>> {
let row = sqlx::query(
r#"
DELETE FROM thread_goals
WHERE thread_id = ?
RETURNING
thread_id,
goal_id,
objective,
status,
token_budget,
tokens_used,
time_used_seconds,
created_at_ms,
updated_at_ms
"#,
)
.bind(thread_id.to_string())
.execute(self.pool.as_ref())
.fetch_optional(self.pool.as_ref())
.await?;
Ok(result.rows_affected() > 0)
row.map(|row| thread_goal_from_row(&row)).transpose()
}
pub async fn account_thread_goal_usage(
@@ -622,7 +635,8 @@ mod tests {
assert_eq!(0, replaced.tokens_used);
assert_eq!(0, replaced.time_used_seconds);
assert!(
assert_eq!(
Some(replaced),
runtime
.thread_goals()
.delete_thread_goal(thread_id)
@@ -637,8 +651,9 @@ mod tests {
.await
.unwrap()
);
assert!(
!runtime
assert_eq!(
None,
runtime
.thread_goals()
.delete_thread_goal(thread_id)
.await
+1 -1
View File
@@ -891,7 +891,7 @@ ON CONFLICT(id) DO UPDATE SET
let rows_affected = result.rows_affected();
self.memories.delete_thread_memory(thread_id).await?;
if rows_affected > 0 {
self.thread_goals.delete_thread_goal(thread_id).await?;
let _ = self.thread_goals.delete_thread_goal(thread_id).await?;
}
Ok(rows_affected)
}