From c55ce3b51bc3b8bb2805cbee0afc1a8c6e1e7ee2 Mon Sep 17 00:00:00 2001 From: Abdulrahman Alfozan Date: Fri, 26 Jun 2026 13:56:17 -0700 Subject: [PATCH] Close thread persistence when submission channel closes (#30173) ### Summary Release live thread persistence when a session ends because its submission channel closes. This prevents a later same-process resume from failing with `thread ... already has a live local writer`. ### Details The issue is in the `codex-core` session teardown path used by Codex hosts, rather than in Managed Agents API or exec-server itself. Explicit shutdown already closes the `LiveThread`, which releases the process-scoped writer held by `LocalThreadStore`. The submission-channel-close fallback ran runtime and extension teardown but skipped that persistence shutdown, leaving the thread ID registered as having a live writer. This change: - closes the `LiveThread` on the channel-close fallback path; - preserves the existing teardown order used by explicit shutdowns; - extends the lifecycle regression test to assert that the thread store receives `shutdown_thread`. Context: [original report](https://openai.slack.com/archives/C0B4NBHQGTV/p1782136364948039), [recent occurrence 1](https://openai.slack.com/archives/C0B4NBHQGTV/p1782434817895839?thread_ts=1782136364.948039&cid=C0B4NBHQGTV), [recent occurrence 2](https://openai.slack.com/archives/C0B4NBHQGTV/p1782335107474429?thread_ts=1782136364.948039&cid=C0B4NBHQGTV) ### Testing - `just test -p codex-core submission_loop_channel_close_runs_full_thread_teardown` - `just test -p codex-core --lib` (1,989 passed; 3 skipped) - `just fix -p codex-core` - `just fmt` - Native code review: no findings I also attempted `just test -p codex-core`. The new regression passed; 79 unrelated integration tests failed in the local harness, primarily because helper binaries such as `test_stdio_server` were unavailable, plus local proxy/shell timing failures. --- codex-rs/core/src/session/handlers.rs | 5 +++ codex-rs/core/src/session/tests.rs | 44 ++++++++++++++++++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index 75fbca4c8..a483ce5e0 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -850,6 +850,11 @@ pub(super) async fn submission_loop( if !shutdown_received { shutdown_session_runtime(&sess).await; emit_thread_stop_lifecycle(sess.as_ref()).await; + if let Some(live_thread) = sess.live_thread() + && let Err(err) = live_thread.shutdown().await + { + warn!("failed to shutdown thread persistence after submission channel closed: {err}"); + } } debug!("Agent loop exited"); } diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 20f32ddfc..46f7ca6e6 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -6971,7 +6971,7 @@ async fn shutdown_complete_does_not_append_to_thread_store_after_shutdown() { } #[tokio::test] -async fn submission_loop_channel_close_emits_thread_stop_lifecycle() { +async fn submission_loop_channel_close_runs_full_thread_teardown() { struct SessionStopMarker; struct ThreadStopMarker; @@ -6998,6 +6998,40 @@ async fn submission_loop_channel_close_emits_thread_stop_lifecycle() { } let (mut session, turn_context) = make_session_and_context().await; + let store = Arc::new(codex_thread_store::InMemoryThreadStore::default()); + let thread_store: Arc = store.clone(); + let config = session.get_config().await; + let live_thread = LiveThread::create( + Arc::clone(&thread_store), + CreateThreadParams { + session_id: session.session_id(), + thread_id: session.thread_id, + extra_config: None, + forked_from_id: None, + parent_thread_id: None, + source: SessionSource::Exec, + thread_source: None, + originator: "test_originator".to_string(), + base_instructions: BaseInstructions::default(), + dynamic_tools: Vec::new(), + selected_capability_roots: Vec::new(), + multi_agent_version: None, + initial_window_id: Uuid::now_v7().to_string(), + metadata: ThreadPersistenceMetadata { + cwd: Some(config.cwd.to_path_buf()), + model_provider: config.model_provider_id.clone(), + memory_mode: if config.memories.generate_memories { + ThreadMemoryMode::Enabled + } else { + ThreadMemoryMode::Disabled + }, + }, + }, + ) + .await + .expect("create thread persistence"); + session.services.thread_store = thread_store; + session.services.live_thread = Some(live_thread); let calls = Arc::new(std::sync::atomic::AtomicUsize::new(0)); let mut builder = codex_extension_api::ExtensionRegistryBuilder::::new(); builder.thread_lifecycle_contributor(Arc::new(ThreadStopRecorder { @@ -7020,6 +7054,14 @@ async fn submission_loop_channel_close_emits_thread_stop_lifecycle() { submission_loop(session, Arc::clone(&turn_context.config), rx_sub).await; assert_eq!(1, calls.load(std::sync::atomic::Ordering::SeqCst)); + assert_eq!( + codex_thread_store::InMemoryThreadStoreCalls { + create_thread: 1, + shutdown_thread: 1, + ..Default::default() + }, + store.calls().await + ); } #[tokio::test]