exec-server: add Noise relay transport (#26242)

## Why

Rendezvous forwards traffic between the orchestrator and exec-server.
The endpoints need to authenticate each other and encrypt that traffic
without trusting Rendezvous with plaintext or endpoint keys.

## Changes

- Adds a hybrid Noise IK channel through Clatter using X25519,
ML-KEM-768, AES-256-GCM, and SHA-256.
- Binds each handshake to `environment_id`, `executor_registration_id`,
and `stream_id`.
- Pins the registry-provided executor key and carries the harness
authorization inside the encrypted handshake.
- Orders relay frames before consuming Noise nonces and fragments large
JSON-RPC messages into bounded records.
- Bounds handshake payloads, frames, streams, and message reassembly.

Runtime activation is in
[openai/codex#26245](https://github.com/openai/codex/pull/26245).

## Stack

1. **[openai/codex#26242](https://github.com/openai/codex/pull/26242)**:
Noise channel and relay transport
2. [openai/codex#26245](https://github.com/openai/codex/pull/26245):
remote registration and runtime activation

## Verification

- `just test -p codex-exec-server`
- Oversized initiator payload regression coverage
- `just fix -p codex-exec-server`
- `just bazel-lock-check`
- `cargo shear`

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
viyatb-oai
2026-06-15 16:39:41 -07:00
committed by GitHub
Unverified
parent e512e884ed
commit 428cd44154
24 changed files with 1530 additions and 47 deletions
+1
View File
@@ -15,6 +15,7 @@ arc-swap = { workspace = true }
axum = { workspace = true, features = ["http1", "tokio", "ws"] }
base64 = { workspace = true }
bytes = { workspace = true }
clatter = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-api = { workspace = true }
codex-client = { workspace = true }
+1
View File
@@ -231,6 +231,7 @@ impl LazyRemoteExecServerClient {
if matches!(
&self.transport_params,
ExecServerTransportParams::WebSocketUrl { .. }
| ExecServerTransportParams::NoiseRendezvous { .. }
) =>
{
ExecServerClient::connect_for_transport(self.transport_params.clone()).await?
+73 -1
View File
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use futures::future::BoxFuture;
@@ -8,6 +9,8 @@ use crate::ExecServerError;
use crate::HttpRequestParams;
use crate::HttpRequestResponse;
use crate::HttpResponseBodyStream;
use crate::NoiseChannelIdentity;
use crate::NoiseChannelPublicKey;
pub(crate) const DEFAULT_REMOTE_EXEC_SERVER_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
pub(crate) const DEFAULT_REMOTE_EXEC_SERVER_INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
@@ -30,6 +33,43 @@ pub struct RemoteExecServerConnectArgs {
pub resume_session_id: Option<String>,
}
/// Registry-authorized material for one Noise rendezvous connection attempt.
///
/// Treat this as an atomic, single-use bundle. The URL authorization, executor
/// registration, pinned executor key, and harness-key authorization describe one
/// physical connection attempt and must not be mixed with values from another
/// registry response.
pub struct NoiseRendezvousConnectBundle {
pub websocket_url: String,
pub environment_id: String,
pub executor_registration_id: String,
pub executor_public_key: NoiseChannelPublicKey,
pub harness_key_authorization: String,
}
/// Connection arguments for an authenticated Noise rendezvous exec-server.
///
/// `harness_identity` identifies the logical harness endpoint and may be reused
/// across reconnects. In contrast, callers must supply a fresh
/// [`NoiseRendezvousConnectBundle`] for each physical connection attempt.
pub struct NoiseRendezvousConnectArgs {
pub bundle: NoiseRendezvousConnectBundle,
pub harness_identity: NoiseChannelIdentity,
pub client_name: String,
pub connect_timeout: Duration,
pub initialize_timeout: Duration,
pub resume_session_id: Option<String>,
}
/// Supplies fresh registry-authorized material for Noise rendezvous connections.
pub trait NoiseRendezvousConnectProvider: Send + Sync {
/// Fetch a bundle authorizing this harness key for one physical connection.
fn connect_bundle(
&self,
harness_public_key: NoiseChannelPublicKey,
) -> BoxFuture<'_, Result<NoiseRendezvousConnectBundle, ExecServerError>>;
}
/// Stdio connection arguments for a command-backed exec-server.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct StdioExecServerConnectArgs {
@@ -49,13 +89,17 @@ pub(crate) struct StdioExecServerCommand {
}
/// Parameters used to connect to a remote exec-server environment.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Clone)]
pub(crate) enum ExecServerTransportParams {
WebSocketUrl {
websocket_url: String,
connect_timeout: Duration,
initialize_timeout: Duration,
},
NoiseRendezvous {
provider: Arc<dyn NoiseRendezvousConnectProvider>,
identity: NoiseChannelIdentity,
},
#[allow(dead_code)]
StdioCommand {
command: StdioExecServerCommand,
@@ -63,6 +107,34 @@ pub(crate) enum ExecServerTransportParams {
},
}
impl std::fmt::Debug for ExecServerTransportParams {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::WebSocketUrl {
websocket_url,
connect_timeout,
initialize_timeout,
} => f
.debug_struct("WebSocketUrl")
.field("websocket_url", websocket_url)
.field("connect_timeout", connect_timeout)
.field("initialize_timeout", initialize_timeout)
.finish(),
Self::NoiseRendezvous { .. } => {
f.debug_struct("NoiseRendezvous").finish_non_exhaustive()
}
Self::StdioCommand {
command,
initialize_timeout,
} => f
.debug_struct("StdioCommand")
.field("command", command)
.field("initialize_timeout", initialize_timeout)
.finish(),
}
}
}
impl ExecServerTransportParams {
pub(crate) fn websocket_url(websocket_url: String) -> Self {
Self::WebSocketUrl {
@@ -4,6 +4,7 @@ use tokio::io::BufReader;
use tokio::process::Command;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::connect_async_with_config;
use tracing::debug;
use tracing::warn;
@@ -11,15 +12,25 @@ use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
use crate::ExecServerClient;
use crate::ExecServerError;
use crate::client_api::DEFAULT_REMOTE_EXEC_SERVER_CONNECT_TIMEOUT;
use crate::client_api::DEFAULT_REMOTE_EXEC_SERVER_INITIALIZE_TIMEOUT;
use crate::client_api::NoiseRendezvousConnectArgs;
use crate::client_api::NoiseRendezvousConnectBundle;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::client_api::StdioExecServerCommand;
use crate::client_api::StdioExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::noise_relay::NoiseHarnessConnectionArgs;
use crate::noise_relay::noise_harness_connection_from_websocket;
use crate::noise_relay::noise_relay_websocket_config;
use crate::relay::harness_connection_from_websocket;
const ENVIRONMENT_CLIENT_NAME: &str = "codex-environment";
impl ExecServerClient {
/// Open the selected transport and run the common JSON-RPC initialization.
/// Noise connection details are fetched here so reconnects get a fresh URL
/// and authorization without replacing the harness identity.
pub(crate) async fn connect_for_transport(
transport_params: crate::client_api::ExecServerTransportParams,
) -> Result<Self, ExecServerError> {
@@ -38,6 +49,21 @@ impl ExecServerClient {
})
.await
}
crate::client_api::ExecServerTransportParams::NoiseRendezvous {
provider,
identity,
} => {
let bundle = provider.connect_bundle(identity.public_key()).await?;
Self::connect_noise_rendezvous(NoiseRendezvousConnectArgs {
bundle,
harness_identity: identity,
client_name: ENVIRONMENT_CLIENT_NAME.to_string(),
connect_timeout: DEFAULT_REMOTE_EXEC_SERVER_CONNECT_TIMEOUT,
initialize_timeout: DEFAULT_REMOTE_EXEC_SERVER_INITIALIZE_TIMEOUT,
resume_session_id: None,
})
.await
}
crate::client_api::ExecServerTransportParams::StdioCommand {
command,
initialize_timeout,
@@ -79,6 +105,76 @@ impl ExecServerClient {
Self::connect(connection, args.into()).await
}
/// Connect to one exec-server through an authenticated rendezvous stream.
/// The executor key is pinned before JSON-RPC starts; the websocket carries
/// only ciphertext after that.
pub async fn connect_noise_rendezvous(
args: NoiseRendezvousConnectArgs,
) -> Result<Self, ExecServerError> {
ensure_rustls_crypto_provider();
// Keep the registry-issued URL, key, and authorization together for this
// connection attempt.
let NoiseRendezvousConnectArgs {
bundle,
harness_identity,
client_name,
connect_timeout,
initialize_timeout,
resume_session_id,
} = args;
let NoiseRendezvousConnectBundle {
websocket_url,
environment_id,
executor_registration_id,
executor_public_key,
harness_key_authorization,
} = bundle;
let diagnostic_url = websocket_url
.split(['?', '#'])
.next()
.unwrap_or(websocket_url.as_str())
.to_string();
let (stream, _) = timeout(
connect_timeout,
connect_async_with_config(
websocket_url.as_str(),
Some(noise_relay_websocket_config()),
/*disable_nagle*/ false,
),
)
.await
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
url: diagnostic_url.clone(),
timeout: connect_timeout,
})?
.map_err(|source| ExecServerError::WebSocketConnect {
url: diagnostic_url.clone(),
source,
})?;
let connection_label = format!("Noise exec-server rendezvous websocket {diagnostic_url}");
let connection = noise_harness_connection_from_websocket(
stream,
NoiseHarnessConnectionArgs {
connection_label,
environment_id,
executor_registration_id,
identity: harness_identity,
responder_public_key: executor_public_key,
harness_key_authorization,
},
);
Self::connect(
connection,
crate::client_api::ExecServerClientConnectOptions {
client_name,
initialize_timeout,
resume_session_id,
},
)
.await
}
pub(crate) async fn connect_stdio_command(
args: StdioExecServerConnectArgs,
) -> Result<Self, ExecServerError> {
+1
View File
@@ -44,6 +44,7 @@ pub(crate) enum JsonRpcConnectionEvent {
#[derive(Clone)]
pub(crate) enum JsonRpcTransport {
// Plain means no child process; transport bytes may still be encrypted.
Plain,
Stdio { transport: StdioTransport },
}
+34
View File
@@ -9,6 +9,8 @@ use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
use crate::ExecutorFileSystem;
use crate::HttpClient;
use crate::NoiseChannelIdentity;
use crate::NoiseRendezvousConnectProvider;
use crate::client::LazyRemoteExecServerClient;
use crate::client::http_client::ReqwestHttpClient;
use crate::client_api::ExecServerTransportParams;
@@ -282,6 +284,37 @@ impl EnvironmentManager {
.insert(environment_id, Arc::new(environment));
Ok(())
}
/// Adds or replaces a named remote environment that connects through an
/// authenticated, end-to-end encrypted rendezvous stream.
///
/// The provider is retained so every reconnect obtains fresh authorization.
/// This transport never falls back to the URL-only remote environment path.
pub fn upsert_noise_environment(
&self,
environment_id: String,
provider: Arc<dyn NoiseRendezvousConnectProvider>,
) -> Result<(), ExecServerError> {
if environment_id.is_empty() {
return Err(ExecServerError::Protocol(
"environment id cannot be empty".to_string(),
));
}
let identity = NoiseChannelIdentity::generate().map_err(|error| {
ExecServerError::Protocol(format!(
"failed to generate Noise harness identity: {error}"
))
})?;
let environment = Environment::remote_with_transport(
ExecServerTransportParams::NoiseRendezvous { provider, identity },
self.local_runtime_paths.clone(),
);
self.environments
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(environment_id, Arc::new(environment));
Ok(())
}
}
/// Concrete execution/filesystem environment selected for a session.
@@ -420,6 +453,7 @@ impl Environment {
websocket_url: exec_server_url,
..
} => Some(exec_server_url.clone()),
ExecServerTransportParams::NoiseRendezvous { .. } => None,
ExecServerTransportParams::StdioCommand { .. } => None,
};
let client = LazyRemoteExecServerClient::new(remote_transport.clone());
+44 -27
View File
@@ -48,7 +48,7 @@ struct EnvironmentToml {
initialize_timeout_sec: Option<Duration>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug)]
struct TomlEnvironmentProvider {
default: EnvironmentDefault,
include_local: bool,
@@ -580,18 +580,26 @@ mod tests {
)
.expect("provider");
let ExecServerTransportParams::StdioCommand {
command,
initialize_timeout,
} = &provider.environments[0].1
else {
panic!("expected stdio transport");
};
assert_eq!(
provider.environments[0].1,
ExecServerTransportParams::StdioCommand {
command: StdioExecServerCommand {
program: "ssh".to_string(),
args: Vec::new(),
env: HashMap::new(),
cwd: Some(config_dir.path().join("workspace")),
},
initialize_timeout: DEFAULT_REMOTE_EXEC_SERVER_INITIALIZE_TIMEOUT,
command,
&StdioExecServerCommand {
program: "ssh".to_string(),
args: Vec::new(),
env: HashMap::new(),
cwd: Some(config_dir.path().join("workspace")),
}
);
assert_eq!(
*initialize_timeout,
DEFAULT_REMOTE_EXEC_SERVER_INITIALIZE_TIMEOUT
);
}
#[test]
@@ -617,26 +625,35 @@ mod tests {
})
.expect("provider");
let ExecServerTransportParams::WebSocketUrl {
websocket_url,
connect_timeout,
initialize_timeout,
} = &provider.environments[0].1
else {
panic!("expected websocket transport");
};
assert_eq!(websocket_url, "ws://127.0.0.1:8765");
assert_eq!(*connect_timeout, Duration::from_secs(12));
assert_eq!(*initialize_timeout, Duration::from_secs(34));
let ExecServerTransportParams::StdioCommand {
command,
initialize_timeout,
} = &provider.environments[1].1
else {
panic!("expected stdio transport");
};
assert_eq!(
provider.environments[0].1,
ExecServerTransportParams::WebSocketUrl {
websocket_url: "ws://127.0.0.1:8765".to_string(),
connect_timeout: Duration::from_secs(12),
initialize_timeout: Duration::from_secs(34),
}
);
assert_eq!(
provider.environments[1].1,
ExecServerTransportParams::StdioCommand {
command: StdioExecServerCommand {
program: "ssh".to_string(),
args: Vec::new(),
env: HashMap::new(),
cwd: None,
},
initialize_timeout: Duration::from_secs(56),
command,
&StdioExecServerCommand {
program: "ssh".to_string(),
args: Vec::new(),
env: HashMap::new(),
cwd: None,
}
);
assert_eq!(*initialize_timeout, Duration::from_secs(56));
}
#[test]
+8
View File
@@ -10,6 +10,8 @@ mod fs_helper_main;
mod fs_sandbox;
mod local_file_system;
mod local_process;
mod noise_channel;
mod noise_relay;
mod process;
mod process_id;
mod protocol;
@@ -29,6 +31,9 @@ pub use client::http_client::HttpResponseBodyStream;
pub use client::http_client::ReqwestHttpClient;
pub use client_api::ExecServerClientConnectOptions;
pub use client_api::HttpClient;
pub use client_api::NoiseRendezvousConnectArgs;
pub use client_api::NoiseRendezvousConnectBundle;
pub use client_api::NoiseRendezvousConnectProvider;
pub use client_api::RemoteExecServerConnectArgs;
pub use codex_file_system::CopyOptions;
pub use codex_file_system::CreateDirectoryOptions;
@@ -51,6 +56,9 @@ pub use fs_helper::CODEX_FS_HELPER_ARG1;
pub use fs_helper_main::main as run_fs_helper_main;
pub use local_file_system::LOCAL_FS;
pub use local_file_system::LocalFileSystem;
pub use noise_channel::NoiseChannelError;
pub use noise_channel::NoiseChannelIdentity;
pub use noise_channel::NoiseChannelPublicKey;
pub use process::ExecBackend;
pub use process::ExecBackendFuture;
pub use process::ExecProcess;
+261
View File
@@ -0,0 +1,261 @@
//! Noise channel used by the remote exec-server relay.
//!
//! The harness initiates hybrid IK and pins the exec-server static key returned
//! by the registry. The first handshake message lets the exec-server authenticate
//! the harness static key; the exec-server then asks the registry whether that
//! key is authorized before completing the handshake.
//!
//! "Hybrid" means the session keys include both X25519 and ML-KEM-768 key
//! agreement. Once the two-message handshake finishes, AES-GCM protects the
//! ordered transport records carrying JSON-RPC.
use base64::Engine;
use base64::engine::general_purpose::STANDARD;
use clatter::HybridHandshake;
use clatter::HybridHandshakeParams;
use clatter::KeyPair;
use clatter::bytearray::ByteArray;
use clatter::constants::MAX_MESSAGE_LEN;
use clatter::crypto::cipher::AesGcm;
use clatter::crypto::dh::X25519;
use clatter::crypto::hash::Sha256;
use clatter::crypto::kem::rust_crypto_ml_kem::MlKem768;
use clatter::handshakepattern::noise_hybrid_ik;
use clatter::traits::Cipher;
use clatter::traits::Dh;
use clatter::traits::Handshaker;
use clatter::traits::Kem;
use clatter::transportstate::TransportState;
use serde::Deserialize;
use serde::Serialize;
/// Identifies the handshake pattern and algorithms used by this channel.
pub(crate) const NOISE_CHANNEL_SUITE: &str = "Noise_hybridIK_X25519+MLKEM768_AESGCM_SHA256";
const PROLOGUE_DOMAIN: &[u8] = b"codex-exec-server-relay-noise/v1";
type Handshake = HybridHandshake<X25519, MlKem768, MlKem768, AesGcm, Sha256>;
type Transport = TransportState<AesGcm, Sha256>;
type DhKeyPair = KeyPair<<X25519 as Dh>::PubKey, <X25519 as Dh>::PrivateKey>;
type MlKem768PublicKey = <MlKem768 as Kem>::PubKey;
type KemKeyPair = KeyPair<<MlKem768 as Kem>::PubKey, <MlKem768 as Kem>::SecretKey>;
/// Public key material for the exec-server Noise suite.
/// The suite tag prevents keys for another protocol from being accepted just
/// because their components have the expected lengths.
#[derive(Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct NoiseChannelPublicKey {
suite: String,
x25519_public_key: String,
mlkem768_public_key: String,
}
impl std::fmt::Debug for NoiseChannelPublicKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NoiseChannelPublicKey")
.field("suite", &self.suite)
.field("x25519_public_key", &"<redacted>")
.field("mlkem768_public_key", &"<redacted>")
.finish()
}
}
impl NoiseChannelPublicKey {
/// Decode registry-provided key material before passing it to Clatter.
fn decode(&self) -> Result<(<X25519 as Dh>::PubKey, MlKem768PublicKey), NoiseChannelError> {
if self.suite != NOISE_CHANNEL_SUITE {
return Err(NoiseChannelError::InvalidPublicKey(
"unsupported Noise channel suite",
));
}
let dh = STANDARD
.decode(&self.x25519_public_key)
.map_err(|_| NoiseChannelError::InvalidPublicKey("invalid X25519 public key"))?;
let dh: <X25519 as Dh>::PubKey = dh
.try_into()
.map_err(|_| NoiseChannelError::InvalidPublicKey("invalid X25519 public key length"))?;
let kem = STANDARD
.decode(&self.mlkem768_public_key)
.map_err(|_| NoiseChannelError::InvalidPublicKey("invalid ML-KEM-768 public key"))?;
if kem.len() != MlKem768PublicKey::LENGTH {
return Err(NoiseChannelError::InvalidPublicKey(
"invalid ML-KEM-768 public key length",
));
}
Ok((dh, MlKem768PublicKey::from_slice(kem.as_slice())))
}
}
/// Static Noise identity kept for the lifetime of an executor or harness process.
#[derive(Clone)]
pub struct NoiseChannelIdentity {
dh: DhKeyPair,
kem: KemKeyPair,
}
impl NoiseChannelIdentity {
pub fn generate() -> Result<Self, NoiseChannelError> {
let dh = X25519::genkey()
.map_err(|error| NoiseChannelError::KeyGeneration(error.to_string()))?;
let kem = MlKem768::genkey()
.map_err(|error| NoiseChannelError::KeyGeneration(error.to_string()))?;
Ok(Self { dh, kem })
}
pub fn public_key(&self) -> NoiseChannelPublicKey {
NoiseChannelPublicKey {
suite: NOISE_CHANNEL_SUITE.to_string(),
x25519_public_key: STANDARD.encode(self.dh.public),
mlkem768_public_key: STANDARD.encode(self.kem.public.as_slice()),
}
}
}
/// Harness-side state between the two hybrid-IK messages.
/// Consuming it in [`Self::finish`] keeps a handshake tied to one relay stream.
pub(crate) struct InitiatorHandshake {
handshake: Handshake,
}
impl InitiatorHandshake {
/// Start hybrid IK and pin the expected executor key.
/// `payload` carries the short-lived registry authorization inside the first
/// encrypted handshake message.
pub(crate) fn start(
identity: &NoiseChannelIdentity,
responder_public_key: &NoiseChannelPublicKey,
prologue: &[u8],
payload: &[u8],
) -> Result<(Self, Vec<u8>), NoiseChannelError> {
let (responder_dh, responder_kem) = responder_public_key.decode()?;
// Both executor key components are pinned before any JSON-RPC is sent.
let params = HybridHandshakeParams::new(noise_hybrid_ik(), true)
.with_prologue(prologue)
.with_s(identity.dh.clone())
.with_s_kem(identity.kem.clone())
.with_rs(responder_dh)
.with_rs_kem(responder_kem);
let mut handshake = Handshake::new(params)?;
let overhead = handshake.get_next_message_overhead()?;
if payload.len() > MAX_MESSAGE_LEN - overhead {
return Err(NoiseChannelError::InvalidMessage(
"handshake payload is too large",
));
}
let mut output = [0u8; MAX_MESSAGE_LEN];
let output_len = handshake.write_message(payload, &mut output)?;
Ok((Self { handshake }, output[..output_len].to_vec()))
}
/// Consume the executor response and enter transport mode.
/// The v1 response does not carry an application payload.
pub(crate) fn finish(mut self, response: &[u8]) -> Result<NoiseTransport, NoiseChannelError> {
ensure_noise_frame_len(response.len(), "handshake response is too large")?;
let mut payload = [0u8; MAX_MESSAGE_LEN];
let payload_len = self.handshake.read_message(response, &mut payload)?;
if payload_len != 0 {
return Err(NoiseChannelError::InvalidMessage(
"handshake response payload must be empty",
));
}
Ok(NoiseTransport {
transport: self.handshake.finalize()?,
})
}
}
/// Established channel with independent implicit send and receive nonces.
/// Relay records must be ordered before decryption, and a logical record must
/// not be encrypted again for retry.
pub(crate) struct NoiseTransport {
transport: Transport,
}
impl NoiseTransport {
/// Encrypt the next transport record.
pub(crate) fn encrypt(&mut self, plaintext: &[u8]) -> Result<Vec<u8>, NoiseChannelError> {
let frame_len = plaintext.len().checked_add(AesGcm::tag_len()).ok_or(
NoiseChannelError::InvalidMessage("transport plaintext is too large"),
)?;
ensure_noise_frame_len(frame_len, "transport plaintext is too large")?;
Ok(self.transport.send_vec(plaintext)?)
}
/// Decrypt the next ordered transport record.
pub(crate) fn decrypt(&mut self, ciphertext: &[u8]) -> Result<Vec<u8>, NoiseChannelError> {
if ciphertext.len() < AesGcm::tag_len() {
return Err(NoiseChannelError::InvalidMessage(
"transport ciphertext is too short",
));
}
ensure_noise_frame_len(ciphertext.len(), "transport ciphertext is too large")?;
Ok(self.transport.receive_vec(ciphertext)?)
}
}
/// Bind the handshake to one environment registration and relay stream.
/// Both peers include these values in the Noise transcript before processing
/// the first handshake message.
pub(crate) fn noise_channel_prologue(
environment_id: &str,
executor_registration_id: &str,
stream_id: &str,
) -> Vec<u8> {
let mut prologue = Vec::new();
append_prologue_part(&mut prologue, PROLOGUE_DOMAIN);
append_prologue_part(&mut prologue, environment_id.as_bytes());
append_prologue_part(&mut prologue, executor_registration_id.as_bytes());
append_prologue_part(&mut prologue, stream_id.as_bytes());
prologue
}
fn append_prologue_part(prologue: &mut Vec<u8>, part: &[u8]) {
// Length prefixes make component boundaries unambiguous. Raw concatenation
// would allow different identifier tuples to produce the same prologue.
let len = part.len() as u64;
prologue.extend_from_slice(&len.to_be_bytes());
prologue.extend_from_slice(part);
}
fn ensure_noise_frame_len(
frame_len: usize,
message: &'static str,
) -> Result<(), NoiseChannelError> {
if frame_len > MAX_MESSAGE_LEN {
return Err(NoiseChannelError::InvalidMessage(message));
}
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum NoiseChannelError {
#[error("Noise channel key generation failed: {0}")]
KeyGeneration(String),
#[error("invalid Noise channel public key: {0}")]
InvalidPublicKey(&'static str),
#[error("invalid Noise channel message: {0}")]
InvalidMessage(&'static str),
#[error("Noise channel handshake failed: {0}")]
Handshake(String),
#[error("Noise channel transport failed: {0}")]
Transport(String),
}
impl From<clatter::error::HandshakeError> for NoiseChannelError {
fn from(error: clatter::error::HandshakeError) -> Self {
Self::Handshake(error.to_string())
}
}
impl From<clatter::error::TransportError> for NoiseChannelError {
fn from(error: clatter::error::TransportError) -> Self {
Self::Transport(error.to_string())
}
}
#[cfg(test)]
#[path = "noise_channel_tests.rs"]
mod tests;
@@ -0,0 +1,51 @@
use pretty_assertions::assert_eq;
use super::InitiatorHandshake;
use super::MAX_MESSAGE_LEN;
use super::NOISE_CHANNEL_SUITE;
use super::NoiseChannelError;
use super::NoiseChannelIdentity;
use super::NoiseChannelPublicKey;
#[test]
fn public_key_validation_rejects_unknown_suite() {
let key = NoiseChannelIdentity::generate()
.expect("generate identity")
.public_key();
let json = serde_json::to_value(key).expect("serialize key");
let mut object = json.as_object().expect("key object").clone();
object.insert("suite".to_string(), serde_json::json!("unknown"));
let key: NoiseChannelPublicKey =
serde_json::from_value(serde_json::Value::Object(object)).expect("deserialize key");
let initiator = NoiseChannelIdentity::generate().expect("generate initiator identity");
assert!(InitiatorHandshake::start(&initiator, &key, b"prologue", b"").is_err());
}
#[test]
fn public_key_serializes_with_expected_suite() {
let key = NoiseChannelIdentity::generate()
.expect("generate identity")
.public_key();
let json = serde_json::to_value(key).expect("serialize key");
assert_eq!(json["suite"], NOISE_CHANNEL_SUITE);
}
#[test]
fn initiator_rejects_oversized_handshake_payload() {
let initiator = NoiseChannelIdentity::generate().expect("generate initiator identity");
let responder = NoiseChannelIdentity::generate().expect("generate responder identity");
let payload = vec![0; MAX_MESSAGE_LEN];
let result =
InitiatorHandshake::start(&initiator, &responder.public_key(), b"prologue", &payload);
assert!(matches!(
result,
Err(NoiseChannelError::InvalidMessage(
"handshake payload is too large"
))
));
}
@@ -0,0 +1,427 @@
//! Harness side of the Noise relay.
//!
//! The rendezvous service routes frames by `stream_id`, but does not authenticate
//! the executor or see JSON-RPC plaintext. We claim a stream, complete hybrid IK
//! against the registry-provided executor key, and then expose the result as a
//! normal `JsonRpcConnection`. Outbound JSON-RPC is framed and split into Noise
//! records; inbound records are reordered before decryption and reassembly.
use futures::Sink;
use futures::SinkExt;
use futures::Stream;
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio_tungstenite::tungstenite::Message;
use tracing::debug;
use tracing::warn;
use uuid::Uuid;
use crate::ExecServerError;
use crate::connection::CHANNEL_CAPACITY;
use crate::connection::JsonRpcConnection;
use crate::connection::JsonRpcConnectionEvent;
use crate::connection::JsonRpcTransport;
use crate::noise_channel::InitiatorHandshake;
use crate::noise_channel::NoiseChannelIdentity;
use crate::noise_channel::NoiseChannelPublicKey;
use crate::noise_channel::NoiseTransport;
use crate::noise_channel::noise_channel_prologue;
use crate::noise_relay::message_framing::JsonRpcMessageDecoder;
use crate::noise_relay::message_framing::NOISE_RECORD_PLAINTEXT_LEN;
use crate::noise_relay::message_framing::frame_jsonrpc_message;
use crate::noise_relay::ordered_ciphertext::OrderedCiphertextFrames;
use crate::noise_relay::take_next_sequence;
use crate::relay::RelayFrameBodyKind;
use crate::relay::decode_relay_message_frame;
use crate::relay::encode_relay_message_frame;
use crate::relay_proto::RelayData;
use crate::relay_proto::RelayMessageFrame;
/// Values that bind one harness websocket to the intended executor registration.
///
/// These fields all come from the same registry response. Keeping them together
/// makes that relationship visible at the call site and avoids mixing up the
/// several string and key arguments used to start the handshake.
pub(crate) struct NoiseHarnessConnectionArgs {
pub(crate) connection_label: String,
pub(crate) environment_id: String,
pub(crate) executor_registration_id: String,
pub(crate) identity: NoiseChannelIdentity,
pub(crate) responder_public_key: NoiseChannelPublicKey,
pub(crate) harness_key_authorization: String,
}
/// Adapt one harness rendezvous websocket into an authenticated JSON-RPC connection.
///
/// The returned connection is not usable until the background task completes
/// hybrid IK against the registry-pinned exec-server key. Rendezvous can see
/// stream metadata and ciphertext, but never JSON-RPC plaintext or either
/// endpoint's private key. Failures close the connection rather than falling
/// back to plaintext.
pub(crate) fn noise_harness_connection_from_websocket<T, E>(
stream: T,
args: NoiseHarnessConnectionArgs,
) -> JsonRpcConnection
where
T: Sink<Message, Error = E> + Stream<Item = Result<Message, E>> + Unpin + Send + 'static,
E: std::fmt::Display + Send + 'static,
{
let NoiseHarnessConnectionArgs {
connection_label,
environment_id,
executor_registration_id,
identity,
responder_public_key,
harness_key_authorization,
} = args;
let stream_id = Uuid::new_v4().to_string();
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (disconnected_tx, disconnected_rx) = watch::channel(false);
let websocket_task = tokio::spawn(async move {
let mut websocket = stream;
// Bind the Noise transcript to the exact environment registration and
// virtual relay stream before emitting any handshake bytes. A captured
// handshake cannot be spliced onto a different routed connection.
let prologue =
noise_channel_prologue(&environment_id, &executor_registration_id, &stream_id);
let (initiator_handshake, request) = match InitiatorHandshake::start(
&identity,
&responder_public_key,
&prologue,
harness_key_authorization.as_bytes(),
) {
Ok(handshake) => handshake,
Err(error) => {
send_disconnected(
&incoming_tx,
&disconnected_tx,
format!("failed to start Noise relay handshake: {error}"),
)
.await;
return;
}
};
// Resume claims the stream ID at rendezvous; Handshake carries the
// opaque first IK message. No JSON-RPC data is sent before the
// responder proves possession of the pinned static key.
let resume = RelayMessageFrame::resume(stream_id.clone());
let handshake = RelayMessageFrame::handshake(stream_id.clone(), request);
if websocket
.send(Message::Binary(encode_relay_message_frame(&resume).into()))
.await
.is_err()
|| websocket
.send(Message::Binary(
encode_relay_message_frame(&handshake).into(),
))
.await
.is_err()
{
let _ = disconnected_tx.send(true);
return;
}
// During the handshake, ignore unrelated routed streams and control
// frames, but reject data on our stream. Accepting early data would
// create a plaintext or unauthenticated application path.
let mut transport = loop {
let Some(incoming_message) = websocket.next().await else {
send_disconnected(
&incoming_tx,
&disconnected_tx,
"Noise relay websocket ended during handshake".to_string(),
)
.await;
return;
};
let message = match incoming_message {
Ok(Message::Binary(payload)) => payload,
Ok(Message::Close(_)) => {
send_disconnected(
&incoming_tx,
&disconnected_tx,
"Noise relay websocket received close frame during handshake".to_string(),
)
.await;
return;
}
Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_)) => continue,
Ok(Message::Text(_)) => {
send_disconnected(
&incoming_tx,
&disconnected_tx,
"Noise relay transport expects binary protobuf frames".to_string(),
)
.await;
return;
}
Err(error) => {
send_disconnected(
&incoming_tx,
&disconnected_tx,
format!(
"failed to read Noise relay websocket from {connection_label}: {error}"
),
)
.await;
return;
}
};
let frame = match decode_relay_message_frame(message.as_ref()) {
Ok(frame) => frame,
Err(error) => {
send_disconnected(
&incoming_tx,
&disconnected_tx,
format!("failed to parse Noise relay frame: {error}"),
)
.await;
return;
}
};
if frame.stream_id != stream_id {
debug!("Noise relay ignored frame for unrelated stream during handshake");
continue;
}
match frame.validate() {
Ok(RelayFrameBodyKind::Handshake) => {
let response = match frame.into_handshake_payload() {
Ok(response) => response,
Err(error) => {
send_disconnected(
&incoming_tx,
&disconnected_tx,
format!("invalid Noise relay handshake response: {error}"),
)
.await;
return;
}
};
match initiator_handshake.finish(&response) {
Ok(transport) => break transport,
Err(error) => {
send_disconnected(
&incoming_tx,
&disconnected_tx,
format!("Noise relay handshake failed: {error}"),
)
.await;
return;
}
}
}
Ok(RelayFrameBodyKind::Reset) => {
send_disconnected(
&incoming_tx,
&disconnected_tx,
frame
.into_reset_reason()
.unwrap_or_else(|| "Noise relay reset during handshake".to_string()),
)
.await;
return;
}
Ok(
RelayFrameBodyKind::Ack
| RelayFrameBodyKind::Resume
| RelayFrameBodyKind::Heartbeat,
) => {}
Ok(RelayFrameBodyKind::Data) | Err(_) => {
send_disconnected(
&incoming_tx,
&disconnected_tx,
"Noise relay received data before handshake completion".to_string(),
)
.await;
return;
}
}
};
// After the handshake, each relay sequence maps to exactly one Noise
// transport record. Outbound records are encrypted once; inbound
// records are reordered and deduplicated before decryption.
let mut next_outbound_seq = 0u32;
let mut inbound_ciphertexts = OrderedCiphertextFrames::default();
let mut inbound_decoder = JsonRpcMessageDecoder::default();
'relay: loop {
tokio::select! {
maybe_message = outgoing_rx.recv() => {
let Some(message) = maybe_message else {
break;
};
let framed = match frame_jsonrpc_message(&message) {
Ok(framed) => framed,
Err(error) => {
warn!("failed to frame JSON-RPC payload for Noise relay: {error}");
break;
}
};
for plaintext_record in framed.chunks(NOISE_RECORD_PLAINTEXT_LEN) {
let seq = match take_next_sequence(&mut next_outbound_seq) {
Ok(seq) => seq,
Err(error) => {
warn!("Noise relay sequence exhausted: {error}");
break 'relay;
}
};
let ciphertext = match transport.encrypt(plaintext_record) {
Ok(ciphertext) => ciphertext,
Err(error) => {
warn!("failed to encrypt JSON-RPC payload for Noise relay: {error}");
break 'relay;
}
};
let frame = RelayMessageFrame::data(stream_id.clone(), seq, ciphertext);
if let Err(error) = websocket
.send(Message::Binary(encode_relay_message_frame(&frame).into()))
.await
{
warn!("failed to write Noise relay websocket: {error}");
break 'relay;
}
}
}
incoming_message = websocket.next() => {
let Some(incoming_message) = incoming_message else {
break;
};
match incoming_message {
Ok(Message::Binary(payload)) => {
let frame = match decode_relay_message_frame(payload.as_ref()) {
Ok(frame) => frame,
Err(error) => {
send_malformed(&incoming_tx, error.to_string()).await;
break;
}
};
if frame.stream_id != stream_id {
continue;
}
match frame.validate() {
Ok(RelayFrameBodyKind::Data) => {
let data = match frame.into_data() {
Ok(data) => data,
Err(error) => {
send_malformed(&incoming_tx, error.to_string()).await;
break;
}
};
if let Err(error) = receive_data(
&mut inbound_ciphertexts,
&mut transport,
&mut inbound_decoder,
data,
&incoming_tx,
)
.await
{
send_malformed(&incoming_tx, error.to_string()).await;
break;
}
}
Ok(RelayFrameBodyKind::Reset) => {
let reason = frame.into_reset_reason();
let _ = incoming_tx
.send(JsonRpcConnectionEvent::Disconnected { reason })
.await;
break;
}
Ok(
RelayFrameBodyKind::Ack
| RelayFrameBodyKind::Resume
| RelayFrameBodyKind::Heartbeat,
) => {}
Ok(RelayFrameBodyKind::Handshake) | Err(_) => {
send_malformed(
&incoming_tx,
"Noise relay received invalid post-handshake frame".to_string(),
)
.await;
break;
}
}
}
Ok(Message::Close(_)) => break,
Ok(Message::Ping(_) | Message::Pong(_) | Message::Frame(_)) => {}
Ok(Message::Text(_)) => {
send_malformed(
&incoming_tx,
"Noise relay transport expects binary protobuf frames".to_string(),
)
.await;
break;
}
Err(error) => {
debug!("Noise relay websocket read failed: {error}");
break;
}
}
}
}
}
let _ = disconnected_tx.send(true);
});
JsonRpcConnection {
outgoing_tx,
incoming_rx,
disconnected_rx,
task_handles: vec![websocket_task],
transport: JsonRpcTransport::Plain,
}
}
/// Order and decrypt one relay frame, then emit any complete JSON-RPC messages.
/// Relay records and JSON-RPC messages do not share boundaries, so reassembly
/// happens after decryption.
async fn receive_data(
inbound_ciphertexts: &mut OrderedCiphertextFrames,
transport: &mut NoiseTransport,
decoder: &mut JsonRpcMessageDecoder,
data: RelayData,
incoming_tx: &mpsc::Sender<JsonRpcConnectionEvent>,
) -> Result<(), ExecServerError> {
// Ordering must happen before decryption because Noise transport nonces are
// implicit. A future or duplicate ciphertext passed directly to Clatter
// would desynchronize the channel.
for ciphertext in inbound_ciphertexts.push(data.seq, data.payload)? {
let plaintext = transport.decrypt(&ciphertext).map_err(|error| {
ExecServerError::Protocol(format!("Noise relay decryption failed: {error}"))
})?;
// The authenticated byte stream can carry partial or multiple JSON-RPC
// messages; emit only complete, successfully parsed messages.
for message in decoder.push(&plaintext)? {
incoming_tx
.send(JsonRpcConnectionEvent::Message(message))
.await
.map_err(|_| ExecServerError::Closed)?;
}
}
Ok(())
}
async fn send_malformed(incoming_tx: &mpsc::Sender<JsonRpcConnectionEvent>, reason: String) {
let _ = incoming_tx
.send(JsonRpcConnectionEvent::MalformedMessage { reason })
.await;
}
async fn send_disconnected(
incoming_tx: &mpsc::Sender<JsonRpcConnectionEvent>,
disconnected_tx: &watch::Sender<bool>,
reason: String,
) {
let _ = disconnected_tx.send(true);
let _ = incoming_tx
.send(JsonRpcConnectionEvent::Disconnected {
reason: Some(reason),
})
.await;
}
@@ -0,0 +1,85 @@
use codex_app_server_protocol::JSONRPCMessage;
use crate::ExecServerError;
const LENGTH_PREFIX_BYTES: usize = size_of::<u32>();
const MAX_NOISE_JSONRPC_MESSAGE_LEN: usize = 64 * 1024 * 1024;
pub(crate) const NOISE_RECORD_PLAINTEXT_LEN: usize = 60 * 1024;
/// Serialize one JSON-RPC message into the encrypted record byte stream.
///
/// Clatter limits an individual Noise message to 65,535 bytes, while valid
/// exec-server responses can be much larger. A four-byte authenticated length
/// prefix lets the caller split this byte stream into bounded Noise records and
/// lets the receiver reconstruct exact JSON-RPC message boundaries.
pub(crate) fn frame_jsonrpc_message(message: &JSONRPCMessage) -> Result<Vec<u8>, ExecServerError> {
let mut framed = vec![0; LENGTH_PREFIX_BYTES];
serde_json::to_writer(&mut framed, message)?;
let message_len = framed.len() - LENGTH_PREFIX_BYTES;
if message_len > MAX_NOISE_JSONRPC_MESSAGE_LEN {
return Err(ExecServerError::Protocol(
"Noise relay JSON-RPC message exceeds maximum length".to_string(),
));
}
framed[..LENGTH_PREFIX_BYTES].copy_from_slice(&(message_len as u32).to_be_bytes());
Ok(framed)
}
/// Incrementally reconstructs authenticated JSON-RPC messages from Noise records.
///
/// The length prefix is encrypted along with the message. It is still bounded
/// here so a bad authenticated peer cannot grow the reassembly buffer forever.
#[derive(Default)]
pub(crate) struct JsonRpcMessageDecoder {
buffered: Vec<u8>,
}
impl JsonRpcMessageDecoder {
/// Append one decrypted record and return all complete framed messages.
pub(crate) fn push(
&mut self,
plaintext_record: &[u8],
) -> Result<Vec<JSONRPCMessage>, ExecServerError> {
if plaintext_record.len() > NOISE_RECORD_PLAINTEXT_LEN {
return Err(ExecServerError::Protocol(
"Noise relay plaintext record exceeds maximum length".to_string(),
));
}
self.buffered.extend_from_slice(plaintext_record);
// One record can finish multiple messages, and one message can span
// multiple records. Parse only after the authenticated length prefix
// and the full declared payload are present.
let mut messages = Vec::new();
while let Some(prefix) = self.buffered.get(..LENGTH_PREFIX_BYTES) {
let message_len =
u32::from_be_bytes([prefix[0], prefix[1], prefix[2], prefix[3]]) as usize;
// Reject the authenticated length before waiting for its payload.
if message_len == 0 || message_len > MAX_NOISE_JSONRPC_MESSAGE_LEN {
return Err(ExecServerError::Protocol(
"Noise relay JSON-RPC message has invalid length".to_string(),
));
}
let framed_len = LENGTH_PREFIX_BYTES + message_len;
if self.buffered.len() < framed_len {
break;
}
messages.push(serde_json::from_slice(
&self.buffered[LENGTH_PREFIX_BYTES..framed_len],
)?);
self.buffered.drain(..framed_len);
}
// Even before a message is complete, keep reassembly memory bounded.
if self.buffered.len() > LENGTH_PREFIX_BYTES + MAX_NOISE_JSONRPC_MESSAGE_LEN {
return Err(ExecServerError::Protocol(
"Noise relay JSON-RPC reassembly buffer exceeds maximum length".to_string(),
));
}
Ok(messages)
}
}
#[cfg(test)]
#[path = "message_framing_tests.rs"]
mod tests;
@@ -0,0 +1,52 @@
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use pretty_assertions::assert_eq;
use super::JsonRpcMessageDecoder;
use super::MAX_NOISE_JSONRPC_MESSAGE_LEN;
use super::NOISE_RECORD_PLAINTEXT_LEN;
use super::frame_jsonrpc_message;
use crate::ExecServerError;
#[test]
fn fragments_and_reassembles_large_jsonrpc_message() {
let message = JSONRPCMessage::Notification(JSONRPCNotification {
method: "large/test".to_string(),
params: Some(serde_json::json!({
"data": "x".repeat(128 * 1024),
})),
});
let framed = frame_jsonrpc_message(&message).unwrap();
assert!(framed.len() > 128 * 1024);
let mut decoder = JsonRpcMessageDecoder::default();
let mut decoded = Vec::new();
for record in framed.chunks(NOISE_RECORD_PLAINTEXT_LEN) {
decoded.extend(decoder.push(record).unwrap());
}
assert_eq!(decoded, vec![message]);
}
#[test]
fn rejects_declared_message_length_above_limit_without_payload() {
let mut decoder = JsonRpcMessageDecoder::default();
let declared_len = (MAX_NOISE_JSONRPC_MESSAGE_LEN as u32 + 1).to_be_bytes();
assert!(matches!(
decoder.push(&declared_len),
Err(ExecServerError::Protocol(message))
if message == "Noise relay JSON-RPC message has invalid length"
));
}
#[test]
fn rejects_oversized_plaintext_record() {
let mut decoder = JsonRpcMessageDecoder::default();
assert!(matches!(
decoder.push(&vec![0; NOISE_RECORD_PLAINTEXT_LEN + 1]),
Err(ExecServerError::Protocol(message))
if message == "Noise relay plaintext record exceeds maximum length"
));
}
@@ -0,0 +1,31 @@
mod harness;
mod message_framing;
mod ordered_ciphertext;
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
use crate::ExecServerError;
pub(crate) use harness::NoiseHarnessConnectionArgs;
pub(crate) use harness::noise_harness_connection_from_websocket;
// This bounds allocation in tungstenite before protobuf and Noise record
// validation run. It comfortably fits one maximum Noise record plus metadata.
const MAX_NOISE_RELAY_WEBSOCKET_MESSAGE_SIZE: usize = 256 * 1024;
/// Return the websocket limits required by every Noise relay endpoint.
pub(crate) fn noise_relay_websocket_config() -> WebSocketConfig {
WebSocketConfig::default()
.max_frame_size(Some(MAX_NOISE_RELAY_WEBSOCKET_MESSAGE_SIZE))
.max_message_size(Some(MAX_NOISE_RELAY_WEBSOCKET_MESSAGE_SIZE))
}
fn take_next_sequence(next_seq: &mut u32) -> Result<u32, ExecServerError> {
// Never wrap: relay sequence is the explicit ordering key for an implicit
// Noise nonce. Reusing zero after u32::MAX would be ambiguous and unsafe.
let seq = *next_seq;
*next_seq = next_seq.checked_add(1).ok_or_else(|| {
ExecServerError::Protocol("Noise relay sequence number exhausted".to_string())
})?;
Ok(seq)
}
@@ -0,0 +1,70 @@
use std::collections::BTreeMap;
use crate::ExecServerError;
const MAX_REORDER_DISTANCE: u32 = 64;
const MAX_PENDING_BYTES: usize = 1024 * 1024;
/// Reorders relay records before they reach Noise's implicit receive nonce.
/// The window is bounded, and each sequence number is released at most once.
#[derive(Default)]
pub(crate) struct OrderedCiphertextFrames {
next_seq: u32,
pending: BTreeMap<u32, Vec<u8>>,
pending_bytes: usize,
}
impl OrderedCiphertextFrames {
/// Accept one relay record and return the newly contiguous ciphertext run.
///
/// Returns nothing for duplicates or while a gap remains. Closing a gap also
/// releases any buffered records that now follow it contiguously.
pub(crate) fn push(
&mut self,
seq: u32,
payload: Vec<u8>,
) -> Result<Vec<Vec<u8>>, ExecServerError> {
// Keep the first ciphertext for a sequence. Later copies are duplicates.
if seq < self.next_seq || self.pending.contains_key(&seq) {
return Ok(Vec::new());
}
if seq > self.next_seq {
// Bound both the sequence gap and buffered bytes.
if seq - self.next_seq > MAX_REORDER_DISTANCE {
return Err(ExecServerError::Protocol(
"Noise relay ciphertext exceeds reorder window".to_string(),
));
}
let pending_bytes = self.pending_bytes + payload.len();
if pending_bytes > MAX_PENDING_BYTES {
return Err(ExecServerError::Protocol(
"Noise relay pending ciphertext buffer is full".to_string(),
));
}
self.pending.insert(seq, payload);
self.pending_bytes = pending_bytes;
return Ok(Vec::new());
}
// Release the expected record and anything now contiguous behind it.
let mut ready = vec![payload];
self.advance()?;
while let Some(payload) = self.pending.remove(&self.next_seq) {
self.pending_bytes -= payload.len();
ready.push(payload);
self.advance()?;
}
Ok(ready)
}
fn advance(&mut self) -> Result<(), ExecServerError> {
self.next_seq = self.next_seq.checked_add(1).ok_or_else(|| {
ExecServerError::Protocol("Noise relay sequence number exhausted".to_string())
})?;
Ok(())
}
}
#[cfg(test)]
#[path = "ordered_ciphertext_tests.rs"]
mod tests;
@@ -0,0 +1,52 @@
use pretty_assertions::assert_eq;
use super::MAX_PENDING_BYTES;
use super::OrderedCiphertextFrames;
#[test]
fn releases_ciphertexts_only_in_nonce_order() {
let mut frames = OrderedCiphertextFrames::default();
assert_eq!(
frames.push(/*seq*/ 1, b"second".to_vec()).unwrap(),
Vec::<Vec<u8>>::new()
);
assert_eq!(
frames.push(/*seq*/ 0, b"first".to_vec()).unwrap(),
vec![b"first".to_vec(), b"second".to_vec()]
);
}
#[test]
fn ignores_duplicate_ciphertexts_without_replacing_buffered_record() {
let mut frames = OrderedCiphertextFrames::default();
assert_eq!(
frames.push(/*seq*/ 1, b"first copy".to_vec()).unwrap(),
Vec::<Vec<u8>>::new()
);
assert_eq!(
frames.push(/*seq*/ 1, b"replacement".to_vec()).unwrap(),
Vec::<Vec<u8>>::new()
);
assert_eq!(
frames.push(/*seq*/ 0, b"zero".to_vec()).unwrap(),
vec![b"zero".to_vec(), b"first copy".to_vec()]
);
assert_eq!(
frames.push(/*seq*/ 0, b"duplicate".to_vec()).unwrap(),
Vec::<Vec<u8>>::new()
);
}
#[test]
fn rejects_unbounded_reordering() {
let mut frames = OrderedCiphertextFrames::default();
assert!(frames.push(/*seq*/ 65, Vec::new()).is_err());
assert!(
frames
.push(/*seq*/ 1, vec![0; MAX_PENDING_BYTES + 1])
.is_err()
);
}
@@ -14,6 +14,7 @@ message RelayMessageFrame {
RelayResume resume = 7;
RelayReset reset = 8;
RelayHeartbeat heartbeat = 9;
RelayHandshake handshake = 10;
}
}
@@ -35,3 +36,7 @@ message RelayReset {
}
message RelayHeartbeat {}
message RelayHandshake {
bytes payload = 1;
}
@@ -9,7 +9,7 @@ pub struct RelayMessageFrame {
pub ack: u32,
#[prost(uint32, tag = "4")]
pub ack_bits: u32,
#[prost(oneof = "relay_message_frame::Body", tags = "5, 6, 7, 8, 9")]
#[prost(oneof = "relay_message_frame::Body", tags = "5, 6, 7, 8, 9, 10")]
pub body: ::core::option::Option<relay_message_frame::Body>,
}
pub mod relay_message_frame {
@@ -25,6 +25,8 @@ pub mod relay_message_frame {
Reset(super::RelayReset),
#[prost(message, tag = "9")]
Heartbeat(super::RelayHeartbeat),
#[prost(message, tag = "10")]
Handshake(super::RelayHandshake),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -52,3 +54,8 @@ pub struct RelayReset {
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct RelayHeartbeat {}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct RelayHandshake {
#[prost(bytes = "vec", tag = "1")]
pub payload: ::prost::alloc::vec::Vec<u8>,
}
+64 -16
View File
@@ -23,24 +23,25 @@ use crate::connection::JsonRpcConnectionEvent;
use crate::connection::JsonRpcTransport;
use crate::connection::WEBSOCKET_KEEPALIVE_INTERVAL;
use crate::relay_proto::RelayData;
use crate::relay_proto::RelayHandshake;
use crate::relay_proto::RelayMessageFrame;
use crate::relay_proto::RelayResume;
use crate::relay_proto::relay_message_frame;
use crate::server::ConnectionProcessor;
const RELAY_MESSAGE_FRAME_VERSION: u32 = 1;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum RelayFrameBodyKind {
pub(crate) enum RelayFrameBodyKind {
Data,
Ack,
Resume,
Reset,
Heartbeat,
Handshake,
}
impl RelayMessageFrame {
fn data(stream_id: String, seq: u32, payload: Vec<u8>) -> Self {
pub(crate) fn data(stream_id: String, seq: u32, payload: Vec<u8>) -> Self {
Self {
version: RELAY_MESSAGE_FRAME_VERSION,
stream_id,
@@ -55,7 +56,7 @@ impl RelayMessageFrame {
}
}
fn resume(stream_id: String) -> Self {
pub(crate) fn resume(stream_id: String) -> Self {
Self {
version: RELAY_MESSAGE_FRAME_VERSION,
stream_id,
@@ -67,7 +68,19 @@ impl RelayMessageFrame {
}
}
fn validate(&self) -> Result<RelayFrameBodyKind, ExecServerError> {
pub(crate) fn handshake(stream_id: String, payload: Vec<u8>) -> Self {
Self {
version: RELAY_MESSAGE_FRAME_VERSION,
stream_id,
ack: 0,
ack_bits: 0,
body: Some(relay_message_frame::Body::Handshake(RelayHandshake {
payload,
})),
}
}
pub(crate) fn validate(&self) -> Result<RelayFrameBodyKind, ExecServerError> {
if self.version != RELAY_MESSAGE_FRAME_VERSION {
return Err(ExecServerError::Protocol(format!(
"unsupported relay message frame version {}",
@@ -99,27 +112,56 @@ impl RelayMessageFrame {
Ok(RelayFrameBodyKind::Reset)
}
Some(relay_message_frame::Body::Heartbeat(_)) => Ok(RelayFrameBodyKind::Heartbeat),
Some(relay_message_frame::Body::Handshake(handshake)) => {
if handshake.payload.is_empty() {
return Err(ExecServerError::Protocol(
"relay handshake message frame is missing payload".to_string(),
));
}
Ok(RelayFrameBodyKind::Handshake)
}
None => Err(ExecServerError::Protocol(
"relay message frame is missing body".to_string(),
)),
}
}
fn into_jsonrpc_message(self) -> Result<JSONRPCMessage, ExecServerError> {
pub(crate) fn into_data(self) -> Result<RelayData, ExecServerError> {
let kind = self.validate()?;
if kind != RelayFrameBodyKind::Data {
return Err(ExecServerError::Protocol(
"expected relay data message frame".to_string(),
));
}
let payload = match self.body {
Some(relay_message_frame::Body::Data(data)) => data.payload,
_ => Vec::new(),
};
match self.body {
Some(relay_message_frame::Body::Data(data)) => Ok(data),
_ => Err(ExecServerError::Protocol(
"expected relay data message frame".to_string(),
)),
}
}
fn into_jsonrpc_message(self) -> Result<JSONRPCMessage, ExecServerError> {
let payload = self.into_data()?.payload;
serde_json::from_slice(&payload).map_err(ExecServerError::Json)
}
fn into_reset_reason(self) -> Option<String> {
pub(crate) fn into_handshake_payload(self) -> Result<Vec<u8>, ExecServerError> {
let kind = self.validate()?;
if kind != RelayFrameBodyKind::Handshake {
return Err(ExecServerError::Protocol(
"expected relay handshake message frame".to_string(),
));
}
match self.body {
Some(relay_message_frame::Body::Handshake(handshake)) => Ok(handshake.payload),
_ => Err(ExecServerError::Protocol(
"expected relay handshake message frame".to_string(),
)),
}
}
pub(crate) fn into_reset_reason(self) -> Option<String> {
match self.body {
Some(relay_message_frame::Body::Reset(reset)) if !reset.reason.is_empty() => {
Some(reset.reason)
@@ -129,16 +171,18 @@ impl RelayMessageFrame {
}
}
fn encode_relay_message_frame(frame: &RelayMessageFrame) -> Vec<u8> {
pub(crate) fn encode_relay_message_frame(frame: &RelayMessageFrame) -> Vec<u8> {
frame.encode_to_vec()
}
fn decode_relay_message_frame(payload: &[u8]) -> Result<RelayMessageFrame, ExecServerError> {
pub(crate) fn decode_relay_message_frame(
payload: &[u8],
) -> Result<RelayMessageFrame, ExecServerError> {
RelayMessageFrame::decode(payload)
.map_err(|err| ExecServerError::Protocol(format!("invalid relay message frame: {err}")))
}
fn jsonrpc_payload(message: &JSONRPCMessage) -> Result<Vec<u8>, ExecServerError> {
pub(crate) fn jsonrpc_payload(message: &JSONRPCMessage) -> Result<Vec<u8>, ExecServerError> {
serde_json::to_vec(message).map_err(ExecServerError::Json)
}
@@ -304,7 +348,8 @@ where
}
RelayFrameBodyKind::Ack
| RelayFrameBodyKind::Resume
| RelayFrameBodyKind::Heartbeat => {}
| RelayFrameBodyKind::Heartbeat
| RelayFrameBodyKind::Handshake => {}
}
}
Some(Ok(Message::Close(_))) | None => {
@@ -423,6 +468,7 @@ pub(crate) async fn run_multiplexed_environment<S>(
continue;
}
};
let stream = streams.entry(stream_id.clone()).or_insert_with(|| {
spawn_virtual_stream(
stream_id.clone(),
@@ -453,7 +499,8 @@ pub(crate) async fn run_multiplexed_environment<S>(
}
RelayFrameBodyKind::Ack
| RelayFrameBodyKind::Resume
| RelayFrameBodyKind::Heartbeat => {}
| RelayFrameBodyKind::Heartbeat
| RelayFrameBodyKind::Handshake => {}
}
}
@@ -547,6 +594,7 @@ mod tests {
use pretty_assertions::assert_eq;
use tokio::net::TcpListener;
use tokio::time::timeout;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
+1
View File
@@ -2,6 +2,7 @@
mod generated;
pub(crate) use generated::RelayData;
pub(crate) use generated::RelayHandshake;
pub(crate) use generated::RelayMessageFrame;
pub(crate) use generated::RelayResume;
pub(crate) use generated::relay_message_frame;