Files
pakrym-oai a4711b88dd [codex] exec-server: stream files in chunks (#28354)
## Why

`fs/readFile` buffers the entire file in one response, which makes large
remote reads expensive and prevents callers from applying backpressure.
We need an opt-in streaming path with bounded block sizes while
preserving the existing single-call API for small and sandboxed reads.

## What changed

- Add `ExecServerClient::stream`, returning a named `FileReadStream`
that implements `futures::Stream` and yields immutable 1 MiB byte
blocks.
- Add internal `fs/open`, `fs/readBlock`, and `fs/close` RPCs.
`fs/readBlock` accepts an explicit offset and length.
- Keep unsandboxed files open between block reads, cap open handles per
connection, and clean them up on EOF, error, stream drop, explicit
close, or connection shutdown.
- Reject platform-sandboxed streaming opens instead of turning the
one-shot sandbox helper into a persistent server. Existing `fs/readFile`
behavior is unchanged.

## Testing

- `just test -p codex-exec-server`
- Integration coverage for 1 MiB chunking, exact block-boundary EOF,
sandbox rejection, and continued reads from the opened file after path
replacement.
- Handle-manager coverage for non-sequential offsets, variable block
lengths, the 128-handle limit, and capacity release after close.
2026-06-16 09:50:55 -07:00

122 lines
3.8 KiB
Rust

use bytes::Bytes;
use codex_utils_path_uri::PathUri;
use tokio::io;
use uuid::Uuid;
use super::map_remote_error;
use crate::ExecServerClient;
use crate::FILE_READ_CHUNK_SIZE;
use crate::FileSystemReadStream;
use crate::FileSystemResult;
use crate::FileSystemSandboxContext;
use crate::protocol::FS_READ_BLOCK_METHOD;
use crate::protocol::FsCloseParams;
use crate::protocol::FsOpenParams;
use crate::protocol::FsReadBlockParams;
struct FileReadRegistration {
client: ExecServerClient,
handle_id: String,
runtime: Option<tokio::runtime::Handle>,
active: bool,
}
pub(super) async fn open(
client: ExecServerClient,
path: PathUri,
sandbox: Option<FileSystemSandboxContext>,
) -> FileSystemResult<FileSystemReadStream> {
let registration = FileReadRegistration {
client,
handle_id: Uuid::new_v4().simple().to_string(),
runtime: tokio::runtime::Handle::try_current().ok(),
active: true,
};
registration
.client
.fs_open(FsOpenParams {
handle_id: registration.handle_id.clone(),
path,
sandbox,
})
.await
.map_err(map_remote_error)?;
Ok(FileSystemReadStream::new(futures::stream::try_unfold(
Some((registration, 0_u64)),
|state| async move {
let Some((mut registration, offset)) = state else {
return Ok(None);
};
let response = registration
.client
.fs_read_block(FsReadBlockParams {
handle_id: registration.handle_id.clone(),
offset,
len: FILE_READ_CHUNK_SIZE,
})
.await
.map_err(map_remote_error)?;
let chunk = Bytes::from(response.chunk.into_inner());
if chunk.len() > FILE_READ_CHUNK_SIZE {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"{FS_READ_BLOCK_METHOD} returned {} bytes, maximum is {}",
chunk.len(),
FILE_READ_CHUNK_SIZE
),
));
}
if response.eof {
if registration
.client
.fs_close(FsCloseParams {
handle_id: registration.handle_id.clone(),
})
.await
.is_ok()
{
registration.active = false;
}
return if chunk.is_empty() {
Ok(None)
} else {
Ok(Some((chunk, None)))
};
}
if chunk.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("{FS_READ_BLOCK_METHOD} returned an empty non-terminal block"),
));
}
let next_offset = offset.checked_add(chunk.len() as u64).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("{FS_READ_BLOCK_METHOD} offset overflowed after {offset} bytes"),
)
})?;
Ok(Some((chunk, Some((registration, next_offset)))))
},
)))
}
impl Drop for FileReadRegistration {
fn drop(&mut self) {
if !self.active {
return;
}
let client = self.client.clone();
let handle_id = self.handle_id.clone();
let runtime = self
.runtime
.clone()
.or_else(|| tokio::runtime::Handle::try_current().ok());
if let Some(runtime) = runtime {
runtime.spawn(async move {
let _ = client.fs_close(FsCloseParams { handle_id }).await;
});
}
}
}