mirror of
https://github.com/pchuan98/codex.git
synced 2026-07-01 00:31:56 +08:00
app-server: refresh live threads from latest config snapshot (#21187)
## Why App-server config writes were leaving existing threads partially stale. After a config mutation, the app-server told each live thread to run `Op::ReloadUserConfig`, but that path only re-read the user `config.toml` layer. Settings that came from the app-server's materialized config snapshot did not propagate to existing threads until restart. This change prevent a FS access from `core` for CCA. ## What changed - add `CodexThread::refresh_runtime_config()` and `Session::refresh_runtime_config()` so the app-server can push a freshly rebuilt config snapshot into a live thread - rebuild the latest config with each thread's `cwd` after config mutations, then refresh the thread from that snapshot instead of asking it to reload only `config.toml` - keep session-static settings unchanged during refresh, while updating runtime-refreshable state such as the config layer stack, `tool_suggest`, and derived hook/plugin/skill state - keep `reload_user_config_layer()` as the file-backed fallback for legacy local reload flows, but route the shared refresh logic through the new runtime refresh path ## Testing - add a session test that verifies `refresh_runtime_config()` rebuilds hooks from refreshed config - add a session test that verifies runtime-refreshable fields update while session-static settings like `model` and `notify` stay unchanged --------- Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
committed by
GitHub
Unverified
parent
129401df43
commit
eb0462f2af
@@ -46,7 +46,6 @@ use codex_login::AuthManager;
|
||||
use codex_model_provider::create_model_provider;
|
||||
use codex_plugin::PluginId;
|
||||
use codex_protocol::config_types::WebSearchMode;
|
||||
use codex_protocol::protocol::Op;
|
||||
use serde_json::json;
|
||||
use std::path::PathBuf;
|
||||
|
||||
@@ -378,14 +377,22 @@ impl ConfigRequestProcessor {
|
||||
}
|
||||
|
||||
async fn reload_user_config(&self) {
|
||||
let next_config = match self.load_latest_config(/*fallback_cwd*/ None).await {
|
||||
Ok(config) => config,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"failed to rebuild user config for runtime refresh: {}",
|
||||
err.message
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let thread_ids = self.thread_manager.list_thread_ids().await;
|
||||
for thread_id in thread_ids {
|
||||
let Ok(thread) = self.thread_manager.get_thread(thread_id).await else {
|
||||
continue;
|
||||
};
|
||||
if let Err(err) = thread.submit(Op::ReloadUserConfig).await {
|
||||
tracing::warn!("failed to request user config reload: {err}");
|
||||
}
|
||||
thread.refresh_runtime_config(next_config.clone()).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -237,28 +237,32 @@ impl ConfigLayerStack {
|
||||
/// replaced; otherwise, it is inserted into the stack at the appropriate
|
||||
/// position based on precedence rules.
|
||||
pub fn with_user_config(&self, config_toml: &AbsolutePathBuf, user_config: TomlValue) -> Self {
|
||||
let user_layer = ConfigLayerEntry::new(
|
||||
self.with_user_layer(Some(ConfigLayerEntry::new(
|
||||
ConfigLayerSource::User {
|
||||
file: config_toml.clone(),
|
||||
},
|
||||
user_config,
|
||||
);
|
||||
)))
|
||||
}
|
||||
|
||||
/// Returns a new stack with the user layer copied from `other`, preserving
|
||||
/// every non-user layer already present in this stack.
|
||||
pub fn with_user_layer_from(&self, other: &Self) -> Self {
|
||||
self.with_user_layer(other.get_user_layer().cloned())
|
||||
}
|
||||
|
||||
fn with_user_layer(&self, user_layer: Option<ConfigLayerEntry>) -> Self {
|
||||
let mut layers = self.layers.clone();
|
||||
match self.user_layer_index {
|
||||
Some(index) => {
|
||||
let user_layer_index = match (self.user_layer_index, user_layer) {
|
||||
(Some(index), Some(user_layer)) => {
|
||||
layers[index] = user_layer;
|
||||
Self {
|
||||
layers,
|
||||
user_layer_index: self.user_layer_index,
|
||||
requirements: self.requirements.clone(),
|
||||
requirements_toml: self.requirements_toml.clone(),
|
||||
ignore_user_and_project_exec_policy_rules: self
|
||||
.ignore_user_and_project_exec_policy_rules,
|
||||
startup_warnings: self.startup_warnings.clone(),
|
||||
}
|
||||
Some(index)
|
||||
}
|
||||
None => {
|
||||
(Some(index), None) => {
|
||||
layers.remove(index);
|
||||
None
|
||||
}
|
||||
(None, Some(user_layer)) => {
|
||||
let user_layer_index = match layers
|
||||
.iter()
|
||||
.position(|layer| layer.name.precedence() > user_layer.name.precedence())
|
||||
@@ -272,16 +276,18 @@ impl ConfigLayerStack {
|
||||
layers.len() - 1
|
||||
}
|
||||
};
|
||||
Self {
|
||||
layers,
|
||||
user_layer_index: Some(user_layer_index),
|
||||
requirements: self.requirements.clone(),
|
||||
requirements_toml: self.requirements_toml.clone(),
|
||||
ignore_user_and_project_exec_policy_rules: self
|
||||
.ignore_user_and_project_exec_policy_rules,
|
||||
startup_warnings: self.startup_warnings.clone(),
|
||||
}
|
||||
Some(user_layer_index)
|
||||
}
|
||||
(None, None) => None,
|
||||
};
|
||||
Self {
|
||||
layers,
|
||||
user_layer_index,
|
||||
requirements: self.requirements.clone(),
|
||||
requirements_toml: self.requirements_toml.clone(),
|
||||
ignore_user_and_project_exec_policy_rules: self
|
||||
.ignore_user_and_project_exec_policy_rules,
|
||||
startup_warnings: self.startup_warnings.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -464,6 +464,13 @@ impl CodexThread {
|
||||
self.codex.session.get_config().await
|
||||
}
|
||||
|
||||
/// Refresh the thread's layer-backed user config state from a caller-supplied
|
||||
/// config snapshot. Thread-scoped layers and session-static settings remain
|
||||
/// unchanged.
|
||||
pub async fn refresh_runtime_config(&self, next_config: crate::config::Config) {
|
||||
self.codex.session.refresh_runtime_config(next_config).await;
|
||||
}
|
||||
|
||||
pub async fn read_mcp_resource(
|
||||
&self,
|
||||
server: &str,
|
||||
|
||||
@@ -1399,10 +1399,49 @@ impl Session {
|
||||
state.session_configuration.provider.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn refresh_runtime_config(&self, next_config: Config) {
|
||||
// Refresh only the user layer from the incoming snapshot. Preserve thread-local
|
||||
// layers such as request/session overrides that were present when this session
|
||||
// was created.
|
||||
let config = {
|
||||
let mut state = self.state.lock().await;
|
||||
let mut config = (*state.session_configuration.original_config_do_not_use).clone();
|
||||
config.config_layer_stack = config
|
||||
.config_layer_stack
|
||||
.with_user_layer_from(&next_config.config_layer_stack);
|
||||
config.tool_suggest =
|
||||
resolve_tool_suggest_config_from_layer_stack(&config.config_layer_stack);
|
||||
let config = Arc::new(config);
|
||||
state.session_configuration.original_config_do_not_use = Arc::clone(&config);
|
||||
config
|
||||
};
|
||||
self.services.skills_manager.clear_cache();
|
||||
self.services.plugins_manager.clear_cache();
|
||||
let hooks = build_hooks_for_config(
|
||||
config.as_ref(),
|
||||
self.services.plugins_manager.as_ref(),
|
||||
self.services.user_shell.as_ref(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let state = self.state.lock().await;
|
||||
// A newer refresh may have updated the config while this hook build was in flight.
|
||||
// Only publish hooks derived from the current config snapshot.
|
||||
if Arc::ptr_eq(
|
||||
&state.session_configuration.original_config_do_not_use,
|
||||
&config,
|
||||
) {
|
||||
self.services.hooks.store(Arc::new(hooks));
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn reload_user_config_layer(&self) {
|
||||
// Refresh layer-backed runtime state for an existing session, including enabled plugin,
|
||||
// skill, and hook state. Derived config fields such as feature gates and legacy notify
|
||||
// settings remain session-static.
|
||||
//
|
||||
// Prefer `refresh_runtime_config()` when the host can already provide a materialized
|
||||
// config snapshot. This file-based path exists for legacy local reload flows.
|
||||
let config_toml_path = {
|
||||
let state = self.state.lock().await;
|
||||
state
|
||||
@@ -1428,36 +1467,17 @@ impl Session {
|
||||
}
|
||||
};
|
||||
|
||||
let config = {
|
||||
let mut state = self.state.lock().await;
|
||||
let next_config = {
|
||||
let state = self.state.lock().await;
|
||||
let mut config = (*state.session_configuration.original_config_do_not_use).clone();
|
||||
config.config_layer_stack = config
|
||||
.config_layer_stack
|
||||
.with_user_config(&config_toml_path, user_config);
|
||||
config.tool_suggest =
|
||||
resolve_tool_suggest_config_from_layer_stack(&config.config_layer_stack);
|
||||
let config = Arc::new(config);
|
||||
state.session_configuration.original_config_do_not_use = Arc::clone(&config);
|
||||
config
|
||||
};
|
||||
self.services.skills_manager.clear_cache();
|
||||
self.services.plugins_manager.clear_cache();
|
||||
let hooks = build_hooks_for_config(
|
||||
config.as_ref(),
|
||||
self.services.plugins_manager.as_ref(),
|
||||
self.services.user_shell.as_ref(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let state = self.state.lock().await;
|
||||
// A newer reload may have updated the config while this hook build was in flight.
|
||||
// Only publish hooks derived from the current config snapshot.
|
||||
if Arc::ptr_eq(
|
||||
&state.session_configuration.original_config_do_not_use,
|
||||
&config,
|
||||
) {
|
||||
self.services.hooks.store(Arc::new(hooks));
|
||||
}
|
||||
self.refresh_runtime_config(next_config).await;
|
||||
}
|
||||
|
||||
async fn build_settings_update_items(
|
||||
|
||||
@@ -1283,6 +1283,78 @@ async fn reload_user_config_layer_refreshes_hooks() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn refresh_runtime_config_refreshes_hooks() -> anyhow::Result<()> {
|
||||
let (session, _turn_context) = make_session_and_context().await;
|
||||
{
|
||||
let mut state = session.state.lock().await;
|
||||
let mut config = (*state.session_configuration.original_config_do_not_use).clone();
|
||||
config
|
||||
.features
|
||||
.enable(Feature::CodexHooks)
|
||||
.expect("enable Codex hooks");
|
||||
state.session_configuration.original_config_do_not_use = Arc::new(config);
|
||||
}
|
||||
let codex_home = session.codex_home().await;
|
||||
std::fs::create_dir_all(&codex_home)?;
|
||||
let config_toml_path = codex_home.join(CONFIG_TOML_FILE);
|
||||
#[derive(serde::Serialize)]
|
||||
struct NormalizedHookIdentity {
|
||||
event_name: &'static str,
|
||||
#[serde(flatten)]
|
||||
group: codex_config::MatcherGroup,
|
||||
}
|
||||
let trusted_hash = {
|
||||
let identity = NormalizedHookIdentity {
|
||||
event_name: "session_start",
|
||||
group: codex_config::MatcherGroup {
|
||||
matcher: None,
|
||||
hooks: vec![codex_config::HookHandlerConfig::Command {
|
||||
command: "python3 /tmp/user.py".to_string(),
|
||||
timeout_sec: Some(600),
|
||||
r#async: false,
|
||||
status_message: None,
|
||||
}],
|
||||
},
|
||||
};
|
||||
let identity = codex_config::TomlValue::try_from(identity)?;
|
||||
codex_config::version_for_toml(&identity)
|
||||
};
|
||||
let hook_key = format!("{}:session_start:0:0", config_toml_path.display());
|
||||
let trusted_user_config: codex_config::TomlValue = serde_json::from_value(serde_json::json!({
|
||||
"hooks": {
|
||||
"SessionStart": [{
|
||||
"hooks": [{
|
||||
"type": "command",
|
||||
"command": "python3 /tmp/user.py",
|
||||
}],
|
||||
}],
|
||||
"state": {
|
||||
hook_key: {
|
||||
"trusted_hash": trusted_hash,
|
||||
},
|
||||
},
|
||||
},
|
||||
}))?;
|
||||
std::fs::write(&config_toml_path, toml::to_string(&trusted_user_config)?)?;
|
||||
|
||||
let request = codex_hooks::SessionStartRequest {
|
||||
session_id: session.conversation_id,
|
||||
cwd: session.get_config().await.cwd.clone(),
|
||||
transcript_path: None,
|
||||
model: "gpt-5.2".to_string(),
|
||||
permission_mode: "default".to_string(),
|
||||
source: codex_hooks::SessionStartSource::Startup,
|
||||
};
|
||||
assert!(session.hooks().preview_session_start(&request).is_empty());
|
||||
|
||||
let next_config = load_latest_config_for_session(&session).await;
|
||||
session.refresh_runtime_config(next_config).await;
|
||||
|
||||
assert_eq!(session.hooks().preview_session_start(&request).len(), 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reload_user_config_layer_updates_effective_tool_suggest_config() {
|
||||
let (session, _turn_context) = make_session_and_context().await;
|
||||
@@ -1312,6 +1384,62 @@ disabled_tools = [
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn refresh_runtime_config_updates_runtime_refreshable_fields_and_keeps_session_static_settings()
|
||||
{
|
||||
let (session, _turn_context) = make_session_and_context().await;
|
||||
let codex_home = session.codex_home().await;
|
||||
std::fs::create_dir_all(&codex_home).expect("create codex home");
|
||||
std::fs::write(
|
||||
codex_home.join(CONFIG_TOML_FILE),
|
||||
r#"[apps.calendar]
|
||||
enabled = false
|
||||
destructive_enabled = false
|
||||
|
||||
[tool_suggest]
|
||||
disabled_tools = [
|
||||
{ type = "connector", id = " calendar " },
|
||||
{ type = "plugin", id = "slack@openai-curated" },
|
||||
]
|
||||
"#,
|
||||
)
|
||||
.expect("write user config");
|
||||
|
||||
let original = session.get_config().await;
|
||||
let mut next_config = load_latest_config_for_session(&session).await;
|
||||
next_config.model = Some("gpt-5.4".to_string());
|
||||
next_config.notify = Some(vec!["echo".to_string()]);
|
||||
|
||||
session.refresh_runtime_config(next_config).await;
|
||||
|
||||
let config = session.get_config().await;
|
||||
let apps_toml = config
|
||||
.config_layer_stack
|
||||
.effective_config()
|
||||
.as_table()
|
||||
.and_then(|table| table.get("apps"))
|
||||
.cloned()
|
||||
.expect("apps table");
|
||||
let apps = codex_config::types::AppsConfigToml::deserialize(apps_toml)
|
||||
.expect("deserialize apps config");
|
||||
let app = apps
|
||||
.apps
|
||||
.get("calendar")
|
||||
.expect("calendar app config exists");
|
||||
|
||||
assert!(!app.enabled);
|
||||
assert_eq!(app.destructive_enabled, Some(false));
|
||||
assert_eq!(config.model, original.model);
|
||||
assert_eq!(config.notify, original.notify);
|
||||
assert_eq!(
|
||||
config.tool_suggest.disabled_tools,
|
||||
vec![
|
||||
ToolSuggestDisabledTool::connector("calendar"),
|
||||
ToolSuggestDisabledTool::plugin("slack@openai-curated"),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filter_connectors_for_input_skips_duplicate_slug_mentions() {
|
||||
let connectors = vec![
|
||||
@@ -3837,6 +3965,16 @@ async fn make_session_with_config(
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
async fn load_latest_config_for_session(session: &Session) -> Config {
|
||||
let config = session.get_config().await;
|
||||
ConfigBuilder::default()
|
||||
.codex_home(config.codex_home.to_path_buf())
|
||||
.fallback_cwd(Some(config.cwd.to_path_buf()))
|
||||
.build()
|
||||
.await
|
||||
.expect("load latest config for session")
|
||||
}
|
||||
|
||||
async fn make_session_with_config_and_rx(
|
||||
mutator: impl FnOnce(&mut Config),
|
||||
) -> anyhow::Result<(Arc<Session>, async_channel::Receiver<Event>)> {
|
||||
|
||||
Reference in New Issue
Block a user