mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
Remove ApiPrompt (#11265)
Keep things simple and build a full Responses API request request right in the model client
This commit is contained in:
committed by
GitHub
Unverified
parent
59c625458b
commit
3322b99900
@@ -2,7 +2,7 @@
|
||||
|
||||
Typed clients for Codex/OpenAI APIs built on top of the generic transport in `codex-client`.
|
||||
|
||||
- Hosts the request/response models and prompt helpers for Responses and Compact APIs.
|
||||
- Hosts the request/response models and request builders for Responses and Compact APIs.
|
||||
- Owns provider configuration (base URLs, headers, query params), auth header injection, retry tuning, and stream idle settings.
|
||||
- Parses SSE streams into `ResponseEvent`/`ResponseStream`, including rate-limit snapshots and API-specific error mapping.
|
||||
- Serves as the wire-level layer consumed by `codex-core`; higher layers handle auth refresh and business logic.
|
||||
@@ -11,14 +11,10 @@ Typed clients for Codex/OpenAI APIs built on top of the generic transport in `co
|
||||
|
||||
The public interface of this crate is intentionally small and uniform:
|
||||
|
||||
- **Prompted endpoints (Responses)**
|
||||
- Input: a single `Prompt` plus endpoint-specific options.
|
||||
- `Prompt` (re-exported as `codex_api::Prompt`) carries:
|
||||
- `instructions: String` – the fully-resolved system prompt for this turn.
|
||||
- `input: Vec<ResponseItem>` – conversation history and user/tool messages.
|
||||
- `tools: Vec<serde_json::Value>` – JSON tools compatible with the target API.
|
||||
- `parallel_tool_calls: bool`.
|
||||
- `output_schema: Option<Value>` – used to build `text.format` when present.
|
||||
- **Responses endpoint**
|
||||
- Input:
|
||||
- `ResponsesApiRequest` for the request body (`model`, `instructions`, `input`, `tools`, `parallel_tool_calls`, reasoning/text controls).
|
||||
- `ResponsesOptions` for transport/header concerns (`conversation_id`, `session_source`, `extra_headers`, `compression`, `turn_state`).
|
||||
- Output: a `ResponseStream` of `ResponseEvent` (both re-exported from `common`).
|
||||
|
||||
- **Compaction endpoint**
|
||||
|
||||
@@ -14,22 +14,6 @@ use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Canonical prompt input for Responses endpoints.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Prompt {
|
||||
/// Fully-resolved system instructions for this turn.
|
||||
pub instructions: String,
|
||||
/// Conversation history and user/tool messages.
|
||||
pub input: Vec<ResponseItem>,
|
||||
/// JSON-encoded tool definitions compatible with the target API.
|
||||
// TODO(jif) have a proper type here
|
||||
pub tools: Vec<Value>,
|
||||
/// Whether parallel tool calls are permitted.
|
||||
pub parallel_tool_calls: bool,
|
||||
/// Optional output schema used to build the `text.format` controls.
|
||||
pub output_schema: Option<Value>,
|
||||
}
|
||||
|
||||
/// Canonical input payload for the compaction endpoint.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct CompactionInput<'a> {
|
||||
@@ -152,13 +136,13 @@ impl From<VerbosityConfig> for OpenAiVerbosity {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ResponsesApiRequest<'a> {
|
||||
pub model: &'a str,
|
||||
pub instructions: &'a str,
|
||||
pub input: &'a [ResponseItem],
|
||||
pub tools: &'a [serde_json::Value],
|
||||
pub tool_choice: &'static str,
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct ResponsesApiRequest {
|
||||
pub model: String,
|
||||
pub instructions: String,
|
||||
pub input: Vec<ResponseItem>,
|
||||
pub tools: Vec<serde_json::Value>,
|
||||
pub tool_choice: String,
|
||||
pub parallel_tool_calls: bool,
|
||||
pub reasoning: Option<Reasoning>,
|
||||
pub store: bool,
|
||||
@@ -170,6 +154,26 @@ pub struct ResponsesApiRequest<'a> {
|
||||
pub text: Option<TextControls>,
|
||||
}
|
||||
|
||||
impl From<&ResponsesApiRequest> for ResponseCreateWsRequest {
|
||||
fn from(request: &ResponsesApiRequest) -> Self {
|
||||
Self {
|
||||
model: request.model.clone(),
|
||||
instructions: request.instructions.clone(),
|
||||
previous_response_id: None,
|
||||
input: request.input.clone(),
|
||||
tools: request.tools.clone(),
|
||||
tool_choice: request.tool_choice.clone(),
|
||||
parallel_tool_calls: request.parallel_tool_calls,
|
||||
reasoning: request.reasoning.clone(),
|
||||
store: request.store,
|
||||
stream: request.stream,
|
||||
include: request.include.clone(),
|
||||
prompt_cache_key: request.prompt_cache_key.clone(),
|
||||
text: request.text.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ResponseCreateWsRequest {
|
||||
pub model: String,
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
use crate::auth::AuthProvider;
|
||||
use crate::common::Prompt as ApiPrompt;
|
||||
use crate::common::Reasoning;
|
||||
use crate::common::ResponseStream;
|
||||
use crate::common::TextControls;
|
||||
use crate::common::ResponsesApiRequest;
|
||||
use crate::endpoint::session::EndpointSession;
|
||||
use crate::error::ApiError;
|
||||
use crate::provider::Provider;
|
||||
use crate::requests::ResponsesRequest;
|
||||
use crate::requests::ResponsesRequestBuilder;
|
||||
use crate::requests::headers::build_conversation_headers;
|
||||
use crate::requests::headers::insert_header;
|
||||
use crate::requests::headers::subagent_header;
|
||||
use crate::requests::responses::Compression;
|
||||
use crate::requests::responses::attach_item_ids;
|
||||
use crate::sse::spawn_response_stream;
|
||||
use crate::telemetry::SseTelemetry;
|
||||
use codex_client::HttpTransport;
|
||||
@@ -21,7 +21,6 @@ use http::Method;
|
||||
use serde_json::Value;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
use tracing::instrument;
|
||||
|
||||
pub struct ResponsesClient<T: HttpTransport, A: AuthProvider> {
|
||||
session: EndpointSession<T, A>,
|
||||
@@ -30,11 +29,6 @@ pub struct ResponsesClient<T: HttpTransport, A: AuthProvider> {
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ResponsesOptions {
|
||||
pub reasoning: Option<Reasoning>,
|
||||
pub include: Vec<String>,
|
||||
pub prompt_cache_key: Option<String>,
|
||||
pub text: Option<TextControls>,
|
||||
pub store_override: Option<bool>,
|
||||
pub conversation_id: Option<String>,
|
||||
pub session_source: Option<SessionSource>,
|
||||
pub extra_headers: HeaderMap,
|
||||
@@ -63,31 +57,10 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
|
||||
|
||||
pub async fn stream_request(
|
||||
&self,
|
||||
request: ResponsesRequest,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
) -> Result<ResponseStream, ApiError> {
|
||||
self.stream(
|
||||
request.body,
|
||||
request.headers,
|
||||
request.compression,
|
||||
turn_state,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip_all, err)]
|
||||
pub async fn stream_prompt(
|
||||
&self,
|
||||
model: &str,
|
||||
prompt: &ApiPrompt,
|
||||
request: ResponsesApiRequest,
|
||||
options: ResponsesOptions,
|
||||
) -> Result<ResponseStream, ApiError> {
|
||||
let ResponsesOptions {
|
||||
reasoning,
|
||||
include,
|
||||
prompt_cache_key,
|
||||
text,
|
||||
store_override,
|
||||
conversation_id,
|
||||
session_source,
|
||||
extra_headers,
|
||||
@@ -95,21 +68,19 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
|
||||
turn_state,
|
||||
} = options;
|
||||
|
||||
let request = ResponsesRequestBuilder::new(model, &prompt.instructions, &prompt.input)
|
||||
.tools(&prompt.tools)
|
||||
.parallel_tool_calls(prompt.parallel_tool_calls)
|
||||
.reasoning(reasoning)
|
||||
.include(include)
|
||||
.prompt_cache_key(prompt_cache_key)
|
||||
.text(text)
|
||||
.conversation(conversation_id)
|
||||
.session_source(session_source)
|
||||
.store_override(store_override)
|
||||
.extra_headers(extra_headers)
|
||||
.compression(compression)
|
||||
.build(self.session.provider())?;
|
||||
let mut body = serde_json::to_value(&request)
|
||||
.map_err(|e| ApiError::Stream(format!("failed to encode responses request: {e}")))?;
|
||||
if request.store && self.session.provider().is_azure_responses_endpoint() {
|
||||
attach_item_ids(&mut body, &request.input);
|
||||
}
|
||||
|
||||
self.stream_request(request, turn_state).await
|
||||
let mut headers = extra_headers;
|
||||
headers.extend(build_conversation_headers(conversation_id));
|
||||
if let Some(subagent) = subagent_header(&session_source) {
|
||||
insert_header(&mut headers, "x-openai-subagent", &subagent);
|
||||
}
|
||||
|
||||
self.stream(body, headers, compression, turn_state).await
|
||||
}
|
||||
|
||||
fn path() -> &'static str {
|
||||
|
||||
@@ -17,7 +17,6 @@ pub use crate::auth::AuthProvider;
|
||||
pub use crate::common::CompactionInput;
|
||||
pub use crate::common::MemorySummarizeInput;
|
||||
pub use crate::common::MemorySummarizeOutput;
|
||||
pub use crate::common::Prompt;
|
||||
pub use crate::common::RawMemory;
|
||||
pub use crate::common::RawMemoryMetadata;
|
||||
pub use crate::common::ResponseAppendWsRequest;
|
||||
@@ -37,8 +36,6 @@ pub use crate::endpoint::responses_websocket::ResponsesWebsocketConnection;
|
||||
pub use crate::error::ApiError;
|
||||
pub use crate::provider::Provider;
|
||||
pub use crate::provider::is_azure_responses_wire_base_url;
|
||||
pub use crate::requests::ResponsesRequest;
|
||||
pub use crate::requests::ResponsesRequestBuilder;
|
||||
pub use crate::sse::stream_from_fixture;
|
||||
pub use crate::telemetry::SseTelemetry;
|
||||
pub use crate::telemetry::WebsocketTelemetry;
|
||||
|
||||
@@ -1,5 +1,2 @@
|
||||
pub(crate) mod headers;
|
||||
pub mod responses;
|
||||
|
||||
pub use responses::ResponsesRequest;
|
||||
pub use responses::ResponsesRequestBuilder;
|
||||
|
||||
@@ -1,14 +1,4 @@
|
||||
use crate::common::Reasoning;
|
||||
use crate::common::ResponsesApiRequest;
|
||||
use crate::common::TextControls;
|
||||
use crate::error::ApiError;
|
||||
use crate::provider::Provider;
|
||||
use crate::requests::headers::build_conversation_headers;
|
||||
use crate::requests::headers::insert_header;
|
||||
use crate::requests::headers::subagent_header;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use http::HeaderMap;
|
||||
use serde_json::Value;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
|
||||
@@ -18,149 +8,7 @@ pub enum Compression {
|
||||
Zstd,
|
||||
}
|
||||
|
||||
/// Assembled request body plus headers for a Responses stream request.
|
||||
pub struct ResponsesRequest {
|
||||
pub body: Value,
|
||||
pub headers: HeaderMap,
|
||||
pub compression: Compression,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ResponsesRequestBuilder<'a> {
|
||||
model: Option<&'a str>,
|
||||
instructions: Option<&'a str>,
|
||||
input: Option<&'a [ResponseItem]>,
|
||||
tools: Option<&'a [Value]>,
|
||||
parallel_tool_calls: bool,
|
||||
reasoning: Option<Reasoning>,
|
||||
include: Vec<String>,
|
||||
prompt_cache_key: Option<String>,
|
||||
text: Option<TextControls>,
|
||||
conversation_id: Option<String>,
|
||||
session_source: Option<SessionSource>,
|
||||
store_override: Option<bool>,
|
||||
headers: HeaderMap,
|
||||
compression: Compression,
|
||||
}
|
||||
|
||||
impl<'a> ResponsesRequestBuilder<'a> {
|
||||
pub fn new(model: &'a str, instructions: &'a str, input: &'a [ResponseItem]) -> Self {
|
||||
Self {
|
||||
model: Some(model),
|
||||
instructions: Some(instructions),
|
||||
input: Some(input),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn tools(mut self, tools: &'a [Value]) -> Self {
|
||||
self.tools = Some(tools);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn parallel_tool_calls(mut self, enabled: bool) -> Self {
|
||||
self.parallel_tool_calls = enabled;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn reasoning(mut self, reasoning: Option<Reasoning>) -> Self {
|
||||
self.reasoning = reasoning;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn include(mut self, include: Vec<String>) -> Self {
|
||||
self.include = include;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn prompt_cache_key(mut self, key: Option<String>) -> Self {
|
||||
self.prompt_cache_key = key;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn text(mut self, text: Option<TextControls>) -> Self {
|
||||
self.text = text;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn conversation(mut self, conversation_id: Option<String>) -> Self {
|
||||
self.conversation_id = conversation_id;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn session_source(mut self, source: Option<SessionSource>) -> Self {
|
||||
self.session_source = source;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn store_override(mut self, store: Option<bool>) -> Self {
|
||||
self.store_override = store;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn extra_headers(mut self, headers: HeaderMap) -> Self {
|
||||
self.headers = headers;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn compression(mut self, compression: Compression) -> Self {
|
||||
self.compression = compression;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self, provider: &Provider) -> Result<ResponsesRequest, ApiError> {
|
||||
let model = self
|
||||
.model
|
||||
.ok_or_else(|| ApiError::Stream("missing model for responses request".into()))?;
|
||||
let instructions = self
|
||||
.instructions
|
||||
.ok_or_else(|| ApiError::Stream("missing instructions for responses request".into()))?;
|
||||
let input = self
|
||||
.input
|
||||
.ok_or_else(|| ApiError::Stream("missing input for responses request".into()))?;
|
||||
let tools = self.tools.unwrap_or_default();
|
||||
|
||||
let store = self
|
||||
.store_override
|
||||
.unwrap_or_else(|| provider.is_azure_responses_endpoint());
|
||||
|
||||
let req = ResponsesApiRequest {
|
||||
model,
|
||||
instructions,
|
||||
input,
|
||||
tools,
|
||||
tool_choice: "auto",
|
||||
parallel_tool_calls: self.parallel_tool_calls,
|
||||
reasoning: self.reasoning,
|
||||
store,
|
||||
stream: true,
|
||||
include: self.include,
|
||||
prompt_cache_key: self.prompt_cache_key,
|
||||
text: self.text,
|
||||
};
|
||||
|
||||
let mut body = serde_json::to_value(&req)
|
||||
.map_err(|e| ApiError::Stream(format!("failed to encode responses request: {e}")))?;
|
||||
|
||||
if store && provider.is_azure_responses_endpoint() {
|
||||
attach_item_ids(&mut body, input);
|
||||
}
|
||||
|
||||
let mut headers = self.headers;
|
||||
headers.extend(build_conversation_headers(self.conversation_id));
|
||||
if let Some(subagent) = subagent_header(&self.session_source) {
|
||||
insert_header(&mut headers, "x-openai-subagent", &subagent);
|
||||
}
|
||||
|
||||
Ok(ResponsesRequest {
|
||||
body,
|
||||
headers,
|
||||
compression: self.compression,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn attach_item_ids(payload_json: &mut Value, original_items: &[ResponseItem]) {
|
||||
pub(crate) fn attach_item_ids(payload_json: &mut Value, original_items: &[ResponseItem]) {
|
||||
let Some(input_value) = payload_json.get_mut("input") else {
|
||||
return;
|
||||
};
|
||||
@@ -186,78 +34,3 @@ fn attach_item_ids(payload_json: &mut Value, original_items: &[ResponseItem]) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::provider::RetryConfig;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use http::HeaderValue;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::time::Duration;
|
||||
|
||||
fn provider(name: &str, base_url: &str) -> Provider {
|
||||
Provider {
|
||||
name: name.to_string(),
|
||||
base_url: base_url.to_string(),
|
||||
query_params: None,
|
||||
headers: HeaderMap::new(),
|
||||
retry: RetryConfig {
|
||||
max_attempts: 1,
|
||||
base_delay: Duration::from_millis(50),
|
||||
retry_429: false,
|
||||
retry_5xx: true,
|
||||
retry_transport: true,
|
||||
},
|
||||
stream_idle_timeout: Duration::from_secs(5),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn azure_default_store_attaches_ids_and_headers() {
|
||||
let provider = provider("azure", "https://example.openai.azure.com/v1");
|
||||
let input = vec![
|
||||
ResponseItem::Message {
|
||||
id: Some("m1".into()),
|
||||
role: "assistant".into(),
|
||||
content: Vec::new(),
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
},
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".into(),
|
||||
content: Vec::new(),
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
},
|
||||
];
|
||||
|
||||
let request = ResponsesRequestBuilder::new("gpt-test", "inst", &input)
|
||||
.conversation(Some("conv-1".into()))
|
||||
.session_source(Some(SessionSource::SubAgent(SubAgentSource::Review)))
|
||||
.build(&provider)
|
||||
.expect("request");
|
||||
|
||||
assert_eq!(request.body.get("store"), Some(&Value::Bool(true)));
|
||||
|
||||
let ids: Vec<Option<String>> = request
|
||||
.body
|
||||
.get("input")
|
||||
.and_then(|v| v.as_array())
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(|item| item.get("id").and_then(|v| v.as_str().map(str::to_string)))
|
||||
.collect();
|
||||
assert_eq!(ids, vec![Some("m1".to_string()), None]);
|
||||
|
||||
assert_eq!(
|
||||
request.headers.get("session_id"),
|
||||
Some(&HeaderValue::from_static("conv-1"))
|
||||
);
|
||||
assert_eq!(
|
||||
request.headers.get("x-openai-subagent"),
|
||||
Some(&HeaderValue::from_static("review"))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use codex_api::AuthProvider;
|
||||
use codex_api::Provider;
|
||||
use codex_api::ResponsesApiRequest;
|
||||
use codex_api::ResponsesClient;
|
||||
use codex_api::ResponsesOptions;
|
||||
use codex_api::requests::responses::Compression;
|
||||
@@ -17,10 +18,12 @@ use codex_client::StreamResponse;
|
||||
use codex_client::TransportError;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
use http::StatusCode;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
|
||||
fn assert_path_ends_with(requests: &[Request], suffix: &str) {
|
||||
assert_eq!(requests.len(), 1);
|
||||
@@ -251,27 +254,106 @@ async fn streaming_client_retries_on_transport_error() -> Result<()> {
|
||||
let mut provider = provider("openai");
|
||||
provider.retry.max_attempts = 2;
|
||||
|
||||
let request = ResponsesApiRequest {
|
||||
model: "gpt-test".into(),
|
||||
instructions: "Say hi".into(),
|
||||
input: Vec::new(),
|
||||
tools: Vec::new(),
|
||||
tool_choice: "auto".into(),
|
||||
parallel_tool_calls: false,
|
||||
reasoning: None,
|
||||
store: false,
|
||||
stream: true,
|
||||
include: Vec::new(),
|
||||
prompt_cache_key: None,
|
||||
text: None,
|
||||
};
|
||||
let client = ResponsesClient::new(transport.clone(), provider, NoAuth);
|
||||
|
||||
let prompt = codex_api::Prompt {
|
||||
instructions: "Say hi".to_string(),
|
||||
input: vec![ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: "hi".to_string(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}],
|
||||
tools: Vec::<Value>::new(),
|
||||
parallel_tool_calls: false,
|
||||
output_schema: None,
|
||||
};
|
||||
|
||||
let options = ResponsesOptions::default();
|
||||
|
||||
let _stream = client.stream_prompt("gpt-test", &prompt, options).await?;
|
||||
let _stream = client
|
||||
.stream_request(
|
||||
request,
|
||||
ResponsesOptions {
|
||||
compression: Compression::None,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(transport.attempts(), 2);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn azure_default_store_attaches_ids_and_headers() -> Result<()> {
|
||||
let state = RecordingState::default();
|
||||
let transport = RecordingTransport::new(state.clone());
|
||||
let client = ResponsesClient::new(transport, provider("azure"), NoAuth);
|
||||
|
||||
let request = ResponsesApiRequest {
|
||||
model: "gpt-test".into(),
|
||||
instructions: "Say hi".into(),
|
||||
input: vec![ResponseItem::Message {
|
||||
id: Some("msg_1".into()),
|
||||
role: "user".into(),
|
||||
content: vec![ContentItem::InputText { text: "hi".into() }],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}],
|
||||
tools: Vec::new(),
|
||||
tool_choice: "auto".into(),
|
||||
parallel_tool_calls: false,
|
||||
reasoning: None,
|
||||
store: true,
|
||||
stream: true,
|
||||
include: Vec::new(),
|
||||
prompt_cache_key: None,
|
||||
text: None,
|
||||
};
|
||||
|
||||
let mut extra_headers = HeaderMap::new();
|
||||
extra_headers.insert("x-test-header", HeaderValue::from_static("present"));
|
||||
let _stream = client
|
||||
.stream_request(
|
||||
request,
|
||||
ResponsesOptions {
|
||||
conversation_id: Some("sess_123".into()),
|
||||
session_source: Some(SessionSource::SubAgent(SubAgentSource::Review)),
|
||||
extra_headers,
|
||||
compression: Compression::None,
|
||||
turn_state: None,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let requests = state.take_stream_requests();
|
||||
assert_eq!(requests.len(), 1);
|
||||
let req = &requests[0];
|
||||
|
||||
assert_eq!(
|
||||
req.headers.get("session_id").and_then(|v| v.to_str().ok()),
|
||||
Some("sess_123")
|
||||
);
|
||||
assert_eq!(
|
||||
req.headers
|
||||
.get("x-openai-subagent")
|
||||
.and_then(|v| v.to_str().ok()),
|
||||
Some("review")
|
||||
);
|
||||
assert_eq!(
|
||||
req.headers
|
||||
.get("x-test-header")
|
||||
.and_then(|v| v.to_str().ok()),
|
||||
Some("present")
|
||||
);
|
||||
|
||||
let input_id = req
|
||||
.body
|
||||
.as_ref()
|
||||
.and_then(|body| body.get("input"))
|
||||
.and_then(|input| input.get(0))
|
||||
.and_then(|item| item.get("id"))
|
||||
.and_then(|id| id.as_str());
|
||||
assert_eq!(input_id, Some("msg_1"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
+69
-119
@@ -40,12 +40,12 @@ use codex_api::CompactionInput as ApiCompactionInput;
|
||||
use codex_api::MemoriesClient as ApiMemoriesClient;
|
||||
use codex_api::MemorySummarizeInput as ApiMemorySummarizeInput;
|
||||
use codex_api::MemorySummarizeOutput as ApiMemorySummarizeOutput;
|
||||
use codex_api::Prompt as ApiPrompt;
|
||||
use codex_api::RawMemory as ApiRawMemory;
|
||||
use codex_api::RequestTelemetry;
|
||||
use codex_api::ReqwestTransport;
|
||||
use codex_api::ResponseAppendWsRequest;
|
||||
use codex_api::ResponseCreateWsRequest;
|
||||
use codex_api::ResponsesApiRequest;
|
||||
use codex_api::ResponsesClient as ApiResponsesClient;
|
||||
use codex_api::ResponsesOptions as ApiResponsesOptions;
|
||||
use codex_api::ResponsesWebsocketClient as ApiWebSocketResponsesClient;
|
||||
@@ -75,7 +75,6 @@ use http::HeaderMap as ApiHeaderMap;
|
||||
use http::HeaderValue;
|
||||
use http::StatusCode as HttpStatusCode;
|
||||
use reqwest::StatusCode;
|
||||
use serde_json::Value;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
@@ -443,28 +442,17 @@ impl ModelClientSession {
|
||||
.swap(true, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn build_responses_request(prompt: &Prompt) -> Result<ApiPrompt> {
|
||||
let instructions = prompt.base_instructions.text.clone();
|
||||
let tools_json: Vec<Value> = create_tools_json_for_responses_api(&prompt.tools)?;
|
||||
Ok(build_api_prompt(prompt, instructions, tools_json))
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
/// Builds shared Responses API request options for both HTTP and WebSocket streaming.
|
||||
///
|
||||
/// Keeping option construction in one place ensures request-scoped headers are consistent
|
||||
/// regardless of transport choice.
|
||||
fn build_responses_options(
|
||||
fn build_responses_request(
|
||||
&self,
|
||||
provider: &codex_api::Provider,
|
||||
prompt: &Prompt,
|
||||
model_info: &ModelInfo,
|
||||
effort: Option<ReasoningEffortConfig>,
|
||||
summary: ReasoningSummaryConfig,
|
||||
turn_metadata_header: Option<&str>,
|
||||
compression: Compression,
|
||||
) -> ApiResponsesOptions {
|
||||
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
|
||||
|
||||
) -> Result<ResponsesApiRequest> {
|
||||
let instructions = &prompt.base_instructions.text;
|
||||
let input = prompt.get_formatted_input();
|
||||
let tools = create_tools_json_for_responses_api(&prompt.tools)?;
|
||||
let default_reasoning_effort = model_info.default_reasoning_level;
|
||||
let reasoning = if model_info.supports_reasoning_summaries {
|
||||
Some(Reasoning {
|
||||
@@ -478,13 +466,11 @@ impl ModelClientSession {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let include = if reasoning.is_some() {
|
||||
vec!["reasoning.encrypted_content".to_string()]
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
let verbosity = if model_info.support_verbosity {
|
||||
self.client
|
||||
.state
|
||||
@@ -499,16 +485,39 @@ impl ModelClientSession {
|
||||
}
|
||||
None
|
||||
};
|
||||
|
||||
let text = create_text_param_for_request(verbosity, &prompt.output_schema);
|
||||
let prompt_cache_key = Some(self.client.state.conversation_id.to_string());
|
||||
let request = ResponsesApiRequest {
|
||||
model: model_info.slug.clone(),
|
||||
instructions: instructions.clone(),
|
||||
input,
|
||||
tools,
|
||||
tool_choice: "auto".to_string(),
|
||||
parallel_tool_calls: prompt.parallel_tool_calls,
|
||||
reasoning,
|
||||
store: provider.is_azure_responses_endpoint(),
|
||||
stream: true,
|
||||
include,
|
||||
prompt_cache_key,
|
||||
text,
|
||||
};
|
||||
Ok(request)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
/// Builds shared Responses API transport options and request-body options.
|
||||
///
|
||||
/// Keeping option construction in one place ensures request-scoped headers are consistent
|
||||
/// regardless of transport choice.
|
||||
fn build_responses_options(
|
||||
&self,
|
||||
turn_metadata_header: Option<&str>,
|
||||
compression: Compression,
|
||||
) -> ApiResponsesOptions {
|
||||
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header);
|
||||
let conversation_id = self.client.state.conversation_id.to_string();
|
||||
|
||||
ApiResponsesOptions {
|
||||
reasoning,
|
||||
include,
|
||||
prompt_cache_key: Some(conversation_id.clone()),
|
||||
text,
|
||||
store_override: None,
|
||||
conversation_id: Some(conversation_id),
|
||||
session_source: Some(self.client.state.session_source.clone()),
|
||||
extra_headers: build_responses_headers(
|
||||
@@ -559,78 +568,36 @@ impl ModelClientSession {
|
||||
.filter(|id| !id.is_empty())
|
||||
}
|
||||
|
||||
fn prepare_websocket_create_request(
|
||||
&self,
|
||||
model_slug: &str,
|
||||
api_prompt: &ApiPrompt,
|
||||
options: &ApiResponsesOptions,
|
||||
input: Vec<ResponseItem>,
|
||||
previous_response_id: Option<String>,
|
||||
) -> ResponsesWsRequest {
|
||||
let ApiResponsesOptions {
|
||||
reasoning,
|
||||
include,
|
||||
prompt_cache_key,
|
||||
text,
|
||||
store_override,
|
||||
..
|
||||
} = options;
|
||||
|
||||
let store = store_override.unwrap_or(false);
|
||||
let payload = ResponseCreateWsRequest {
|
||||
model: model_slug.to_string(),
|
||||
instructions: api_prompt.instructions.clone(),
|
||||
previous_response_id,
|
||||
input,
|
||||
tools: api_prompt.tools.clone(),
|
||||
tool_choice: "auto".to_string(),
|
||||
parallel_tool_calls: api_prompt.parallel_tool_calls,
|
||||
reasoning: reasoning.clone(),
|
||||
store,
|
||||
stream: true,
|
||||
include: include.clone(),
|
||||
prompt_cache_key: prompt_cache_key.clone(),
|
||||
text: text.clone(),
|
||||
};
|
||||
|
||||
ResponsesWsRequest::ResponseCreate(payload)
|
||||
}
|
||||
|
||||
fn prepare_websocket_request(
|
||||
&mut self,
|
||||
model_slug: &str,
|
||||
api_prompt: &ApiPrompt,
|
||||
options: &ApiResponsesOptions,
|
||||
) -> ResponsesWsRequest {
|
||||
payload: ResponseCreateWsRequest,
|
||||
) -> (ResponsesWsRequest, Vec<ResponseItem>) {
|
||||
let full_input = payload.input.clone();
|
||||
let responses_websockets_v2_enabled = self.client.responses_websockets_v2_enabled();
|
||||
let incremental_items = self.get_incremental_items(&api_prompt.input);
|
||||
let incremental_items = self.get_incremental_items(&full_input);
|
||||
if let Some(append_items) = incremental_items {
|
||||
if responses_websockets_v2_enabled
|
||||
&& let Some(previous_response_id) = self.websocket_previous_response_id()
|
||||
{
|
||||
return self.prepare_websocket_create_request(
|
||||
model_slug,
|
||||
api_prompt,
|
||||
options,
|
||||
append_items,
|
||||
Some(previous_response_id),
|
||||
);
|
||||
let payload = ResponseCreateWsRequest {
|
||||
previous_response_id: Some(previous_response_id),
|
||||
input: append_items,
|
||||
..payload
|
||||
};
|
||||
return (ResponsesWsRequest::ResponseCreate(payload), full_input);
|
||||
}
|
||||
|
||||
if !responses_websockets_v2_enabled {
|
||||
return ResponsesWsRequest::ResponseAppend(ResponseAppendWsRequest {
|
||||
input: append_items,
|
||||
});
|
||||
return (
|
||||
ResponsesWsRequest::ResponseAppend(ResponseAppendWsRequest {
|
||||
input: append_items,
|
||||
}),
|
||||
full_input,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
self.prepare_websocket_create_request(
|
||||
model_slug,
|
||||
api_prompt,
|
||||
options,
|
||||
api_prompt.input.clone(),
|
||||
None,
|
||||
)
|
||||
(ResponsesWsRequest::ResponseCreate(payload), full_input)
|
||||
}
|
||||
|
||||
/// Opportunistically warms a websocket for this turn-scoped client session.
|
||||
@@ -744,8 +711,6 @@ impl ModelClientSession {
|
||||
}
|
||||
|
||||
let auth_manager = self.client.state.auth_manager.clone();
|
||||
let api_prompt = Self::build_responses_request(prompt)?;
|
||||
|
||||
let mut auth_recovery = auth_manager
|
||||
.as_ref()
|
||||
.map(super::auth::AuthManager::unauthorized_recovery);
|
||||
@@ -754,26 +719,22 @@ impl ModelClientSession {
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let (request_telemetry, sse_telemetry) = Self::build_streaming_telemetry(otel_manager);
|
||||
let compression = self.responses_request_compression(client_setup.auth.as_ref());
|
||||
let options = self.build_responses_options(turn_metadata_header, compression);
|
||||
|
||||
let request = self.build_responses_request(
|
||||
&client_setup.api_provider,
|
||||
prompt,
|
||||
model_info,
|
||||
effort,
|
||||
summary,
|
||||
)?;
|
||||
let client = ApiResponsesClient::new(
|
||||
transport,
|
||||
client_setup.api_provider,
|
||||
client_setup.api_auth,
|
||||
)
|
||||
.with_telemetry(Some(request_telemetry), Some(sse_telemetry));
|
||||
|
||||
let options = self.build_responses_options(
|
||||
prompt,
|
||||
model_info,
|
||||
effort,
|
||||
summary,
|
||||
turn_metadata_header,
|
||||
compression,
|
||||
);
|
||||
|
||||
let stream_result = client
|
||||
.stream_prompt(&model_info.slug, &api_prompt, options)
|
||||
.await;
|
||||
let stream_result = client.stream_request(request, options).await;
|
||||
|
||||
match stream_result {
|
||||
Ok(stream) => {
|
||||
@@ -802,7 +763,6 @@ impl ModelClientSession {
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> Result<WebsocketStreamOutcome> {
|
||||
let auth_manager = self.client.state.auth_manager.clone();
|
||||
let api_prompt = Self::build_responses_request(prompt)?;
|
||||
|
||||
let mut auth_recovery = auth_manager
|
||||
.as_ref()
|
||||
@@ -811,14 +771,15 @@ impl ModelClientSession {
|
||||
let client_setup = self.client.current_client_setup().await?;
|
||||
let compression = self.responses_request_compression(client_setup.auth.as_ref());
|
||||
|
||||
let options = self.build_responses_options(
|
||||
let options = self.build_responses_options(turn_metadata_header, compression);
|
||||
let request = self.build_responses_request(
|
||||
&client_setup.api_provider,
|
||||
prompt,
|
||||
model_info,
|
||||
effort,
|
||||
summary,
|
||||
turn_metadata_header,
|
||||
compression,
|
||||
);
|
||||
)?;
|
||||
let ws_payload = ResponseCreateWsRequest::from(&request);
|
||||
|
||||
match self
|
||||
.websocket_connection(
|
||||
@@ -845,7 +806,7 @@ impl ModelClientSession {
|
||||
Err(err) => return Err(map_api_error(err)),
|
||||
}
|
||||
|
||||
let request = self.prepare_websocket_request(&model_info.slug, &api_prompt, &options);
|
||||
let (request, request_input) = self.prepare_websocket_request(ws_payload);
|
||||
|
||||
let stream_result = self
|
||||
.connection
|
||||
@@ -858,7 +819,7 @@ impl ModelClientSession {
|
||||
.stream_request(request)
|
||||
.await
|
||||
.map_err(map_api_error)?;
|
||||
self.websocket_last_items = api_prompt.input.clone();
|
||||
self.websocket_last_items = request_input;
|
||||
let (last_response_id_sender, last_response_id_receiver) = oneshot::channel();
|
||||
self.websocket_last_response_id_rx = Some(last_response_id_receiver);
|
||||
let mut last_response_id_sender = Some(last_response_id_sender);
|
||||
@@ -973,17 +934,6 @@ impl ModelClientSession {
|
||||
}
|
||||
}
|
||||
|
||||
/// Adapts the core `Prompt` type into the `codex-api` payload shape.
|
||||
fn build_api_prompt(prompt: &Prompt, instructions: String, tools_json: Vec<Value>) -> ApiPrompt {
|
||||
ApiPrompt {
|
||||
instructions,
|
||||
input: prompt.get_formatted_input(),
|
||||
tools: tools_json,
|
||||
parallel_tool_calls: prompt.parallel_tool_calls,
|
||||
output_schema: prompt.output_schema.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses per-turn metadata into an HTTP header value.
|
||||
///
|
||||
/// Invalid values are treated as absent so callers can compare and propagate
|
||||
|
||||
@@ -249,11 +249,11 @@ mod tests {
|
||||
let input: Vec<ResponseItem> = vec![];
|
||||
let tools: Vec<serde_json::Value> = vec![];
|
||||
let req = ResponsesApiRequest {
|
||||
model: "gpt-5.1",
|
||||
instructions: "i",
|
||||
input: &input,
|
||||
tools: &tools,
|
||||
tool_choice: "auto",
|
||||
model: "gpt-5.1".to_string(),
|
||||
instructions: "i".to_string(),
|
||||
input,
|
||||
tools,
|
||||
tool_choice: "auto".to_string(),
|
||||
parallel_tool_calls: true,
|
||||
reasoning: None,
|
||||
store: false,
|
||||
@@ -290,11 +290,11 @@ mod tests {
|
||||
create_text_param_for_request(None, &Some(schema.clone())).expect("text controls");
|
||||
|
||||
let req = ResponsesApiRequest {
|
||||
model: "gpt-5.1",
|
||||
instructions: "i",
|
||||
input: &input,
|
||||
tools: &tools,
|
||||
tool_choice: "auto",
|
||||
model: "gpt-5.1".to_string(),
|
||||
instructions: "i".to_string(),
|
||||
input,
|
||||
tools,
|
||||
tool_choice: "auto".to_string(),
|
||||
parallel_tool_calls: true,
|
||||
reasoning: None,
|
||||
store: false,
|
||||
@@ -326,11 +326,11 @@ mod tests {
|
||||
let input: Vec<ResponseItem> = vec![];
|
||||
let tools: Vec<serde_json::Value> = vec![];
|
||||
let req = ResponsesApiRequest {
|
||||
model: "gpt-5.1",
|
||||
instructions: "i",
|
||||
input: &input,
|
||||
tools: &tools,
|
||||
tool_choice: "auto",
|
||||
model: "gpt-5.1".to_string(),
|
||||
instructions: "i".to_string(),
|
||||
input,
|
||||
tools,
|
||||
tool_choice: "auto".to_string(),
|
||||
parallel_tool_calls: true,
|
||||
reasoning: None,
|
||||
store: false,
|
||||
|
||||
Reference in New Issue
Block a user