Close thread persistence when submission channel closes (#30173)

### Summary

Release live thread persistence when a session ends because its
submission channel closes. This prevents a later same-process resume
from failing with `thread ... already has a live local writer`.

### Details

The issue is in the `codex-core` session teardown path used by Codex
hosts, rather than in Managed Agents API or exec-server itself.

Explicit shutdown already closes the `LiveThread`, which releases the
process-scoped writer held by `LocalThreadStore`. The
submission-channel-close fallback ran runtime and extension teardown but
skipped that persistence shutdown, leaving the thread ID registered as
having a live writer.

This change:

- closes the `LiveThread` on the channel-close fallback path;
- preserves the existing teardown order used by explicit shutdowns;
- extends the lifecycle regression test to assert that the thread store
receives `shutdown_thread`.

Context: [original
report](https://openai.slack.com/archives/C0B4NBHQGTV/p1782136364948039),
[recent occurrence
1](https://openai.slack.com/archives/C0B4NBHQGTV/p1782434817895839?thread_ts=1782136364.948039&cid=C0B4NBHQGTV),
[recent occurrence
2](https://openai.slack.com/archives/C0B4NBHQGTV/p1782335107474429?thread_ts=1782136364.948039&cid=C0B4NBHQGTV)

### Testing

- `just test -p codex-core
submission_loop_channel_close_runs_full_thread_teardown`
- `just test -p codex-core --lib` (1,989 passed; 3 skipped)
- `just fix -p codex-core`
- `just fmt`
- Native code review: no findings

I also attempted `just test -p codex-core`. The new regression passed;
79 unrelated integration tests failed in the local harness, primarily
because helper binaries such as `test_stdio_server` were unavailable,
plus local proxy/shell timing failures.
This commit is contained in:
Abdulrahman Alfozan
2026-06-26 13:56:17 -07:00
committed by GitHub
Unverified
parent 69596f0e42
commit c55ce3b51b
2 changed files with 48 additions and 1 deletions
+5
View File
@@ -850,6 +850,11 @@ pub(super) async fn submission_loop(
if !shutdown_received {
shutdown_session_runtime(&sess).await;
emit_thread_stop_lifecycle(sess.as_ref()).await;
if let Some(live_thread) = sess.live_thread()
&& let Err(err) = live_thread.shutdown().await
{
warn!("failed to shutdown thread persistence after submission channel closed: {err}");
}
}
debug!("Agent loop exited");
}
+43 -1
View File
@@ -6971,7 +6971,7 @@ async fn shutdown_complete_does_not_append_to_thread_store_after_shutdown() {
}
#[tokio::test]
async fn submission_loop_channel_close_emits_thread_stop_lifecycle() {
async fn submission_loop_channel_close_runs_full_thread_teardown() {
struct SessionStopMarker;
struct ThreadStopMarker;
@@ -6998,6 +6998,40 @@ async fn submission_loop_channel_close_emits_thread_stop_lifecycle() {
}
let (mut session, turn_context) = make_session_and_context().await;
let store = Arc::new(codex_thread_store::InMemoryThreadStore::default());
let thread_store: Arc<dyn codex_thread_store::ThreadStore> = store.clone();
let config = session.get_config().await;
let live_thread = LiveThread::create(
Arc::clone(&thread_store),
CreateThreadParams {
session_id: session.session_id(),
thread_id: session.thread_id,
extra_config: None,
forked_from_id: None,
parent_thread_id: None,
source: SessionSource::Exec,
thread_source: None,
originator: "test_originator".to_string(),
base_instructions: BaseInstructions::default(),
dynamic_tools: Vec::new(),
selected_capability_roots: Vec::new(),
multi_agent_version: None,
initial_window_id: Uuid::now_v7().to_string(),
metadata: ThreadPersistenceMetadata {
cwd: Some(config.cwd.to_path_buf()),
model_provider: config.model_provider_id.clone(),
memory_mode: if config.memories.generate_memories {
ThreadMemoryMode::Enabled
} else {
ThreadMemoryMode::Disabled
},
},
},
)
.await
.expect("create thread persistence");
session.services.thread_store = thread_store;
session.services.live_thread = Some(live_thread);
let calls = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let mut builder = codex_extension_api::ExtensionRegistryBuilder::<crate::config::Config>::new();
builder.thread_lifecycle_contributor(Arc::new(ThreadStopRecorder {
@@ -7020,6 +7054,14 @@ async fn submission_loop_channel_close_emits_thread_stop_lifecycle() {
submission_loop(session, Arc::clone(&turn_context.config), rx_sub).await;
assert_eq!(1, calls.load(std::sync::atomic::Ordering::SeqCst));
assert_eq!(
codex_thread_store::InMemoryThreadStoreCalls {
create_thread: 1,
shutdown_thread: 1,
..Default::default()
},
store.calls().await
);
}
#[tokio::test]