Files
agent-framework/python/packages/azurefunctions/tests/integration_tests/conftest.py
T
Eduard van Valkenburg 3dc59c83b5 Python: [BREAKING] Moved to a single get_response and run API (#3379)
* WIP

* big update to new ResponseStream model

* fixed tests and typing

* fixed tests and typing

* fixed tools typevar import

* fix

* mypy fix

* mypy fixes and some cleanup

* fix missing quoted names

* and client

* fix  imports agui

* fix anthropic override

* fix agui

* fix ag ui

* fix import

* fix anthropic types

* fix mypy

* refactoring

* updated typing

* fix 3.11

* fixes

* redid layering of chat clients and agents

* redid layering of chat clients and agents

* Fix lint, type, and test issues after rebase

- Add @overload decorators to AgentProtocol.run() for type compatibility
- Add missing docstring params (middleware, function_invocation_configuration)
- Fix TODO format (TD002) by adding author tags
- Fix broken observability tests from upstream:
  - Replace non-existent use_instrumentation with direct instantiation
  - Replace non-existent use_agent_instrumentation with AgentTelemetryLayer mixin
  - Fix get_streaming_response to use get_response(stream=True)
  - Add AgentInitializationError import
  - Update streaming exception tests to match actual behavior

* Fix AgentExecutionException import error in test_agents.py

- Replace non-existent AgentExecutionException with AgentRunException

* Fix test import and asyncio deprecation issues

- Add 'tests' to pythonpath in ag-ui pyproject.toml for utils_test_ag_ui import
- Replace deprecated asyncio.get_event_loop().run_until_complete with asyncio.run

* Fix azure-ai test failures

- Update _prepare_options patching to use correct class path
- Fix test_to_azure_ai_agent_tools_web_search_missing_connection to clear env vars

* Convert ag-ui utils_test_ag_ui.py to conftest.py

- Move test utilities to conftest.py for proper pytest discovery
- Update all test imports to use conftest instead of utils_test_ag_ui
- Remove old utils_test_ag_ui.py file
- Revert pythonpath change in pyproject.toml

* fix: use relative imports for ag-ui test utilities

* fix agui

* Rename Bare*Client to Raw*Client and BaseChatClient

- Renamed BareChatClient to BaseChatClient (abstract base class)
- Renamed BareOpenAIChatClient to RawOpenAIChatClient
- Renamed BareOpenAIResponsesClient to RawOpenAIResponsesClient
- Renamed BareAzureAIClient to RawAzureAIClient
- Added warning docstrings to Raw* classes about layer ordering
- Updated README in samples/getting_started/agents/custom with layer docs
- Added test for span ordering with function calling

* Fix layer ordering: FunctionInvocationLayer before ChatTelemetryLayer

This ensures each inner LLM call gets its own telemetry span, resulting in
the correct span sequence: chat -> execute_tool -> chat

Updated all production clients and test mocks to use correct ordering:
- ChatMiddlewareLayer (first)
- FunctionInvocationLayer (second)
- ChatTelemetryLayer (third)
- BaseChatClient/Raw...Client (fourth)

* Remove run_stream usage

* Fix conversation_id propagation

* Python: Add BaseAgent implementation for Claude Agent SDK (#3509)

* Added ClaudeAgent implementation

* Updated streaming logic

* Small updates

* Small update

* Fixes

* Small fix

* Naming improvements

* Updated imports

* Addressed comments

* Updated package versions

* Update Claude agent connector layering

* fix test and plugin

* Store function middleware in invocation layer

* Fix telemetry streaming and ag-ui tests

* Remove legacy ag-ui tests folder

* updates

* Remove terminate flag from FunctionInvocationContext, use MiddlewareTermination instead

- Remove terminate attribute from FunctionInvocationContext
- Add result attribute to MiddlewareTermination to carry function results
- FunctionMiddlewarePipeline.execute() now lets MiddlewareTermination propagate
- _auto_invoke_function captures context.result in exception before re-raising
- _try_execute_function_calls catches MiddlewareTermination and sets should_terminate
- Fix handoff middleware to append to chat_client.function_middleware directly
- Update tests to use raise MiddlewareTermination instead of context.terminate
- Add middleware flow documentation in samples/concepts/tools/README.md
- Fix ag-ui to use FunctionMiddlewarePipeline instead of removed create_function_middleware_pipeline

* fix: remove references to removed terminate flag in purview tests, add type ignore

* fix: move _test_utils.py from package to test folder

* fix: call get_final_response() to trigger context provider notification in streaming test

* fix: correct broken links in tools README

* docs: clarify default middleware behavior in summary table

* fix: ensure inner stream result hooks are called when using map()/from_awaitable()

* Fix mypy type errors

* Address PR review comments on observability.py

- Remove TODO comment about unconsumed streams, add explanatory note instead
- Remove redundant _close_span cleanup hook (already called in _finalize_stream)
- Clarify behavior: cleanup hooks run after stream iteration, if stream is not
  consumed the span remains open until garbage collected

* Remove gen_ai.client.operation.duration from span attributes

Duration is a metrics-only attribute per OpenTelemetry semantic conventions.
It should be recorded to the histogram but not set as a span attribute.

* Remove duration from _get_response_attributes, pass directly to _capture_response

Duration is a metrics-only attribute. It's now passed directly to _capture_response
instead of being included in the attributes dict that gets set on the span.

* Remove redundant _close_span cleanup hook in AgentTelemetryLayer

_finalize_stream already calls _close_span() in its finally block,
so adding it as a separate cleanup hook is redundant.

* Use weakref.finalize to close span when stream is garbage collected

If a user creates a streaming response but never consumes it, the cleanup
hooks won't run. Now we register a weak reference finalizer that will close
the span when the stream object is garbage collected, ensuring spans don't
leak in this scenario.

* Fix _get_finalizers_from_stream to use _result_hooks attribute

Renamed function to _get_result_hooks_from_stream and fixed it to
look for the _result_hooks attribute which is the correct name in
ResponseStream class.

* Add missing asyncio import in test_request_info_mixin.py

* Fix leftover merge conflict marker in image_generation sample

* Update integration tests

* Fix integration tests: increase max_iterations from 1 to 2

Tests with tool_choice options require at least 2 iterations:
1. First iteration to get function call and execute the tool
2. Second iteration to get the final text response

With max_iterations=1, streaming tests would return early with only
the function call/result but no final text content.

* Fix duplicate function call error in conversation-based APIs

When using conversation_id (for Responses/Assistants APIs), the server
already has the function call message from the previous response. We
should only send the new function result message, not all messages
including the function call which would cause a duplicate ID error.

Fix: When conversation_id is set, only send the last message (the tool
result) instead of all response.messages.

* Add regression test for conversation_id propagation between tool iterations

Port test from PR #3664 with updates for new streaming API pattern.
Tests that conversation_id is properly updated in options dict during
function invocation loop iterations.

* Fix tool_choice=required to return after tool execution

When tool_choice is 'required', the user's intent is to force exactly one
tool call. After the tool executes, return immediately with the function
call and result - don't continue to call the model again.

This fixes integration tests that were failing with empty text responses
because with tool_choice=required, the model would keep returning function
calls instead of text.

Also adds regression tests for:
- conversation_id propagation between tool iterations (from PR #3664)
- tool_choice=required returns after tool execution

* Document tool_choice behavior in tools README

- Add table explaining tool_choice values (auto, none, required)
- Explain why tool_choice=required returns immediately after tool execution
- Add code example showing the difference between required and auto
- Update flow diagram to show the early return path for tool_choice=required

* Fix tool_choice=None behavior - don't default to 'auto'

Remove the hardcoded default of 'auto' for tool_choice in ChatAgent init.
When tool_choice is not specified (None), it will now not be sent to the
API, allowing the API's default behavior to be used.

Users who want tool_choice='auto' can still explicitly set it either in
default_options or at runtime.

Fixes #3585

* Fix tool_choice=none should not remove tools

In OpenAI Assistants client, tools were not being sent when
tool_choice='none'. This was incorrect - tool_choice='none' means
the model won't call tools, but tools should still be available
in the request (they may be used later in the conversation).

Fixes #3585

* Add test for tool_choice=none preserving tools

Adds a regression test to ensure that when tool_choice='none' is set but
tools are provided, the tools are still sent to the API. This verifies
the fix for #3585.

* Fix tool_choice=none should not remove tools in all clients

Apply the same fix to OpenAI Responses client and Azure AI client:
- OpenAI Responses: Remove else block that popped tool_choice/parallel_tool_calls
- Azure AI: Remove tool_choice != 'none' check when adding tools

When tool_choice='none', the model won't call tools, but tools should
still be sent to the API so they're available for future turns.

Also update README to clarify tool_choice=required supports multiple tools.

Fixes #3585

* Keep tool_choice even when tools is None

Move tool_choice processing outside of the 'if tools' block in OpenAI
Responses client so tool_choice is sent to the API even when no tools
are provided.

* Update test to match new parallel_tool_calls behavior

Changed test_prepare_options_removes_parallel_tool_calls_when_no_tools to
test_prepare_options_preserves_parallel_tool_calls_when_no_tools to reflect
that parallel_tool_calls is now preserved even when no tools are present,
consistent with the tool_choice behavior.

* Fix ChatMessage API and Role enum usage after rebase

- Update ChatMessage instantiation to use keyword args (role=, text=, contents=)
- Fix Role enum comparisons to use .value for string comparison
- Add created_at to AgentResponse in error handling
- Fix AgentResponse.from_updates -> from_agent_run_response_updates
- Fix DurableAgentStateMessage.from_chat_message to convert Role enum to string
- Add Role import where needed

* Fix additional ChatMessage API and method name changes

- Fix ChatMessage usage in workflow files (use text= instead of contents= for strings)
- Fix AgentResponse.from_updates -> from_agent_run_response_updates in workflow files
- Fix test files for ChatMessage and Role enum usage

* Fix remaining ChatMessage API usage in test files

* Fix more ChatMessage and Role API changes in source and test files

- Fix ChatMessage in _magentic.py replan method
- Fix Role enum comparison in test assertions
- Fix remaining test files with old ChatMessage syntax

* Fix ChatMessage and Role API changes across packages

- Add Role import where missing
- Fix ChatMessage signature: positional args to keyword args (role=, text=, contents=)
- Fix Role enum comparisons: .role.value instead of .role string
- Fix FinishReason enum usage in ag-ui event converters
- Rename AgentResponse.from_updates to from_agent_run_response_updates in ag-ui

Fixes API compatibility after Types API Review improvements merge

* Fix ChatMessage and Role API changes in github_copilot tests

* Fix ChatMessage and Role API changes in redis and github_copilot packages

- Fix redis provider: Role enum comparison using .value
- Fix redis tests: ChatMessage signature and Role comparisons
- Fix github_copilot tests: ChatMessage signature and Role comparisons
- Update docstring examples in redis chat message store

* Fix ChatMessage and Role API changes in devui package

- Fix executor: ChatMessage signature change
- Fix conversations: Role enum to string conversion in two places
- Fix tests: ChatMessage signatures and Role comparisons

* Fix ChatMessage and Role API changes in a2a and lab packages

- Fix a2a tests: Role comparisons and ChatMessage signatures
- Fix lab tau2 source: Role enum comparison in flip_messages, log_messages, sliding_window
- Fix lab tau2 tests: ChatMessage signatures and Role comparisons

* Remove duplicate test files from ag-ui/tests (tests are in ag_ui_tests)

* Fix ChatMessage and Role API changes across packages

After rebasing on upstream/main which merged PR #3647 (Types API Review
improvements), fix all packages to use the new API:

- ChatMessage: Use keyword args (role=, text=, contents=) instead of
  positional args
- Role: Compare using .value attribute since it's now an enum

Packages fixed:
- ag-ui: Fixed Role value extraction bugs in _message_adapters.py
- anthropic: Fixed ChatMessage and Role comparisons in tests
- azure-ai: Fixed Role comparison in _client.py
- azure-ai-search: Fixed ChatMessage and Role in source/tests
- bedrock: Fixed ChatMessage signatures in tests
- chatkit: Fixed ChatMessage and Role in source/tests
- copilotstudio: Fixed ChatMessage and Role in tests
- declarative: Fixed ChatMessage in _executors_agents.py
- mem0: Fixed ChatMessage and Role in source/tests
- purview: Fixed ChatMessage in source/tests

* Fix mypy errors for ChatMessage and Role API changes

- durabletask: Use str() fallback in role value extraction
- core: Fix ChatMessage in _orchestrator_helpers.py to use keyword args
- core: Add type ignore for _conversation_state.py contents deserialization
- ag-ui: Fix type ignore comments (call-overload instead of arg-type)
- azure-ai-search: Fix get_role_value type hint to accept Any
- lab: Move get_role_value to module level with Any type hint

* Improve CI test timeout configuration

- Increase job timeout from 10 to 15 minutes
- Reduce per-test timeout to 60s (was 900s/300s)
- Add --timeout_method thread for better timeout handling
- Add --timeout-verbose to see which tests are slow
- Reduce retries from 3 to 2 and delay from 10s to 5s

This ensures individual test timeouts are shorter than the job
timeout, providing better visibility when tests hang.

With 60s timeout and 2 retries, worst case per test is ~180s.

* Fix ChatMessage API usage in docstrings and source

- Fix ChatMessage positional args in docstrings: _serialization.py, _threads.py, _middleware.py
- Fix ChatMessage in tau2 runner.py
- Fix role comparison in _orchestrator_helpers.py to use .value
- Fix role comparison in _group_chat.py docstring example
- Fix role assertions in test_durable_entities.py to use .value

* Revert tool_choice/parallel_tool_calls changes - must be removed when no tools

OpenAI API requires tool_choice and parallel_tool_calls to only be
present when tools are specified. Restored the logic that removes
these options when there are no tools.

- Restored check in _chat_client.py to remove tool_choice and
  parallel_tool_calls when no tools present
- Restored same logic in _responses_client.py
- Reverted test to expect the correct behavior

* fixed issue in tests

* fix: resolve merge conflict markers in ag-ui tests

* fix: restructure ag-ui tests and fix Role/FinishReason to use string types

* fix: streaming function invocation and middleware termination

- Refactor streaming function invocation to use get_final_response() on inner streams
- Fix MiddlewareTermination to accept result parameter for passing results
- Fix _AutoHandoffMiddleware to use MiddlewareTermination instead of context.terminate
- Fix AgentMiddlewareLayer.run() to properly forward function/chat middleware
- Remove duplicate middleware registration in AgentMiddlewareLayer.__init__
- Fix exception handling in _auto_invoke_function to properly capture termination
- Fix mypy errors in core package
- Update tests to use stream=True parameter for unified run API

* fix all tests command

* Refactor integration tests to use pytest fixtures

- Merge testutils.py into conftest.py for azurefunctions integration tests
- Merge dt_testutils.py into conftest.py for durabletask integration tests
- Convert all integration tests to use fixtures instead of direct imports
  (fixes ModuleNotFoundError with --import-mode=importlib)
- Add sample_helper fixture for azurefunctions tests
- Add agent_client_factory and orchestration_helper fixtures for durabletask
- Integration tests now skip with descriptive messages when services unavailable
- Restructure devui tests into tests/devui/ with proper conftest.py
- Add test organization guidelines to CODING_STANDARD.md
- Remove __init__.py from test directories per pytest best practices

* Fix pytest_collection_modifyitems to only skip integration tests

The hook was skipping all tests in the test session, not just
integration tests. Now it only skips items in the integration_tests
directory.

* Fix mem0 tests failing on Python 3.13

Use patch.object on the imported module instead of @patch with string
path to ensure the mock takes effect regardless of import timing.

* fix mem0

* another attempt for mem0

* fix for mem0

* fix mem0

* Increase worker initialization wait time in durabletask tests

Increase from 2 to 8 seconds to allow time for:
- Python startup and module imports
- Azure OpenAI client creation
- Agent registration with DTS worker
- Worker connection to DTS

This helps prevent test failures in CI where the first tests may run
before the worker is fully ready to process requests.

* Fix streaming test to use ResponseStream with finalizer

The _consume_stream method now expects a ResponseStream that can provide
a final AgentResponse via get_final_response(). Update the test to use
ResponseStream with AgentResponse.from_updates as the finalizer.

* Fix MockToolCallingAgent to use new ResponseStream API and update samples

* small updates to run_stream to run

* fix sub workflow

* temp fix for az func test

---------

Co-authored-by: Dmytro Struk <13853051+dmytrostruk@users.noreply.github.com>
2026-02-05 20:09:58 +00:00

581 lines
21 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""
Pytest configuration for Azure Functions integration tests.
This module provides fixtures, configuration, and test utilities for pytest.
"""
import os
import shutil
import socket
import subprocess
import sys
import time
import uuid
from collections.abc import Iterator, Mapping
from contextlib import suppress
from pathlib import Path
from typing import Any
import pytest
import requests
# =============================================================================
# Configuration Constants
# =============================================================================
TIMEOUT = 30 # seconds
ORCHESTRATION_TIMEOUT = 180 # seconds for orchestrations
_DEFAULT_HOST = "localhost"
# Emulator ports (match CI workflow configuration)
_AZURITE_BLOB_PORT = 10000
_DTS_EMULATOR_PORT = 8080
# =============================================================================
# Exceptions
# =============================================================================
class FunctionAppStartupError(RuntimeError):
"""Raised when the Azure Functions host fails to start reliably."""
pass
# =============================================================================
# Environment and Service Checks
# =============================================================================
def _load_env_file_if_present() -> None:
"""Load environment variables from the local .env file when available."""
env_file = Path(__file__).parent / ".env"
if not env_file.exists():
return
try:
from dotenv import load_dotenv
load_dotenv(env_file)
except ImportError:
# python-dotenv not available; rely on existing environment
pass
def _check_func_cli_available() -> bool:
"""Check if Azure Functions Core Tools (func) is installed and available."""
return shutil.which("func") is not None
def _check_port_listening(port: int, host: str = _DEFAULT_HOST) -> bool:
"""Check if a service is listening on the given port."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(1)
return sock.connect_ex((host, port)) == 0
def _check_azurite_available() -> bool:
"""Check if Azurite (Azure Storage emulator) is available on the expected port."""
return _check_port_listening(_AZURITE_BLOB_PORT)
def _check_dts_emulator_available() -> bool:
"""Check if Durable Task Scheduler emulator is available on the expected port."""
return _check_port_listening(_DTS_EMULATOR_PORT)
def _should_skip_azure_functions_integration_tests() -> tuple[bool, str]:
"""Determine whether Azure Functions integration tests should be skipped."""
_load_env_file_if_present()
run_integration_tests = os.getenv("RUN_INTEGRATION_TESTS", "false").lower() == "true"
if not run_integration_tests:
return (
True,
"Integration tests are disabled. Set RUN_INTEGRATION_TESTS=true to enable Azure Functions sample tests.",
)
# Check for Azure Functions Core Tools
if not _check_func_cli_available():
return (
True,
"Azure Functions Core Tools (func) not installed. Install with: npm install -g azure-functions-core-tools@4", # noqa: E501
)
# Check for Azurite (Azure Storage emulator)
if not _check_azurite_available():
return (
True,
f"Azurite not running on port {_AZURITE_BLOB_PORT}. Start with: docker run -d -p 10000:10000 -p 10001:10001 -p 10002:10002 mcr.microsoft.com/azure-storage/azurite", # noqa: E501
)
# Check for Durable Task Scheduler emulator
if not _check_dts_emulator_available():
return (
True,
f"Durable Task Scheduler emulator not running on port {_DTS_EMULATOR_PORT}. Start with: docker run -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest", # noqa: E501
)
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT", "").strip()
if not endpoint or endpoint == "https://your-resource.openai.azure.com/":
return True, "No real AZURE_OPENAI_ENDPOINT provided; skipping integration tests."
deployment_name = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME", "").strip()
if not deployment_name or deployment_name == "your-deployment-name":
return True, "No real AZURE_OPENAI_CHAT_DEPLOYMENT_NAME provided; skipping integration tests."
return False, "Integration tests enabled."
_SKIP_AZURE_FUNCTIONS_INTEGRATION_TESTS, _AZURE_FUNCTIONS_SKIP_REASON = _should_skip_azure_functions_integration_tests()
skip_if_azure_functions_integration_tests_disabled = pytest.mark.skipif(
_SKIP_AZURE_FUNCTIONS_INTEGRATION_TESTS,
reason=_AZURE_FUNCTIONS_SKIP_REASON,
)
# =============================================================================
# Test Helper Class
# =============================================================================
class SampleTestHelper:
"""Helper class for testing samples."""
@staticmethod
def post_json(url: str, data: dict[str, Any], timeout: int = TIMEOUT) -> requests.Response:
"""POST JSON data to a URL."""
return requests.post(url, json=data, headers={"Content-Type": "application/json"}, timeout=timeout)
@staticmethod
def post_text(url: str, text: str, timeout: int = TIMEOUT) -> requests.Response:
"""POST plain text to a URL."""
return requests.post(url, data=text, headers={"Content-Type": "text/plain"}, timeout=timeout)
@staticmethod
def get(url: str, timeout: int = TIMEOUT) -> requests.Response:
"""GET request to a URL."""
return requests.get(url, timeout=timeout)
@staticmethod
def wait_for_orchestration(
status_url: str, max_wait: int = ORCHESTRATION_TIMEOUT, poll_interval: int = 2
) -> dict[str, Any]:
"""Wait for an orchestration to complete.
Args:
status_url: URL to poll for orchestration status
max_wait: Maximum seconds to wait
poll_interval: Seconds between polls
Returns:
Final orchestration status
Raises:
TimeoutError: If orchestration doesn't complete in time
"""
start_time = time.time()
while time.time() - start_time < max_wait:
response = requests.get(status_url, timeout=TIMEOUT)
response.raise_for_status()
status = response.json()
runtime_status = status.get("runtimeStatus", "")
if runtime_status in ["Completed", "Failed", "Terminated"]:
return status
time.sleep(poll_interval)
raise TimeoutError(f"Orchestration did not complete within {max_wait} seconds")
@staticmethod
def wait_for_orchestration_with_output(
status_url: str, max_wait: int = ORCHESTRATION_TIMEOUT, poll_interval: int = 2
) -> dict[str, Any]:
"""Wait for an orchestration to complete and have output available.
This is a specialized version of wait_for_orchestration that also
ensures the output field is present, handling timing race conditions.
Args:
status_url: URL to poll for orchestration status
max_wait: Maximum seconds to wait
poll_interval: Seconds between polls
Returns:
Final orchestration status with output
Raises:
TimeoutError: If orchestration doesn't complete with output in time
"""
start_time = time.time()
while time.time() - start_time < max_wait:
response = requests.get(status_url, timeout=TIMEOUT)
response.raise_for_status()
status = response.json()
runtime_status = status.get("runtimeStatus", "")
if runtime_status in ["Failed", "Terminated"]:
return status
if runtime_status == "Completed" and status.get("output"):
return status
# If completed but no output, continue polling for a bit more to
# handle the race condition where output has not been persisted yet.
time.sleep(poll_interval)
# Provide detailed error message based on final status
final_response = requests.get(status_url, timeout=TIMEOUT)
final_response.raise_for_status()
final_status = final_response.json()
final_runtime_status = final_status.get("runtimeStatus", "Unknown")
if final_runtime_status == "Completed":
if "output" not in final_status:
raise TimeoutError(
"Orchestration completed but 'output' field is missing after "
f"{max_wait} seconds. Final status: {final_status}"
)
if not final_status["output"]:
raise TimeoutError(
"Orchestration completed but output is empty after "
f"{max_wait} seconds. Final status: {final_status}"
)
raise TimeoutError(
"Orchestration completed with output but validation failed after "
f"{max_wait} seconds. Final status: {final_status}"
)
raise TimeoutError(
"Orchestration did not complete within "
f"{max_wait} seconds. Final status: {final_runtime_status}, "
f"Full status: {final_status}"
)
# =============================================================================
# Function App Lifecycle Management
# =============================================================================
def _resolve_repo_root() -> Path:
"""Resolve the repository root, preferring GITHUB_WORKSPACE when available."""
workspace = os.getenv("GITHUB_WORKSPACE")
if workspace:
candidate = Path(workspace).expanduser()
if not (candidate / "samples").exists() and (candidate / "python" / "samples").exists():
return (candidate / "python").resolve()
return candidate.resolve()
# If `GITHUB_WORKSPACE` is not set,
# go up from conftest.py -> integration_tests -> tests -> azurefunctions -> packages -> python
return Path(__file__).resolve().parents[4]
def _get_sample_path_from_marker(request: pytest.FixtureRequest) -> tuple[Path | None, str | None]:
"""Get sample path from @pytest.mark.sample() marker.
Returns a tuple of (sample_path, error_message).
If successful, error_message is None.
If failed, sample_path is None and error_message contains the reason.
"""
marker = request.node.get_closest_marker("sample")
if not marker:
return (
None,
(
"No @pytest.mark.sample() marker found on test. Add pytestmark with "
"@pytest.mark.sample('sample_name') to the test module."
),
)
if not marker.args:
return (
None,
"@pytest.mark.sample() marker found but no sample name provided. Use @pytest.mark.sample('sample_name').",
)
sample_name = marker.args[0]
repo_root = _resolve_repo_root()
sample_path = repo_root / "samples" / "getting_started" / "azure_functions" / sample_name
if not sample_path.exists():
return None, f"Sample directory does not exist: {sample_path}"
return sample_path, None
def _find_available_port(host: str = _DEFAULT_HOST) -> int:
"""Find an available TCP port on the given host."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind((host, 0))
return sock.getsockname()[1]
def _build_base_url(port: int, host: str = _DEFAULT_HOST) -> str:
"""Construct a base URL for the Azure Functions host."""
return f"http://{host}:{port}"
def _is_port_in_use(port: int, host: str = _DEFAULT_HOST) -> bool:
"""Check if a port is already in use.
Returns True if the port is in use, False otherwise.
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
return sock.connect_ex((host, port)) == 0
def _load_and_validate_env() -> None:
"""Load .env file from current directory if it exists, then validate required environment variables.
Raises pytest.fail if required environment variables are missing.
"""
_load_env_file_if_present()
# Required environment variables for Azure Functions samples
# These match the variables defined in .env.example
required_env_vars = [
"AZURE_OPENAI_ENDPOINT",
"AZURE_OPENAI_CHAT_DEPLOYMENT_NAME",
"AzureWebJobsStorage",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING",
"FUNCTIONS_WORKER_RUNTIME",
]
# Check if required env vars are set
missing_vars = [var for var in required_env_vars if not os.environ.get(var)]
if missing_vars:
pytest.fail(
f"Missing required environment variables: {', '.join(missing_vars)}. "
"Please create a .env file in tests/integration_tests/ based on .env.example or "
"set these variables in your environment."
)
def _start_function_app(sample_path: Path, port: int) -> subprocess.Popen[Any]:
"""Start a function app in the specified sample directory.
Returns the subprocess.Popen object for the running process.
"""
env = os.environ.copy()
# Use a unique TASKHUB_NAME for each test run to ensure test isolation.
# This prevents conflicts between parallel or repeated test runs, as Durable Functions
# use the task hub name to separate orchestration state.
env["TASKHUB_NAME"] = f"test{uuid.uuid4().hex[:8]}"
# On Windows, use CREATE_NEW_PROCESS_GROUP to allow proper termination
# shell=True only on Windows to handle PATH resolution
if sys.platform == "win32":
return subprocess.Popen(
["func", "start", "--port", str(port)],
cwd=str(sample_path),
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
shell=True,
env=env,
)
# On Unix, don't use shell=True to avoid shell wrapper issues
return subprocess.Popen(["func", "start", "--port", str(port)], cwd=str(sample_path), env=env)
def _wait_for_function_app_ready(func_process: subprocess.Popen[Any], port: int, max_wait: int = 60) -> None:
"""Block until the Azure Functions host responds healthy or fail fast."""
start_time = time.time()
health_url = f"{_build_base_url(port)}/api/health"
last_error: Exception | None = None
while time.time() - start_time < max_wait:
# If the process exited early, capture any previously seen error and fail fast.
if func_process.poll() is not None:
raise FunctionAppStartupError(
f"Function app process exited with code {func_process.returncode} before becoming healthy"
) from last_error
if _is_port_in_use(port):
try:
response = requests.get(health_url, timeout=5)
if response.status_code == 200:
return
last_error = RuntimeError(f"Health check returned {response.status_code}")
except requests.RequestException as exc:
last_error = exc
time.sleep(1)
raise FunctionAppStartupError(
f"Function app did not become healthy on port {port} within {max_wait} seconds"
) from last_error
def _cleanup_function_app(func_process: subprocess.Popen[Any]) -> None:
"""Clean up the function app process and all its children.
Uses psutil if available for more thorough cleanup, falls back to basic termination.
"""
try:
import psutil
if func_process.poll() is None: # Process still running
# Get parent process
parent = psutil.Process(func_process.pid)
# Get all child processes recursively
children = parent.children(recursive=True)
# Kill children first
for child in children:
with suppress(psutil.NoSuchProcess, psutil.AccessDenied):
child.kill()
# Kill parent
with suppress(psutil.NoSuchProcess, psutil.AccessDenied):
parent.kill()
# Wait for all to terminate
_gone, alive = psutil.wait_procs(children + [parent], timeout=3)
# Force kill any remaining
for proc in alive:
with suppress(psutil.NoSuchProcess, psutil.AccessDenied):
proc.kill()
except ImportError:
# Fallback if psutil not available
try:
if func_process.poll() is None:
func_process.kill()
func_process.wait()
except Exception:
# Ignore all exceptions during fallback cleanup; best effort to terminate process.
pass
except Exception:
pass # Best effort cleanup
# Give the port time to be released
time.sleep(2)
# =============================================================================
# Pytest Configuration
# =============================================================================
def pytest_configure(config: pytest.Config) -> None:
"""Register custom markers."""
config.addinivalue_line("markers", "orchestration: marks tests that use orchestrations (require Azurite)")
config.addinivalue_line(
"markers",
"sample(path): specify the sample directory path for the test (e.g., @pytest.mark.sample('01_single_agent'))",
)
def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None:
"""Skip integration tests in this directory if prerequisites are not met."""
should_skip, reason = _should_skip_azure_functions_integration_tests()
if should_skip:
skip_marker = pytest.mark.skip(reason=reason)
for item in items:
# Only skip items that are in this integration_tests directory
if "integration_tests" in str(item.fspath):
item.add_marker(skip_marker)
# =============================================================================
# Pytest Fixtures
# =============================================================================
@pytest.fixture(scope="session")
def function_app_running() -> bool:
"""Check if the function app is running on localhost:7071.
This fixture can be used to skip tests if the function app is not available.
"""
try:
response = requests.get("http://localhost:7071/api/health", timeout=2)
return response.status_code == 200
except requests.exceptions.RequestException:
return False
@pytest.fixture(scope="session")
def skip_if_no_function_app(function_app_running: bool) -> None:
"""Skip test if function app is not running."""
if not function_app_running:
pytest.skip("Function app is not running on http://localhost:7071")
@pytest.fixture(scope="module")
def function_app_for_test(request: pytest.FixtureRequest) -> Iterator[dict[str, int | str]]:
"""Start the function app for the corresponding sample based on marker.
This fixture:
1. Determines which sample to run from @pytest.mark.sample()
2. Validates environment variables
3. Starts the function app using 'func start'
4. Waits for the app to be ready
5. Tears down the app after tests complete
Usage:
@pytest.mark.sample("01_single_agent")
@pytest.mark.usefixtures("function_app_for_test")
class TestSample01SingleAgent:
...
"""
# Get sample path from marker
sample_path, error_message = _get_sample_path_from_marker(request)
if error_message:
pytest.fail(error_message)
assert sample_path is not None, "Sample path must be resolved before starting the function app"
# Load .env file if it exists and validate required env vars
_load_and_validate_env()
max_attempts = 3
last_error: Exception | None = None
func_process: subprocess.Popen[Any] | None = None
base_url = ""
port = 0
for _ in range(max_attempts):
port = _find_available_port()
base_url = _build_base_url(port)
func_process = _start_function_app(sample_path, port)
try:
_wait_for_function_app_ready(func_process, port)
last_error = None
break
except FunctionAppStartupError as exc:
last_error = exc
_cleanup_function_app(func_process)
func_process = None
if func_process is None:
error_message = f"Function app failed to start after {max_attempts} attempt(s)."
if last_error is not None:
error_message += f" Last error: {last_error}"
pytest.fail(error_message)
try:
yield {"base_url": base_url, "port": port}
finally:
if func_process is not None:
_cleanup_function_app(func_process)
@pytest.fixture(scope="module")
def base_url(function_app_for_test: Mapping[str, int | str]) -> str:
"""Expose the function app's base URL to tests."""
return str(function_app_for_test["base_url"])
@pytest.fixture(scope="session")
def sample_helper() -> type[SampleTestHelper]:
"""Provide the SampleTestHelper class for tests."""
return SampleTestHelper