app-server: add fuzzy search sessions for streaming file search (#10268)

This commit is contained in:
Jeremy Rose
2026-02-12 10:49:44 -08:00
committed by GitHub
Unverified
parent 545b266839
commit 66e0c3aaa3
13 changed files with 951 additions and 27 deletions
@@ -0,0 +1,63 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"FuzzyFileSearchResult": {
"description": "Superset of [`codex_file_search::FileMatch`]",
"properties": {
"file_name": {
"type": "string"
},
"indices": {
"items": {
"format": "uint32",
"minimum": 0.0,
"type": "integer"
},
"type": [
"array",
"null"
]
},
"path": {
"type": "string"
},
"root": {
"type": "string"
},
"score": {
"format": "uint32",
"minimum": 0.0,
"type": "integer"
}
},
"required": [
"file_name",
"path",
"root",
"score"
],
"type": "object"
}
},
"properties": {
"files": {
"items": {
"$ref": "#/definitions/FuzzyFileSearchResult"
},
"type": "array"
},
"query": {
"type": "string"
},
"sessionId": {
"type": "string"
}
},
"required": [
"files",
"query",
"sessionId"
],
"title": "FuzzyFileSearchSessionUpdatedNotification",
"type": "object"
}
@@ -3684,6 +3684,65 @@
],
"type": "object"
},
"FuzzyFileSearchResult": {
"description": "Superset of [`codex_file_search::FileMatch`]",
"properties": {
"file_name": {
"type": "string"
},
"indices": {
"items": {
"format": "uint32",
"minimum": 0.0,
"type": "integer"
},
"type": [
"array",
"null"
]
},
"path": {
"type": "string"
},
"root": {
"type": "string"
},
"score": {
"format": "uint32",
"minimum": 0.0,
"type": "integer"
}
},
"required": [
"file_name",
"path",
"root",
"score"
],
"type": "object"
},
"FuzzyFileSearchSessionUpdatedNotification": {
"properties": {
"files": {
"items": {
"$ref": "#/definitions/FuzzyFileSearchResult"
},
"type": "array"
},
"query": {
"type": "string"
},
"sessionId": {
"type": "string"
}
},
"required": [
"files",
"query",
"sessionId"
],
"type": "object"
},
"GhostCommit": {
"description": "Details of a ghost commit created from a repository state.",
"properties": {
@@ -8171,6 +8230,26 @@
"title": "ConfigWarningNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"fuzzyFileSearch/sessionUpdated"
],
"title": "FuzzyFileSearch/sessionUpdatedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/FuzzyFileSearchSessionUpdatedNotification"
}
},
"required": [
"method",
"params"
],
"title": "FuzzyFileSearch/sessionUpdatedNotification",
"type": "object"
},
{
"description": "Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.",
"properties": {
@@ -5386,6 +5386,30 @@
],
"type": "object"
},
"FuzzyFileSearchSessionUpdatedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"files": {
"items": {
"$ref": "#/definitions/FuzzyFileSearchResult"
},
"type": "array"
},
"query": {
"type": "string"
},
"sessionId": {
"type": "string"
}
},
"required": [
"files",
"query",
"sessionId"
],
"title": "FuzzyFileSearchSessionUpdatedNotification",
"type": "object"
},
"GetAuthStatusParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
@@ -8425,6 +8449,26 @@
"title": "ConfigWarningNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"fuzzyFileSearch/sessionUpdated"
],
"title": "FuzzyFileSearch/sessionUpdatedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/FuzzyFileSearchSessionUpdatedNotification"
}
},
"required": [
"method",
"params"
],
"title": "FuzzyFileSearch/sessionUpdatedNotification",
"type": "object"
},
{
"description": "Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.",
"properties": {
@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { FuzzyFileSearchResult } from "./FuzzyFileSearchResult";
export type FuzzyFileSearchSessionUpdatedNotification = { sessionId: string, query: string, files: Array<FuzzyFileSearchResult>, };
@@ -2,6 +2,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { AuthStatusChangeNotification } from "./AuthStatusChangeNotification";
import type { FuzzyFileSearchSessionUpdatedNotification } from "./FuzzyFileSearchSessionUpdatedNotification";
import type { LoginChatGptCompleteNotification } from "./LoginChatGptCompleteNotification";
import type { SessionConfiguredNotification } from "./SessionConfiguredNotification";
import type { AccountLoginCompletedNotification } from "./v2/AccountLoginCompletedNotification";
@@ -37,4 +38,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW
/**
* Notification sent from the server to the client.
*/
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };
@@ -77,6 +77,7 @@ export type { FunctionCallOutputPayload } from "./FunctionCallOutputPayload";
export type { FuzzyFileSearchParams } from "./FuzzyFileSearchParams";
export type { FuzzyFileSearchResponse } from "./FuzzyFileSearchResponse";
export type { FuzzyFileSearchResult } from "./FuzzyFileSearchResult";
export type { FuzzyFileSearchSessionUpdatedNotification } from "./FuzzyFileSearchSessionUpdatedNotification";
export type { GetAuthStatusParams } from "./GetAuthStatusParams";
export type { GetAuthStatusResponse } from "./GetAuthStatusResponse";
export type { GetConversationSummaryParams } from "./GetConversationSummaryParams";
@@ -458,6 +458,21 @@ client_request_definitions! {
params: FuzzyFileSearchParams,
response: FuzzyFileSearchResponse,
},
#[experimental("fuzzyFileSearch/sessionStart")]
FuzzyFileSearchSessionStart => "fuzzyFileSearch/sessionStart" {
params: FuzzyFileSearchSessionStartParams,
response: FuzzyFileSearchSessionStartResponse,
},
#[experimental("fuzzyFileSearch/sessionUpdate")]
FuzzyFileSearchSessionUpdate => "fuzzyFileSearch/sessionUpdate" {
params: FuzzyFileSearchSessionUpdateParams,
response: FuzzyFileSearchSessionUpdateResponse,
},
#[experimental("fuzzyFileSearch/sessionStop")]
FuzzyFileSearchSessionStop => "fuzzyFileSearch/sessionStop" {
params: FuzzyFileSearchSessionStopParams,
response: FuzzyFileSearchSessionStopResponse,
},
/// Execute a command (argv vector) under the server's sandbox.
ExecOneOffCommand {
params: v1::ExecOneOffCommandParams,
@@ -702,6 +717,47 @@ pub struct FuzzyFileSearchResponse {
pub files: Vec<FuzzyFileSearchResult>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FuzzyFileSearchSessionStartParams {
pub session_id: String,
pub roots: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
pub struct FuzzyFileSearchSessionStartResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FuzzyFileSearchSessionUpdateParams {
pub session_id: String,
pub query: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
pub struct FuzzyFileSearchSessionUpdateResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FuzzyFileSearchSessionStopParams {
pub session_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Default)]
pub struct FuzzyFileSearchSessionStopResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
pub struct FuzzyFileSearchSessionUpdatedNotification {
pub session_id: String,
pub query: String,
pub files: Vec<FuzzyFileSearchResult>,
}
server_notification_definitions! {
/// NEW NOTIFICATIONS
Error => "error" (v2::ErrorNotification),
@@ -734,6 +790,7 @@ server_notification_definitions! {
ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification),
DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification),
ConfigWarning => "configWarning" (v2::ConfigWarningNotification),
FuzzyFileSearchSessionUpdated => "fuzzyFileSearch/sessionUpdated" (FuzzyFileSearchSessionUpdatedNotification),
/// Notifies the user of world-writable directories on Windows, which cannot be protected by the sandbox.
WindowsWorldWritableWarning => "windows/worldWritableWarning" (v2::WindowsWorldWritableWarningNotification),
@@ -1,7 +1,9 @@
use crate::bespoke_event_handling::apply_bespoke_event_handling;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::fuzzy_file_search::FuzzyFileSearchSession;
use crate::fuzzy_file_search::run_fuzzy_file_search;
use crate::fuzzy_file_search::start_fuzzy_file_search_session;
use crate::models::supported_models;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::ConnectionRequestId;
@@ -47,6 +49,12 @@ use codex_app_server_protocol::ForkConversationParams;
use codex_app_server_protocol::ForkConversationResponse;
use codex_app_server_protocol::FuzzyFileSearchParams;
use codex_app_server_protocol::FuzzyFileSearchResponse;
use codex_app_server_protocol::FuzzyFileSearchSessionStartParams;
use codex_app_server_protocol::FuzzyFileSearchSessionStartResponse;
use codex_app_server_protocol::FuzzyFileSearchSessionStopParams;
use codex_app_server_protocol::FuzzyFileSearchSessionStopResponse;
use codex_app_server_protocol::FuzzyFileSearchSessionUpdateParams;
use codex_app_server_protocol::FuzzyFileSearchSessionUpdateResponse;
use codex_app_server_protocol::GetAccountParams;
use codex_app_server_protocol::GetAccountRateLimitsResponse;
use codex_app_server_protocol::GetAccountResponse;
@@ -294,6 +302,7 @@ pub(crate) struct CodexMessageProcessor {
active_login: Arc<Mutex<Option<ActiveLogin>>>,
thread_state_manager: ThreadStateManager,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
fuzzy_search_sessions: Arc<Mutex<HashMap<String, FuzzyFileSearchSession>>>,
feedback: CodexFeedback,
}
@@ -361,6 +370,7 @@ impl CodexMessageProcessor {
active_login: Arc::new(Mutex::new(None)),
thread_state_manager: ThreadStateManager::new(),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
fuzzy_search_sessions: Arc::new(Mutex::new(HashMap::new())),
feedback,
}
}
@@ -727,6 +737,18 @@ impl CodexMessageProcessor {
self.fuzzy_file_search(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::FuzzyFileSearchSessionStart { request_id, params } => {
self.fuzzy_file_search_session_start(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::FuzzyFileSearchSessionUpdate { request_id, params } => {
self.fuzzy_file_search_session_update(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::FuzzyFileSearchSessionStop { request_id, params } => {
self.fuzzy_file_search_session_stop(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::OneOffCommandExec { request_id, params } => {
self.exec_one_off_command(to_connection_request_id(request_id), params)
.await;
@@ -5556,6 +5578,89 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
async fn fuzzy_file_search_session_start(
&mut self,
request_id: ConnectionRequestId,
params: FuzzyFileSearchSessionStartParams,
) {
let FuzzyFileSearchSessionStartParams { session_id, roots } = params;
if session_id.is_empty() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "sessionId must not be empty".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
let session =
start_fuzzy_file_search_session(session_id.clone(), roots, self.outgoing.clone());
match session {
Ok(session) => {
let mut sessions = self.fuzzy_search_sessions.lock().await;
sessions.insert(session_id, session);
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionStartResponse {})
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start fuzzy file search session: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
async fn fuzzy_file_search_session_update(
&mut self,
request_id: ConnectionRequestId,
params: FuzzyFileSearchSessionUpdateParams,
) {
let FuzzyFileSearchSessionUpdateParams { session_id, query } = params;
let found = {
let sessions = self.fuzzy_search_sessions.lock().await;
if let Some(session) = sessions.get(&session_id) {
session.update_query(query);
true
} else {
false
}
};
if !found {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("fuzzy file search session not found: {session_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionUpdateResponse {})
.await;
}
async fn fuzzy_file_search_session_stop(
&mut self,
request_id: ConnectionRequestId,
params: FuzzyFileSearchSessionStopParams,
) {
let FuzzyFileSearchSessionStopParams { session_id } = params;
{
let mut sessions = self.fuzzy_search_sessions.lock().await;
sessions.remove(&session_id);
}
self.outgoing
.send_response(request_id, FuzzyFileSearchSessionStopResponse {})
.await;
}
async fn upload_feedback(&self, request_id: ConnectionRequestId, params: FeedbackUploadParams) {
if !self.config.feedback_enabled {
let error = JSONRPCErrorError {
@@ -1,12 +1,18 @@
use std::num::NonZero;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use codex_app_server_protocol::FuzzyFileSearchResult;
use codex_app_server_protocol::FuzzyFileSearchSessionUpdatedNotification;
use codex_app_server_protocol::ServerNotification;
use codex_file_search as file_search;
use tracing::warn;
use crate::outgoing_message::OutgoingMessageSender;
const MATCH_LIMIT: usize = 50;
const MAX_THREADS: usize = 12;
@@ -77,3 +83,148 @@ pub(crate) async fn run_fuzzy_file_search(
files
}
pub(crate) struct FuzzyFileSearchSession {
session: file_search::FileSearchSession,
shared: Arc<SessionShared>,
}
impl FuzzyFileSearchSession {
pub(crate) fn update_query(&self, query: String) {
if self.shared.canceled.load(Ordering::Relaxed) {
return;
}
{
#[expect(clippy::unwrap_used)]
let mut latest_query = self.shared.latest_query.lock().unwrap();
*latest_query = query.clone();
}
self.session.update_query(&query);
}
}
impl Drop for FuzzyFileSearchSession {
fn drop(&mut self) {
self.shared.canceled.store(true, Ordering::Relaxed);
}
}
pub(crate) fn start_fuzzy_file_search_session(
session_id: String,
roots: Vec<String>,
outgoing: Arc<OutgoingMessageSender>,
) -> anyhow::Result<FuzzyFileSearchSession> {
#[expect(clippy::expect_used)]
let limit = NonZero::new(MATCH_LIMIT).expect("MATCH_LIMIT should be a valid non-zero usize");
let cores = std::thread::available_parallelism()
.map(std::num::NonZero::get)
.unwrap_or(1);
let threads = cores.min(MAX_THREADS);
#[expect(clippy::expect_used)]
let threads = NonZero::new(threads.max(1)).expect("threads should be non-zero");
let search_dirs: Vec<PathBuf> = roots.iter().map(PathBuf::from).collect();
let canceled = Arc::new(AtomicBool::new(false));
let shared = Arc::new(SessionShared {
session_id,
latest_query: Mutex::new(String::new()),
outgoing,
runtime: tokio::runtime::Handle::current(),
canceled: canceled.clone(),
});
let reporter = Arc::new(SessionReporterImpl {
shared: shared.clone(),
});
let session = file_search::create_session(
search_dirs,
file_search::FileSearchOptions {
limit,
threads,
compute_indices: true,
..Default::default()
},
reporter,
Some(canceled),
)?;
Ok(FuzzyFileSearchSession { session, shared })
}
struct SessionShared {
session_id: String,
latest_query: Mutex<String>,
outgoing: Arc<OutgoingMessageSender>,
runtime: tokio::runtime::Handle,
canceled: Arc<AtomicBool>,
}
struct SessionReporterImpl {
shared: Arc<SessionShared>,
}
impl SessionReporterImpl {
fn send_snapshot(&self, snapshot: &file_search::FileSearchSnapshot) {
if self.shared.canceled.load(Ordering::Relaxed) {
return;
}
let query = {
#[expect(clippy::unwrap_used)]
self.shared.latest_query.lock().unwrap().clone()
};
if snapshot.query != query {
return;
}
let files = if query.is_empty() {
Vec::new()
} else {
collect_files(snapshot)
};
let notification = ServerNotification::FuzzyFileSearchSessionUpdated(
FuzzyFileSearchSessionUpdatedNotification {
session_id: self.shared.session_id.clone(),
query,
files,
},
);
let outgoing = self.shared.outgoing.clone();
self.shared.runtime.spawn(async move {
outgoing.send_server_notification(notification).await;
});
}
}
impl file_search::SessionReporter for SessionReporterImpl {
fn on_update(&self, snapshot: &file_search::FileSearchSnapshot) {
self.send_snapshot(snapshot);
}
fn on_complete(&self) {}
}
fn collect_files(snapshot: &file_search::FileSearchSnapshot) -> Vec<FuzzyFileSearchResult> {
let mut files = snapshot
.matches
.iter()
.map(|m| {
let file_name = m.path.file_name().unwrap_or_default();
FuzzyFileSearchResult {
root: m.root.to_string_lossy().to_string(),
path: m.path.to_string_lossy().to_string(),
file_name: file_name.to_string_lossy().to_string(),
score: m.score,
indices: m.indices.clone(),
}
})
.collect::<Vec<_>>();
files.sort_by(file_search::cmp_by_score_desc_then_path_asc::<
FuzzyFileSearchResult,
_,
_,
>(|f| f.score, |f| f.path.as_str()));
files
}
@@ -678,6 +678,78 @@ impl McpProcess {
self.send_request("fuzzyFileSearch", Some(params)).await
}
pub async fn send_fuzzy_file_search_session_start_request(
&mut self,
session_id: &str,
roots: Vec<String>,
) -> anyhow::Result<i64> {
let params = serde_json::json!({
"sessionId": session_id,
"roots": roots,
});
self.send_request("fuzzyFileSearch/sessionStart", Some(params))
.await
}
pub async fn start_fuzzy_file_search_session(
&mut self,
session_id: &str,
roots: Vec<String>,
) -> anyhow::Result<JSONRPCResponse> {
let request_id = self
.send_fuzzy_file_search_session_start_request(session_id, roots)
.await?;
self.read_stream_until_response_message(RequestId::Integer(request_id))
.await
}
pub async fn send_fuzzy_file_search_session_update_request(
&mut self,
session_id: &str,
query: &str,
) -> anyhow::Result<i64> {
let params = serde_json::json!({
"sessionId": session_id,
"query": query,
});
self.send_request("fuzzyFileSearch/sessionUpdate", Some(params))
.await
}
pub async fn update_fuzzy_file_search_session(
&mut self,
session_id: &str,
query: &str,
) -> anyhow::Result<JSONRPCResponse> {
let request_id = self
.send_fuzzy_file_search_session_update_request(session_id, query)
.await?;
self.read_stream_until_response_message(RequestId::Integer(request_id))
.await
}
pub async fn send_fuzzy_file_search_session_stop_request(
&mut self,
session_id: &str,
) -> anyhow::Result<i64> {
let params = serde_json::json!({
"sessionId": session_id,
});
self.send_request("fuzzyFileSearch/sessionStop", Some(params))
.await
}
pub async fn stop_fuzzy_file_search_session(
&mut self,
session_id: &str,
) -> anyhow::Result<JSONRPCResponse> {
let request_id = self
.send_fuzzy_file_search_session_stop_request(session_id)
.await?;
self.read_stream_until_response_message(RequestId::Integer(request_id))
.await
}
async fn send_request(
&mut self,
method: &str,
@@ -1,6 +1,7 @@
use anyhow::Result;
use anyhow::anyhow;
use app_test_support::McpProcess;
use codex_app_server_protocol::FuzzyFileSearchSessionUpdatedNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use pretty_assertions::assert_eq;
@@ -9,6 +10,130 @@ use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const SHORT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(500);
const STOP_GRACE_PERIOD: std::time::Duration = std::time::Duration::from_millis(250);
const SESSION_UPDATED_METHOD: &str = "fuzzyFileSearch/sessionUpdated";
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum FileExpectation {
Any,
Empty,
NonEmpty,
}
async fn initialized_mcp(codex_home: &TempDir) -> Result<McpProcess> {
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
Ok(mcp)
}
async fn wait_for_session_updated(
mcp: &mut McpProcess,
session_id: &str,
query: &str,
file_expectation: FileExpectation,
) -> Result<FuzzyFileSearchSessionUpdatedNotification> {
for _ in 0..20 {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message(SESSION_UPDATED_METHOD),
)
.await??;
let params = notification
.params
.ok_or_else(|| anyhow!("missing notification params"))?;
let payload = serde_json::from_value::<FuzzyFileSearchSessionUpdatedNotification>(params)?;
if payload.session_id != session_id || payload.query != query {
continue;
}
let files_match = match file_expectation {
FileExpectation::Any => true,
FileExpectation::Empty => payload.files.is_empty(),
FileExpectation::NonEmpty => !payload.files.is_empty(),
};
if files_match {
return Ok(payload);
}
}
anyhow::bail!(
"did not receive expected session update for sessionId={session_id}, query={query}"
);
}
async fn assert_update_request_fails_for_missing_session(
mcp: &mut McpProcess,
session_id: &str,
query: &str,
) -> Result<()> {
let request_id = mcp
.send_fuzzy_file_search_session_update_request(session_id, query)
.await?;
let err = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(err.error.code, -32600);
assert_eq!(
err.error.message,
format!("fuzzy file search session not found: {session_id}")
);
Ok(())
}
async fn assert_no_session_updates_for(
mcp: &mut McpProcess,
session_id: &str,
grace_period: std::time::Duration,
duration: std::time::Duration,
) -> Result<()> {
let grace_deadline = tokio::time::Instant::now() + grace_period;
loop {
let now = tokio::time::Instant::now();
if now >= grace_deadline {
break;
}
let remaining = grace_deadline - now;
match timeout(
remaining,
mcp.read_stream_until_notification_message(SESSION_UPDATED_METHOD),
)
.await
{
Err(_) => break,
Ok(Err(err)) => return Err(err),
Ok(Ok(_)) => {}
}
}
let deadline = tokio::time::Instant::now() + duration;
loop {
let now = tokio::time::Instant::now();
if now >= deadline {
return Ok(());
}
let remaining = deadline - now;
match timeout(
remaining,
mcp.read_stream_until_notification_message(SESSION_UPDATED_METHOD),
)
.await
{
Err(_) => return Ok(()),
Ok(Err(err)) => return Err(err),
Ok(Ok(notification)) => {
let params = notification
.params
.ok_or_else(|| anyhow!("missing notification params"))?;
let payload =
serde_json::from_value::<FuzzyFileSearchSessionUpdatedNotification>(params)?;
if payload.session_id == session_id {
anyhow::bail!("received unexpected session update after stop: {payload:?}");
}
}
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_sorts_and_includes_indices() -> Result<()> {
@@ -125,3 +250,215 @@ async fn test_fuzzy_file_search_accepts_cancellation_token() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_streams_updates() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let root_path = root.path().to_string_lossy().to_string();
let session_id = "session-1";
mcp.start_fuzzy_file_search_session(session_id, vec![root_path.clone()])
.await?;
mcp.update_fuzzy_file_search_session(session_id, "alp")
.await?;
let payload =
wait_for_session_updated(&mut mcp, session_id, "alp", FileExpectation::NonEmpty).await?;
assert_eq!(payload.files.len(), 1);
assert_eq!(payload.files[0].root, root_path);
assert_eq!(payload.files[0].path, "alpha.txt");
mcp.stop_fuzzy_file_search_session(session_id).await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_update_before_start_errors() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = initialized_mcp(&codex_home).await?;
assert_update_request_fails_for_missing_session(&mut mcp, "missing", "alp").await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_update_works_without_waiting_for_start_response()
-> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let root_path = root.path().to_string_lossy().to_string();
let session_id = "session-no-wait";
let start_request_id = mcp
.send_fuzzy_file_search_session_start_request(session_id, vec![root_path.clone()])
.await?;
let update_request_id = mcp
.send_fuzzy_file_search_session_update_request(session_id, "alp")
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(update_request_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_request_id)),
)
.await??;
let payload =
wait_for_session_updated(&mut mcp, session_id, "alp", FileExpectation::NonEmpty).await?;
assert_eq!(payload.files.len(), 1);
assert_eq!(payload.files[0].root, root_path);
assert_eq!(payload.files[0].path, "alpha.txt");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_multiple_query_updates_work() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;
std::fs::write(root.path().join("alphabet.txt"), "contents")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let root_path = root.path().to_string_lossy().to_string();
let session_id = "session-multi-update";
mcp.start_fuzzy_file_search_session(session_id, vec![root_path.clone()])
.await?;
mcp.update_fuzzy_file_search_session(session_id, "alp")
.await?;
let alp_payload =
wait_for_session_updated(&mut mcp, session_id, "alp", FileExpectation::NonEmpty).await?;
assert_eq!(
alp_payload.files.iter().all(|file| file.root == root_path),
true
);
mcp.update_fuzzy_file_search_session(session_id, "zzzz")
.await?;
let zzzz_payload =
wait_for_session_updated(&mut mcp, session_id, "zzzz", FileExpectation::Any).await?;
assert_eq!(zzzz_payload.query, "zzzz");
assert_eq!(zzzz_payload.files.is_empty(), true);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_update_after_stop_fails() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let session_id = "session-stop-fail";
let root_path = root.path().to_string_lossy().to_string();
mcp.start_fuzzy_file_search_session(session_id, vec![root_path])
.await?;
mcp.stop_fuzzy_file_search_session(session_id).await?;
assert_update_request_fails_for_missing_session(&mut mcp, session_id, "alp").await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_session_stops_sending_updates_after_stop() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
for i in 0..10_000 {
let file_path = root.path().join(format!("file-{i:04}.txt"));
std::fs::write(file_path, "contents")?;
}
let mut mcp = initialized_mcp(&codex_home).await?;
let root_path = root.path().to_string_lossy().to_string();
let session_id = "session-stop-no-updates";
mcp.start_fuzzy_file_search_session(session_id, vec![root_path])
.await?;
mcp.update_fuzzy_file_search_session(session_id, "file-")
.await?;
wait_for_session_updated(&mut mcp, session_id, "file-", FileExpectation::NonEmpty).await?;
mcp.stop_fuzzy_file_search_session(session_id).await?;
assert_no_session_updates_for(&mut mcp, session_id, STOP_GRACE_PERIOD, SHORT_READ_TIMEOUT)
.await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_two_sessions_are_independent() -> Result<()> {
let codex_home = TempDir::new()?;
let root_a = TempDir::new()?;
let root_b = TempDir::new()?;
std::fs::write(root_a.path().join("alpha.txt"), "contents")?;
std::fs::write(root_b.path().join("beta.txt"), "contents")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let root_a_path = root_a.path().to_string_lossy().to_string();
let root_b_path = root_b.path().to_string_lossy().to_string();
let session_a = "session-a";
let session_b = "session-b";
mcp.start_fuzzy_file_search_session(session_a, vec![root_a_path.clone()])
.await?;
mcp.start_fuzzy_file_search_session(session_b, vec![root_b_path.clone()])
.await?;
mcp.update_fuzzy_file_search_session(session_a, "alp")
.await?;
let session_a_update =
wait_for_session_updated(&mut mcp, session_a, "alp", FileExpectation::NonEmpty).await?;
assert_eq!(session_a_update.files.len(), 1);
assert_eq!(session_a_update.files[0].root, root_a_path);
assert_eq!(session_a_update.files[0].path, "alpha.txt");
mcp.update_fuzzy_file_search_session(session_b, "bet")
.await?;
let session_b_update =
wait_for_session_updated(&mut mcp, session_b, "bet", FileExpectation::NonEmpty).await?;
assert_eq!(session_b_update.files.len(), 1);
assert_eq!(session_b_update.files[0].root, root_b_path);
assert_eq!(session_b_update.files[0].path, "beta.txt");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_fuzzy_file_search_query_cleared_sends_blank_snapshot() -> Result<()> {
let codex_home = TempDir::new()?;
let root = TempDir::new()?;
std::fs::write(root.path().join("alpha.txt"), "contents")?;
let mut mcp = initialized_mcp(&codex_home).await?;
let root_path = root.path().to_string_lossy().to_string();
let session_id = "session-clear-query";
mcp.start_fuzzy_file_search_session(session_id, vec![root_path])
.await?;
mcp.update_fuzzy_file_search_session(session_id, "alp")
.await?;
wait_for_session_updated(&mut mcp, session_id, "alp", FileExpectation::NonEmpty).await?;
mcp.update_fuzzy_file_search_session(session_id, "").await?;
let payload =
wait_for_session_updated(&mut mcp, session_id, "", FileExpectation::Empty).await?;
assert_eq!(payload.files.is_empty(), true);
Ok(())
}
+32 -25
View File
@@ -139,19 +139,6 @@ impl Drop for FileSearchSession {
}
pub fn create_session(
search_directory: &Path,
options: FileSearchOptions,
reporter: Arc<dyn SessionReporter>,
) -> anyhow::Result<FileSearchSession> {
create_session_inner(
vec![search_directory.to_path_buf()],
options,
reporter,
None,
)
}
fn create_session_inner(
search_directories: Vec<PathBuf>,
options: FileSearchOptions,
reporter: Arc<dyn SessionReporter>,
@@ -291,7 +278,7 @@ pub fn run(
cancel_flag: Option<Arc<AtomicBool>>,
) -> anyhow::Result<FileSearchResults> {
let reporter = Arc::new(RunReporter::default());
let session = create_session_inner(roots, options, reporter.clone(), cancel_flag)?;
let session = create_session(roots, options, reporter.clone(), cancel_flag)?;
session.update_query(pattern_text);
@@ -771,8 +758,13 @@ mod tests {
fn session_scanned_file_count_is_monotonic_across_queries() {
let dir = create_temp_tree(200);
let reporter = Arc::new(RecordingReporter::default());
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
.expect("session");
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
None,
)
.expect("session");
session.update_query("file-00");
thread::sleep(Duration::from_millis(20));
@@ -791,8 +783,13 @@ mod tests {
fn session_streams_updates_before_walk_complete() {
let dir = create_temp_tree(600);
let reporter = Arc::new(RecordingReporter::default());
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
.expect("session");
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
None,
)
.expect("session");
session.update_query("file-0");
let completed = reporter.wait_for_complete(Duration::from_secs(5));
@@ -808,8 +805,13 @@ mod tests {
fs::write(dir.path().join("alpha.txt"), "alpha").unwrap();
fs::write(dir.path().join("beta.txt"), "beta").unwrap();
let reporter = Arc::new(RecordingReporter::default());
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
.expect("session");
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
None,
)
.expect("session");
session.update_query("alpha");
assert!(reporter.wait_for_complete(Duration::from_secs(5)));
@@ -834,7 +836,7 @@ mod tests {
fs::write(dir.path().join("alpha.txt"), "alpha").unwrap();
fs::write(dir.path().join("beta.txt"), "beta").unwrap();
let reporter = Arc::new(RecordingReporter::default());
let session = create_session_inner(
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
@@ -863,7 +865,7 @@ mod tests {
let cancel_flag = Arc::new(AtomicBool::new(false));
let reporter_a = Arc::new(RecordingReporter::default());
let session_a = create_session_inner(
let session_a = create_session(
vec![root_a.path().to_path_buf()],
FileSearchOptions::default(),
reporter_a,
@@ -872,7 +874,7 @@ mod tests {
.expect("session_a");
let reporter_b = Arc::new(RecordingReporter::default());
let session_b = create_session_inner(
let session_b = create_session(
vec![root_b.path().to_path_buf()],
FileSearchOptions::default(),
reporter_b.clone(),
@@ -894,8 +896,13 @@ mod tests {
fn session_emits_updates_when_query_changes() {
let dir = create_temp_tree(200);
let reporter = Arc::new(RecordingReporter::default());
let session = create_session(dir.path(), FileSearchOptions::default(), reporter.clone())
.expect("session");
let session = create_session(
vec![dir.path().to_path_buf()],
FileSearchOptions::default(),
reporter.clone(),
None,
)
.expect("session");
session.update_query("zzzzzzzz");
let completed = reporter.wait_for_complete(Duration::from_secs(5));
+2 -1
View File
@@ -81,12 +81,13 @@ impl FileSearchManager {
session_token,
});
let session = file_search::create_session(
&self.search_dir,
vec![self.search_dir.clone()],
file_search::FileSearchOptions {
compute_indices: true,
..Default::default()
},
reporter,
None,
);
match session {
Ok(session) => st.session = Some(session),