diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index 75fbca4c8..a483ce5e0 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -850,6 +850,11 @@ pub(super) async fn submission_loop( if !shutdown_received { shutdown_session_runtime(&sess).await; emit_thread_stop_lifecycle(sess.as_ref()).await; + if let Some(live_thread) = sess.live_thread() + && let Err(err) = live_thread.shutdown().await + { + warn!("failed to shutdown thread persistence after submission channel closed: {err}"); + } } debug!("Agent loop exited"); } diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 20f32ddfc..46f7ca6e6 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -6971,7 +6971,7 @@ async fn shutdown_complete_does_not_append_to_thread_store_after_shutdown() { } #[tokio::test] -async fn submission_loop_channel_close_emits_thread_stop_lifecycle() { +async fn submission_loop_channel_close_runs_full_thread_teardown() { struct SessionStopMarker; struct ThreadStopMarker; @@ -6998,6 +6998,40 @@ async fn submission_loop_channel_close_emits_thread_stop_lifecycle() { } let (mut session, turn_context) = make_session_and_context().await; + let store = Arc::new(codex_thread_store::InMemoryThreadStore::default()); + let thread_store: Arc = store.clone(); + let config = session.get_config().await; + let live_thread = LiveThread::create( + Arc::clone(&thread_store), + CreateThreadParams { + session_id: session.session_id(), + thread_id: session.thread_id, + extra_config: None, + forked_from_id: None, + parent_thread_id: None, + source: SessionSource::Exec, + thread_source: None, + originator: "test_originator".to_string(), + base_instructions: BaseInstructions::default(), + dynamic_tools: Vec::new(), + selected_capability_roots: Vec::new(), + multi_agent_version: None, + initial_window_id: Uuid::now_v7().to_string(), + metadata: ThreadPersistenceMetadata { + cwd: Some(config.cwd.to_path_buf()), + model_provider: config.model_provider_id.clone(), + memory_mode: if config.memories.generate_memories { + ThreadMemoryMode::Enabled + } else { + ThreadMemoryMode::Disabled + }, + }, + }, + ) + .await + .expect("create thread persistence"); + session.services.thread_store = thread_store; + session.services.live_thread = Some(live_thread); let calls = Arc::new(std::sync::atomic::AtomicUsize::new(0)); let mut builder = codex_extension_api::ExtensionRegistryBuilder::::new(); builder.thread_lifecycle_contributor(Arc::new(ThreadStopRecorder { @@ -7020,6 +7054,14 @@ async fn submission_loop_channel_close_emits_thread_stop_lifecycle() { submission_loop(session, Arc::clone(&turn_context.config), rx_sub).await; assert_eq!(1, calls.load(std::sync::atomic::Ordering::SeqCst)); + assert_eq!( + codex_thread_store::InMemoryThreadStoreCalls { + create_thread: 1, + shutdown_thread: 1, + ..Default::default() + }, + store.calls().await + ); } #[tokio::test]