diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index f895d3fe7..f34ee2897 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -3391,6 +3391,15 @@ ], "type": "object" }, + "ThreadGoalStatus": { + "enum": [ + "active", + "paused", + "budgetLimited", + "complete" + ], + "type": "string" + }, "ThreadInjectItemsParams": { "properties": { "items": { diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 59b3f5b45..629c0b97f 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -3028,6 +3028,93 @@ ], "type": "object" }, + "ThreadGoal": { + "properties": { + "createdAt": { + "format": "int64", + "type": "integer" + }, + "objective": { + "type": "string" + }, + "status": { + "$ref": "#/definitions/ThreadGoalStatus" + }, + "threadId": { + "type": "string" + }, + "timeUsedSeconds": { + "format": "int64", + "type": "integer" + }, + "tokenBudget": { + "format": "int64", + "type": [ + "integer", + "null" + ] + }, + "tokensUsed": { + "format": "int64", + "type": "integer" + }, + "updatedAt": { + "format": "int64", + "type": "integer" + } + }, + "required": [ + "createdAt", + "objective", + "status", + "threadId", + "timeUsedSeconds", + "tokensUsed", + "updatedAt" + ], + "type": "object" + }, + "ThreadGoalClearedNotification": { + "properties": { + "threadId": { + "type": "string" + } + }, + "required": [ + "threadId" + ], + "type": "object" + }, + "ThreadGoalStatus": { + "enum": [ + "active", + "paused", + "budgetLimited", + "complete" + ], + "type": "string" + }, + "ThreadGoalUpdatedNotification": { + "properties": { + "goal": { + "$ref": "#/definitions/ThreadGoal" + }, + "threadId": { + "type": "string" + }, + "turnId": { + "type": [ + "string", + "null" + ] + } + }, + "required": [ + "goal", + "threadId" + ], + "type": "object" + }, "ThreadId": { "type": "string" }, @@ -4727,6 +4814,46 @@ "title": "Thread/name/updatedNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "thread/goal/updated" + ], + "title": "Thread/goal/updatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ThreadGoalUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Thread/goal/updatedNotification", + "type": "object" + }, + { + "properties": { + "method": { + "enum": [ + "thread/goal/cleared" + ], + "title": "Thread/goal/clearedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ThreadGoalClearedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Thread/goal/clearedNotification", + "type": "object" + }, { "properties": { "method": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 0c76232d9..2fc1be346 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -3806,6 +3806,46 @@ "title": "Thread/name/updatedNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "thread/goal/updated" + ], + "title": "Thread/goal/updatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/ThreadGoalUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Thread/goal/updatedNotification", + "type": "object" + }, + { + "properties": { + "method": { + "enum": [ + "thread/goal/cleared" + ], + "title": "Thread/goal/clearedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/ThreadGoalClearedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Thread/goal/clearedNotification", + "type": "object" + }, { "properties": { "method": { @@ -14611,6 +14651,97 @@ "title": "ThreadForkResponse", "type": "object" }, + "ThreadGoal": { + "properties": { + "createdAt": { + "format": "int64", + "type": "integer" + }, + "objective": { + "type": "string" + }, + "status": { + "$ref": "#/definitions/v2/ThreadGoalStatus" + }, + "threadId": { + "type": "string" + }, + "timeUsedSeconds": { + "format": "int64", + "type": "integer" + }, + "tokenBudget": { + "format": "int64", + "type": [ + "integer", + "null" + ] + }, + "tokensUsed": { + "format": "int64", + "type": "integer" + }, + "updatedAt": { + "format": "int64", + "type": "integer" + } + }, + "required": [ + "createdAt", + "objective", + "status", + "threadId", + "timeUsedSeconds", + "tokensUsed", + "updatedAt" + ], + "type": "object" + }, + "ThreadGoalClearedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "threadId": { + "type": "string" + } + }, + "required": [ + "threadId" + ], + "title": "ThreadGoalClearedNotification", + "type": "object" + }, + "ThreadGoalStatus": { + "enum": [ + "active", + "paused", + "budgetLimited", + "complete" + ], + "type": "string" + }, + "ThreadGoalUpdatedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "goal": { + "$ref": "#/definitions/v2/ThreadGoal" + }, + "threadId": { + "type": "string" + }, + "turnId": { + "type": [ + "string", + "null" + ] + } + }, + "required": [ + "goal", + "threadId" + ], + "title": "ThreadGoalUpdatedNotification", + "type": "object" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 55f33badd..87e133a07 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -10444,6 +10444,46 @@ "title": "Thread/name/updatedNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "thread/goal/updated" + ], + "title": "Thread/goal/updatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ThreadGoalUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Thread/goal/updatedNotification", + "type": "object" + }, + { + "properties": { + "method": { + "enum": [ + "thread/goal/cleared" + ], + "title": "Thread/goal/clearedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ThreadGoalClearedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Thread/goal/clearedNotification", + "type": "object" + }, { "properties": { "method": { @@ -12497,6 +12537,97 @@ "title": "ThreadForkResponse", "type": "object" }, + "ThreadGoal": { + "properties": { + "createdAt": { + "format": "int64", + "type": "integer" + }, + "objective": { + "type": "string" + }, + "status": { + "$ref": "#/definitions/ThreadGoalStatus" + }, + "threadId": { + "type": "string" + }, + "timeUsedSeconds": { + "format": "int64", + "type": "integer" + }, + "tokenBudget": { + "format": "int64", + "type": [ + "integer", + "null" + ] + }, + "tokensUsed": { + "format": "int64", + "type": "integer" + }, + "updatedAt": { + "format": "int64", + "type": "integer" + } + }, + "required": [ + "createdAt", + "objective", + "status", + "threadId", + "timeUsedSeconds", + "tokensUsed", + "updatedAt" + ], + "type": "object" + }, + "ThreadGoalClearedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "threadId": { + "type": "string" + } + }, + "required": [ + "threadId" + ], + "title": "ThreadGoalClearedNotification", + "type": "object" + }, + "ThreadGoalStatus": { + "enum": [ + "active", + "paused", + "budgetLimited", + "complete" + ], + "type": "string" + }, + "ThreadGoalUpdatedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "goal": { + "$ref": "#/definitions/ThreadGoal" + }, + "threadId": { + "type": "string" + }, + "turnId": { + "type": [ + "string", + "null" + ] + } + }, + "required": [ + "goal", + "threadId" + ], + "title": "ThreadGoalUpdatedNotification", + "type": "object" + }, "ThreadId": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadGoalClearedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadGoalClearedNotification.json new file mode 100644 index 000000000..c1fe94b91 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadGoalClearedNotification.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "threadId": { + "type": "string" + } + }, + "required": [ + "threadId" + ], + "title": "ThreadGoalClearedNotification", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadGoalUpdatedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadGoalUpdatedNotification.json new file mode 100644 index 000000000..52a2e905a --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadGoalUpdatedNotification.json @@ -0,0 +1,80 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "ThreadGoal": { + "properties": { + "createdAt": { + "format": "int64", + "type": "integer" + }, + "objective": { + "type": "string" + }, + "status": { + "$ref": "#/definitions/ThreadGoalStatus" + }, + "threadId": { + "type": "string" + }, + "timeUsedSeconds": { + "format": "int64", + "type": "integer" + }, + "tokenBudget": { + "format": "int64", + "type": [ + "integer", + "null" + ] + }, + "tokensUsed": { + "format": "int64", + "type": "integer" + }, + "updatedAt": { + "format": "int64", + "type": "integer" + } + }, + "required": [ + "createdAt", + "objective", + "status", + "threadId", + "timeUsedSeconds", + "tokensUsed", + "updatedAt" + ], + "type": "object" + }, + "ThreadGoalStatus": { + "enum": [ + "active", + "paused", + "budgetLimited", + "complete" + ], + "type": "string" + } + }, + "properties": { + "goal": { + "$ref": "#/definitions/ThreadGoal" + }, + "threadId": { + "type": "string" + }, + "turnId": { + "type": [ + "string", + "null" + ] + } + }, + "required": [ + "goal", + "threadId" + ], + "title": "ThreadGoalUpdatedNotification", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts index 031527e3a..41d4754bc 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts @@ -40,6 +40,8 @@ import type { SkillsChangedNotification } from "./v2/SkillsChangedNotification"; import type { TerminalInteractionNotification } from "./v2/TerminalInteractionNotification"; import type { ThreadArchivedNotification } from "./v2/ThreadArchivedNotification"; import type { ThreadClosedNotification } from "./v2/ThreadClosedNotification"; +import type { ThreadGoalClearedNotification } from "./v2/ThreadGoalClearedNotification"; +import type { ThreadGoalUpdatedNotification } from "./v2/ThreadGoalUpdatedNotification"; import type { ThreadNameUpdatedNotification } from "./v2/ThreadNameUpdatedNotification"; import type { ThreadRealtimeClosedNotification } from "./v2/ThreadRealtimeClosedNotification"; import type { ThreadRealtimeErrorNotification } from "./v2/ThreadRealtimeErrorNotification"; @@ -64,4 +66,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW /** * Notification sent from the server to the client. */ -export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "skills/changed", "params": SkillsChangedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "hook/started", "params": HookStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "hook/completed", "params": HookCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/autoApprovalReview/started", "params": ItemGuardianApprovalReviewStartedNotification } | { "method": "item/autoApprovalReview/completed", "params": ItemGuardianApprovalReviewCompletedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "command/exec/outputDelta", "params": CommandExecOutputDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/fileChange/patchUpdated", "params": FileChangePatchUpdatedNotification } | { "method": "serverRequest/resolved", "params": ServerRequestResolvedNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "mcpServer/startupStatus/updated", "params": McpServerStatusUpdatedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "externalAgentConfig/import/completed", "params": ExternalAgentConfigImportCompletedNotification } | { "method": "fs/changed", "params": FsChangedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "model/verification", "params": ModelVerificationNotification } | { "method": "warning", "params": WarningNotification } | { "method": "guardianWarning", "params": GuardianWarningNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/transcript/delta", "params": ThreadRealtimeTranscriptDeltaNotification } | { "method": "thread/realtime/transcript/done", "params": ThreadRealtimeTranscriptDoneNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/sdp", "params": ThreadRealtimeSdpNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification }; +export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "skills/changed", "params": SkillsChangedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/goal/updated", "params": ThreadGoalUpdatedNotification } | { "method": "thread/goal/cleared", "params": ThreadGoalClearedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "hook/started", "params": HookStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "hook/completed", "params": HookCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/autoApprovalReview/started", "params": ItemGuardianApprovalReviewStartedNotification } | { "method": "item/autoApprovalReview/completed", "params": ItemGuardianApprovalReviewCompletedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "command/exec/outputDelta", "params": CommandExecOutputDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/fileChange/patchUpdated", "params": FileChangePatchUpdatedNotification } | { "method": "serverRequest/resolved", "params": ServerRequestResolvedNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "mcpServer/startupStatus/updated", "params": McpServerStatusUpdatedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "externalAgentConfig/import/completed", "params": ExternalAgentConfigImportCompletedNotification } | { "method": "fs/changed", "params": FsChangedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "model/verification", "params": ModelVerificationNotification } | { "method": "warning", "params": WarningNotification } | { "method": "guardianWarning", "params": GuardianWarningNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/transcript/delta", "params": ThreadRealtimeTranscriptDeltaNotification } | { "method": "thread/realtime/transcript/done", "params": ThreadRealtimeTranscriptDoneNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/sdp", "params": ThreadRealtimeSdpNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadGoal.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadGoal.ts new file mode 100644 index 000000000..c68732324 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadGoal.ts @@ -0,0 +1,6 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { ThreadGoalStatus } from "./ThreadGoalStatus"; + +export type ThreadGoal = { threadId: string, objective: string, status: ThreadGoalStatus, tokenBudget: number | null, tokensUsed: number, timeUsedSeconds: number, createdAt: number, updatedAt: number, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadGoalClearedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadGoalClearedNotification.ts new file mode 100644 index 000000000..e8e5a8b6e --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadGoalClearedNotification.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ThreadGoalClearedNotification = { threadId: string, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadGoalStatus.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadGoalStatus.ts new file mode 100644 index 000000000..7a4bf332f --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadGoalStatus.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ThreadGoalStatus = "active" | "paused" | "budgetLimited" | "complete"; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadGoalUpdatedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadGoalUpdatedNotification.ts new file mode 100644 index 000000000..c9972afa8 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadGoalUpdatedNotification.ts @@ -0,0 +1,6 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { ThreadGoal } from "./ThreadGoal"; + +export type ThreadGoalUpdatedNotification = { threadId: string, turnId: string | null, goal: ThreadGoal, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index 59c4fa673..0e43b5a4b 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -326,6 +326,10 @@ export type { ThreadCompactStartParams } from "./ThreadCompactStartParams"; export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse"; export type { ThreadForkParams } from "./ThreadForkParams"; export type { ThreadForkResponse } from "./ThreadForkResponse"; +export type { ThreadGoal } from "./ThreadGoal"; +export type { ThreadGoalClearedNotification } from "./ThreadGoalClearedNotification"; +export type { ThreadGoalStatus } from "./ThreadGoalStatus"; +export type { ThreadGoalUpdatedNotification } from "./ThreadGoalUpdatedNotification"; export type { ThreadInjectItemsParams } from "./ThreadInjectItemsParams"; export type { ThreadInjectItemsResponse } from "./ThreadInjectItemsResponse"; export type { ThreadItem } from "./ThreadItem"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 1c5be70da..016d6e16b 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -285,6 +285,21 @@ client_request_definitions! { params: v2::ThreadSetNameParams, response: v2::ThreadSetNameResponse, }, + #[experimental("thread/goal/set")] + ThreadGoalSet => "thread/goal/set" { + params: v2::ThreadGoalSetParams, + response: v2::ThreadGoalSetResponse, + }, + #[experimental("thread/goal/get")] + ThreadGoalGet => "thread/goal/get" { + params: v2::ThreadGoalGetParams, + response: v2::ThreadGoalGetResponse, + }, + #[experimental("thread/goal/clear")] + ThreadGoalClear => "thread/goal/clear" { + params: v2::ThreadGoalClearParams, + response: v2::ThreadGoalClearResponse, + }, ThreadMetadataUpdate => "thread/metadata/update" { params: v2::ThreadMetadataUpdateParams, response: v2::ThreadMetadataUpdateResponse, @@ -1027,6 +1042,10 @@ server_notification_definitions! { ThreadClosed => "thread/closed" (v2::ThreadClosedNotification), SkillsChanged => "skills/changed" (v2::SkillsChangedNotification), ThreadNameUpdated => "thread/name/updated" (v2::ThreadNameUpdatedNotification), + #[experimental("thread/goal/updated")] + ThreadGoalUpdated => "thread/goal/updated" (v2::ThreadGoalUpdatedNotification), + #[experimental("thread/goal/cleared")] + ThreadGoalCleared => "thread/goal/cleared" (v2::ThreadGoalClearedNotification), ThreadTokenUsageUpdated => "thread/tokenUsage/updated" (v2::ThreadTokenUsageUpdatedNotification), TurnStarted => "turn/started" (v2::TurnStartedNotification), HookStarted => "hook/started" (v2::HookStartedNotification), @@ -2046,6 +2065,76 @@ mod tests { let reason = crate::experimental_api::ExperimentalApi::experimental_reason(&request); assert_eq!(reason, Some("thread/realtime/start")); } + + #[test] + fn thread_goal_methods_are_marked_experimental() { + let set_request = ClientRequest::ThreadGoalSet { + request_id: RequestId::Integer(1), + params: v2::ThreadGoalSetParams { + thread_id: "thr_123".to_string(), + objective: Some("ship goal mode".to_string()), + status: Some(v2::ThreadGoalStatus::Active), + token_budget: Some(Some(10_000)), + }, + }; + let get_request = ClientRequest::ThreadGoalGet { + request_id: RequestId::Integer(2), + params: v2::ThreadGoalGetParams { + thread_id: "thr_123".to_string(), + }, + }; + let clear_request = ClientRequest::ThreadGoalClear { + request_id: RequestId::Integer(3), + params: v2::ThreadGoalClearParams { + thread_id: "thr_123".to_string(), + }, + }; + + assert_eq!( + crate::experimental_api::ExperimentalApi::experimental_reason(&set_request), + Some("thread/goal/set") + ); + assert_eq!( + crate::experimental_api::ExperimentalApi::experimental_reason(&get_request), + Some("thread/goal/get") + ); + assert_eq!( + crate::experimental_api::ExperimentalApi::experimental_reason(&clear_request), + Some("thread/goal/clear") + ); + } + + #[test] + fn thread_goal_notifications_are_marked_experimental() { + let goal = v2::ThreadGoal { + thread_id: "thr_123".to_string(), + objective: "ship goal mode".to_string(), + status: v2::ThreadGoalStatus::Active, + token_budget: Some(10_000), + tokens_used: 123, + time_used_seconds: 45, + created_at: 1_700_000_000, + updated_at: 1_700_000_123, + }; + let updated = ServerNotification::ThreadGoalUpdated(v2::ThreadGoalUpdatedNotification { + thread_id: "thr_123".to_string(), + turn_id: None, + goal, + }); + let cleared = ServerNotification::ThreadGoalCleared(v2::ThreadGoalClearedNotification { + thread_id: "thr_123".to_string(), + }); + + assert_eq!( + crate::experimental_api::ExperimentalApi::experimental_reason(&updated), + Some("thread/goal/updated") + ); + assert_eq!( + crate::experimental_api::ExperimentalApi::experimental_reason(&cleared), + Some("thread/goal/cleared") + ); + } + #[test] fn thread_realtime_started_notification_is_marked_experimental() { let notification = diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 505102e12..b7dccc861 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -96,6 +96,7 @@ use codex_protocol::protocol::SkillMetadata as CoreSkillMetadata; use codex_protocol::protocol::SkillScope as CoreSkillScope; use codex_protocol::protocol::SkillToolDependency as CoreSkillToolDependency; use codex_protocol::protocol::SubAgentSource as CoreSubAgentSource; +use codex_protocol::protocol::ThreadGoalStatus as CoreThreadGoalStatus; use codex_protocol::protocol::TokenUsage as CoreTokenUsage; use codex_protocol::protocol::TokenUsageInfo as CoreTokenUsageInfo; use codex_protocol::request_permissions::PermissionGrantScope as CorePermissionGrantScope; @@ -3747,6 +3748,103 @@ pub struct ThreadUnarchiveParams { #[ts(export_to = "v2/")] pub struct ThreadSetNameResponse {} +v2_enum_from_core! { + pub enum ThreadGoalStatus from CoreThreadGoalStatus { + Active, + Paused, + BudgetLimited, + Complete, + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadGoal { + pub thread_id: String, + pub objective: String, + pub status: ThreadGoalStatus, + #[ts(type = "number | null")] + pub token_budget: Option, + #[ts(type = "number")] + pub tokens_used: i64, + #[ts(type = "number")] + pub time_used_seconds: i64, + #[ts(type = "number")] + pub created_at: i64, + #[ts(type = "number")] + pub updated_at: i64, +} + +impl From for ThreadGoal { + fn from(value: codex_protocol::protocol::ThreadGoal) -> Self { + Self { + thread_id: value.thread_id.to_string(), + objective: value.objective, + status: value.status.into(), + token_budget: value.token_budget, + tokens_used: value.tokens_used, + time_used_seconds: value.time_used_seconds, + created_at: value.created_at, + updated_at: value.updated_at, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadGoalSetParams { + pub thread_id: String, + #[ts(optional = nullable)] + pub objective: Option, + #[ts(optional = nullable)] + pub status: Option, + #[serde( + default, + deserialize_with = "super::serde_helpers::deserialize_double_option", + serialize_with = "super::serde_helpers::serialize_double_option", + skip_serializing_if = "Option::is_none" + )] + #[ts(optional = nullable, type = "number | null")] + pub token_budget: Option>, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadGoalSetResponse { + pub goal: ThreadGoal, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadGoalGetParams { + pub thread_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadGoalGetResponse { + pub goal: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadGoalClearParams { + pub thread_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadGoalClearResponse { + pub cleared: bool, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -6270,6 +6368,22 @@ pub struct ThreadNameUpdatedNotification { pub thread_name: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadGoalUpdatedNotification { + pub thread_id: String, + pub turn_id: Option, + pub goal: ThreadGoal, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadGoalClearedNotification { + pub thread_id: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 174417805..35df7016c 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -152,6 +152,11 @@ Example with notification opt-out: - `thread/metadata/update` — patch stored thread metadata in sqlite; currently supports updating persisted `gitInfo` fields and returns the refreshed `thread`. - `thread/memoryMode/set` — experimental; set a thread’s persisted memory eligibility to `"enabled"` or `"disabled"` for either a loaded thread or a stored rollout; returns `{}` on success. - `memory/reset` — experimental; clear the current `CODEX_HOME/memories` directory and reset persisted memory stage data in sqlite while preserving existing thread memory modes; returns `{}` on success. +- `thread/goal/set` — create, replace, or update the single persisted goal for a materialized thread; returns the current goal and emits `thread/goal/updated`. Supplying a new `objective` replaces the goal and resets usage accounting. Supplying the current non-terminal objective or omitting `objective` updates the existing goal’s status and/or token budget while preserving usage. +- `thread/goal/get` — fetch the current persisted goal for a materialized thread; returns `goal: null` when no goal exists. +- `thread/goal/clear` — clear the current persisted goal for a materialized thread; returns whether a goal was removed and emits `thread/goal/cleared` when state changes. +- `thread/goal/updated` — notification emitted whenever a thread goal changes; includes the full current goal. +- `thread/goal/cleared` — notification emitted whenever a thread goal is removed. - `thread/status/changed` — notification emitted when a loaded thread’s status changes (`threadId` + new `status`). - `thread/archive` — move a thread’s rollout file into the archived directory and attempt to move any spawned descendant thread rollout files; returns `{}` on success and emits `thread/archived` for each archived thread. - `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server keeps the thread loaded and unloads it only after it has had no subscribers and no thread activity for 30 minutes, then emits `thread/closed`. @@ -470,6 +475,70 @@ Experimental: use `memory/reset` to clear local memory artifacts and sqlite-back { "id": 27, "result": {} } ``` +### Example: Set and update a thread goal + +Use `thread/goal/set` with an `objective` to create or replace the current goal for a materialized thread. Supplying a new objective resets `tokensUsed`, `timeUsedSeconds`, and `createdAt`. Supplying the current non-terminal objective, or omitting `objective`, updates the existing goal’s status or token budget while preserving usage history. Clients can set `budgetLimited` when they stop because a token budget is exhausted or nearly exhausted; the system also sets it when accounting crosses a configured token budget. + +```json +{ "method": "thread/goal/set", "id": 27, "params": { + "threadId": "thr_123", + "objective": "Keep improving the benchmark until p95 latency is under 120ms", + "tokenBudget": 200000 +} } +{ "id": 27, "result": { "goal": { + "threadId": "thr_123", + "objective": "Keep improving the benchmark until p95 latency is under 120ms", + "status": "active", + "tokenBudget": 200000, + "tokensUsed": 0, + "timeUsedSeconds": 0, + "createdAt": 1776272400, + "updatedAt": 1776272400 +} } } +{ "method": "thread/goal/updated", "params": { "threadId": "thr_123", "goal": { + "threadId": "thr_123", + "objective": "Keep improving the benchmark until p95 latency is under 120ms", + "status": "active", + "tokenBudget": 200000, + "tokensUsed": 0, + "timeUsedSeconds": 0, + "createdAt": 1776272400, + "updatedAt": 1776272400 +} } } +``` + +```json +{ "method": "thread/goal/set", "id": 28, "params": { + "threadId": "thr_123", + "status": "paused" +} } +{ "id": 28, "result": { "goal": { + "threadId": "thr_123", + "objective": "Keep improving the benchmark until p95 latency is under 120ms", + "status": "paused", + "tokenBudget": 200000, + "tokensUsed": 10000, + "timeUsedSeconds": 60, + "createdAt": 1776272400, + "updatedAt": 1776272460 +} } } +``` + +Use `thread/goal/get` to read the current goal without changing it. + +```json +{ "method": "thread/goal/get", "id": 29, "params": { "threadId": "thr_123" } } +{ "id": 29, "result": { "goal": null } } +``` + +Use `thread/goal/clear` to remove the current goal. + +```json +{ "method": "thread/goal/clear", "id": 30, "params": { "threadId": "thr_123" } } +{ "id": 30, "result": { "cleared": true } } +{ "method": "thread/goal/cleared", "params": { "threadId": "thr_123" } } +``` + ### Example: Archive a thread Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory and attempt to move any spawned descendant thread rollouts. diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 8af3f8761..a1eba990c 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -78,6 +78,7 @@ use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; use codex_app_server_protocol::SkillsChangedNotification; use codex_app_server_protocol::TerminalInteractionNotification; +use codex_app_server_protocol::ThreadGoalUpdatedNotification; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadNameUpdatedNotification; use codex_app_server_protocol::ThreadRealtimeClosedNotification; @@ -1954,6 +1955,20 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } } + EventMsg::ThreadGoalUpdated(thread_goal_event) => { + if let ApiVersion::V2 = api_version { + let notification = ThreadGoalUpdatedNotification { + thread_id: thread_goal_event.thread_id.to_string(), + turn_id: thread_goal_event.turn_id, + goal: thread_goal_event.goal.clone().into(), + }; + outgoing + .send_global_server_notification(ServerNotification::ThreadGoalUpdated( + notification, + )) + .await; + } + } EventMsg::TurnDiff(turn_diff_event) => { handle_turn_diff( conversation_id, diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index bf6b4bdf9..dfec182fd 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -150,6 +150,16 @@ use codex_app_server_protocol::ThreadDecrementElicitationParams; use codex_app_server_protocol::ThreadDecrementElicitationResponse; use codex_app_server_protocol::ThreadForkParams; use codex_app_server_protocol::ThreadForkResponse; +use codex_app_server_protocol::ThreadGoal; +use codex_app_server_protocol::ThreadGoalClearParams; +use codex_app_server_protocol::ThreadGoalClearResponse; +use codex_app_server_protocol::ThreadGoalClearedNotification; +use codex_app_server_protocol::ThreadGoalGetParams; +use codex_app_server_protocol::ThreadGoalGetResponse; +use codex_app_server_protocol::ThreadGoalSetParams; +use codex_app_server_protocol::ThreadGoalSetResponse; +use codex_app_server_protocol::ThreadGoalStatus; +use codex_app_server_protocol::ThreadGoalUpdatedNotification; use codex_app_server_protocol::ThreadIncrementElicitationParams; use codex_app_server_protocol::ThreadIncrementElicitationResponse; use codex_app_server_protocol::ThreadInjectItemsParams; @@ -482,6 +492,9 @@ enum ThreadReadViewError { Internal(String), } +mod thread_goal_handlers; +use self::thread_goal_handlers::api_thread_goal_from_state; + impl Drop for ActiveLogin { fn drop(&mut self) { self.cancel(); @@ -955,6 +968,18 @@ impl CodexMessageProcessor { self.thread_set_name(to_connection_request_id(request_id), params) .await; } + ClientRequest::ThreadGoalSet { request_id, params } => { + self.thread_goal_set(to_connection_request_id(request_id), params) + .await; + } + ClientRequest::ThreadGoalGet { request_id, params } => { + self.thread_goal_get(to_connection_request_id(request_id), params) + .await; + } + ClientRequest::ThreadGoalClear { request_id, params } => { + self.thread_goal_clear(to_connection_request_id(request_id), params) + .await; + } ClientRequest::ThreadMetadataUpdate { request_id, params } => { self.thread_metadata_update(to_connection_request_id(request_id), params) .await; @@ -4695,6 +4720,9 @@ impl CodexMessageProcessor { ) .await; } + if self.config.features.enabled(Feature::Goals) { + self.emit_thread_goal_snapshot(thread_id).await; + } } Err(err) => { let error = JSONRPCErrorError { @@ -4860,6 +4888,17 @@ impl CodexMessageProcessor { return true; }; + let emit_thread_goal_update = self.config.features.enabled(Feature::Goals); + let thread_goal_state_db = if emit_thread_goal_update { + if let Some(state_db) = existing_thread.state_db() { + Some(state_db) + } else { + open_state_db_for_direct_thread_lookup(&self.config).await + } + } else { + None + }; + let command = crate::thread_state::ThreadListenerCommand::SendThreadResumeResponse( Box::new(crate::thread_state::PendingThreadResumeRequest { request_id: request_id.clone(), @@ -4867,6 +4906,8 @@ impl CodexMessageProcessor { config_snapshot, instruction_sources, thread_summary, + emit_thread_goal_update, + thread_goal_state_db, include_turns: !params.exclude_turns, }), ); @@ -4879,6 +4920,7 @@ impl CodexMessageProcessor { data: None, }; self.outgoing.send_error(request_id, err).await; + return true; } return true; } @@ -8800,6 +8842,29 @@ async fn handle_thread_listener_command( ) .await; } + ThreadListenerCommand::EmitThreadGoalUpdated { goal } => { + outgoing + .send_server_notification(ServerNotification::ThreadGoalUpdated( + ThreadGoalUpdatedNotification { + thread_id: conversation_id.to_string(), + turn_id: None, + goal, + }, + )) + .await; + } + ThreadListenerCommand::EmitThreadGoalCleared => { + outgoing + .send_server_notification(ServerNotification::ThreadGoalCleared( + ThreadGoalClearedNotification { + thread_id: conversation_id.to_string(), + }, + )) + .await; + } + ThreadListenerCommand::EmitThreadGoalSnapshot { state_db } => { + send_thread_goal_snapshot_notification(outgoing, conversation_id, &state_db).await; + } ThreadListenerCommand::ResolveServerRequest { request_id, completion_tx, @@ -8964,11 +9029,56 @@ async fn handle_pending_thread_resume_request( ) .await; } + if pending.emit_thread_goal_update { + if let Some(state_db) = pending.thread_goal_state_db { + send_thread_goal_snapshot_notification(outgoing, conversation_id, &state_db).await; + } else { + tracing::warn!( + thread_id = %conversation_id, + "state db unavailable when reading thread goal for running thread resume" + ); + } + } outgoing .replay_requests_to_connection_for_thread(connection_id, conversation_id) .await; } +async fn send_thread_goal_snapshot_notification( + outgoing: &Arc, + thread_id: ThreadId, + state_db: &StateDbHandle, +) { + match state_db.get_thread_goal(thread_id).await { + Ok(Some(goal)) => { + outgoing + .send_server_notification(ServerNotification::ThreadGoalUpdated( + ThreadGoalUpdatedNotification { + thread_id: thread_id.to_string(), + turn_id: None, + goal: api_thread_goal_from_state(goal), + }, + )) + .await; + } + Ok(None) => { + outgoing + .send_server_notification(ServerNotification::ThreadGoalCleared( + ThreadGoalClearedNotification { + thread_id: thread_id.to_string(), + }, + )) + .await; + } + Err(err) => { + tracing::warn!( + thread_id = %thread_id, + "failed to read thread goal for resume snapshot: {err}" + ); + } + } +} + enum ThreadTurnSource<'a> { HistoryItems(&'a [RolloutItem]), } @@ -9459,6 +9569,27 @@ async fn open_state_db_for_direct_thread_lookup(config: &Config) -> Option) -> JSONRPCErrorError { + JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: message.into(), + data: None, + } +} + +fn internal_error(message: impl Into) -> JSONRPCErrorError { + JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: message.into(), + data: None, + } +} + +fn parse_thread_id_for_request(thread_id: &str) -> Result { + ThreadId::from_string(thread_id) + .map_err(|err| invalid_request(format!("invalid thread id: {err}"))) +} + fn non_empty_title(metadata: &ThreadMetadata) -> Option { let title = metadata.title.trim(); (!title.is_empty()).then(|| title.to_string()) diff --git a/codex-rs/app-server/src/codex_message_processor/thread_goal_handlers.rs b/codex-rs/app-server/src/codex_message_processor/thread_goal_handlers.rs new file mode 100644 index 000000000..f837ef9dc --- /dev/null +++ b/codex-rs/app-server/src/codex_message_processor/thread_goal_handlers.rs @@ -0,0 +1,466 @@ +use super::*; + +impl CodexMessageProcessor { + pub(super) async fn thread_goal_set( + &self, + request_id: ConnectionRequestId, + params: ThreadGoalSetParams, + ) { + if !self.config.features.enabled(Feature::Goals) { + self.send_invalid_request_error(request_id, "goals feature is disabled".to_string()) + .await; + return; + } + + let thread_id = match parse_thread_id_for_request(params.thread_id.as_str()) { + Ok(thread_id) => thread_id, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + let state_db = match self.state_db_for_materialized_thread(thread_id).await { + Ok(state_db) => state_db, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + let running_thread = self.thread_manager.get_thread(thread_id).await.ok(); + let rollout_path = match running_thread.as_ref() { + Some(thread) => match thread.rollout_path() { + Some(path) => path, + None => { + self.send_invalid_request_error( + request_id, + format!("ephemeral thread does not support goals: {thread_id}"), + ) + .await; + return; + } + }, + None => { + match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()) + .await + { + Ok(Some(path)) => path, + Ok(None) => { + self.send_invalid_request_error( + request_id, + format!("thread not found: {thread_id}"), + ) + .await; + return; + } + Err(err) => { + self.send_internal_error( + request_id, + format!("failed to locate thread id {thread_id}: {err}"), + ) + .await; + return; + } + } + } + }; + reconcile_rollout( + Some(&state_db), + rollout_path.as_path(), + self.config.model_provider_id.as_str(), + /*builder*/ None, + &[], + /*archived_only*/ None, + /*new_thread_memory_mode*/ None, + ) + .await; + + let listener_command_tx = { + let thread_state = self.thread_state_manager.thread_state(thread_id).await; + let thread_state = thread_state.lock().await; + thread_state.listener_command_tx() + }; + let status = params.status.map(thread_goal_status_to_state); + let objective = params.objective.as_deref().map(str::trim); + + if let Some(objective) = objective { + if objective.is_empty() { + self.send_invalid_request_error( + request_id, + "goal objective must not be empty".to_string(), + ) + .await; + return; + } + if let Err(message) = validate_goal_budget(params.token_budget.flatten()) { + self.send_invalid_request_error(request_id, message).await; + return; + } + } else if let Some(token_budget) = params.token_budget + && let Err(message) = validate_goal_budget(token_budget) + { + self.send_invalid_request_error(request_id, message).await; + return; + } + + let goal = if let Some(objective) = objective { + match state_db.get_thread_goal(thread_id).await { + Ok(goal) => { + if let Some(goal) = goal.as_ref().filter(|goal| { + goal.objective == objective + && goal.status != codex_state::ThreadGoalStatus::Complete + }) { + state_db + .update_thread_goal( + thread_id, + codex_state::ThreadGoalUpdate { + status, + token_budget: params.token_budget, + expected_goal_id: Some(goal.goal_id.clone()), + }, + ) + .await + .and_then(|goal| { + goal.ok_or_else(|| { + anyhow::anyhow!( + "cannot update goal for thread {thread_id}: no goal exists" + ) + }) + }) + } else { + state_db + .replace_thread_goal( + thread_id, + objective, + status.unwrap_or(codex_state::ThreadGoalStatus::Active), + params.token_budget.flatten(), + ) + .await + } + } + Err(err) => Err(err), + } + } else { + state_db + .update_thread_goal( + thread_id, + codex_state::ThreadGoalUpdate { + status, + token_budget: params.token_budget, + expected_goal_id: None, + }, + ) + .await + .and_then(|goal| { + goal.ok_or_else(|| { + anyhow::anyhow!("cannot update goal for thread {thread_id}: no goal exists") + }) + }) + }; + + let goal = match goal { + Ok(goal) => goal, + Err(err) => { + self.send_invalid_request_error(request_id, err.to_string()) + .await; + return; + } + }; + let goal = api_thread_goal_from_state(goal); + self.outgoing + .send_response( + request_id.clone(), + ThreadGoalSetResponse { goal: goal.clone() }, + ) + .await; + self.emit_thread_goal_updated_ordered(thread_id, goal, listener_command_tx) + .await; + } + + pub(super) async fn thread_goal_get( + &self, + request_id: ConnectionRequestId, + params: ThreadGoalGetParams, + ) { + if !self.config.features.enabled(Feature::Goals) { + self.send_invalid_request_error(request_id, "goals feature is disabled".to_string()) + .await; + return; + } + + let thread_id = match parse_thread_id_for_request(params.thread_id.as_str()) { + Ok(thread_id) => thread_id, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + let state_db = match self.state_db_for_materialized_thread(thread_id).await { + Ok(state_db) => state_db, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + let goal = match state_db.get_thread_goal(thread_id).await { + Ok(goal) => goal.map(api_thread_goal_from_state), + Err(err) => { + self.send_internal_error(request_id, format!("failed to read thread goal: {err}")) + .await; + return; + } + }; + self.outgoing + .send_response(request_id, ThreadGoalGetResponse { goal }) + .await; + } + + pub(super) async fn thread_goal_clear( + &self, + request_id: ConnectionRequestId, + params: ThreadGoalClearParams, + ) { + if !self.config.features.enabled(Feature::Goals) { + self.send_invalid_request_error(request_id, "goals feature is disabled".to_string()) + .await; + return; + } + + let thread_id = match parse_thread_id_for_request(params.thread_id.as_str()) { + Ok(thread_id) => thread_id, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + let state_db = match self.state_db_for_materialized_thread(thread_id).await { + Ok(state_db) => state_db, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + let running_thread = self.thread_manager.get_thread(thread_id).await.ok(); + let rollout_path = match running_thread.as_ref() { + Some(thread) => match thread.rollout_path() { + Some(path) => path, + None => { + self.send_invalid_request_error( + request_id, + format!("ephemeral thread does not support goals: {thread_id}"), + ) + .await; + return; + } + }, + None => { + match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()) + .await + { + Ok(Some(path)) => path, + Ok(None) => { + self.send_invalid_request_error( + request_id, + format!("thread not found: {thread_id}"), + ) + .await; + return; + } + Err(err) => { + self.send_internal_error( + request_id, + format!("failed to locate thread id {thread_id}: {err}"), + ) + .await; + return; + } + } + } + }; + reconcile_rollout( + Some(&state_db), + rollout_path.as_path(), + self.config.model_provider_id.as_str(), + /*builder*/ None, + &[], + /*archived_only*/ None, + /*new_thread_memory_mode*/ None, + ) + .await; + + let listener_command_tx = { + let thread_state = self.thread_state_manager.thread_state(thread_id).await; + let thread_state = thread_state.lock().await; + thread_state.listener_command_tx() + }; + let cleared = match state_db.delete_thread_goal(thread_id).await { + Ok(cleared) => cleared, + Err(err) => { + self.send_internal_error(request_id, format!("failed to clear thread goal: {err}")) + .await; + return; + } + }; + + self.outgoing + .send_response(request_id, ThreadGoalClearResponse { cleared }) + .await; + if cleared { + self.emit_thread_goal_cleared_ordered(thread_id, listener_command_tx) + .await; + } + } + + async fn state_db_for_materialized_thread( + &self, + thread_id: ThreadId, + ) -> Result { + if let Ok(thread) = self.thread_manager.get_thread(thread_id).await { + if thread.rollout_path().is_none() { + return Err(invalid_request(format!( + "ephemeral thread does not support goals: {thread_id}" + ))); + } + if let Some(state_db) = thread.state_db() { + return Ok(state_db); + } + } else { + match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await + { + Ok(Some(_)) => {} + Ok(None) => { + return Err(invalid_request(format!("thread not found: {thread_id}"))); + } + Err(err) => { + return Err(internal_error(format!( + "failed to locate thread id {thread_id}: {err}" + ))); + } + } + } + + open_state_db_for_direct_thread_lookup(&self.config) + .await + .ok_or_else(|| internal_error("sqlite state db unavailable for thread goals")) + } + + pub(super) async fn emit_thread_goal_snapshot(&self, thread_id: ThreadId) { + let state_db = match self.state_db_for_materialized_thread(thread_id).await { + Ok(state_db) => state_db, + Err(err) => { + warn!( + "failed to open state db before emitting thread goal resume snapshot for {thread_id}: {}", + err.message + ); + return; + } + }; + let listener_command_tx = { + let thread_state = self.thread_state_manager.thread_state(thread_id).await; + let thread_state = thread_state.lock().await; + thread_state.listener_command_tx() + }; + if let Some(listener_command_tx) = listener_command_tx { + let command = crate::thread_state::ThreadListenerCommand::EmitThreadGoalSnapshot { + state_db: state_db.clone(), + }; + if listener_command_tx.send(command).is_ok() { + return; + } + warn!( + "failed to enqueue thread goal snapshot for {thread_id}: listener command channel is closed" + ); + } + send_thread_goal_snapshot_notification(&self.outgoing, thread_id, &state_db).await; + } + + async fn emit_thread_goal_updated_ordered( + &self, + thread_id: ThreadId, + goal: ThreadGoal, + listener_command_tx: Option>, + ) { + if let Some(listener_command_tx) = listener_command_tx { + let command = crate::thread_state::ThreadListenerCommand::EmitThreadGoalUpdated { + goal: goal.clone(), + }; + if listener_command_tx.send(command).is_ok() { + return; + } + warn!( + "failed to enqueue thread goal update for {thread_id}: listener command channel is closed" + ); + } + self.outgoing + .send_server_notification(ServerNotification::ThreadGoalUpdated( + ThreadGoalUpdatedNotification { + thread_id: thread_id.to_string(), + turn_id: None, + goal, + }, + )) + .await; + } + + async fn emit_thread_goal_cleared_ordered( + &self, + thread_id: ThreadId, + listener_command_tx: Option>, + ) { + if let Some(listener_command_tx) = listener_command_tx { + let command = crate::thread_state::ThreadListenerCommand::EmitThreadGoalCleared; + if listener_command_tx.send(command).is_ok() { + return; + } + warn!( + "failed to enqueue thread goal clear for {thread_id}: listener command channel is closed" + ); + } + self.outgoing + .send_server_notification(ServerNotification::ThreadGoalCleared( + ThreadGoalClearedNotification { + thread_id: thread_id.to_string(), + }, + )) + .await; + } +} + +fn validate_goal_budget(value: Option) -> Result<(), String> { + if let Some(value) = value + && value <= 0 + { + return Err("goal budgets must be positive when provided".to_string()); + } + Ok(()) +} + +fn thread_goal_status_to_state(status: ThreadGoalStatus) -> codex_state::ThreadGoalStatus { + match status { + ThreadGoalStatus::Active => codex_state::ThreadGoalStatus::Active, + ThreadGoalStatus::Paused => codex_state::ThreadGoalStatus::Paused, + ThreadGoalStatus::BudgetLimited => codex_state::ThreadGoalStatus::BudgetLimited, + ThreadGoalStatus::Complete => codex_state::ThreadGoalStatus::Complete, + } +} + +fn thread_goal_status_from_state(status: codex_state::ThreadGoalStatus) -> ThreadGoalStatus { + match status { + codex_state::ThreadGoalStatus::Active => ThreadGoalStatus::Active, + codex_state::ThreadGoalStatus::Paused => ThreadGoalStatus::Paused, + codex_state::ThreadGoalStatus::BudgetLimited => ThreadGoalStatus::BudgetLimited, + codex_state::ThreadGoalStatus::Complete => ThreadGoalStatus::Complete, + } +} + +pub(super) fn api_thread_goal_from_state(goal: codex_state::ThreadGoal) -> ThreadGoal { + ThreadGoal { + thread_id: goal.thread_id.to_string(), + objective: goal.objective, + status: thread_goal_status_from_state(goal.status), + token_budget: goal.token_budget, + tokens_used: goal.tokens_used, + time_used_seconds: goal.time_used_seconds, + created_at: goal.created_at.timestamp(), + updated_at: goal.updated_at.timestamp(), + } +} diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index d4347933e..73d1c5961 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -1,6 +1,7 @@ use crate::outgoing_message::ConnectionId; use crate::outgoing_message::ConnectionRequestId; use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadGoal; use codex_app_server_protocol::ThreadHistoryBuilder; use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnError; @@ -9,6 +10,7 @@ use codex_core::ThreadConfigSnapshot; use codex_protocol::ThreadId; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; +use codex_rollout::state_db::StateDbHandle; use codex_utils_absolute_path::AbsolutePathBuf; use std::collections::HashMap; use std::collections::HashSet; @@ -31,6 +33,8 @@ pub(crate) struct PendingThreadResumeRequest { pub(crate) config_snapshot: ThreadConfigSnapshot, pub(crate) instruction_sources: Vec, pub(crate) thread_summary: codex_app_server_protocol::Thread, + pub(crate) emit_thread_goal_update: bool, + pub(crate) thread_goal_state_db: Option, pub(crate) include_turns: bool, } @@ -38,6 +42,16 @@ pub(crate) struct PendingThreadResumeRequest { pub(crate) enum ThreadListenerCommand { // SendThreadResumeResponse is used to resume an already running thread by sending the thread's history to the client and atomically subscribing for new updates. SendThreadResumeResponse(Box), + // EmitThreadGoalUpdated is used to order app-server goal updates with running-thread resume responses. + EmitThreadGoalUpdated { + goal: ThreadGoal, + }, + // EmitThreadGoalCleared is used to order app-server goal clears with running-thread resume responses. + EmitThreadGoalCleared, + // EmitThreadGoalSnapshot is used to read and emit the latest goal state in the listener order. + EmitThreadGoalSnapshot { + state_db: StateDbHandle, + }, // ResolveServerRequest is used to notify the client that the request has been resolved. // It is executed in the thread listener's context to ensure that the resolved notification is ordered with regard to the request itself. ResolveServerRequest { diff --git a/codex-rs/app-server/src/transport/mod.rs b/codex-rs/app-server/src/transport/mod.rs index 22e7a80a5..b610f099a 100644 --- a/codex-rs/app-server/src/transport/mod.rs +++ b/codex-rs/app-server/src/transport/mod.rs @@ -7,6 +7,7 @@ use crate::outgoing_message::OutgoingEnvelope; use crate::outgoing_message::OutgoingError; use crate::outgoing_message::OutgoingMessage; use crate::outgoing_message::QueuedOutgoingMessage; +use codex_app_server_protocol::ExperimentalApi; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::ServerRequest; @@ -337,6 +338,13 @@ fn should_skip_notification_for_connection( }; match message { OutgoingMessage::AppServerNotification(notification) => { + if notification.experimental_reason().is_some() + && !connection_state + .experimental_api_enabled + .load(Ordering::Acquire) + { + return true; + } let method = notification.to_string(); opted_out_notification_methods.contains(method.as_str()) } @@ -469,6 +477,9 @@ mod tests { use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; + use codex_app_server_protocol::ThreadGoal; + use codex_app_server_protocol::ThreadGoalStatus; + use codex_app_server_protocol::ThreadGoalUpdatedNotification; use codex_utils_absolute_path::AbsolutePathBuf; use pretty_assertions::assert_eq; use serde_json::json; @@ -479,6 +490,23 @@ mod tests { AbsolutePathBuf::from_absolute_path(path).expect("absolute path") } + fn thread_goal_updated_notification() -> ServerNotification { + ServerNotification::ThreadGoalUpdated(ThreadGoalUpdatedNotification { + thread_id: "thread-1".to_string(), + turn_id: None, + goal: ThreadGoal { + thread_id: "thread-1".to_string(), + objective: "ship goal mode".to_string(), + status: ThreadGoalStatus::Active, + token_budget: None, + tokens_used: 0, + time_used_seconds: 0, + created_at: 1, + updated_at: 1, + }, + }) + } + #[test] fn listen_off_parses_as_off_transport() { assert_eq!( @@ -810,6 +838,76 @@ mod tests { )); } + #[tokio::test] + async fn experimental_notifications_are_dropped_without_capability() { + let connection_id = ConnectionId(12); + let (writer_tx, mut writer_rx) = mpsc::channel(1); + + let mut connections = HashMap::new(); + connections.insert( + connection_id, + OutboundConnectionState::new( + writer_tx, + Arc::new(AtomicBool::new(true)), + Arc::new(AtomicBool::new(false)), + Arc::new(RwLock::new(HashSet::new())), + /*disconnect_sender*/ None, + ), + ); + + route_outgoing_envelope( + &mut connections, + OutgoingEnvelope::ToConnection { + connection_id, + message: OutgoingMessage::AppServerNotification(thread_goal_updated_notification()), + write_complete_tx: None, + }, + ) + .await; + + assert!( + writer_rx.try_recv().is_err(), + "experimental notifications should not reach clients without capability" + ); + } + + #[tokio::test] + async fn experimental_notifications_are_preserved_with_capability() { + let connection_id = ConnectionId(13); + let (writer_tx, mut writer_rx) = mpsc::channel(1); + + let mut connections = HashMap::new(); + connections.insert( + connection_id, + OutboundConnectionState::new( + writer_tx, + Arc::new(AtomicBool::new(true)), + Arc::new(AtomicBool::new(true)), + Arc::new(RwLock::new(HashSet::new())), + /*disconnect_sender*/ None, + ), + ); + + route_outgoing_envelope( + &mut connections, + OutgoingEnvelope::ToConnection { + connection_id, + message: OutgoingMessage::AppServerNotification(thread_goal_updated_notification()), + write_complete_tx: None, + }, + ) + .await; + + let message = writer_rx + .recv() + .await + .expect("experimental notification should reach opted-in client"); + assert!(matches!( + message.message, + OutgoingMessage::AppServerNotification(ServerNotification::ThreadGoalUpdated(_)) + )); + } + #[tokio::test] async fn command_execution_request_approval_strips_additional_permissions_without_capability() { let connection_id = ConnectionId(8); diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index e450dd50d..f3d392375 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -28,6 +28,9 @@ use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::SessionSource; +use codex_app_server_protocol::ThreadGoalClearResponse; +use codex_app_server_protocol::ThreadGoalSetResponse; +use codex_app_server_protocol::ThreadGoalStatus; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadMetadataGitInfoUpdateParams; use codex_app_server_protocol::ThreadMetadataUpdateParams; @@ -168,6 +171,63 @@ async fn thread_resume_rejects_unmaterialized_thread() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_goal_get_rejects_unmaterialized_thread() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + let config_path = codex_home.path().join("config.toml"); + let config = std::fs::read_to_string(&config_path)?; + std::fs::write( + &config_path, + config.replace( + "general_analytics = true\n", + "general_analytics = true\ngoals = true\n", + ), + )?; + + let mut mcp = McpProcess::new_without_managed_config(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.2-codex".to_string()), + ephemeral: Some(true), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let goal_id = mcp + .send_raw_request( + "thread/goal/get", + Some(json!({ + "threadId": thread.id, + })), + ) + .await?; + let goal_err: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(goal_id)), + ) + .await??; + assert!( + goal_err + .error + .message + .contains("ephemeral thread does not support goals"), + "unexpected goal/get error: {}", + goal_err.error.message + ); + + Ok(()) +} + #[tokio::test] async fn thread_resume_tracks_thread_initialized_analytics() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; @@ -326,6 +386,337 @@ async fn thread_resume_can_skip_turns_for_metadata_only_resume() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_resume_emits_paused_goal_update() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + let config_path = codex_home.path().join("config.toml"); + let config = std::fs::read_to_string(&config_path)?; + std::fs::write( + &config_path, + config.replace( + "general_analytics = true\n", + "general_analytics = true\ngoals = true\n", + ), + )?; + + let mut mcp = McpProcess::new_without_managed_config(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.2-codex".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "materialize this thread".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let _turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let goal_id = mcp + .send_raw_request( + "thread/goal/set", + Some(json!({ + "threadId": thread.id, + "objective": "keep polishing", + "status": "paused", + })), + ) + .await?; + let goal_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(goal_id)), + ) + .await??; + let _goal: ThreadGoalSetResponse = to_response(goal_resp)?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/goal/updated"), + ) + .await??; + + let resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread.id.clone(), + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let _resume: ThreadResumeResponse = to_response(resume_resp)?; + let notification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/goal/updated"), + ) + .await??; + let notification: ServerNotification = notification.try_into()?; + let ServerNotification::ThreadGoalUpdated(notification) = notification else { + anyhow::bail!("expected thread goal update notification"); + }; + assert_eq!(notification.goal.status, ThreadGoalStatus::Paused); + + Ok(()) +} + +#[tokio::test] +async fn thread_goal_set_preserves_budget_limited_same_objective() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + let config_path = codex_home.path().join("config.toml"); + let config = std::fs::read_to_string(&config_path)?; + std::fs::write( + &config_path, + config.replace( + "general_analytics = true\n", + "general_analytics = true\ngoals = true\n", + ), + )?; + + let mut mcp = McpProcess::new_without_managed_config(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.2-codex".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "materialize this thread".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let _turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let goal_id = mcp + .send_raw_request( + "thread/goal/set", + Some(json!({ + "threadId": thread.id, + "objective": "keep polishing", + "status": "budgetLimited", + "tokenBudget": 10, + })), + ) + .await?; + let goal_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(goal_id)), + ) + .await??; + let goal: ThreadGoalSetResponse = to_response(goal_resp)?; + assert_eq!(goal.goal.status, ThreadGoalStatus::BudgetLimited); + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/goal/updated"), + ) + .await??; + + let replacement_id = mcp + .send_raw_request( + "thread/goal/set", + Some(json!({ + "threadId": thread.id, + "objective": "keep polishing", + })), + ) + .await?; + let replacement_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(replacement_id)), + ) + .await??; + let replacement: ThreadGoalSetResponse = to_response(replacement_resp)?; + + assert_eq!(replacement.goal.status, ThreadGoalStatus::BudgetLimited); + assert_eq!(replacement.goal.token_budget, Some(10)); + assert_eq!(replacement.goal.tokens_used, 0); + assert_eq!(replacement.goal.time_used_seconds, 0); + + Ok(()) +} + +#[tokio::test] +async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + let config_path = codex_home.path().join("config.toml"); + let config = std::fs::read_to_string(&config_path)?; + std::fs::write( + &config_path, + config.replace( + "general_analytics = true\n", + "general_analytics = true\ngoals = true\n", + ), + )?; + + let mut mcp = McpProcess::new_without_managed_config(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.2-codex".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "materialize this thread".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let _turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let goal_id = mcp + .send_raw_request( + "thread/goal/set", + Some(json!({ + "threadId": thread.id, + "objective": "keep polishing", + })), + ) + .await?; + let goal_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(goal_id)), + ) + .await??; + let _goal: ThreadGoalSetResponse = to_response(goal_resp)?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/goal/updated"), + ) + .await??; + + let clear_id = mcp + .send_raw_request( + "thread/goal/clear", + Some(json!({ + "threadId": thread.id, + })), + ) + .await?; + let clear_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(clear_id)), + ) + .await??; + let clear: ThreadGoalClearResponse = to_response(clear_resp)?; + assert!(clear.cleared); + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/goal/cleared"), + ) + .await??; + + let get_id = mcp + .send_raw_request( + "thread/goal/get", + Some(json!({ + "threadId": thread.id, + })), + ) + .await?; + let get_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(get_id)), + ) + .await??; + let get: codex_app_server_protocol::ThreadGoalGetResponse = to_response(get_resp)?; + assert_eq!(None, get.goal); + + let clear_again_id = mcp + .send_raw_request( + "thread/goal/clear", + Some(json!({ + "threadId": thread.id, + })), + ) + .await?; + let clear_again_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(clear_again_id)), + ) + .await??; + let clear_again: ThreadGoalClearResponse = to_response(clear_again_resp)?; + assert!(!clear_again.cleared); + + Ok(()) +} + #[tokio::test] async fn thread_resume_by_path_uses_remote_thread_store_error() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; diff --git a/codex-rs/core/src/agent/status.rs b/codex-rs/core/src/agent/status.rs index c343e1950..43be71886 100644 --- a/codex-rs/core/src/agent/status.rs +++ b/codex-rs/core/src/agent/status.rs @@ -8,7 +8,8 @@ pub(crate) fn agent_status_from_event(msg: &EventMsg) -> Option { EventMsg::TurnStarted(_) => Some(AgentStatus::Running), EventMsg::TurnComplete(ev) => Some(AgentStatus::Completed(ev.last_agent_message.clone())), EventMsg::TurnAborted(ev) => match ev.reason { - codex_protocol::protocol::TurnAbortReason::Interrupted => { + codex_protocol::protocol::TurnAbortReason::Interrupted + | codex_protocol::protocol::TurnAbortReason::BudgetLimited => { Some(AgentStatus::Interrupted) } _ => Some(AgentStatus::Errored(format!("{:?}", ev.reason))), diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index db5df955d..fe9320b12 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1502,6 +1502,7 @@ pub(super) fn realtime_text_for_event(msg: &EventMsg) -> Option { | EventMsg::AgentReasoningSectionBreak(_) | EventMsg::SessionConfigured(_) | EventMsg::ThreadNameUpdated(_) + | EventMsg::ThreadGoalUpdated(_) | EventMsg::McpStartupUpdate(_) | EventMsg::McpStartupComplete(_) | EventMsg::McpToolCallBegin(_) diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 99fffb801..f759f8bb8 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -321,6 +321,9 @@ async fn run_codex_tool_session_inner( EventMsg::ThreadNameUpdated(_) => { // Ignore session metadata updates in MCP tool runner. } + EventMsg::ThreadGoalUpdated(_) => { + // Ignore thread goal metadata updates in MCP tool runner. + } EventMsg::AgentMessageDelta(_) => { // TODO: think how we want to support this in the MCP } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 63e8ab0e5..94edb0cb0 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1464,6 +1464,9 @@ pub enum EventMsg { /// Updated session metadata (e.g., thread name changes). ThreadNameUpdated(ThreadNameUpdatedEvent), + /// Updated long-running goal metadata for the thread. + ThreadGoalUpdated(ThreadGoalUpdatedEvent), + /// Incremental MCP startup progress updates. McpStartupUpdate(McpStartupUpdateEvent), @@ -3612,6 +3615,43 @@ pub struct ThreadNameUpdatedEvent { pub thread_name: Option, } +#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "protocol/")] +pub enum ThreadGoalStatus { + Active, + Paused, + BudgetLimited, + Complete, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "protocol/")] +pub struct ThreadGoal { + pub thread_id: ThreadId, + pub objective: String, + pub status: ThreadGoalStatus, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub token_budget: Option, + pub tokens_used: i64, + pub time_used_seconds: i64, + pub created_at: i64, + pub updated_at: i64, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "protocol/")] +pub struct ThreadGoalUpdatedEvent { + pub thread_id: ThreadId, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub turn_id: Option, + pub goal: ThreadGoal, +} + /// User's decision in response to an ExecApprovalRequest. #[derive(Debug, Default, Clone, Deserialize, Serialize, PartialEq, Eq, Display, JsonSchema, TS)] #[serde(rename_all = "snake_case")] @@ -3714,6 +3754,7 @@ pub enum TurnAbortReason { Interrupted, Replaced, ReviewEnded, + BudgetLimited, } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/rollout-trace/src/protocol_event.rs b/codex-rs/rollout-trace/src/protocol_event.rs index b3267a23e..2aa5af5af 100644 --- a/codex-rs/rollout-trace/src/protocol_event.rs +++ b/codex-rs/rollout-trace/src/protocol_event.rs @@ -226,6 +226,7 @@ pub(crate) fn tool_runtime_trace_event(event: &EventMsg) -> Option Option<&'static s | EventMsg::AgentReasoningRawContent(_) | EventMsg::AgentReasoningRawContentDelta(_) | EventMsg::AgentReasoningSectionBreak(_) + | EventMsg::ThreadGoalUpdated(_) | EventMsg::McpStartupUpdate(_) | EventMsg::McpStartupComplete(_) | EventMsg::McpToolCallBegin(_) @@ -403,8 +405,9 @@ impl TraceExecutionStatus for PatchApplyStatus { fn execution_status_for_abort_reason(reason: &TurnAbortReason) -> ExecutionStatus { match reason { - TurnAbortReason::Interrupted | TurnAbortReason::Replaced | TurnAbortReason::ReviewEnded => { - ExecutionStatus::Cancelled - } + TurnAbortReason::Interrupted + | TurnAbortReason::Replaced + | TurnAbortReason::ReviewEnded + | TurnAbortReason::BudgetLimited => ExecutionStatus::Cancelled, } } diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index ddd42e577..8459f96c1 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -145,6 +145,7 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option { | EventMsg::AgentReasoningSectionBreak(_) | EventMsg::RawResponseItem(_) | EventMsg::SessionConfigured(_) + | EventMsg::ThreadGoalUpdated(_) | EventMsg::McpToolCallBegin(_) | EventMsg::WebSearchBegin(_) | EventMsg::ExecCommandBegin(_) diff --git a/codex-rs/tui/src/app/app_server_adapter.rs b/codex-rs/tui/src/app/app_server_adapter.rs index 2cfa3c9dd..0a90c2c9b 100644 --- a/codex-rs/tui/src/app/app_server_adapter.rs +++ b/codex-rs/tui/src/app/app_server_adapter.rs @@ -350,6 +350,12 @@ fn server_notification_thread_target( ServerNotification::ThreadTokenUsageUpdated(notification) => { Some(notification.thread_id.as_str()) } + ServerNotification::ThreadGoalUpdated(notification) => { + Some(notification.thread_id.as_str()) + } + ServerNotification::ThreadGoalCleared(notification) => { + Some(notification.thread_id.as_str()) + } ServerNotification::TurnStarted(notification) => Some(notification.thread_id.as_str()), ServerNotification::HookStarted(notification) => Some(notification.thread_id.as_str()), ServerNotification::TurnCompleted(notification) => Some(notification.thread_id.as_str()), diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index b748b11e6..4aa49d711 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -6533,6 +6533,8 @@ impl ChatWidget { notification.token_usage, ))); } + ServerNotification::ThreadGoalUpdated(_) => {} + ServerNotification::ThreadGoalCleared(_) => {} ServerNotification::ThreadNameUpdated(notification) => { match ThreadId::from_string(¬ification.thread_id) { Ok(thread_id) => self.on_thread_name_updated( @@ -7089,6 +7091,7 @@ impl ChatWidget { match msg { EventMsg::SessionConfigured(e) => self.on_session_configured(e), EventMsg::ThreadNameUpdated(e) => self.on_thread_name_updated(e), + EventMsg::ThreadGoalUpdated(_) => {} // NOTE: All three AgentMessage arms feed `record_agent_markdown` even // when the message is otherwise not rendered (thread-snapshot replay, // non-review live messages). This ensures the copy source stays @@ -7200,6 +7203,9 @@ impl ChatWidget { TurnAbortReason::ReviewEnded => { self.on_interrupted_turn(ev.reason); } + TurnAbortReason::BudgetLimited => { + self.on_interrupted_turn(ev.reason); + } }, EventMsg::PlanUpdate(update) => self.on_plan_update(update), EventMsg::ExecApprovalRequest(ev) => {