mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
829f5b6b59
## Why The app-server and exec-server expose separate JSON-RPC APIs, but exec-server currently sources its serialized protocol and envelope types through app-server-oriented code. Giving each API an explicit owner makes the crate boundary legible without introducing shared generic envelopes. ## What changed - Added `codex-exec-server-protocol` to own exec DTOs, process IDs, and JSON-RPC envelopes. - Updated exec-server clients, transports, handlers, and tests to use the new crate. - Exposed app-server's existing JSON-RPC types through a public `rpc` module while retaining root re-exports. - Preserved existing wire shapes, including exec `PathUri` behavior. ## Stack This is PR 1 of 6. Next: [PR #29721](https://github.com/openai/codex/pull/29721), which moves auth mode below the app wire boundary. ## Validation - Exec-server protocol and server coverage passed in the focused protocol test runs. - App-server protocol schema fixtures passed.
444 lines
14 KiB
Rust
444 lines
14 KiB
Rust
#![cfg(unix)]
|
|
|
|
mod common;
|
|
|
|
use codex_exec_server::ExecResponse;
|
|
use codex_exec_server::InitializeParams;
|
|
use codex_exec_server::InitializeResponse;
|
|
use codex_exec_server::ProcessId;
|
|
use codex_exec_server::ReadResponse;
|
|
use codex_exec_server::TerminateResponse;
|
|
use codex_exec_server::WriteResponse;
|
|
use codex_exec_server::WriteStatus;
|
|
use codex_exec_server_protocol::JSONRPCMessage;
|
|
use codex_exec_server_protocol::JSONRPCResponse;
|
|
use codex_utils_path_uri::PathUri;
|
|
use common::exec_server::exec_server;
|
|
use pretty_assertions::assert_eq;
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn exec_server_starts_process_over_websocket() -> anyhow::Result<()> {
|
|
let mut server = exec_server().await?;
|
|
let initialize_id = server
|
|
.send_request(
|
|
"initialize",
|
|
serde_json::to_value(InitializeParams {
|
|
client_name: "exec-server-test".to_string(),
|
|
resume_session_id: None,
|
|
})?,
|
|
)
|
|
.await?;
|
|
let _ = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id
|
|
)
|
|
})
|
|
.await?;
|
|
|
|
server
|
|
.send_notification("initialized", serde_json::json!({}))
|
|
.await?;
|
|
|
|
let process_start_id = server
|
|
.send_request(
|
|
"process/start",
|
|
serde_json::json!({
|
|
"processId": "proc-1",
|
|
"argv": ["true"],
|
|
"cwd": PathUri::from_host_native_path(std::env::current_dir()?)?,
|
|
"env": {},
|
|
"tty": false,
|
|
"pipeStdin": false,
|
|
"arg0": null
|
|
}),
|
|
)
|
|
.await?;
|
|
let response = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &process_start_id
|
|
)
|
|
})
|
|
.await?;
|
|
let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else {
|
|
panic!("expected process/start response");
|
|
};
|
|
assert_eq!(id, process_start_id);
|
|
let process_start_response: ExecResponse = serde_json::from_value(result)?;
|
|
assert_eq!(
|
|
process_start_response,
|
|
ExecResponse {
|
|
process_id: ProcessId::from("proc-1"),
|
|
}
|
|
);
|
|
|
|
server.shutdown().await?;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn exec_server_defaults_omitted_pipe_stdin_to_closed_stdin() -> anyhow::Result<()> {
|
|
let mut server = exec_server().await?;
|
|
let initialize_id = server
|
|
.send_request(
|
|
"initialize",
|
|
serde_json::to_value(InitializeParams {
|
|
client_name: "exec-server-test".to_string(),
|
|
resume_session_id: None,
|
|
})?,
|
|
)
|
|
.await?;
|
|
let _ = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id
|
|
)
|
|
})
|
|
.await?;
|
|
|
|
server
|
|
.send_notification("initialized", serde_json::json!({}))
|
|
.await?;
|
|
|
|
let process_start_id = server
|
|
.send_request(
|
|
"process/start",
|
|
serde_json::json!({
|
|
"processId": "proc-default-stdin",
|
|
"argv": [
|
|
"/bin/sh",
|
|
"-c",
|
|
"sleep 0.3; if IFS= read -r line; then printf 'read:%s\\n' \"$line\"; else printf 'eof\\n'; fi"
|
|
],
|
|
"cwd": PathUri::from_host_native_path(std::env::current_dir()?)?,
|
|
"env": {},
|
|
"tty": false,
|
|
"arg0": null
|
|
}),
|
|
)
|
|
.await?;
|
|
let response = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &process_start_id
|
|
)
|
|
})
|
|
.await?;
|
|
let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else {
|
|
panic!("expected process/start response");
|
|
};
|
|
let process_start_response: ExecResponse = serde_json::from_value(result)?;
|
|
assert_eq!(
|
|
process_start_response,
|
|
ExecResponse {
|
|
process_id: ProcessId::from("proc-default-stdin"),
|
|
}
|
|
);
|
|
|
|
let write_id = server
|
|
.send_request(
|
|
"process/write",
|
|
serde_json::json!({
|
|
"processId": "proc-default-stdin",
|
|
"chunk": "aWdub3JlZAo=",
|
|
"writeId": "write-default-stdin"
|
|
}),
|
|
)
|
|
.await?;
|
|
let response = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &write_id
|
|
)
|
|
})
|
|
.await?;
|
|
let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else {
|
|
panic!("expected process/write response");
|
|
};
|
|
let write_response: WriteResponse = serde_json::from_value(result)?;
|
|
assert_eq!(
|
|
write_response,
|
|
WriteResponse {
|
|
status: WriteStatus::StdinClosed
|
|
}
|
|
);
|
|
|
|
server.shutdown().await?;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn exec_server_dedupes_retried_process_write_ids() -> anyhow::Result<()> {
|
|
let mut server = exec_server().await?;
|
|
let initialize_id = server
|
|
.send_request(
|
|
"initialize",
|
|
serde_json::to_value(InitializeParams {
|
|
client_name: "exec-server-test".to_string(),
|
|
resume_session_id: None,
|
|
})?,
|
|
)
|
|
.await?;
|
|
let _ = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id
|
|
)
|
|
})
|
|
.await?;
|
|
|
|
server
|
|
.send_notification("initialized", serde_json::json!({}))
|
|
.await?;
|
|
|
|
let process_start_id = server
|
|
.send_request(
|
|
"process/start",
|
|
serde_json::json!({
|
|
"processId": "proc-write-id",
|
|
"argv": [
|
|
"/bin/sh",
|
|
"-c",
|
|
"IFS= read -r first; printf 'line:%s\\n' \"$first\"; IFS= read -r second; printf 'line:%s\\n' \"$second\""
|
|
],
|
|
"cwd": PathUri::from_host_native_path(std::env::current_dir()?)?,
|
|
"env": {},
|
|
"tty": false,
|
|
"pipeStdin": true,
|
|
"arg0": null
|
|
}),
|
|
)
|
|
.await?;
|
|
let _ = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &process_start_id
|
|
)
|
|
})
|
|
.await?;
|
|
|
|
for (write_id, chunk) in [
|
|
("write-1", "Zmlyc3QK"),
|
|
("write-1", "Zmlyc3QK"),
|
|
("write-2", "c2Vjb25kCg=="),
|
|
] {
|
|
let request_id = server
|
|
.send_request(
|
|
"process/write",
|
|
serde_json::json!({
|
|
"processId": "proc-write-id",
|
|
"chunk": chunk,
|
|
"writeId": write_id
|
|
}),
|
|
)
|
|
.await?;
|
|
let response = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &request_id
|
|
)
|
|
})
|
|
.await?;
|
|
let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else {
|
|
panic!("expected process/write response");
|
|
};
|
|
let write_response: WriteResponse = serde_json::from_value(result)?;
|
|
assert_eq!(
|
|
write_response,
|
|
WriteResponse {
|
|
status: WriteStatus::Accepted
|
|
}
|
|
);
|
|
}
|
|
|
|
let mut after_seq = None;
|
|
let mut output = Vec::new();
|
|
for _ in 0..5 {
|
|
let read_id = server
|
|
.send_request(
|
|
"process/read",
|
|
serde_json::json!({
|
|
"processId": "proc-write-id",
|
|
"afterSeq": after_seq,
|
|
"maxBytes": null,
|
|
"waitMs": 1000
|
|
}),
|
|
)
|
|
.await?;
|
|
let response = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &read_id
|
|
)
|
|
})
|
|
.await?;
|
|
let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else {
|
|
panic!("expected process/read response");
|
|
};
|
|
let read_response: ReadResponse = serde_json::from_value(result)?;
|
|
output.extend(
|
|
read_response
|
|
.chunks
|
|
.into_iter()
|
|
.flat_map(|chunk| chunk.chunk.into_inner()),
|
|
);
|
|
after_seq = Some(read_response.next_seq.saturating_sub(1));
|
|
if read_response.closed || output.ends_with(b"line:second\n") {
|
|
break;
|
|
}
|
|
}
|
|
|
|
assert_eq!(
|
|
String::from_utf8(output)?,
|
|
"line:first\nline:second\n".to_string()
|
|
);
|
|
|
|
server.shutdown().await?;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn exec_server_resumes_detached_session_without_killing_processes() -> anyhow::Result<()> {
|
|
let mut server = exec_server().await?;
|
|
let initialize_id = server
|
|
.send_request(
|
|
"initialize",
|
|
serde_json::to_value(InitializeParams {
|
|
client_name: "exec-server-test".to_string(),
|
|
resume_session_id: None,
|
|
})?,
|
|
)
|
|
.await?;
|
|
let response = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id
|
|
)
|
|
})
|
|
.await?;
|
|
let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else {
|
|
panic!("expected initialize response");
|
|
};
|
|
let initialize_response: InitializeResponse = serde_json::from_value(result)?;
|
|
|
|
server
|
|
.send_notification("initialized", serde_json::json!({}))
|
|
.await?;
|
|
|
|
let process_start_id = server
|
|
.send_request(
|
|
"process/start",
|
|
serde_json::json!({
|
|
"processId": "proc-resume",
|
|
"argv": ["/bin/sh", "-c", "sleep 5"],
|
|
"cwd": PathUri::from_host_native_path(std::env::current_dir()?)?,
|
|
"env": {},
|
|
"tty": false,
|
|
"pipeStdin": false,
|
|
"arg0": null
|
|
}),
|
|
)
|
|
.await?;
|
|
let _ = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &process_start_id
|
|
)
|
|
})
|
|
.await?;
|
|
|
|
server.disconnect_websocket().await?;
|
|
server.reconnect_websocket().await?;
|
|
|
|
let resume_initialize_id = server
|
|
.send_request(
|
|
"initialize",
|
|
serde_json::to_value(InitializeParams {
|
|
client_name: "exec-server-test".to_string(),
|
|
resume_session_id: Some(initialize_response.session_id.clone()),
|
|
})?,
|
|
)
|
|
.await?;
|
|
let response = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &resume_initialize_id
|
|
)
|
|
})
|
|
.await?;
|
|
let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else {
|
|
panic!("expected resume initialize response");
|
|
};
|
|
let resumed_response: InitializeResponse = serde_json::from_value(result)?;
|
|
assert_eq!(resumed_response, initialize_response);
|
|
|
|
server
|
|
.send_notification("initialized", serde_json::json!({}))
|
|
.await?;
|
|
|
|
let process_read_id = server
|
|
.send_request(
|
|
"process/read",
|
|
serde_json::json!({
|
|
"processId": "proc-resume",
|
|
"afterSeq": null,
|
|
"maxBytes": null,
|
|
"waitMs": 0
|
|
}),
|
|
)
|
|
.await?;
|
|
let response = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &process_read_id
|
|
)
|
|
})
|
|
.await?;
|
|
let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else {
|
|
panic!("expected process/read response");
|
|
};
|
|
let process_read_response: ReadResponse = serde_json::from_value(result)?;
|
|
assert!(process_read_response.failure.is_none());
|
|
assert!(!process_read_response.exited);
|
|
assert!(!process_read_response.closed);
|
|
|
|
let terminate_id = server
|
|
.send_request(
|
|
"process/terminate",
|
|
serde_json::json!({
|
|
"processId": "proc-resume"
|
|
}),
|
|
)
|
|
.await?;
|
|
let response = server
|
|
.wait_for_event(|event| {
|
|
matches!(
|
|
event,
|
|
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &terminate_id
|
|
)
|
|
})
|
|
.await?;
|
|
let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else {
|
|
panic!("expected process/terminate response");
|
|
};
|
|
let terminate_response: TerminateResponse = serde_json::from_value(result)?;
|
|
assert_eq!(terminate_response, TerminateResponse { running: true });
|
|
|
|
server.shutdown().await?;
|
|
Ok(())
|
|
}
|