mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
a4711b88dd
## Why `fs/readFile` buffers the entire file in one response, which makes large remote reads expensive and prevents callers from applying backpressure. We need an opt-in streaming path with bounded block sizes while preserving the existing single-call API for small and sandboxed reads. ## What changed - Add `ExecServerClient::stream`, returning a named `FileReadStream` that implements `futures::Stream` and yields immutable 1 MiB byte blocks. - Add internal `fs/open`, `fs/readBlock`, and `fs/close` RPCs. `fs/readBlock` accepts an explicit offset and length. - Keep unsandboxed files open between block reads, cap open handles per connection, and clean them up on EOF, error, stream drop, explicit close, or connection shutdown. - Reject platform-sandboxed streaming opens instead of turning the one-shot sandbox helper into a persistent server. Existing `fs/readFile` behavior is unchanged. ## Testing - `just test -p codex-exec-server` - Integration coverage for 1 MiB chunking, exact block-boundary EOF, sandbox rejection, and continued reads from the opened file after path replacement. - Handle-manager coverage for non-sequential offsets, variable block lengths, the 128-handle limit, and capacity release after close.
122 lines
3.8 KiB
Rust
122 lines
3.8 KiB
Rust
use bytes::Bytes;
|
|
use codex_utils_path_uri::PathUri;
|
|
use tokio::io;
|
|
use uuid::Uuid;
|
|
|
|
use super::map_remote_error;
|
|
use crate::ExecServerClient;
|
|
use crate::FILE_READ_CHUNK_SIZE;
|
|
use crate::FileSystemReadStream;
|
|
use crate::FileSystemResult;
|
|
use crate::FileSystemSandboxContext;
|
|
use crate::protocol::FS_READ_BLOCK_METHOD;
|
|
use crate::protocol::FsCloseParams;
|
|
use crate::protocol::FsOpenParams;
|
|
use crate::protocol::FsReadBlockParams;
|
|
|
|
struct FileReadRegistration {
|
|
client: ExecServerClient,
|
|
handle_id: String,
|
|
runtime: Option<tokio::runtime::Handle>,
|
|
active: bool,
|
|
}
|
|
|
|
pub(super) async fn open(
|
|
client: ExecServerClient,
|
|
path: PathUri,
|
|
sandbox: Option<FileSystemSandboxContext>,
|
|
) -> FileSystemResult<FileSystemReadStream> {
|
|
let registration = FileReadRegistration {
|
|
client,
|
|
handle_id: Uuid::new_v4().simple().to_string(),
|
|
runtime: tokio::runtime::Handle::try_current().ok(),
|
|
active: true,
|
|
};
|
|
registration
|
|
.client
|
|
.fs_open(FsOpenParams {
|
|
handle_id: registration.handle_id.clone(),
|
|
path,
|
|
sandbox,
|
|
})
|
|
.await
|
|
.map_err(map_remote_error)?;
|
|
Ok(FileSystemReadStream::new(futures::stream::try_unfold(
|
|
Some((registration, 0_u64)),
|
|
|state| async move {
|
|
let Some((mut registration, offset)) = state else {
|
|
return Ok(None);
|
|
};
|
|
let response = registration
|
|
.client
|
|
.fs_read_block(FsReadBlockParams {
|
|
handle_id: registration.handle_id.clone(),
|
|
offset,
|
|
len: FILE_READ_CHUNK_SIZE,
|
|
})
|
|
.await
|
|
.map_err(map_remote_error)?;
|
|
let chunk = Bytes::from(response.chunk.into_inner());
|
|
if chunk.len() > FILE_READ_CHUNK_SIZE {
|
|
return Err(io::Error::new(
|
|
io::ErrorKind::InvalidData,
|
|
format!(
|
|
"{FS_READ_BLOCK_METHOD} returned {} bytes, maximum is {}",
|
|
chunk.len(),
|
|
FILE_READ_CHUNK_SIZE
|
|
),
|
|
));
|
|
}
|
|
if response.eof {
|
|
if registration
|
|
.client
|
|
.fs_close(FsCloseParams {
|
|
handle_id: registration.handle_id.clone(),
|
|
})
|
|
.await
|
|
.is_ok()
|
|
{
|
|
registration.active = false;
|
|
}
|
|
return if chunk.is_empty() {
|
|
Ok(None)
|
|
} else {
|
|
Ok(Some((chunk, None)))
|
|
};
|
|
}
|
|
if chunk.is_empty() {
|
|
return Err(io::Error::new(
|
|
io::ErrorKind::InvalidData,
|
|
format!("{FS_READ_BLOCK_METHOD} returned an empty non-terminal block"),
|
|
));
|
|
}
|
|
let next_offset = offset.checked_add(chunk.len() as u64).ok_or_else(|| {
|
|
io::Error::new(
|
|
io::ErrorKind::InvalidData,
|
|
format!("{FS_READ_BLOCK_METHOD} offset overflowed after {offset} bytes"),
|
|
)
|
|
})?;
|
|
Ok(Some((chunk, Some((registration, next_offset)))))
|
|
},
|
|
)))
|
|
}
|
|
|
|
impl Drop for FileReadRegistration {
|
|
fn drop(&mut self) {
|
|
if !self.active {
|
|
return;
|
|
}
|
|
let client = self.client.clone();
|
|
let handle_id = self.handle_id.clone();
|
|
let runtime = self
|
|
.runtime
|
|
.clone()
|
|
.or_else(|| tokio::runtime::Handle::try_current().ok());
|
|
if let Some(runtime) = runtime {
|
|
runtime.spawn(async move {
|
|
let _ = client.fs_close(FsCloseParams { handle_id }).await;
|
|
});
|
|
}
|
|
}
|
|
}
|