Files
Giles Odigwe 540193ccef Python: Reduce flaky integration tests and improve CI signal quality (#5454)
* Enable Ollama integration tests in CI and rename report to Integration Test Report

- Install Ollama, cache models (qwen2.5:0.5b + nomic-embed-text), and start
  server in the Misc integration job for both workflow files
- Set OLLAMA_MODEL and OLLAMA_EMBEDDING_MODEL env vars so the 5 Ollama tests
  are no longer skipped
- Rename Flaky Test Report to Integration Test Report throughout (job names,
  artifact names, cache keys, file names, script titles/docstrings)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Bump Ollama model to qwen2.5:1.5b for better instruction following

The 0.5b model was too small to reliably follow simple prompts like
'Say Hello World', causing test assertion failures. The 1.5b model
follows instructions more reliably while still being small enough
for fast CI pulls (~1GB).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Re-enable reliable streaming integration tests

Remove the hard skip on test_03_reliable_streaming tests that was
temporarily disabled for instability investigation. CI infrastructure
(Azurite, DTS emulator, Redis, func CLI) is already in place.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Re-enable skipped Functions/DurableTask tests and bump timeout to 480s

- Remove hard skips from 4 tests in test_11_workflow_parallel.py
- Remove hard skip from test_conditional_branching in test_06_dt_multi_agent_orchestration_conditionals.py
- Increase pytest --timeout from 360 to 480 for Functions+DurableTask CI job
- Updated in both python-merge-tests.yml and python-integration-tests.yml

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Re-skip failing Functions/DurableTask tests with specific root causes

- test_11_workflow_parallel (4 tests): xdist worker crashes during execution
- test_conditional_branching: orchestration fails with RuntimeError, not a timeout
- Keep 480s timeout bump for remaining Functions tests

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix auth routing in samples 06/11: api_key -> credential for Azure OpenAI

Both samples passed a bearer token provider via api_key= which caused the
client to route to api.openai.com instead of Azure OpenAI, resulting in
401 Unauthorized. Changed to credential= which correctly triggers Azure
routing and picks up AZURE_OPENAI_ENDPOINT from the environment.

- samples/azure_functions/11_workflow_parallel/function_app.py: 1 fix
- samples/durabletask/06_multi_agent_orchestration_conditionals/worker.py: 2 fixes
- Re-enable 4 parallel workflow tests and 1 conditional branching test

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Re-skip parallel workflow tests: xdist worker distribution issue

The 4 parallel workflow tests crash because xdist worksteal distributes
them across separate workers, each spawning its own func process against
shared emulators. Auth fix (api_key->credential) was valid and stays.
test_conditional_branching now passes with the auth fix.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix E501 line-too-long in azurefunctions parallel test skip reasons

Wrap skip reason strings to stay within 120 char line limit.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add retry logic and port-conflict fix for Ollama CI setup

- Kill any auto-started Ollama before launching serve (fixes port
  conflict: 'address already in use')
- Retry ollama pull up to 3 times with 15s backoff (fixes 429 rate
  limit failures)
- Applied to both python-merge-tests.yml and python-integration-tests.yml

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix flaky integration tests and re-enable skipped tests

- Foundry agent: add allow_preview=True to custom client test
- Foundry hosting: raise max_output_tokens 50->200, add temperature,
  relax assertion in test_temperature_and_max_tokens
- Foundry embedding: update skip reason with root cause (endpoint mismatch)
- OpenAI file search: fix vector store indexing race condition by polling
  file_counts before querying; fix get_streaming_response -> get_response(stream=True)
- Azure OpenAI file search: remove skip (transient 500 resolved)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Remove temperature from foundry hosting test (unsupported by CI model)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Stabilize Ollama tool call integration tests with no-arg function

Use a no-argument greet() function instead of hello_world(arg1) for
integration tests. The 1.5B model in CI is unreliable at generating
correct tool call arguments, causing 'Argument parsing failed' errors.
A no-arg function eliminates this flakiness entirely.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Increase reliable streaming test timeouts from 30s to 60s

The LLM call through Azure OpenAI + Redis streaming pipeline can exceed
30s in CI due to cold starts or throttling. Raise to 60s to reduce
flaky timeouts while still bounded by pytest's 120s per-test limit.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Re-enable workflow parallel tests with xdist_group marker

The tests were skipped because xdist distributes module tests across
workers, each spawning their own func process (port conflicts). Adding
xdist_group forces all tests in this module onto a single worker so
the module-scoped function_app_for_test fixture works correctly.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Revert "Re-enable workflow parallel tests with xdist_group marker"

This reverts commit 455c28da62.

* Rename flaky_report to integration_test_report and add try/finally cleanup

- Rename scripts/flaky_report/ to scripts/integration_test_report/ to
  reflect expanded scope beyond flaky-test detection
- Update workflow references in both CI files
- Wrap file search integration tests in try/finally to ensure vector
  store cleanup runs even on test failure or timeout

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix Ollama pull failure propagation and Azure OpenAI vector store readiness

- Ollama CI: fail the step immediately if model pull fails after 3
  retries instead of silently proceeding to tests
- Azure OpenAI file search: add the same vector-store readiness polling
  that was applied to the non-Azure OpenAI tests, preventing eventual
  consistency race conditions

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* remove load_dotenv from test file

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-01 00:41:39 +00:00

580 lines
21 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Integration tests for ResponsesHostServer with a real Foundry endpoint.
These tests exercise the full HTTP pipeline using httpx.AsyncClient with
ASGITransport — no real server process is started. The agent talks to a real
Foundry project endpoint so every test requires valid credentials.
Required environment variables:
FOUNDRY_PROJECT_ENDPOINT - The Azure AI Foundry project endpoint URL.
FOUNDRY_MODEL - The model deployment name (e.g. gpt-4o).
"""
from __future__ import annotations
import base64
import json
import os
from pathlib import Path
from typing import Annotated, Any
import httpx
import pytest
from agent_framework import Agent, tool
from agent_framework.foundry import FoundryChatClient
from azure.ai.agentserver.responses import InMemoryResponseProvider
from azure.identity import AzureCliCredential
from agent_framework_foundry_hosting import ResponsesHostServer
# ---------------------------------------------------------------------------
# Skip / marker helpers
# ---------------------------------------------------------------------------
skip_if_foundry_hosting_integration_tests_disabled = pytest.mark.skipif(
os.getenv("FOUNDRY_PROJECT_ENDPOINT", "") in ("", "https://test-project.services.ai.azure.com/")
or os.getenv("FOUNDRY_MODEL", "") == "",
reason="No real FOUNDRY_PROJECT_ENDPOINT or FOUNDRY_MODEL provided; skipping integration tests.",
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def server() -> ResponsesHostServer:
"""Create a ResponsesHostServer backed by a real Foundry agent."""
client = FoundryChatClient(credential=AzureCliCredential())
agent = Agent(
client=client,
instructions="You are a concise assistant. Keep answers very short (one or two sentences).",
default_options={"store": False},
)
return ResponsesHostServer(agent, store=InMemoryResponseProvider())
@tool
async def get_weather(location: Annotated[str, "The city name"]) -> str:
"""Get the current weather in a given location."""
return f"The weather in {location} is 72°F and sunny."
@pytest.fixture
def server_with_tools() -> ResponsesHostServer:
"""Create a ResponsesHostServer whose agent has a tool."""
client = FoundryChatClient(credential=AzureCliCredential())
agent = Agent(
client=client,
instructions="You are a concise assistant. Use the provided tools when appropriate. Keep answers very short.",
tools=[get_weather],
default_options={"store": False},
)
return ResponsesHostServer(agent, store=InMemoryResponseProvider())
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
async def _post_json(
server: ResponsesHostServer,
payload: dict[str, Any],
) -> httpx.Response:
"""Send a POST /responses request with a raw JSON payload."""
transport = httpx.ASGITransport(app=server)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
return await client.post("/responses", json=payload, timeout=120)
def _parse_sse_events(body: str) -> list[dict[str, Any]]:
"""Parse SSE text into a list of event dicts with 'event' and 'data' keys."""
events: list[dict[str, Any]] = []
current_event: str | None = None
current_data_lines: list[str] = []
for line in body.split("\n"):
if line.startswith("event: "):
current_event = line[len("event: ") :]
elif line.startswith("data: "):
current_data_lines.append(line[len("data: ") :])
elif line.strip() == "" and current_event is not None:
data_str = "\n".join(current_data_lines)
try:
data = json.loads(data_str)
except json.JSONDecodeError:
data = data_str
events.append({"event": current_event, "data": data})
current_event = None
current_data_lines = []
return events
def _sse_event_types(events: list[dict[str, Any]]) -> list[str]:
"""Extract event type strings from parsed SSE events."""
return [e["event"] for e in events]
# ---------------------------------------------------------------------------
# Tests — basic text input
# ---------------------------------------------------------------------------
class TestBasicText:
"""Simple text-in / text-out round trips."""
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
async def test_simple_text_non_streaming(self, server: ResponsesHostServer) -> None:
"""Non-streaming: send a text prompt and get a completed response."""
resp = await _post_json(
server,
{
"input": "Say hello in exactly three words.",
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
# There should be exactly one output item with text
output_messages = [o for o in body["output"] if o["type"] == "message"]
assert len(output_messages) == 1
text_parts = [c for c in output_messages[0]["content"] if c["type"] == "output_text"]
assert len(text_parts) >= 1
assert len(text_parts[0]["text"]) > 0
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
async def test_simple_text_streaming(self, server: ResponsesHostServer) -> None:
"""Streaming: send a text prompt and verify SSE lifecycle events."""
resp = await _post_json(
server,
{
"input": "Say hello in exactly three words.",
"stream": True,
},
)
assert resp.status_code == 200
assert "text/event-stream" in resp.headers["content-type"]
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[1] == "response.in_progress"
assert types[-1] == "response.completed"
assert "response.output_text.delta" in types
assert "response.output_text.done" in types
# The done event should have accumulated text
done_events = [e for e in events if e["event"] == "response.output_text.done"]
assert len(done_events) >= 1
assert len(done_events[0]["data"]["text"]) > 0
# ---------------------------------------------------------------------------
# Tests — structured content input
# ---------------------------------------------------------------------------
class TestStructuredContentInput:
"""Structured content arrays: text + images, text + files."""
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
async def test_text_array_input(self, server: ResponsesHostServer) -> None:
"""Multiple input_text parts in one message."""
resp = await _post_json(
server,
{
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "My name is Alice."},
{"type": "input_text", "text": "What is my name?"},
],
}
],
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
# The response should mention Alice
output_messages = [o for o in body["output"] if o["type"] == "message"]
assert len(output_messages) == 1
output_text = output_messages[0]["content"][0]["text"]
assert "alice" in output_text.lower()
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
async def test_input_image_url(self, server: ResponsesHostServer) -> None:
"""Send an image via URL and ask the model about it."""
resp = await _post_json(
server,
{
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "What animal is in this image? Reply in one word."},
{
"type": "input_image",
"image_url": "https://cdn.pixabay.com/photo/2024/02/28/07/42/european-shorthair-8601492_640.jpg",
},
],
}
],
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
output_messages = [o for o in body["output"] if o["type"] == "message"]
assert len(output_messages) == 1
output_text = output_messages[0]["content"][0]["text"].lower()
assert "cat" in output_text
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
async def test_input_image_file_data(self, server: ResponsesHostServer) -> None:
"""Send a local image file as inline base64 data URI."""
image_path = Path(__file__).resolve().parent / "test_assets" / "sample_image.jpg" # noqa: ASYNC240
image_bytes = image_path.read_bytes()
b64 = base64.b64encode(image_bytes).decode()
data_uri = f"data:image/jpeg;base64,{b64}"
resp = await _post_json(
server,
{
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "What animal is in this image? Reply in one word."},
{"type": "input_image", "image_url": data_uri},
],
}
],
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
output_messages = [o for o in body["output"] if o["type"] == "message"]
assert len(output_messages) == 1
output_text = output_messages[0]["content"][0]["text"].lower()
assert "cat" in output_text
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
async def test_input_file_data(self, server: ResponsesHostServer) -> None:
"""Send a small text file as inline file_data (base64 data URI)."""
text_content = "The capital of France is Paris."
b64 = base64.b64encode(text_content.encode()).decode()
data_uri = f"data:text/plain;base64,{b64}"
resp = await _post_json(
server,
{
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "What is the capital mentioned in the attached file?"},
{"type": "input_file", "file_data": data_uri, "filename": "info.txt"},
],
}
],
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
output_messages = [o for o in body["output"] if o["type"] == "message"]
assert len(output_messages) == 1
output_text = output_messages[0]["content"][0]["text"].lower()
assert "paris" in output_text
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
async def test_input_pdf_file_data(self, server: ResponsesHostServer) -> None:
"""Send a real PDF file as inline file_data (base64 data URI)."""
pdf_path = Path(__file__).resolve().parent / "test_assets" / "sample.pdf" # noqa: ASYNC240
pdf_bytes = pdf_path.read_bytes()
b64 = base64.b64encode(pdf_bytes).decode()
data_uri = f"data:application/pdf;base64,{b64}"
resp = await _post_json(
server,
{
"input": [
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "Summarize this PDF in one sentence."},
{"type": "input_file", "file_data": data_uri, "filename": "sample.pdf"},
],
}
],
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
output_messages = [o for o in body["output"] if o["type"] == "message"]
assert len(output_messages) == 1
output_text = output_messages[0]["content"][0]["text"]
assert "microsoft" in output_text.lower()
# ---------------------------------------------------------------------------
# Tests — multi-turn conversations
# ---------------------------------------------------------------------------
class TestMultiTurn:
"""Multi-round conversations using previous_response_id."""
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
async def test_two_turn_conversation(self, server: ResponsesHostServer) -> None:
"""Turn 1: introduce context. Turn 2: ask about it using previous_response_id."""
# Turn 1
resp1 = await _post_json(
server,
{
"input": "My favorite color is blue. Remember that.",
"stream": False,
},
)
assert resp1.status_code == 200
body1 = resp1.json()
assert body1["status"] == "completed"
response_id_1 = body1["id"]
# Turn 2 — references turn 1
resp2 = await _post_json(
server,
{
"input": "What is my favorite color?",
"stream": False,
"previous_response_id": response_id_1,
},
)
assert resp2.status_code == 200
body2 = resp2.json()
assert body2["status"] == "completed"
output_messages = [o for o in body2["output"] if o["type"] == "message"]
assert len(output_messages) == 1
output_text = output_messages[0]["content"][0]["text"].lower()
assert "blue" in output_text
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
async def test_three_turn_conversation(self, server: ResponsesHostServer) -> None:
"""Three sequential turns to verify history accumulates correctly."""
# Turn 1
resp1 = await _post_json(
server,
{
"input": "I have a pet dog named Max.",
"stream": False,
},
)
assert resp1.status_code == 200
id1 = resp1.json()["id"]
# Turn 2
resp2 = await _post_json(
server,
{
"input": "I also have a cat named Luna.",
"stream": False,
"previous_response_id": id1,
},
)
assert resp2.status_code == 200
id2 = resp2.json()["id"]
# Turn 3 — should remember both pets
resp3 = await _post_json(
server,
{
"input": "What are my pets' names?",
"stream": False,
"previous_response_id": id2,
},
)
assert resp3.status_code == 200
body3 = resp3.json()
output_messages = [o for o in body3["output"] if o["type"] == "message"]
assert len(output_messages) == 1
output_text = output_messages[0]["content"][0]["text"].lower()
assert "max" in output_text
assert "luna" in output_text
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
async def test_multi_turn_streaming(self, server: ResponsesHostServer) -> None:
"""Multi-turn conversation with streaming on the second turn."""
# Turn 1 — non-streaming
resp1 = await _post_json(
server,
{
"input": "My favorite number is 42.",
"stream": False,
},
)
assert resp1.status_code == 200
id1 = resp1.json()["id"]
# Turn 2 — streaming
resp2 = await _post_json(
server,
{
"input": "What is my favorite number?",
"stream": True,
"previous_response_id": id1,
},
)
assert resp2.status_code == 200
assert "text/event-stream" in resp2.headers["content-type"]
events = _parse_sse_events(resp2.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[-1] == "response.completed"
assert "response.output_text.done" in types
done_events = [e for e in events if e["event"] == "response.output_text.done"]
assert "42" in done_events[0]["data"]["text"]
# ---------------------------------------------------------------------------
# Tests — tool calling
# ---------------------------------------------------------------------------
class TestToolCalling:
"""Tests that verify function-tool round trips through the hosting layer."""
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
async def test_tool_call_non_streaming(self, server_with_tools: ResponsesHostServer) -> None:
"""Agent invokes a tool and returns a final answer (non-streaming)."""
resp = await _post_json(
server_with_tools,
{
"input": "What is the weather in Seattle?",
"stream": False,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
# The output should contain the final text referencing the weather
output_messages = [o for o in body["output"] if o["type"] == "message"]
assert len(output_messages) == 1
final_text = output_messages[0]["content"][0]["text"].lower()
assert "72" in final_text or "sunny" in final_text or "seattle" in final_text
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
async def test_tool_call_streaming(self, server_with_tools: ResponsesHostServer) -> None:
"""Agent invokes a tool and returns a final answer (streaming)."""
resp = await _post_json(
server_with_tools,
{
"input": "What is the weather in Seattle?",
"stream": True,
},
)
assert resp.status_code == 200
assert "text/event-stream" in resp.headers["content-type"]
events = _parse_sse_events(resp.text)
types = _sse_event_types(events)
assert types[0] == "response.created"
assert types[-1] == "response.completed"
# Should have text output with the weather info
done_events = [e for e in events if e["event"] == "response.output_text.done"]
assert len(done_events) >= 1
final_text = done_events[-1]["data"]["text"].lower()
assert "72" in final_text or "sunny" in final_text or "seattle" in final_text
# ---------------------------------------------------------------------------
# Tests — options passthrough
# ---------------------------------------------------------------------------
class TestOptions:
"""Verify chat options are passed through to the model."""
@pytest.mark.flaky
@pytest.mark.integration
@skip_if_foundry_hosting_integration_tests_disabled
async def test_temperature_and_max_tokens(self, server: ResponsesHostServer) -> None:
"""Set max_output_tokens and verify the response succeeds."""
resp = await _post_json(
server,
{
"input": "Say hello briefly.",
"stream": False,
"max_output_tokens": 200,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "completed"
assert len(body["output"]) > 0