[codex-analytics] add extensible feature thread sources (#27063)

## Why
- `ThreadSource` currently defines a closed set of core-owned values
- Product features also create threads for background or scheduled work
- Adding every product-specific value to the core enum would require
repeated `codex-rs` protocol changes
- Feature-backed values let product callers provide precise attribution
while preserving the existing core classifications

## What Changed
- Adds `ThreadSource::Feature(String)` for app-owned thread source
values
- Represents all app-server v2 thread sources as scalar strings, so a
feature source is supplied as `"automation"`
- Persists and emits the feature's plain string label, so `"automation"`
produces `thread_source="automation"` in analytics
- Keeps `user`, `subagent`, and `memory_consolidation` as explicit
core-owned values and regenerates the app-server schemas and TypeScript
bindings

## Verification
- `just write-app-server-schema`
- `cargo check --workspace`
- `just test -p codex-protocol
feature_thread_source_serializes_as_its_app_owned_label`
- `just test -p codex-app-server-protocol
thread_sources_round_trip_as_scalar_labels`
- `cargo test -p codex-analytics
thread_initialized_event_serializes_expected_shape`
- `just fmt`
This commit is contained in:
marksteinbrick-oai
2026-06-09 12:27:10 -07:00
committed by GitHub
Unverified
parent 99da697e4c
commit a71e040df5
32 changed files with 123 additions and 113 deletions
@@ -1374,7 +1374,7 @@ fn thread_initialized_event_serializes_expected_shape() {
},
model: "gpt-5".to_string(),
ephemeral: true,
thread_source: Some(ThreadSource::User),
thread_source: Some(ThreadSource::Feature("automation".to_string())),
initialization_mode: ThreadInitializationMode::New,
subagent_source: None,
parent_thread_id: None,
@@ -1407,7 +1407,7 @@ fn thread_initialized_event_serializes_expected_shape() {
},
"model": "gpt-5",
"ephemeral": true,
"thread_source": "user",
"thread_source": "automation",
"initialization_mode": "new",
"subagent_source": null,
"parent_thread_id": null,
+5 -5
View File
@@ -1263,7 +1263,7 @@ impl AnalyticsReducer {
thread_metadata.session_id.clone(),
connection_state.app_server_client.clone(),
connection_state.runtime.clone(),
thread_metadata.thread_source,
thread_metadata.thread_source.clone(),
thread_metadata.subagent_source.clone(),
thread_metadata.parent_thread_id.clone(),
),
@@ -1368,7 +1368,7 @@ impl AnalyticsReducer {
accepted_turn_id,
app_server_client: connection_state.app_server_client.clone(),
runtime: connection_state.runtime.clone(),
thread_source: thread_metadata.thread_source,
thread_source: thread_metadata.thread_source.clone(),
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id.clone(),
num_input_images: pending_request.num_input_images,
@@ -1411,7 +1411,7 @@ impl AnalyticsReducer {
review_id: pending_review.review_id,
app_server_client: connection_state.app_server_client.clone(),
runtime: connection_state.runtime.clone(),
thread_source: thread_metadata.thread_source,
thread_source: thread_metadata.thread_source.clone(),
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id.clone(),
subject_kind: pending_review.subject_kind,
@@ -1979,7 +1979,7 @@ fn tool_item_base(
item_id,
app_server_client: context.connection_state.app_server_client.clone(),
runtime: context.connection_state.runtime.clone(),
thread_source: thread_metadata.thread_source,
thread_source: thread_metadata.thread_source.clone(),
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id.clone(),
tool_name,
@@ -2459,7 +2459,7 @@ fn codex_turn_event_params(
runtime,
submission_type,
ephemeral,
thread_source: thread_metadata.thread_source,
thread_source: thread_metadata.thread_source.clone(),
initialization_mode: thread_metadata.initialization_mode,
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id.clone(),
@@ -3847,11 +3847,6 @@
"type": "string"
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadSourceKind": {
@@ -4615,11 +4615,6 @@
"type": "object"
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadStartedNotification": {
@@ -18109,11 +18109,6 @@
"type": "string"
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadSourceKind": {
@@ -15926,11 +15926,6 @@
"type": "string"
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadSourceKind": {
@@ -73,11 +73,6 @@
"type": "string"
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
}
},
@@ -1830,11 +1830,6 @@
]
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadStatus": {
@@ -1645,11 +1645,6 @@
]
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadStatus": {
@@ -1645,11 +1645,6 @@
]
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadStatus": {
@@ -1645,11 +1645,6 @@
]
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadStatus": {
@@ -1830,11 +1830,6 @@
]
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadStatus": {
@@ -1645,11 +1645,6 @@
]
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadStatus": {
@@ -159,11 +159,6 @@
"type": "object"
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadStartSource": {
@@ -1830,11 +1830,6 @@
]
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadStatus": {
@@ -1645,11 +1645,6 @@
]
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadStatus": {
@@ -1645,11 +1645,6 @@
]
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadStatus": {
@@ -2,4 +2,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadSource = "user" | "subagent" | "memory_consolidation";
export type ThreadSource = string;
@@ -57,6 +57,30 @@ fn test_absolute_path() -> AbsolutePathBuf {
absolute_path("readable")
}
#[test]
fn thread_sources_round_trip_as_scalar_labels() {
for (source, label) in [
(ThreadSource::User, "user"),
(ThreadSource::Subagent, "subagent"),
(
ThreadSource::Feature("automation".to_string()),
"automation",
),
(ThreadSource::MemoryConsolidation, "memory_consolidation"),
] {
let value = serde_json::to_value(&source).expect("serialize thread source");
assert_eq!(value, json!(label));
assert_eq!(
serde_json::from_value::<ThreadSource>(value).expect("deserialize thread source"),
source
);
let core_source: codex_protocol::protocol::ThreadSource = source.clone().into();
assert_eq!(ThreadSource::from(core_source), source);
}
}
#[test]
fn approvals_reviewer_serializes_auto_review_and_accepts_legacy_guardian_subagent() {
assert_eq!(
@@ -7,6 +7,8 @@ use codex_protocol::protocol::SubAgentSource as CoreSubAgentSource;
use codex_protocol::protocol::ThreadSource as CoreThreadSource;
use codex_utils_absolute_path::AbsolutePathBuf;
use schemars::JsonSchema;
use schemars::r#gen::SchemaGenerator;
use schemars::schema::Schema;
use serde::Deserialize;
use serde::Serialize;
use std::path::PathBuf;
@@ -61,20 +63,47 @@ impl From<SessionSource> for CoreSessionSource {
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "snake_case")]
#[ts(rename_all = "snake_case", export_to = "v2/")]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, TS)]
#[serde(try_from = "String", into = "String")]
#[ts(type = "string")]
#[ts(export_to = "v2/")]
pub enum ThreadSource {
User,
Subagent,
Feature(String),
MemoryConsolidation,
}
impl JsonSchema for ThreadSource {
fn schema_name() -> String {
"ThreadSource".to_string()
}
fn json_schema(generator: &mut SchemaGenerator) -> Schema {
String::json_schema(generator)
}
}
impl TryFrom<String> for ThreadSource {
type Error = String;
fn try_from(value: String) -> Result<Self, Self::Error> {
value.parse::<CoreThreadSource>().map(Into::into)
}
}
impl From<ThreadSource> for String {
fn from(value: ThreadSource) -> Self {
CoreThreadSource::from(value).into()
}
}
impl From<CoreThreadSource> for ThreadSource {
fn from(value: CoreThreadSource) -> Self {
match value {
CoreThreadSource::User => ThreadSource::User,
CoreThreadSource::Subagent => ThreadSource::Subagent,
CoreThreadSource::Feature(feature) => ThreadSource::Feature(feature),
CoreThreadSource::MemoryConsolidation => ThreadSource::MemoryConsolidation,
}
}
@@ -85,6 +114,7 @@ impl From<ThreadSource> for CoreThreadSource {
match value {
ThreadSource::User => CoreThreadSource::User,
ThreadSource::Subagent => CoreThreadSource::Subagent,
ThreadSource::Feature(feature) => CoreThreadSource::Feature(feature),
ThreadSource::MemoryConsolidation => CoreThreadSource::MemoryConsolidation,
}
}
@@ -4154,7 +4154,7 @@ fn summary_from_thread_metadata(metadata: &ThreadMetadata) -> ConversationSummar
metadata.cwd.clone(),
metadata.cli_version.clone(),
metadata.source.clone(),
metadata.thread_source,
metadata.thread_source.clone(),
metadata.agent_nickname.clone(),
metadata.agent_role.clone(),
metadata.git_sha.clone(),
@@ -4243,7 +4243,7 @@ fn build_thread_from_snapshot(
agent_nickname: config_snapshot.session_source.get_nickname(),
agent_role: config_snapshot.session_source.get_agent_role(),
source: config_snapshot.session_source.clone().into(),
thread_source: config_snapshot.thread_source.map(Into::into),
thread_source: config_snapshot.thread_source.clone().map(Into::into),
git_info: None,
name: None,
turns: Vec::new(),
+2 -2
View File
@@ -91,7 +91,7 @@ pub(super) async fn spawn_review_thread(
forked_from_thread_id,
parent_turn_context.parent_thread_id,
&session_source,
parent_turn_context.thread_source,
parent_turn_context.thread_source.clone(),
review_turn_id.clone(),
#[allow(deprecated)]
parent_turn_context.cwd.clone(),
@@ -121,7 +121,7 @@ pub(super) async fn spawn_review_thread(
reasoning_summary,
session_source,
parent_thread_id: parent_turn_context.parent_thread_id,
thread_source: parent_turn_context.thread_source,
thread_source: parent_turn_context.thread_source.clone(),
environments: parent_turn_context.environments.clone(),
available_models,
unified_exec_shell_mode,
+3 -3
View File
@@ -193,7 +193,7 @@ impl SessionConfiguration {
session_source: self.session_source.clone(),
forked_from_thread_id: self.forked_from_thread_id,
parent_thread_id: self.parent_thread_id,
thread_source: self.thread_source,
thread_source: self.thread_source.clone(),
}
}
@@ -549,7 +549,7 @@ impl Session {
forked_from_id,
parent_thread_id,
source: session_source,
thread_source: session_configuration.thread_source,
thread_source: session_configuration.thread_source.clone(),
base_instructions: BaseInstructions {
text: session_configuration.base_instructions.clone(),
},
@@ -1089,7 +1089,7 @@ impl Session {
thread_id,
forked_from_id,
parent_thread_id,
thread_source: session_configuration.thread_source,
thread_source: session_configuration.thread_source.clone(),
thread_name: session_configuration.thread_name.clone(),
model: session_configuration.collaboration_mode.model().to_string(),
model_provider_id: config.model_provider_id.clone(),
+3 -3
View File
@@ -239,7 +239,7 @@ impl TurnContext {
reasoning_summary: self.reasoning_summary,
session_source: self.session_source.clone(),
parent_thread_id: self.parent_thread_id,
thread_source: self.thread_source,
thread_source: self.thread_source.clone(),
environments: self.environments.clone(),
#[allow(deprecated)]
cwd: self.cwd.clone(),
@@ -514,7 +514,7 @@ impl Session {
session_configuration.forked_from_thread_id,
session_configuration.parent_thread_id,
&session_configuration.session_source,
session_configuration.thread_source,
session_configuration.thread_source.clone(),
sub_id.clone(),
cwd.clone(),
&session_configuration.permission_profile(),
@@ -538,7 +538,7 @@ impl Session {
reasoning_summary,
session_source,
parent_thread_id: session_configuration.parent_thread_id,
thread_source: session_configuration.thread_source,
thread_source: session_configuration.thread_source.clone(),
environments,
#[allow(deprecated)]
cwd,
+2 -2
View File
@@ -1146,7 +1146,7 @@ fn session_configured_from_thread_start_response(
&response.thread.session_id,
&response.thread.id,
response.thread.parent_thread_id.as_deref(),
response.thread.thread_source.map(Into::into),
response.thread.thread_source.clone().map(Into::into),
response.thread.name.clone(),
response.thread.path.clone(),
response.model.clone(),
@@ -1169,7 +1169,7 @@ fn session_configured_from_thread_resume_response(
&response.thread.session_id,
&response.thread.id,
response.thread.parent_thread_id.as_deref(),
response.thread.thread_source.map(Into::into),
response.thread.thread_source.clone().map(Into::into),
response.thread.name.clone(),
response.thread.path.clone(),
response.model.clone(),
+36 -7
View File
@@ -2480,12 +2480,12 @@ impl InitialHistory {
pub fn get_resumed_session_sources(&self) -> Option<(SessionSource, Option<ThreadSource>)> {
let meta = self.get_resumed_session_meta()?;
Some((meta.source.clone(), meta.thread_source))
Some((meta.source.clone(), meta.thread_source.clone()))
}
pub fn get_resumed_thread_source(&self) -> Option<ThreadSource> {
self.get_resumed_session_meta()
.and_then(|meta| meta.thread_source)
.and_then(|meta| meta.thread_source.clone())
}
pub fn get_resumed_parent_thread_id(&self) -> Option<ThreadId> {
@@ -2529,20 +2529,23 @@ pub enum SessionSource {
Unknown,
}
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "snake_case")]
#[ts(rename_all = "snake_case")]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema, TS)]
#[serde(try_from = "String", into = "String")]
#[schemars(with = "String")]
#[ts(type = "string")]
pub enum ThreadSource {
User,
Subagent,
Feature(String),
MemoryConsolidation,
}
impl ThreadSource {
pub fn as_str(self) -> &'static str {
pub fn as_str(&self) -> &str {
match self {
ThreadSource::User => "user",
ThreadSource::Subagent => "subagent",
ThreadSource::Feature(feature) => feature,
ThreadSource::MemoryConsolidation => "memory_consolidation",
}
}
@@ -2554,6 +2557,20 @@ impl fmt::Display for ThreadSource {
}
}
impl TryFrom<String> for ThreadSource {
type Error = String;
fn try_from(value: String) -> Result<Self, Self::Error> {
value.parse()
}
}
impl From<ThreadSource> for String {
fn from(value: ThreadSource) -> Self {
value.to_string()
}
}
impl FromStr for ThreadSource {
type Err = String;
@@ -2562,7 +2579,7 @@ impl FromStr for ThreadSource {
"user" => Ok(ThreadSource::User),
"subagent" => Ok(ThreadSource::Subagent),
"memory_consolidation" => Ok(ThreadSource::MemoryConsolidation),
other => Err(format!("unknown thread source: {other}")),
other => Ok(ThreadSource::Feature(other.to_string())),
}
}
}
@@ -4048,6 +4065,18 @@ mod tests {
use tempfile::NamedTempFile;
use tempfile::TempDir;
#[test]
fn feature_thread_source_serializes_as_its_app_owned_label() -> Result<()> {
let source = ThreadSource::Feature("automation".to_string());
assert_eq!(serde_json::to_value(&source)?, json!("automation"));
assert_eq!(
serde_json::from_value::<ThreadSource>(json!("automation"))?,
source
);
Ok(())
}
fn sorted_writable_roots(roots: Vec<WritableRoot>) -> Vec<(PathBuf, Vec<PathBuf>)> {
let mut sorted_roots: Vec<(PathBuf, Vec<PathBuf>)> = roots
.into_iter()
+1 -1
View File
@@ -50,7 +50,7 @@ fn apply_session_meta_from_item(metadata: &mut ThreadMetadata, meta_line: &Sessi
}
metadata.id = meta_line.meta.id;
metadata.source = enum_to_string(&meta_line.meta.source);
metadata.thread_source = meta_line.meta.thread_source;
metadata.thread_source = meta_line.meta.thread_source.clone();
metadata.agent_nickname = meta_line.meta.agent_nickname.clone();
metadata.agent_role = meta_line.meta.agent_role.clone();
metadata.agent_path = meta_line.meta.agent_path.clone();
+1 -1
View File
@@ -196,7 +196,7 @@ impl ThreadMetadataBuilder {
created_at,
updated_at,
source,
thread_source: self.thread_source,
thread_source: self.thread_source.clone(),
agent_nickname: self.agent_nickname.clone(),
agent_role: self.agent_role.clone(),
agent_path: self
+2
View File
@@ -521,6 +521,7 @@ ON CONFLICT(id) DO NOTHING
.bind(
metadata
.thread_source
.as_ref()
.map(codex_protocol::protocol::ThreadSource::as_str),
)
.bind(metadata.agent_nickname.as_deref())
@@ -753,6 +754,7 @@ ON CONFLICT(id) DO UPDATE SET
.bind(
metadata
.thread_source
.as_ref()
.map(codex_protocol::protocol::ThreadSource::as_str),
)
.bind(metadata.agent_nickname.as_deref())
+3 -3
View File
@@ -177,7 +177,7 @@ impl ThreadStore for InMemoryThreadStore {
agent_role: params.source.get_agent_role(),
agent_path: params.source.get_agent_path().map(Into::into),
source: params.source.clone(),
thread_source: params.thread_source,
thread_source: params.thread_source.clone(),
model_provider: Some(params.metadata.model_provider.clone()),
base_instructions: Some(params.base_instructions.clone()),
dynamic_tools: (!params.dynamic_tools.is_empty()).then(|| params.dynamic_tools.clone()),
@@ -392,8 +392,8 @@ fn stored_thread_from_state(
.and_then(|metadata| metadata.source.clone())
.unwrap_or_else(|| created.source.clone()),
thread_source: metadata
.and_then(|metadata| metadata.thread_source)
.unwrap_or(created.thread_source),
.and_then(|metadata| metadata.thread_source.clone())
.unwrap_or_else(|| created.thread_source.clone()),
agent_nickname: metadata.and_then(|metadata| metadata.agent_nickname.clone().flatten()),
agent_role: metadata.and_then(|metadata| metadata.agent_role.clone().flatten()),
agent_path: metadata.and_then(|metadata| metadata.agent_path.clone().flatten()),
@@ -221,7 +221,7 @@ async fn apply_metadata_update(
patch.source.clone().unwrap_or(SessionSource::Unknown),
);
builder.model_provider = patch.model_provider.clone();
builder.thread_source = patch.thread_source.flatten();
builder.thread_source = patch.thread_source.clone().flatten();
builder.agent_nickname = patch.agent_nickname.clone().flatten();
builder.agent_role = patch.agent_role.clone().flatten();
builder.agent_path = patch.agent_path.clone().flatten();
@@ -66,7 +66,7 @@ impl ThreadMetadataSync {
created_at: Some(created_at),
updated_at: Some(created_at),
source: Some(params.source.clone()),
thread_source: Some(params.thread_source),
thread_source: Some(params.thread_source.clone()),
agent_nickname: Some(params.source.get_nickname()),
agent_role: Some(params.source.get_agent_role()),
agent_path: Some(params.source.get_agent_path().map(Into::into)),
@@ -201,7 +201,7 @@ impl ThreadMetadataSync {
RolloutItem::SessionMeta(meta_line) if meta_line.meta.id == self.thread_id => {
update.created_at = parse_session_timestamp(meta_line.meta.timestamp.as_str());
update.source = Some(meta_line.meta.source.clone());
update.thread_source = Some(meta_line.meta.thread_source);
update.thread_source = Some(meta_line.meta.thread_source.clone());
update.agent_nickname = Some(meta_line.meta.agent_nickname.clone());
update.agent_role = Some(meta_line.meta.agent_role.clone());
update.agent_path = Some(meta_line.meta.agent_path.clone());