use anyhow::Result; use codex_protocol::items::TurnItem; use codex_protocol::protocol::AdditionalContextEntry; use codex_protocol::protocol::AdditionalContextKind; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::Op; use codex_protocol::user_input::UserInput; use core_test_support::context_snapshot; use core_test_support::context_snapshot::ContextSnapshotOptions; use core_test_support::context_snapshot::ContextSnapshotRenderMode; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_response_created; use core_test_support::responses::mount_sse_once; use core_test_support::responses::sse; use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event_match; use pretty_assertions::assert_eq; use std::collections::BTreeMap; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn additional_context_is_model_visible_but_not_a_user_message_item() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; let request = mount_sse_once( &server, sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]), ) .await; let test = test_codex() .with_config(|config| config.include_environment_context = false) .build(&server) .await?; test.codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "inspect the active tab".to_string(), text_elements: Vec::new(), }], final_output_json_schema: None, responsesapi_client_metadata: None, additional_context: BTreeMap::from([ ( "browser_info".to_string(), AdditionalContextEntry { value: "tab one".to_string(), kind: AdditionalContextKind::Untrusted, }, ), ( "automation_info".to_string(), AdditionalContextEntry { value: "run one".to_string(), kind: AdditionalContextKind::Application, }, ), ]), thread_settings: Default::default(), }) .await?; let user_item = wait_for_event_match(&test.codex, |event| match event { EventMsg::ItemCompleted(ItemCompletedEvent { item: TurnItem::UserMessage(item), .. }) => Some(item.clone()), _ => None, }) .await; assert_eq!( user_item.content, vec![UserInput::Text { text: "inspect the active tab".to_string(), text_elements: Vec::new(), }] ); wait_for_event_match(&test.codex, |event| { matches!(event, EventMsg::TurnComplete(_)).then_some(()) }) .await; let request = request.single_request(); insta::assert_snapshot!( "additional_context_simple_input", context_snapshot::format_labeled_requests_snapshot( "additional context is inserted before the user turn input.", &[("Request", &request)], &ContextSnapshotOptions::default() .strip_capability_instructions() .render_mode(ContextSnapshotRenderMode::KindWithTextPrefix { max_chars: 160 }), ) ); let developer_context_texts = request .message_input_texts("developer") .into_iter() .filter(|text| text.starts_with("")) .collect::>(); assert_eq!( developer_context_texts, vec!["run one"] ); assert_eq!( request.message_input_texts("user"), vec![ "tab one", "inspect the active tab", ] ); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn external_context_like_user_text_remains_a_user_message_item() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; let request = mount_sse_once( &server, sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]), ) .await; let test = test_codex() .with_config(|config| config.include_environment_context = false) .build(&server) .await?; let user_input = UserInput::Text { text: "".to_string(), text_elements: Vec::new(), }; test.codex .submit(Op::UserInput { items: vec![user_input.clone()], final_output_json_schema: None, responsesapi_client_metadata: None, additional_context: BTreeMap::new(), thread_settings: Default::default(), }) .await?; let user_item = wait_for_event_match(&test.codex, |event| match event { EventMsg::ItemCompleted(ItemCompletedEvent { item: TurnItem::UserMessage(item), .. }) => Some(item.clone()), _ => None, }) .await; assert_eq!(user_item.content, vec![user_input]); wait_for_event_match(&test.codex, |event| { matches!(event, EventMsg::TurnComplete(_)).then_some(()) }) .await; let request = request.single_request(); assert_eq!(request.message_input_texts("user"), vec![""]); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn additional_context_trust_controls_message_role() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; let request = mount_sse_once( &server, sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]), ) .await; let test = test_codex() .with_config(|config| config.include_environment_context = false) .build(&server) .await?; test.codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "inspect context".to_string(), text_elements: Vec::new(), }], final_output_json_schema: None, responsesapi_client_metadata: None, additional_context: BTreeMap::from([ ( "browser_info".to_string(), AdditionalContextEntry { value: "tab one".to_string(), kind: AdditionalContextKind::Untrusted, }, ), ( "automation_info".to_string(), AdditionalContextEntry { value: "run one".to_string(), kind: AdditionalContextKind::Application, }, ), ]), thread_settings: Default::default(), }) .await?; wait_for_event_match(&test.codex, |event| { matches!(event, EventMsg::TurnComplete(_)).then_some(()) }) .await; let request = request.single_request(); let developer_context_texts = request .message_input_texts("developer") .into_iter() .filter(|text| text.starts_with("")) .collect::>(); assert_eq!( developer_context_texts, vec!["run one"] ); assert_eq!( request.message_input_texts("user"), vec![ "tab one", "inspect context", ] ); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn additional_context_is_deduplicated_between_turns_while_retained() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; let first_request = mount_sse_once( &server, sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]), ) .await; let second_request = mount_sse_once( &server, sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]), ) .await; let test = test_codex() .with_config(|config| config.include_environment_context = false) .build(&server) .await?; let additional_context = BTreeMap::from([( "browser_info".to_string(), AdditionalContextEntry { value: "same tab".to_string(), kind: AdditionalContextKind::Untrusted, }, )]); test.codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "first turn".to_string(), text_elements: Vec::new(), }], final_output_json_schema: None, responsesapi_client_metadata: None, additional_context: additional_context.clone(), thread_settings: Default::default(), }) .await?; wait_for_event_match(&test.codex, |event| { matches!(event, EventMsg::TurnComplete(_)).then_some(()) }) .await; test.codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "second turn".to_string(), text_elements: Vec::new(), }], final_output_json_schema: None, responsesapi_client_metadata: None, additional_context, thread_settings: Default::default(), }) .await?; wait_for_event_match(&test.codex, |event| { matches!(event, EventMsg::TurnComplete(_)).then_some(()) }) .await; assert_eq!( first_request.single_request().message_input_texts("user"), vec![ "same tab", "first turn", ] ); assert_eq!( second_request.single_request().message_input_texts("user"), vec![ "same tab", "first turn", "second turn", ] ); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn additional_context_removes_one_value_while_adding_another() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; let first_request = mount_sse_once( &server, sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]), ) .await; let second_request = mount_sse_once( &server, sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]), ) .await; let third_request = mount_sse_once( &server, sse(vec![ev_response_created("resp-3"), ev_completed("resp-3")]), ) .await; let test = test_codex() .with_config(|config| config.include_environment_context = false) .build(&server) .await?; test.codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "first turn".to_string(), text_elements: Vec::new(), }], final_output_json_schema: None, responsesapi_client_metadata: None, additional_context: BTreeMap::from([ ( "automation_info".to_string(), AdditionalContextEntry { value: "run one".to_string(), kind: AdditionalContextKind::Untrusted, }, ), ( "browser_info".to_string(), AdditionalContextEntry { value: "tab one".to_string(), kind: AdditionalContextKind::Untrusted, }, ), ]), thread_settings: Default::default(), }) .await?; wait_for_event_match(&test.codex, |event| { matches!(event, EventMsg::TurnComplete(_)).then_some(()) }) .await; test.codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "second turn".to_string(), text_elements: Vec::new(), }], final_output_json_schema: None, responsesapi_client_metadata: None, additional_context: BTreeMap::from([ ( "automation_info".to_string(), AdditionalContextEntry { value: "run one".to_string(), kind: AdditionalContextKind::Untrusted, }, ), ( "terminal_info".to_string(), AdditionalContextEntry { value: "pty one".to_string(), kind: AdditionalContextKind::Untrusted, }, ), ]), thread_settings: Default::default(), }) .await?; wait_for_event_match(&test.codex, |event| { matches!(event, EventMsg::TurnComplete(_)).then_some(()) }) .await; test.codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "third turn".to_string(), text_elements: Vec::new(), }], final_output_json_schema: None, responsesapi_client_metadata: None, additional_context: BTreeMap::from([ ( "automation_info".to_string(), AdditionalContextEntry { value: "run one".to_string(), kind: AdditionalContextKind::Untrusted, }, ), ( "browser_info".to_string(), AdditionalContextEntry { value: "tab one".to_string(), kind: AdditionalContextKind::Untrusted, }, ), ( "terminal_info".to_string(), AdditionalContextEntry { value: "pty one".to_string(), kind: AdditionalContextKind::Untrusted, }, ), ]), thread_settings: Default::default(), }) .await?; wait_for_event_match(&test.codex, |event| { matches!(event, EventMsg::TurnComplete(_)).then_some(()) }) .await; assert_eq!( first_request.single_request().message_input_texts("user"), vec![ "run one", "tab one", "first turn", ] ); assert_eq!( second_request.single_request().message_input_texts("user"), vec![ "run one", "tab one", "first turn", "pty one", "second turn", ] ); assert_eq!( third_request.single_request().message_input_texts("user"), vec![ "run one", "tab one", "first turn", "pty one", "second turn", "tab one", "third turn", ] ); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn additional_context_values_are_truncated_before_model_input() -> Result<()> { skip_if_no_network!(Ok(())); const MAX_EXPECTED_EXTERNAL_CONTEXT_TEXT_BYTES: usize = 5 * 1024; let server = start_mock_server().await; let request = mount_sse_once( &server, sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]), ) .await; let test = test_codex() .with_config(|config| config.include_environment_context = false) .build(&server) .await?; let long_browser_value = format!("browser-head-{}browser-tail", "b".repeat(40_000)); let long_automation_value = format!("automation-head-{}automation-tail", "a".repeat(40_000)); let untruncated_browser_fragment = format!("{long_browser_value}"); let untruncated_automation_fragment = format!("{long_automation_value}"); test.codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "summarize context".to_string(), text_elements: Vec::new(), }], final_output_json_schema: None, responsesapi_client_metadata: None, additional_context: BTreeMap::from([ ( "automation_info".to_string(), AdditionalContextEntry { value: long_automation_value.clone(), kind: AdditionalContextKind::Application, }, ), ( "browser_info".to_string(), AdditionalContextEntry { value: long_browser_value.clone(), kind: AdditionalContextKind::Untrusted, }, ), ]), thread_settings: Default::default(), }) .await?; wait_for_event_match(&test.codex, |event| { matches!(event, EventMsg::TurnComplete(_)).then_some(()) }) .await; let request = request.single_request(); let developer_texts = request .message_input_texts("developer") .into_iter() .filter(|text| text.starts_with("")) .collect::>(); let [automation_text] = developer_texts.as_slice() else { panic!("expected application additional context, got {developer_texts:?}"); }; assert!(automation_text.starts_with(&format!( "automation-head-{}", "a".repeat(1024) ))); assert!(automation_text.contains("tokens truncated")); assert!(automation_text.ends_with("automation-tail")); assert!(automation_text.len() < untruncated_automation_fragment.len()); assert!( automation_text.len() <= MAX_EXPECTED_EXTERNAL_CONTEXT_TEXT_BYTES, "application additional context was not capped before model input: {} bytes", automation_text.len() ); let user_texts = request.message_input_texts("user"); let [external_text, user_text] = user_texts.as_slice() else { panic!("expected external context plus user input, got {user_texts:?}"); }; assert_eq!(user_text, "summarize context"); assert!(external_text.starts_with(&format!( "browser-head-{}", "b".repeat(1024) ))); assert!(external_text.contains("tokens truncated")); assert!(external_text.ends_with("browser-tail")); assert!(external_text.len() < untruncated_browser_fragment.len()); assert!( external_text.len() <= MAX_EXPECTED_EXTERNAL_CONTEXT_TEXT_BYTES, "untrusted additional context was not capped before model input: {} bytes", external_text.len() ); Ok(()) }