From 608b8b1cc6ce91064e1fd12e0810e1772b5e4710 Mon Sep 17 00:00:00 2001 From: marksteinbrick-oai Date: Tue, 9 Jun 2026 18:45:54 -0700 Subject: [PATCH] [codex-analytics] emit goal lifecycle analytics (#27078) ## Why - Currently, there is no analytics event for `/goal` behavior - Existing events cannot identify goal execution or its resulting outcome - The original update in [#26182](https://github.com/openai/codex/pull/26182) was implemented before `/goal` moved into `codex-goal-extension`. ## What Changed - Adds `codex_goal_event` serialization and enrichment to `codex-analytics` - Emits goal events from the canonical `codex-goal-extension` mutation and accounting paths: - `created` when a new logical goal is persisted - `usage_accounted` when cumulative goal usage is persisted - `status_changed` when the stored goal status changes - `cleared` when the goal is deleted - Preserves causal `turn_id` for turn driven events and uses null attribution for external or idle lifecycle events - Changes goal deletion to return the deleted row so `cleared` retains the stable goal ID ## Event Details Includes standard analytics metadata along with goal specific fields: - `goal_id`: Stable ID stored in the local SQLite goal row and shared across the goal's events - `event_kind`: Observed operation (see the 4 lifecycle events cited in the above bullet) - `goal_status`: Resulting or last stored status: `active`, `paused`, `blocked`, `usage_limited`, etc. - `has_token_budget`: Indicates whether a token budget is configured - `turn_id`: Causal turn ID, or null when no causal turn exists - `cumulative_tokens_accounted`: Cumulative tokens on `usage_accounted` events; null otherwise - `cumulative_time_accounted_seconds`: Cumulative active time on `usage_accounted` events; null otherwise ## Validation - `just test -p codex-analytics -p codex-state -p codex-goal-extension` - `just test -p codex-core -E 'test(/goal/)'` - `just test -p codex-app-server` - `cargo build -p codex-analytics -p codex-core -p codex-state -p codex-app-server` --- codex-rs/Cargo.lock | 2 + codex-rs/analytics/Cargo.toml | 1 + codex-rs/analytics/src/client.rs | 7 ++ codex-rs/analytics/src/events.rs | 54 +++++++++++++ codex-rs/analytics/src/facts.rs | 22 ++++++ codex-rs/analytics/src/lib.rs | 2 + codex-rs/analytics/src/reducer.rs | 36 +++++++++ codex-rs/app-server/src/extensions.rs | 4 +- codex-rs/app-server/src/mcp_refresh.rs | 1 + codex-rs/app-server/src/message_processor.rs | 1 + .../app-server/tests/suite/v2/analytics.rs | 30 +++++++- .../tests/suite/v2/thread_resume.rs | 77 +++++++++++++++++-- codex-rs/ext/goal/Cargo.toml | 1 + codex-rs/ext/goal/src/analytics.rs | 77 +++++++++++++++++++ codex-rs/ext/goal/src/api.rs | 8 +- codex-rs/ext/goal/src/extension.rs | 11 +++ codex-rs/ext/goal/src/lib.rs | 1 + codex-rs/ext/goal/src/runtime.rs | 38 ++++++++- codex-rs/ext/goal/src/tool.rs | 25 ++++++ .../ext/goal/tests/goal_extension_backend.rs | 3 + codex-rs/state/src/model/thread_goal.rs | 4 +- codex-rs/state/src/runtime/goals.rs | 29 +++++-- codex-rs/state/src/runtime/threads.rs | 2 +- 23 files changed, 412 insertions(+), 24 deletions(-) create mode 100644 codex-rs/ext/goal/src/analytics.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 511fca725..46bb21bf6 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1892,6 +1892,7 @@ dependencies = [ "codex-model-provider", "codex-plugin", "codex-protocol", + "codex-state", "codex-utils-absolute-path", "os_info", "pretty_assertions", @@ -3049,6 +3050,7 @@ dependencies = [ "anyhow", "async-trait", "chrono", + "codex-analytics", "codex-core", "codex-extension-api", "codex-otel", diff --git a/codex-rs/analytics/Cargo.toml b/codex-rs/analytics/Cargo.toml index 918e7edc7..854642746 100644 --- a/codex-rs/analytics/Cargo.toml +++ b/codex-rs/analytics/Cargo.toml @@ -19,6 +19,7 @@ codex-login = { workspace = true } codex-model-provider = { workspace = true } codex-plugin = { workspace = true } codex-protocol = { workspace = true } +codex-state = { workspace = true } os_info = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index b99a2ec86..193de9ef6 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -9,6 +9,7 @@ use crate::facts::AnalyticsJsonRpcError; use crate::facts::AppInvocation; use crate::facts::AppMentionedInput; use crate::facts::AppUsedInput; +use crate::facts::CodexGoalEvent; use crate::facts::CustomAnalyticsFact; use crate::facts::HookRunFact; use crate::facts::HookRunInput; @@ -246,6 +247,12 @@ impl AnalyticsEventsClient { ))); } + pub fn track_goal_event(&self, event: CodexGoalEvent) { + self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::Goal(Box::new( + event, + )))); + } + pub fn track_turn_resolved_config(&self, fact: TurnResolvedConfigFact) { self.record_fact(AnalyticsFact::Custom( CustomAnalyticsFact::TurnResolvedConfig(Box::new(fact)), diff --git a/codex-rs/analytics/src/events.rs b/codex-rs/analytics/src/events.rs index 0b6c9a012..8a25215ec 100644 --- a/codex-rs/analytics/src/events.rs +++ b/codex-rs/analytics/src/events.rs @@ -4,12 +4,14 @@ use crate::facts::AcceptedLineFingerprint; use crate::facts::AppInvocation; use crate::facts::CodexCompactionEvent; use crate::facts::CodexErrKind; +use crate::facts::CodexGoalEvent; use crate::facts::CompactionImplementation; use crate::facts::CompactionPhase; use crate::facts::CompactionReason; use crate::facts::CompactionStatus; use crate::facts::CompactionStrategy; use crate::facts::CompactionTrigger; +use crate::facts::GoalEventKind; use crate::facts::HookRunFact; use crate::facts::InvocationType; use crate::facts::PluginState; @@ -63,6 +65,7 @@ pub(crate) enum TrackEventRequest { AppUsed(CodexAppUsedEventRequest), HookRun(CodexHookRunEventRequest), Compaction(Box), + Goal(Box), TurnEvent(Box), TurnSteer(CodexTurnSteerEventRequest), CommandExecution(CodexCommandExecutionEventRequest), @@ -771,6 +774,30 @@ pub(crate) struct CodexCompactionEventRequest { pub(crate) event_params: CodexCompactionEventParams, } +#[derive(Serialize)] +pub(crate) struct CodexGoalEventParams { + pub(crate) thread_id: String, + pub(crate) session_id: String, + pub(crate) turn_id: Option, + pub(crate) app_server_client: CodexAppServerClientMetadata, + pub(crate) runtime: CodexRuntimeMetadata, + pub(crate) thread_source: Option, + pub(crate) subagent_source: Option, + pub(crate) parent_thread_id: Option, + pub(crate) goal_id: String, + pub(crate) event_kind: GoalEventKind, + pub(crate) goal_status: codex_state::ThreadGoalStatus, + pub(crate) has_token_budget: bool, + pub(crate) cumulative_tokens_accounted: Option, + pub(crate) cumulative_time_accounted_seconds: Option, +} + +#[derive(Serialize)] +pub(crate) struct CodexGoalEventRequest { + pub(crate) event_type: &'static str, + pub(crate) event_params: CodexGoalEventParams, +} + #[derive(Serialize)] pub(crate) struct CodexTurnEventParams { pub(crate) thread_id: String, @@ -979,6 +1006,33 @@ pub(crate) fn codex_compaction_event_params( } } +pub(crate) fn codex_goal_event_params( + input: CodexGoalEvent, + session_id: String, + app_server_client: CodexAppServerClientMetadata, + runtime: CodexRuntimeMetadata, + thread_source: Option, + subagent_source: Option, + parent_thread_id: Option, +) -> CodexGoalEventParams { + CodexGoalEventParams { + thread_id: input.thread_id, + session_id, + turn_id: input.turn_id, + app_server_client, + runtime, + thread_source, + subagent_source, + parent_thread_id, + goal_id: input.goal_id, + event_kind: input.event_kind, + goal_status: input.goal_status, + has_token_budget: input.has_token_budget, + cumulative_tokens_accounted: input.cumulative_tokens_accounted, + cumulative_time_accounted_seconds: input.cumulative_time_accounted_seconds, + } +} + pub(crate) fn codex_plugin_used_metadata( tracking: &TrackEventsContext, plugin: PluginTelemetryMetadata, diff --git a/codex-rs/analytics/src/facts.rs b/codex-rs/analytics/src/facts.rs index a035bc98b..a19fb05d6 100644 --- a/codex-rs/analytics/src/facts.rs +++ b/codex-rs/analytics/src/facts.rs @@ -417,6 +417,27 @@ pub struct CodexCompactionEvent { pub duration_ms: Option, } +#[derive(Clone, Copy, Debug, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum GoalEventKind { + Created, + UsageAccounted, + StatusChanged, + Cleared, +} + +#[derive(Clone)] +pub struct CodexGoalEvent { + pub thread_id: String, + pub turn_id: Option, + pub goal_id: String, + pub event_kind: GoalEventKind, + pub goal_status: codex_state::ThreadGoalStatus, + pub has_token_budget: bool, + pub cumulative_tokens_accounted: Option, + pub cumulative_time_accounted_seconds: Option, +} + #[allow(dead_code)] pub(crate) enum AnalyticsFact { Initialize { @@ -468,6 +489,7 @@ pub(crate) enum AnalyticsFact { pub(crate) enum CustomAnalyticsFact { SubAgentThreadStarted(SubAgentThreadStartedInput), Compaction(Box), + Goal(Box), GuardianReview(Box), TurnResolvedConfig(Box), TurnTokenUsage(Box), diff --git a/codex-rs/analytics/src/lib.rs b/codex-rs/analytics/src/lib.rs index e2e16dfac..35bd539cd 100644 --- a/codex-rs/analytics/src/lib.rs +++ b/codex-rs/analytics/src/lib.rs @@ -24,6 +24,7 @@ pub use facts::AcceptedLineFingerprint; pub use facts::AnalyticsJsonRpcError; pub use facts::AppInvocation; pub use facts::CodexCompactionEvent; +pub use facts::CodexGoalEvent; pub use facts::CodexTurnSteerEvent; pub use facts::CompactionImplementation; pub use facts::CompactionPhase; @@ -31,6 +32,7 @@ pub use facts::CompactionReason; pub use facts::CompactionStatus; pub use facts::CompactionStrategy; pub use facts::CompactionTrigger; +pub use facts::GoalEventKind; pub use facts::HookRunFact; pub use facts::InputError; pub use facts::InvocationType; diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index f8abe36e4..f0eb0ac9a 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -15,6 +15,7 @@ use crate::events::CodexDynamicToolCallEventParams; use crate::events::CodexDynamicToolCallEventRequest; use crate::events::CodexFileChangeEventParams; use crate::events::CodexFileChangeEventRequest; +use crate::events::CodexGoalEventRequest; use crate::events::CodexHookRunEventRequest; use crate::events::CodexImageGenerationEventParams; use crate::events::CodexImageGenerationEventRequest; @@ -51,6 +52,7 @@ use crate::events::TrackEventRequest; use crate::events::WebSearchActionKind; use crate::events::codex_app_metadata; use crate::events::codex_compaction_event_params; +use crate::events::codex_goal_event_params; use crate::events::codex_hook_run_metadata; use crate::events::codex_plugin_metadata; use crate::events::codex_plugin_used_metadata; @@ -62,6 +64,7 @@ use crate::facts::AnalyticsJsonRpcError; use crate::facts::AppMentionedInput; use crate::facts::AppUsedInput; use crate::facts::CodexCompactionEvent; +use crate::facts::CodexGoalEvent; use crate::facts::CustomAnalyticsFact; use crate::facts::HookRunInput; use crate::facts::PluginState; @@ -192,6 +195,16 @@ impl<'a> AnalyticsDropSite<'a> { } } + fn goal(input: &'a CodexGoalEvent) -> Self { + Self { + event_name: "goal", + thread_id: &input.thread_id, + turn_id: input.turn_id.as_deref(), + review_id: None, + item_id: None, + } + } + fn tool_item( notification: &'a codex_app_server_protocol::ItemCompletedNotification, item_id: &'a str, @@ -461,6 +474,9 @@ impl AnalyticsReducer { CustomAnalyticsFact::Compaction(input) => { self.ingest_compaction(*input, out); } + CustomAnalyticsFact::Goal(input) => { + self.ingest_goal(*input, out); + } CustomAnalyticsFact::GuardianReview(input) => { self.ingest_guardian_review(*input, out); } @@ -1271,6 +1287,26 @@ impl AnalyticsReducer { ))); } + fn ingest_goal(&mut self, input: CodexGoalEvent, out: &mut Vec) { + let Some((connection_state, thread_metadata)) = + self.thread_context_or_warn(AnalyticsDropSite::goal(&input)) + else { + return; + }; + out.push(TrackEventRequest::Goal(Box::new(CodexGoalEventRequest { + event_type: "codex_goal_event", + event_params: codex_goal_event_params( + input, + thread_metadata.session_id.clone(), + connection_state.app_server_client.clone(), + connection_state.runtime.clone(), + thread_metadata.thread_source, + thread_metadata.subagent_source.clone(), + thread_metadata.parent_thread_id.clone(), + ), + }))); + } + fn ingest_guardian_review_completed( &mut self, notification: codex_app_server_protocol::ItemGuardianApprovalReviewCompletedNotification, diff --git a/codex-rs/app-server/src/extensions.rs b/codex-rs/app-server/src/extensions.rs index a0a44fd80..9e0a6e1c4 100644 --- a/codex-rs/app-server/src/extensions.rs +++ b/codex-rs/app-server/src/extensions.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::sync::Weak; +use codex_analytics::AnalyticsEventsClient; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ThreadGoal; use codex_app_server_protocol::ThreadGoalUpdatedNotification; @@ -30,6 +31,7 @@ pub(crate) fn thread_extensions( event_sink: Arc, auth_manager: Arc, state_db: Option, + analytics_events_client: AnalyticsEventsClient, thread_manager: Weak, goal_service: Arc, executor_skill_provider: Arc, @@ -42,6 +44,7 @@ where codex_goal_extension::install_with_backend( &mut builder, state_db, + analytics_events_client, codex_otel::global(), thread_manager, goal_service, @@ -140,7 +143,6 @@ pub(crate) fn guardian_agent_spawner( mod tests { use std::time::Duration; - use codex_analytics::AnalyticsEventsClient; use codex_protocol::protocol::ThreadGoal as CoreThreadGoal; use codex_protocol::protocol::ThreadGoalStatus; use codex_protocol::protocol::ThreadGoalUpdatedEvent; diff --git a/codex-rs/app-server/src/mcp_refresh.rs b/codex-rs/app-server/src/mcp_refresh.rs index 2cd102775..ff7104883 100644 --- a/codex-rs/app-server/src/mcp_refresh.rs +++ b/codex-rs/app-server/src/mcp_refresh.rs @@ -196,6 +196,7 @@ mod tests { Arc::new(NoopExtensionEventSink), auth_manager.clone(), Some(state_db.clone()), + codex_analytics::AnalyticsEventsClient::disabled(), thread_manager.clone(), Arc::new(codex_goal_extension::GoalService::new()), Arc::clone(&executor_skill_provider), diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 78ba2c12c..2069d61c7 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -327,6 +327,7 @@ impl MessageProcessor { app_server_extension_event_sink(outgoing.clone(), thread_state_manager.clone()), auth_manager.clone(), state_db.clone(), + analytics_events_client.clone(), thread_manager.clone(), Arc::clone(&goal_service), Arc::clone(&executor_skill_provider), diff --git a/codex-rs/app-server/tests/suite/v2/analytics.rs b/codex-rs/app-server/tests/suite/v2/analytics.rs index 8e8f2a755..ea876940a 100644 --- a/codex-rs/app-server/tests/suite/v2/analytics.rs +++ b/codex-rs/app-server/tests/suite/v2/analytics.rs @@ -124,6 +124,31 @@ pub(crate) async fn wait_for_analytics_event( server: &MockServer, read_timeout: Duration, event_type: &str, +) -> Result { + wait_for_matching_analytics_event(server, read_timeout, |event| { + event["event_type"] == event_type + }) + .await +} + +pub(crate) async fn wait_for_goal_event( + server: &MockServer, + read_timeout: Duration, + event_kind: &str, + goal_status: &str, +) -> Result { + wait_for_matching_analytics_event(server, read_timeout, |event| { + event["event_type"] == "codex_goal_event" + && event["event_params"]["event_kind"] == event_kind + && event["event_params"]["goal_status"] == goal_status + }) + .await +} + +async fn wait_for_matching_analytics_event( + server: &MockServer, + read_timeout: Duration, + matches: impl Fn(&Value) -> bool, ) -> Result { timeout(read_timeout, async { loop { @@ -142,10 +167,7 @@ pub(crate) async fn wait_for_analytics_event( let Some(events) = payload["events"].as_array() else { continue; }; - if let Some(event) = events - .iter() - .find(|event| event["event_type"] == event_type) - { + if let Some(event) = events.iter().find(|event| matches(event)) { return Ok::(event.clone()); } } diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 582177ee7..cf797ff6b 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -103,6 +103,7 @@ use super::analytics::assert_basic_thread_initialized_event; use super::analytics::mount_analytics_capture; use super::analytics::thread_initialized_event; use super::analytics::wait_for_analytics_payload; +use super::analytics::wait_for_goal_event; #[cfg(windows)] const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25); @@ -1316,19 +1317,30 @@ async fn thread_goal_set_edits_objective_without_resetting_usage() -> Result<()> } #[tokio::test] -async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> { - let server = create_mock_responses_server_repeating_assistant("Done").await; +async fn thread_goal_lifecycle_emits_analytics_and_clear_deletes_goal() -> Result<()> { + let server = create_mock_responses_server_sequence_unchecked(vec![ + responses::sse(vec![ + responses::ev_response_created("materialize-thread"), + responses::ev_completed("materialize-thread"), + ]), + responses::sse(vec![ + responses::ev_response_created("goal-continuation"), + responses::ev_completed_with_tokens("goal-continuation", /*total_tokens*/ 200), + ]), + ]) + .await; let codex_home = TempDir::new()?; - create_config_toml(codex_home.path(), &server.uri())?; + create_config_toml_with_chatgpt_base_url(codex_home.path(), &server.uri(), &server.uri())?; let config_path = codex_home.path().join("config.toml"); let config = std::fs::read_to_string(&config_path)?; std::fs::write( &config_path, config.replace("personality = true\n", "personality = true\ngoals = true\n"), )?; + mount_analytics_capture(&server, codex_home.path()).await?; let mut mcp = TestAppServer::new_without_managed_config(codex_home.path()).await?; - timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + timeout(DEFAULT_READ_TIMEOUT.saturating_mul(2), mcp.initialize()).await??; let start_id = mcp .send_thread_start_request(ThreadStartParams { @@ -1370,7 +1382,8 @@ async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> { "thread/goal/set", Some(json!({ "threadId": thread.id, - "objective": "keep polishing", + "objective": "do not serialize this objective", + "tokenBudget": 100, })), ) .await?; @@ -1386,6 +1399,55 @@ async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> { ) .await??; + let created = wait_for_goal_event(&server, DEFAULT_READ_TIMEOUT, "created", "active").await?; + let persisted_goal_id = created["event_params"]["goal_id"] + .as_str() + .expect("created goal id"); + assert_eq!(created["event_params"]["thread_id"], thread.id); + assert_eq!(created["event_params"]["turn_id"], serde_json::Value::Null); + assert_eq!(created["event_params"]["has_token_budget"], true); + assert!(created["event_params"]["session_id"].is_string()); + assert!(created["event_params"]["app_server_client"].is_object()); + assert!(created["event_params"]["runtime"].is_object()); + assert!(created["event_params"].get("objective").is_none()); + assert!(created["event_params"].get("token_budget").is_none()); + + let usage = wait_for_goal_event( + &server, + DEFAULT_READ_TIMEOUT, + "usage_accounted", + "budget_limited", + ) + .await?; + let causal_turn_id = usage["event_params"]["turn_id"] + .as_str() + .expect("accounted usage turn id"); + assert_eq!(usage["event_params"]["goal_id"], persisted_goal_id); + assert_eq!(usage["event_params"]["cumulative_tokens_accounted"], 200); + assert!( + usage["event_params"]["cumulative_time_accounted_seconds"] + .as_i64() + .is_some() + ); + + let status = wait_for_goal_event( + &server, + DEFAULT_READ_TIMEOUT, + "status_changed", + "budget_limited", + ) + .await?; + assert_eq!(status["event_params"]["goal_id"], persisted_goal_id); + assert_eq!(status["event_params"]["turn_id"], causal_turn_id); + assert_eq!( + status["event_params"]["cumulative_tokens_accounted"], + serde_json::Value::Null + ); + assert_eq!( + status["event_params"]["cumulative_time_accounted_seconds"], + serde_json::Value::Null + ); + let clear_id = mcp .send_raw_request( "thread/goal/clear", @@ -1408,6 +1470,11 @@ async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> { ) .await??; + let cleared = + wait_for_goal_event(&server, DEFAULT_READ_TIMEOUT, "cleared", "budget_limited").await?; + assert_eq!(cleared["event_params"]["goal_id"], persisted_goal_id); + assert_eq!(cleared["event_params"]["turn_id"], serde_json::Value::Null); + let get_id = mcp .send_raw_request( "thread/goal/get", diff --git a/codex-rs/ext/goal/Cargo.toml b/codex-rs/ext/goal/Cargo.toml index 611638324..1e21f5ac8 100644 --- a/codex-rs/ext/goal/Cargo.toml +++ b/codex-rs/ext/goal/Cargo.toml @@ -15,6 +15,7 @@ workspace = true [dependencies] async-trait = { workspace = true } +codex-analytics = { workspace = true } codex-core = { workspace = true } codex-extension-api = { workspace = true } codex-otel = { workspace = true } diff --git a/codex-rs/ext/goal/src/analytics.rs b/codex-rs/ext/goal/src/analytics.rs new file mode 100644 index 000000000..82d34962d --- /dev/null +++ b/codex-rs/ext/goal/src/analytics.rs @@ -0,0 +1,77 @@ +use codex_analytics::AnalyticsEventsClient; +use codex_analytics::CodexGoalEvent; +use codex_analytics::GoalEventKind; + +#[derive(Clone)] +pub(crate) struct GoalAnalytics { + client: AnalyticsEventsClient, +} + +pub(crate) enum GoalEventAttribution<'a> { + Turn(&'a str), + NoTurn, +} + +impl GoalAnalytics { + pub(crate) fn new(client: AnalyticsEventsClient) -> Self { + Self { client } + } + + pub(crate) fn created( + &self, + goal: &codex_state::ThreadGoal, + attribution: GoalEventAttribution<'_>, + ) { + self.track(goal, attribution, GoalEventKind::Created); + } + + pub(crate) fn usage_accounted( + &self, + goal: &codex_state::ThreadGoal, + attribution: GoalEventAttribution<'_>, + ) { + self.track(goal, attribution, GoalEventKind::UsageAccounted); + } + + pub(crate) fn status_changed( + &self, + goal: &codex_state::ThreadGoal, + previous_status: Option, + attribution: GoalEventAttribution<'_>, + ) { + if previous_status.is_some_and(|status| status != goal.status) { + self.track(goal, attribution, GoalEventKind::StatusChanged); + } + } + + pub(crate) fn cleared(&self, goal: &codex_state::ThreadGoal) { + self.track(goal, GoalEventAttribution::NoTurn, GoalEventKind::Cleared); + } + + fn track( + &self, + goal: &codex_state::ThreadGoal, + attribution: GoalEventAttribution<'_>, + event_kind: GoalEventKind, + ) { + let (cumulative_tokens_accounted, cumulative_time_accounted_seconds) = match event_kind { + GoalEventKind::UsageAccounted => (Some(goal.tokens_used), Some(goal.time_used_seconds)), + GoalEventKind::Created | GoalEventKind::StatusChanged | GoalEventKind::Cleared => { + (None, None) + } + }; + self.client.track_goal_event(CodexGoalEvent { + thread_id: goal.thread_id.to_string(), + turn_id: match attribution { + GoalEventAttribution::Turn(turn_id) => Some(turn_id.to_string()), + GoalEventAttribution::NoTurn => None, + }, + goal_id: goal.goal_id.clone(), + event_kind, + goal_status: goal.status, + has_token_budget: goal.token_budget.is_some(), + cumulative_tokens_accounted, + cumulative_time_accounted_seconds, + }); + } +} diff --git a/codex-rs/ext/goal/src/api.rs b/codex-rs/ext/goal/src/api.rs index 5123e6a5c..a4dc93409 100644 --- a/codex-rs/ext/goal/src/api.rs +++ b/codex-rs/ext/goal/src/api.rs @@ -259,19 +259,19 @@ impl GoalService { tracing::warn!("failed to prepare external goal mutation: {err}"); } - let cleared = state_db + let cleared_goal = state_db .thread_goals() .delete_thread_goal(thread_id) .await .map_err(|err| { GoalServiceError::Internal(format!("failed to clear thread goal: {err}")) })?; + let cleared = cleared_goal.is_some(); drop(goal_state_permit); drop(runtime); - if cleared - && let Some(runtime) = self.runtime_for_thread(thread_id) - && let Err(err) = runtime.apply_external_goal_clear().await + if let (Some(runtime), Some(goal)) = (self.runtime_for_thread(thread_id), cleared_goal) + && let Err(err) = runtime.apply_external_goal_clear(goal).await { tracing::warn!("failed to apply external goal clear runtime effects: {err}"); } diff --git a/codex-rs/ext/goal/src/extension.rs b/codex-rs/ext/goal/src/extension.rs index 78ae0b5b4..0696c2b86 100644 --- a/codex-rs/ext/goal/src/extension.rs +++ b/codex-rs/ext/goal/src/extension.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use std::sync::Weak; use async_trait::async_trait; +use codex_analytics::AnalyticsEventsClient; use codex_core::ThreadManager; use codex_extension_api::ConfigContributor; use codex_extension_api::ExtensionData; @@ -33,6 +34,7 @@ use codex_protocol::protocol::TokenUsageInfo; use crate::accounting::BudgetLimitedGoalDisposition; use crate::accounting::GoalAccountingState; +use crate::analytics::GoalAnalytics; use crate::api::GoalService; use crate::events::GoalEventEmitter; use crate::metrics::GoalMetrics; @@ -57,6 +59,7 @@ impl GoalExtensionConfig { #[derive(Clone)] pub struct GoalExtension { state_dbs: Arc, + analytics: GoalAnalytics, event_emitter: GoalEventEmitter, metrics: GoalMetrics, thread_manager: Weak, @@ -73,6 +76,7 @@ impl std::fmt::Debug for GoalExtension { impl GoalExtension { pub(crate) fn new_with_host_capabilities( state_dbs: Arc, + analytics_events_client: AnalyticsEventsClient, event_sink: Arc, metrics_client: Option, thread_manager: Weak, @@ -81,6 +85,7 @@ impl GoalExtension { ) -> Self { Self { state_dbs, + analytics: GoalAnalytics::new(analytics_events_client), event_emitter: GoalEventEmitter::new(event_sink), metrics: GoalMetrics::new(metrics_client), thread_manager, @@ -120,6 +125,7 @@ where self.thread_manager.clone(), accounting_state, GoalRuntimeConfig { + analytics: self.analytics.clone(), enabled, tools_available_for_thread, }, @@ -403,6 +409,7 @@ where runtime.thread_id(), Arc::clone(&self.state_dbs), runtime.accounting_state(), + self.analytics.clone(), self.event_emitter.clone(), self.metrics.clone(), )), @@ -410,6 +417,7 @@ where runtime.thread_id(), Arc::clone(&self.state_dbs), runtime.accounting_state(), + self.analytics.clone(), self.event_emitter.clone(), self.metrics.clone(), )), @@ -417,6 +425,7 @@ where runtime.thread_id(), Arc::clone(&self.state_dbs), runtime.accounting_state(), + self.analytics.clone(), self.event_emitter.clone(), self.metrics.clone(), )), @@ -427,6 +436,7 @@ where pub fn install_with_backend( registry: &mut ExtensionRegistryBuilder, state_dbs: Arc, + analytics_events_client: AnalyticsEventsClient, metrics_client: Option, thread_manager: Weak, goal_service: Arc, @@ -436,6 +446,7 @@ pub fn install_with_backend( { let extension = Arc::new(GoalExtension::new_with_host_capabilities( state_dbs, + analytics_events_client, registry.event_sink(), metrics_client, thread_manager, diff --git a/codex-rs/ext/goal/src/lib.rs b/codex-rs/ext/goal/src/lib.rs index bb640c6ce..ccd091aff 100644 --- a/codex-rs/ext/goal/src/lib.rs +++ b/codex-rs/ext/goal/src/lib.rs @@ -1,6 +1,7 @@ //! Extension crate for the `/goal` feature. mod accounting; +mod analytics; mod api; mod events; mod extension; diff --git a/codex-rs/ext/goal/src/runtime.rs b/codex-rs/ext/goal/src/runtime.rs index 2641dfb94..2f2202cbb 100644 --- a/codex-rs/ext/goal/src/runtime.rs +++ b/codex-rs/ext/goal/src/runtime.rs @@ -10,6 +10,8 @@ use codex_protocol::protocol::ThreadGoal; use crate::accounting::BudgetLimitedGoalDisposition; use crate::accounting::GoalAccountingState; +use crate::analytics::GoalAnalytics; +use crate::analytics::GoalEventAttribution; use crate::events::GoalEventEmitter; use crate::metrics::GoalMetrics; use crate::steering::continuation_steering_item; @@ -24,6 +26,7 @@ pub struct GoalRuntimeHandle { } pub(crate) struct GoalRuntimeConfig { + pub(crate) analytics: GoalAnalytics, pub(crate) enabled: bool, pub(crate) tools_available_for_thread: bool, } @@ -36,6 +39,7 @@ pub(crate) enum ActiveGoalStopReason { struct GoalRuntimeInner { thread_id: ThreadId, state_dbs: Arc, + analytics: GoalAnalytics, event_emitter: GoalEventEmitter, metrics: GoalMetrics, thread_manager: Weak, @@ -87,6 +91,7 @@ impl GoalRuntimeHandle { inner: Arc::new(GoalRuntimeInner { thread_id, state_dbs, + analytics: config.analytics, event_emitter, metrics, thread_manager, @@ -165,6 +170,9 @@ impl GoalRuntimeHandle { .is_some_and(|previous_goal| previous_goal.goal_id != goal.goal_id); if previous_goal.is_none() || replaced_existing_goal { self.inner.metrics.record_created(); + self.inner + .analytics + .created(&goal, GoalEventAttribution::NoTurn); } let previous_status = previous_goal .as_ref() @@ -175,6 +183,9 @@ impl GoalRuntimeHandle { self.inner .metrics .record_terminal_if_status_changed(previous_status, &goal); + self.inner + .analytics + .status_changed(&goal, previous_status, GoalEventAttribution::NoTurn); let objective_changed = previous_goal.as_ref().is_some_and(|previous_goal| { !replaced_existing_goal && previous_goal.objective != goal.objective }); @@ -211,11 +222,15 @@ impl GoalRuntimeHandle { Ok(()) } - pub async fn apply_external_goal_clear(&self) -> Result<(), String> { + pub async fn apply_external_goal_clear( + &self, + goal: codex_state::ThreadGoal, + ) -> Result<(), String> { if !self.is_enabled() { return Ok(()); } + self.inner.analytics.cleared(&goal); self.inner.accounting_state.clear_active_goal(); Ok(()) } @@ -302,6 +317,11 @@ impl GoalRuntimeHandle { self.inner .metrics .record_terminal_if_status_changed(previous_status, &goal); + self.inner.analytics.status_changed( + &goal, + previous_status, + GoalEventAttribution::Turn(turn_id), + ); self.inner.accounting_state.clear_active_goal(); let goal = protocol_goal_from_state(goal); self.inner.event_emitter.thread_goal_updated( @@ -445,6 +465,14 @@ impl GoalRuntimeHandle { self.inner .metrics .record_terminal_if_status_changed(previous_status, &goal); + self.inner + .analytics + .usage_accounted(&goal, GoalEventAttribution::Turn(turn_id)); + self.inner.analytics.status_changed( + &goal, + previous_status, + GoalEventAttribution::Turn(turn_id), + ); accounting.mark_progress_accounted_for_status( turn_id, &snapshot, @@ -499,6 +527,14 @@ impl GoalRuntimeHandle { self.inner .metrics .record_terminal_if_status_changed(previous_status, &goal); + self.inner + .analytics + .usage_accounted(&goal, GoalEventAttribution::NoTurn); + self.inner.analytics.status_changed( + &goal, + previous_status, + GoalEventAttribution::NoTurn, + ); accounting.mark_idle_progress_accounted_for_status( &snapshot, goal.status, diff --git a/codex-rs/ext/goal/src/tool.rs b/codex-rs/ext/goal/src/tool.rs index 8c0f51300..c45ed81b0 100644 --- a/codex-rs/ext/goal/src/tool.rs +++ b/codex-rs/ext/goal/src/tool.rs @@ -17,6 +17,8 @@ use serde::Serialize; use crate::accounting::BudgetLimitedGoalDisposition; use crate::accounting::GoalAccountingState; +use crate::analytics::GoalAnalytics; +use crate::analytics::GoalEventAttribution; use crate::events::GoalEventEmitter; use crate::metrics::GoalMetrics; use crate::spec::CREATE_GOAL_TOOL_NAME; @@ -32,6 +34,7 @@ pub(crate) struct GoalToolExecutor { thread_id: ThreadId, state_db: Arc, accounting_state: Arc, + analytics: GoalAnalytics, event_emitter: GoalEventEmitter, metrics: GoalMetrics, } @@ -75,6 +78,7 @@ impl GoalToolExecutor { thread_id: ThreadId, state_db: Arc, accounting_state: Arc, + analytics: GoalAnalytics, event_emitter: GoalEventEmitter, metrics: GoalMetrics, ) -> Self { @@ -83,6 +87,7 @@ impl GoalToolExecutor { thread_id, state_db, accounting_state, + analytics, event_emitter, metrics, } @@ -92,6 +97,7 @@ impl GoalToolExecutor { thread_id: ThreadId, state_db: Arc, accounting_state: Arc, + analytics: GoalAnalytics, event_emitter: GoalEventEmitter, metrics: GoalMetrics, ) -> Self { @@ -100,6 +106,7 @@ impl GoalToolExecutor { thread_id, state_db, accounting_state, + analytics, event_emitter, metrics, } @@ -109,6 +116,7 @@ impl GoalToolExecutor { thread_id: ThreadId, state_db: Arc, accounting_state: Arc, + analytics: GoalAnalytics, event_emitter: GoalEventEmitter, metrics: GoalMetrics, ) -> Self { @@ -117,6 +125,7 @@ impl GoalToolExecutor { thread_id, state_db, accounting_state, + analytics, event_emitter, metrics, } @@ -200,6 +209,10 @@ impl GoalToolExecutor { .accounting_state .mark_current_turn_goal_active(goal.goal_id.clone()); self.metrics.record_created(); + self.analytics.created( + &goal, + GoalEventAttribution::Turn(invocation.turn_id.as_str()), + ); let goal = protocol_goal_from_state(goal); self.emit_goal_updated_from_tool_call(&invocation, turn_id, goal.clone()); goal_response(Some(goal), CompletionBudgetReport::Omit) @@ -259,6 +272,11 @@ impl GoalToolExecutor { })?; self.metrics .record_terminal_if_status_changed(previous_status, &goal); + self.analytics.status_changed( + &goal, + previous_status, + GoalEventAttribution::Turn(invocation.turn_id.as_str()), + ); let goal = protocol_goal_from_state(goal); let turn_id = self.accounting_state.clear_current_turn_goal(); self.emit_goal_updated_from_tool_call(&invocation, turn_id, goal.clone()); @@ -324,6 +342,13 @@ impl GoalToolExecutor { codex_state::GoalAccountingOutcome::Updated(goal) => { self.metrics .record_terminal_if_status_changed(previous_status, &goal); + self.analytics + .usage_accounted(&goal, GoalEventAttribution::Turn(turn_id.as_str())); + self.analytics.status_changed( + &goal, + previous_status, + GoalEventAttribution::Turn(turn_id.as_str()), + ); self.accounting_state.mark_progress_accounted_for_status( turn_id.as_str(), &snapshot, diff --git a/codex-rs/ext/goal/tests/goal_extension_backend.rs b/codex-rs/ext/goal/tests/goal_extension_backend.rs index b922f5332..60c992ec7 100644 --- a/codex-rs/ext/goal/tests/goal_extension_backend.rs +++ b/codex-rs/ext/goal/tests/goal_extension_backend.rs @@ -4,6 +4,7 @@ use std::sync::PoisonError; use std::sync::Weak; use std::time::Duration; +use codex_analytics::AnalyticsEventsClient; use codex_extension_api::ExtensionData; use codex_extension_api::ExtensionEventSink; use codex_extension_api::ExtensionRegistryBuilder; @@ -1114,6 +1115,7 @@ async fn installed_tools_with_start( install_with_backend( &mut builder, runtime, + AnalyticsEventsClient::disabled(), /*metrics_client*/ None, Weak::new(), goal_service, @@ -1164,6 +1166,7 @@ impl GoalExtensionHarness { install_with_backend( &mut builder, runtime, + AnalyticsEventsClient::disabled(), /*metrics_client*/ None, Weak::new(), Arc::clone(&goal_service), diff --git a/codex-rs/state/src/model/thread_goal.rs b/codex-rs/state/src/model/thread_goal.rs index 480625006..12d2a78da 100644 --- a/codex-rs/state/src/model/thread_goal.rs +++ b/codex-rs/state/src/model/thread_goal.rs @@ -3,12 +3,14 @@ use anyhow::anyhow; use chrono::DateTime; use chrono::Utc; use codex_protocol::ThreadId; +use serde::Serialize; use sqlx::Row; use sqlx::sqlite::SqliteRow; use super::epoch_millis_to_datetime; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] pub enum ThreadGoalStatus { Active, Paused, diff --git a/codex-rs/state/src/runtime/goals.rs b/codex-rs/state/src/runtime/goals.rs index 50e8cf068..d0fab47d0 100644 --- a/codex-rs/state/src/runtime/goals.rs +++ b/codex-rs/state/src/runtime/goals.rs @@ -377,18 +377,31 @@ WHERE thread_id = ? self.get_thread_goal(thread_id).await } - pub async fn delete_thread_goal(&self, thread_id: ThreadId) -> anyhow::Result { - let result = sqlx::query( + pub async fn delete_thread_goal( + &self, + thread_id: ThreadId, + ) -> anyhow::Result> { + let row = sqlx::query( r#" DELETE FROM thread_goals WHERE thread_id = ? +RETURNING + thread_id, + goal_id, + objective, + status, + token_budget, + tokens_used, + time_used_seconds, + created_at_ms, + updated_at_ms "#, ) .bind(thread_id.to_string()) - .execute(self.pool.as_ref()) + .fetch_optional(self.pool.as_ref()) .await?; - Ok(result.rows_affected() > 0) + row.map(|row| thread_goal_from_row(&row)).transpose() } pub async fn account_thread_goal_usage( @@ -622,7 +635,8 @@ mod tests { assert_eq!(0, replaced.tokens_used); assert_eq!(0, replaced.time_used_seconds); - assert!( + assert_eq!( + Some(replaced), runtime .thread_goals() .delete_thread_goal(thread_id) @@ -637,8 +651,9 @@ mod tests { .await .unwrap() ); - assert!( - !runtime + assert_eq!( + None, + runtime .thread_goals() .delete_thread_goal(thread_id) .await diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 6f3c45f0e..2c00500ca 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -891,7 +891,7 @@ ON CONFLICT(id) DO UPDATE SET let rows_affected = result.rows_affected(); self.memories.delete_thread_memory(thread_id).await?; if rows_affected > 0 { - self.thread_goals.delete_thread_goal(thread_id).await?; + let _ = self.thread_goals.delete_thread_goal(thread_id).await?; } Ok(rows_affected) }