From bc428746909966ae8dbe08d8ffa9f2cf1132d771 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe <109177538+peibekwe@users.noreply.github.com> Date: Fri, 1 May 2026 16:04:07 -0700 Subject: [PATCH] Python: Add Python parity for HttpRequestAction in declarative workflow (#5599) * Add Python parity for HttpRequestAction in declarative workflow * Ran pyupgrade and pright to fix CI issues * Fix conversation ID dot parsing for http executor * Removed unnecessary export command --- .../agent_framework/declarative/__init__.py | 5 + .../agent_framework/declarative/__init__.pyi | 10 + python/packages/declarative/AGENTS.md | 3 +- .../agent_framework_declarative/__init__.py | 10 + .../_workflows/__init__.py | 20 +- .../_workflows/_declarative_builder.py | 23 + .../_workflows/_errors.py | 38 ++ .../_workflows/_executors_http.py | 417 +++++++++++ .../_workflows/_factory.py | 17 +- .../_workflows/_http_handler.py | 237 +++++++ python/packages/declarative/pyproject.toml | 1 + .../test_default_http_request_handler.py | 329 +++++++++ .../tests/test_http_request_executor.py | 645 ++++++++++++++++++ .../test_http_request_yaml_integration.py | 111 +++ .../tests/test_workflow_factory.py | 6 +- .../tests/workflows/http_request.yaml | 29 + .../declarative/invoke_http_request/main.py | 97 +++ .../invoke_http_request/workflow.yaml | 57 ++ python/uv.lock | 2 + 19 files changed, 2046 insertions(+), 11 deletions(-) create mode 100644 python/packages/declarative/agent_framework_declarative/_workflows/_errors.py create mode 100644 python/packages/declarative/agent_framework_declarative/_workflows/_executors_http.py create mode 100644 python/packages/declarative/agent_framework_declarative/_workflows/_http_handler.py create mode 100644 python/packages/declarative/tests/test_default_http_request_handler.py create mode 100644 python/packages/declarative/tests/test_http_request_executor.py create mode 100644 python/packages/declarative/tests/test_http_request_yaml_integration.py create mode 100644 python/packages/declarative/tests/workflows/http_request.yaml create mode 100644 python/samples/03-workflows/declarative/invoke_http_request/main.py create mode 100644 python/samples/03-workflows/declarative/invoke_http_request/workflow.yaml diff --git a/python/packages/core/agent_framework/declarative/__init__.py b/python/packages/core/agent_framework/declarative/__init__.py index 7b7737dd47..ba88e6a0a9 100644 --- a/python/packages/core/agent_framework/declarative/__init__.py +++ b/python/packages/core/agent_framework/declarative/__init__.py @@ -21,10 +21,15 @@ _IMPORTS = [ "AgentFactory", "AgentExternalInputRequest", "AgentExternalInputResponse", + "DeclarativeActionError", "DeclarativeLoaderError", "DeclarativeWorkflowError", + "DefaultHttpRequestHandler", "ExternalInputRequest", "ExternalInputResponse", + "HttpRequestHandler", + "HttpRequestInfo", + "HttpRequestResult", "ProviderLookupError", "ProviderTypeMapping", "WorkflowFactory", diff --git a/python/packages/core/agent_framework/declarative/__init__.pyi b/python/packages/core/agent_framework/declarative/__init__.pyi index 92da0da682..f18be22f50 100644 --- a/python/packages/core/agent_framework/declarative/__init__.pyi +++ b/python/packages/core/agent_framework/declarative/__init__.pyi @@ -4,10 +4,15 @@ from agent_framework_declarative import ( AgentExternalInputRequest, AgentExternalInputResponse, AgentFactory, + DeclarativeActionError, DeclarativeLoaderError, DeclarativeWorkflowError, + DefaultHttpRequestHandler, ExternalInputRequest, ExternalInputResponse, + HttpRequestHandler, + HttpRequestInfo, + HttpRequestResult, ProviderLookupError, ProviderTypeMapping, WorkflowFactory, @@ -18,10 +23,15 @@ __all__ = [ "AgentExternalInputRequest", "AgentExternalInputResponse", "AgentFactory", + "DeclarativeActionError", "DeclarativeLoaderError", "DeclarativeWorkflowError", + "DefaultHttpRequestHandler", "ExternalInputRequest", "ExternalInputResponse", + "HttpRequestHandler", + "HttpRequestInfo", + "HttpRequestResult", "ProviderLookupError", "ProviderTypeMapping", "WorkflowFactory", diff --git a/python/packages/declarative/AGENTS.md b/python/packages/declarative/AGENTS.md index 72ac14860c..1add614601 100644 --- a/python/packages/declarative/AGENTS.md +++ b/python/packages/declarative/AGENTS.md @@ -8,7 +8,8 @@ YAML/JSON-based declarative agent and workflow definitions. - **`WorkflowFactory`** - Creates workflows from declarative definitions - **`WorkflowState`** - State management for declarative workflows - **`ProviderTypeMapping`** - Maps provider types to implementations -- **`DeclarativeLoaderError`** / **`ProviderLookupError`** - Error types +- **`HttpRequestHandler`** / **`DefaultHttpRequestHandler`** - Pluggable HTTP transport for the `HttpRequestAction` declarative action (configured via `WorkflowFactory(http_request_handler=...)`) +- **`DeclarativeLoaderError`** / **`ProviderLookupError`** / **`DeclarativeWorkflowError`** / **`DeclarativeActionError`** - Error types ## External Input Handling diff --git a/python/packages/declarative/agent_framework_declarative/__init__.py b/python/packages/declarative/agent_framework_declarative/__init__.py index 8200dd42e7..6afcb3c791 100644 --- a/python/packages/declarative/agent_framework_declarative/__init__.py +++ b/python/packages/declarative/agent_framework_declarative/__init__.py @@ -6,9 +6,14 @@ from ._loader import AgentFactory, DeclarativeLoaderError, ProviderLookupError, from ._workflows import ( AgentExternalInputRequest, AgentExternalInputResponse, + DeclarativeActionError, DeclarativeWorkflowError, + DefaultHttpRequestHandler, ExternalInputRequest, ExternalInputResponse, + HttpRequestHandler, + HttpRequestInfo, + HttpRequestResult, WorkflowFactory, WorkflowState, ) @@ -22,10 +27,15 @@ __all__ = [ "AgentExternalInputRequest", "AgentExternalInputResponse", "AgentFactory", + "DeclarativeActionError", "DeclarativeLoaderError", "DeclarativeWorkflowError", + "DefaultHttpRequestHandler", "ExternalInputRequest", "ExternalInputResponse", + "HttpRequestHandler", + "HttpRequestInfo", + "HttpRequestResult", "ProviderLookupError", "ProviderTypeMapping", "WorkflowFactory", diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/__init__.py b/python/packages/declarative/agent_framework_declarative/_workflows/__init__.py index 2968f2b3f9..c199e4551b 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/__init__.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/__init__.py @@ -25,6 +25,7 @@ from ._declarative_base import ( LoopIterationResult, ) from ._declarative_builder import ALL_ACTION_EXECUTORS, DeclarativeWorkflowBuilder +from ._errors import DeclarativeActionError, DeclarativeWorkflowError from ._executors_agents import ( AGENT_ACTION_EXECUTORS, AGENT_REGISTRY_KEY, @@ -67,6 +68,10 @@ from ._executors_external_input import ( RequestExternalInputExecutor, WaitForInputExecutor, ) +from ._executors_http import ( + HTTP_ACTION_EXECUTORS, + HttpRequestActionExecutor, +) from ._executors_tools import ( FUNCTION_TOOL_REGISTRY_KEY, TOOL_ACTION_EXECUTORS, @@ -78,7 +83,13 @@ from ._executors_tools import ( ToolApprovalState, ToolInvocationResult, ) -from ._factory import DeclarativeWorkflowError, WorkflowFactory +from ._factory import WorkflowFactory +from ._http_handler import ( + DefaultHttpRequestHandler, + HttpRequestHandler, + HttpRequestInfo, + HttpRequestResult, +) from ._state import WorkflowState __all__ = [ @@ -90,6 +101,7 @@ __all__ = [ "DECLARATIVE_STATE_KEY", "EXTERNAL_INPUT_EXECUTORS", "FUNCTION_TOOL_REGISTRY_KEY", + "HTTP_ACTION_EXECUTORS", "TOOL_ACTION_EXECUTORS", "TOOL_APPROVAL_STATE_KEY", "TOOL_REGISTRY_KEY", @@ -106,12 +118,14 @@ __all__ = [ "ContinueLoopExecutor", "ConversationData", "CreateConversationExecutor", + "DeclarativeActionError", "DeclarativeActionExecutor", "DeclarativeMessage", "DeclarativeStateData", "DeclarativeWorkflowBuilder", "DeclarativeWorkflowError", "DeclarativeWorkflowState", + "DefaultHttpRequestHandler", "EmitEventExecutor", "EndConversationExecutor", "EndWorkflowExecutor", @@ -120,6 +134,10 @@ __all__ = [ "ExternalLoopState", "ForeachInitExecutor", "ForeachNextExecutor", + "HttpRequestActionExecutor", + "HttpRequestHandler", + "HttpRequestInfo", + "HttpRequestResult", "InvokeAzureAgentExecutor", "InvokeFunctionToolExecutor", "JoinExecutor", diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_builder.py b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_builder.py index 6843c5bd92..fb5dcb88f8 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_builder.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_declarative_builder.py @@ -26,6 +26,7 @@ from ._declarative_base import ( DeclarativeActionExecutor, LoopIterationResult, ) +from ._errors import DeclarativeWorkflowError from ._executors_agents import AGENT_ACTION_EXECUTORS, InvokeAzureAgentExecutor from ._executors_basic import BASIC_ACTION_EXECUTORS from ._executors_control_flow import ( @@ -39,7 +40,9 @@ from ._executors_control_flow import ( SwitchEvaluatorExecutor, ) from ._executors_external_input import EXTERNAL_INPUT_EXECUTORS +from ._executors_http import HTTP_ACTION_EXECUTORS, HttpRequestActionExecutor from ._executors_tools import TOOL_ACTION_EXECUTORS, InvokeFunctionToolExecutor +from ._http_handler import HttpRequestHandler logger = logging.getLogger(__name__) @@ -51,6 +54,7 @@ ALL_ACTION_EXECUTORS = { **AGENT_ACTION_EXECUTORS, **EXTERNAL_INPUT_EXECUTORS, **TOOL_ACTION_EXECUTORS, + **HTTP_ACTION_EXECUTORS, } # Action kinds that terminate control flow (no fall-through to successor) @@ -85,6 +89,7 @@ ACTION_REQUIRED_FIELDS: dict[str, list[str]] = { "WaitForHumanInput": ["variable"], "EmitEvent": ["event"], "InvokeFunctionTool": ["functionName"], + "HttpRequestAction": ["url"], } # Alternate field names that satisfy required field requirements @@ -129,6 +134,7 @@ class DeclarativeWorkflowBuilder: checkpoint_storage: Any | None = None, validate: bool = True, max_iterations: int | None = None, + http_request_handler: HttpRequestHandler | None = None, ): """Initialize the builder. @@ -141,6 +147,9 @@ class DeclarativeWorkflowBuilder: validate: Whether to validate the workflow definition before building (default: True) max_iterations: Maximum runner supersteps. Falls back to the YAML ``maxTurns`` field, then to the core default (100). + http_request_handler: Handler used to dispatch HttpRequestAction requests. + Must be supplied when the workflow contains any HttpRequestAction; + otherwise build raises ``DeclarativeWorkflowError``. """ self._yaml_def = yaml_definition self._workflow_id = workflow_id or yaml_definition.get("name", "declarative_workflow") @@ -152,6 +161,7 @@ class DeclarativeWorkflowBuilder: self._pending_gotos: list[tuple[Any, str]] = [] # (goto_executor, target_id) self._validate = validate self._seen_explicit_ids: set[str] = set() # Track explicit IDs for duplicate detection + self._http_request_handler = http_request_handler # Resolve max_iterations: explicit arg > YAML maxTurns > core default resolved = max_iterations if max_iterations is not None else yaml_definition.get("maxTurns") if resolved is not None and (not isinstance(resolved, int) or resolved <= 0): @@ -458,6 +468,19 @@ class DeclarativeWorkflowBuilder: executor = InvokeAzureAgentExecutor(action_def, id=action_id, agents=self._agents) elif kind == "InvokeFunctionTool": executor = InvokeFunctionToolExecutor(action_def, id=action_id, tools=self._tools) + elif kind == "HttpRequestAction": + if self._http_request_handler is None: + raise DeclarativeWorkflowError( + f"Workflow defines HttpRequestAction '{action_id}' but no " + "http_request_handler was supplied to WorkflowFactory. Pass " + "http_request_handler=DefaultHttpRequestHandler() (or a custom " + "implementation) to enable HTTP requests." + ) + executor = HttpRequestActionExecutor( + action_def, + id=action_id, + http_request_handler=self._http_request_handler, + ) else: executor = executor_class(action_def, id=action_id) self._executors[action_id] = executor diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_errors.py b/python/packages/declarative/agent_framework_declarative/_workflows/_errors.py new file mode 100644 index 0000000000..e3372ebf06 --- /dev/null +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_errors.py @@ -0,0 +1,38 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Error types for declarative workflow executor modules. + +This module exists so that executor modules and the builder (e.g. +``_executors_http``, ``_declarative_builder``) can raise declarative-specific +exceptions without importing from ``_factory``. ``_factory`` imports +``_declarative_builder`` which imports the executor modules; pulling +:class:`DeclarativeWorkflowError` from ``_factory`` into an executor or +builder module would therefore introduce a circular import. +""" + +from __future__ import annotations + +from agent_framework.exceptions import WorkflowException + + +class DeclarativeWorkflowError(WorkflowException): + """Raised for build-time / factory-level declarative workflow errors. + + Used for YAML parsing/validation issues, missing configuration (e.g. an + HTTP request handler not supplied for a workflow that contains an + ``HttpRequestAction``), and other errors detected before workflow + execution begins. + """ + + pass + + +class DeclarativeActionError(WorkflowException): + """Raised when a declarative action fails at run time. + + Used by executor modules for runtime failures (e.g. transport errors, + non-2xx responses from :class:`HttpRequestActionExecutor`). Build-time and + factory-level errors continue to use :class:`DeclarativeWorkflowError`. + """ + + pass diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_executors_http.py b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_http.py new file mode 100644 index 0000000000..0a91748c6b --- /dev/null +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_http.py @@ -0,0 +1,417 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Executor for the ``HttpRequestAction`` declarative action. + +Mirrors the .NET ``HttpRequestExecutor``: dispatches an HTTP request through the +configured :class:`HttpRequestHandler`, parses the response body, and assigns +the parsed body and response headers to the declared state paths. + +Security note: response bodies can echo secrets and may be very large. Diagnostic +messages produced for non-2xx responses truncate the body to 256 characters and +collapse CR/LF/TAB to spaces (parity with .NET ``FormatBodyForDiagnostics``). +""" + +from __future__ import annotations + +import json +import logging +from collections.abc import Mapping +from typing import Any + +import httpx +from agent_framework import ( + Message, + WorkflowContext, + handler, +) + +from ._declarative_base import ( + ActionComplete, + DeclarativeActionExecutor, + DeclarativeWorkflowState, +) +from ._errors import DeclarativeActionError +from ._http_handler import HttpRequestHandler, HttpRequestInfo, HttpRequestResult + +__all__ = [ + "HTTP_ACTION_EXECUTORS", + "HttpRequestActionExecutor", +] + +logger = logging.getLogger(__name__) + +_MAX_BODY_DIAGNOSTIC_LENGTH = 256 +_BODY_TRUNCATION_SUFFIX = " \u2026 [truncated]" + + +# Body discriminator aliases. Long forms match the .NET object-model type +# names so YAML produced by .NET round-trips. Short forms are the .NET YAML +# convention used in test fixtures. +_BODY_KIND_JSON = {"json", "JsonRequestContent"} +_BODY_KIND_RAW = {"raw", "RawRequestContent"} +_BODY_KIND_NONE = {"none", "NoRequestContent"} + + +def _get_path(action_def: Mapping[str, Any], key: str) -> str | None: + """Extract a state path from ``response``/``responseHeaders`` field. + + Supports two YAML shapes (matches .NET serialization round-trips): + + - ``response: Local.MyVar`` (plain string). + - ``response: { path: Local.MyVar }`` (object form). + """ + value = action_def.get(key) + if isinstance(value, str): + return value or None + if isinstance(value, Mapping): + path = value.get("path") # type: ignore[reportUnknownMemberType, reportUnknownVariableType] + return path if isinstance(path, str) and path else None + return None + + +def _format_body_for_diagnostics(body: str | None) -> str: + """Truncate and sanitise a response body for inclusion in error messages. + + Mirrors the .NET ``FormatBodyForDiagnostics`` helper: + + - Empty/None -> empty string. + - Replaces CR/LF/TAB with spaces. + - Truncates to 256 chars with a unicode-ellipsis ``[truncated]`` suffix. + """ + if not body: + return "" + + truncated = len(body) > _MAX_BODY_DIAGNOSTIC_LENGTH + head = body[:_MAX_BODY_DIAGNOSTIC_LENGTH] if truncated else body + sanitized = head.replace("\r", " ").replace("\n", " ").replace("\t", " ") + return sanitized + _BODY_TRUNCATION_SUFFIX if truncated else sanitized + + +def _parse_response_body(body: str | None) -> Any: + """Parse an HTTP response body the same way the .NET executor does. + + JSON-first: if the body parses as JSON, the parsed value is returned. Other + bodies are returned as the raw string. Empty/None bodies return ``None``. + """ + if body is None or body == "": + return None + try: + return json.loads(body) + except json.JSONDecodeError: + return body + + +def _format_query_value(value: Any) -> str | None: + """Format a query-parameter value for URL inclusion. + + Mirrors .NET ``FormatQueryValue``: ``None`` is dropped, ``bool`` becomes + lower-case ``"true"``/``"false"``, numerics use invariant ``str()``, and + other values fall through to ``str()``. + """ + if value is None: + return None + if isinstance(value, bool): + return "true" if value else "false" + if isinstance(value, str): + return value + return str(value) + + +def _get_messages_path(state: DeclarativeWorkflowState, conversation_id_expr: str | None) -> str | None: + """Return the configured conversation messages path, if any. + + Returns ``System.conversations.{evaluated_id}.messages`` when a + ``conversation_id_expr`` is configured and evaluates to a non-empty value. + Returns ``None`` when no conversation id expression is configured or when + the expression evaluates to ``None`` or an empty string (matches .NET + ``GetConversationId`` behaviour where empty becomes ``null`` and the + response is not appended). + """ + if not conversation_id_expr: + return None + evaluated = state.eval_if_expression(conversation_id_expr) + if evaluated is None or (isinstance(evaluated, str) and not evaluated): + return None + return f"System.conversations.{evaluated}.messages" + + +class HttpRequestActionExecutor(DeclarativeActionExecutor): + """Executor for the ``HttpRequestAction`` declarative action. + + Dispatches through the supplied :class:`HttpRequestHandler` and: + + - Parses the response body (JSON-first, raw string fall-back). + - Assigns the parsed body to ``response`` path (if configured). + - Folds multi-value response headers (comma-joined) and assigns them to + ``responseHeaders`` path (if configured). + - On 2xx with non-empty body and a configured ``conversationId``, appends + an Assistant :class:`agent_framework.Message` to + ``System.conversations.{id}.messages``. + - On non-2xx, still publishes ``responseHeaders`` (diagnostic) and raises + :class:`DeclarativeActionError` with a status-coded message containing a + truncated/sanitised body preview. + + Transport errors (``httpx.TimeoutException``, ``TimeoutError``, + ``httpx.HTTPError``) become :class:`DeclarativeActionError`. ``CancelledError`` + is intentionally NOT caught so that workflow cancellation propagates. + """ + + def __init__( + self, + action_def: dict[str, Any], + *, + id: str | None = None, + http_request_handler: HttpRequestHandler, + ) -> None: + """Create an HTTP request action executor. + + Args: + action_def: Parsed ``HttpRequestAction`` YAML dict. + id: Optional executor id (defaults to action id or generated). + http_request_handler: Handler used to dispatch HTTP requests. + Required: the builder enforces presence at workflow-build time. + """ + super().__init__(action_def, id=id) + self._http_request_handler = http_request_handler + + @handler + async def handle_action( + self, + trigger: Any, + ctx: WorkflowContext[ActionComplete], + ) -> None: + """Execute the HTTP request action.""" + state = await self._ensure_state_initialized(ctx, trigger) + + method = self._get_method(state) + url = self._get_url(state) + headers = self._get_headers(state) + query_parameters = self._get_query_parameters(state) + body, body_content_type = self._get_body(state) + timeout_ms = self._get_timeout_ms(state) + conversation_id_expr = self._action_def.get("conversationId") + connection_name = self._get_connection_name(state) + + info = HttpRequestInfo( + method=method, + url=url, + headers=headers or {}, + query_parameters=query_parameters or {}, + body=body, + body_content_type=body_content_type, + timeout_ms=timeout_ms, + connection_name=connection_name, + ) + + try: + result = await self._http_request_handler.send(info) + except (httpx.TimeoutException, TimeoutError) as exc: + raise DeclarativeActionError(f"HTTP request to '{url}' timed out.") from exc + except DeclarativeActionError: + raise + except httpx.HTTPError as exc: + raise DeclarativeActionError(f"HTTP request to '{url}' failed: {type(exc).__name__}") from exc + except Exception as exc: + # Custom HttpRequestHandler implementations may raise arbitrary + # exception types. Wrap them in DeclarativeActionError so workflow + # error handling stays uniform regardless of transport. Note that + # ``asyncio.CancelledError`` is a ``BaseException`` (not + # ``Exception``) and so still propagates unmodified, preserving + # workflow-cancellation semantics. + raise DeclarativeActionError(f"HTTP request to '{url}' failed: {type(exc).__name__}") from exc + + if result.is_success_status_code: + self._assign_response(state, result) + self._assign_response_headers(state, result) + self._append_response_to_conversation(state, conversation_id_expr, result.body) + await ctx.send_message(ActionComplete()) + return + + # Non-success path: still publish headers diagnostically, then raise. + self._assign_response_headers(state, result) + body_preview = _format_body_for_diagnostics(result.body) + if body_preview: + message = f"HTTP request to '{url}' failed with status code {result.status_code}. Body: '{body_preview}'" + else: + message = f"HTTP request to '{url}' failed with status code {result.status_code}." + raise DeclarativeActionError(message) + + # ----- Field resolution ---------------------------------------------------- + + def _get_method(self, state: DeclarativeWorkflowState) -> str: + method = self._action_def.get("method") + evaluated = state.eval_if_expression(method) if method is not None else None + if not evaluated: + return "GET" + return str(evaluated).upper() + + def _get_url(self, state: DeclarativeWorkflowState) -> str: + raw = self._action_def.get("url") + if raw is None: + raise ValueError("HttpRequestAction requires a 'url' field.") + evaluated = state.eval_if_expression(raw) + if not isinstance(evaluated, str) or not evaluated: + raise ValueError("HttpRequestAction 'url' evaluated to an empty value.") + return evaluated + + def _get_headers(self, state: DeclarativeWorkflowState) -> dict[str, str] | None: + raw_headers = self._action_def.get("headers") + if not isinstance(raw_headers, Mapping) or not raw_headers: + return None + result: dict[str, str] = {} + for key, value in raw_headers.items(): # type: ignore[reportUnknownVariableType] + if not isinstance(key, str) or not key: + continue + evaluated = state.eval_if_expression(value) + if evaluated is None: + continue + text = str(evaluated) + if not text: + continue + result[key] = text + return result or None + + def _get_query_parameters(self, state: DeclarativeWorkflowState) -> dict[str, str] | None: + raw_params = self._action_def.get("queryParameters") + if not isinstance(raw_params, Mapping) or not raw_params: + return None + result: dict[str, str] = {} + for key, value in raw_params.items(): # type: ignore[reportUnknownVariableType] + if not isinstance(key, str) or not key or value is None: + continue + evaluated = state.eval_if_expression(value) + formatted = _format_query_value(evaluated) + if formatted is not None: + result[key] = formatted + return result or None + + def _get_body(self, state: DeclarativeWorkflowState) -> tuple[str | None, str | None]: + raw_body = self._action_def.get("body") + if raw_body is None: + return None, None + if not isinstance(raw_body, Mapping): + raise ValueError( + "HttpRequestAction 'body' must be a mapping with a 'kind' field (json, raw) or omitted entirely." + ) + + kind_value: Any = raw_body.get("kind") or raw_body.get("$kind") # type: ignore[reportUnknownMemberType] + if kind_value is None: + raise ValueError( + "HttpRequestAction 'body' is missing 'kind'. Use 'json', 'raw', or omit 'body' for no request body." + ) + if not isinstance(kind_value, str): + raise ValueError(f"HttpRequestAction 'body.kind' must be a string, got {kind_value!r}.") + + if kind_value in _BODY_KIND_NONE: + return None, None + + if kind_value in _BODY_KIND_JSON: + content_expr: Any = raw_body.get("content") # type: ignore[reportUnknownMemberType] + if content_expr is None: + return None, None + evaluated = state.eval_if_expression(content_expr) + try: + body_text = json.dumps(evaluated, default=str) + except (TypeError, ValueError) as exc: + raise ValueError(f"HttpRequestAction 'body.content' could not be serialised as JSON: {exc}") from exc + return body_text, "application/json" + + if kind_value in _BODY_KIND_RAW: + content_expr = raw_body.get("content") # type: ignore[reportUnknownMemberType] + content_type_expr: Any = raw_body.get("contentType") # type: ignore[reportUnknownMemberType] + content: str | None = None + if content_expr is not None: + evaluated = state.eval_if_expression(content_expr) + content = None if evaluated is None else str(evaluated) + content_type: str | None = None + if content_type_expr is not None: + ct_eval = state.eval_if_expression(content_type_expr) + ct_text = None if ct_eval is None else str(ct_eval) + content_type = ct_text or None + # Match .NET RawRequestContent semantics: when a raw body is sent + # without an explicit content type, default to text/plain so the + # request is interpretable by servers. + if content is not None and not content_type: + content_type = "text/plain" + return content, content_type + + raise ValueError( + f"HttpRequestAction 'body.kind' has unsupported value '{kind_value}'. " + "Expected one of: json, raw, JsonRequestContent, RawRequestContent, " + "NoRequestContent." + ) + + def _get_timeout_ms(self, state: DeclarativeWorkflowState) -> int | None: + raw = self._action_def.get("requestTimeoutInMilliseconds") + if raw is None: + return None + evaluated = state.eval_if_expression(raw) + if evaluated is None: + return None + try: + value = int(evaluated) + except (TypeError, ValueError): + logger.debug( + "HttpRequestAction: ignoring non-numeric requestTimeoutInMilliseconds=%r", + evaluated, + ) + return None + return value if value > 0 else None + + def _get_connection_name(self, state: DeclarativeWorkflowState) -> str | None: + connection = self._action_def.get("connection") + if not isinstance(connection, Mapping): + return None + name_expr: Any = connection.get("name") # type: ignore[reportUnknownMemberType] + if name_expr is None: + return None + evaluated = state.eval_if_expression(name_expr) + if evaluated is None: + return None + text = str(evaluated) + return text or None + + # ----- Result handling ----------------------------------------------------- + + def _assign_response(self, state: DeclarativeWorkflowState, result: HttpRequestResult) -> None: + path = _get_path(self._action_def, "response") + if path is None: + return + state.set(path, _parse_response_body(result.body)) + + def _assign_response_headers(self, state: DeclarativeWorkflowState, result: HttpRequestResult) -> None: + path = _get_path(self._action_def, "responseHeaders") + if path is None: + return + if not result.headers: + state.set(path, None) + return + # Fold multi-value headers with commas (standard HTTP folding) only at + # assignment time. The raw multi-value dict on HttpRequestResult.headers + # is left untouched so callers/tests can inspect duplicates. + flattened: dict[str, str] = {} + for key, values in result.headers.items(): + flattened[key] = ",".join(values) + state.set(path, flattened) + + def _append_response_to_conversation( + self, + state: DeclarativeWorkflowState, + conversation_id_expr: str | None, + body: str, + ) -> None: + if not body: + return + messages_path = _get_messages_path(state, conversation_id_expr) + if messages_path is None: + return + # Mirrors InvokeAzureAgentExecutor: rely on state.append to lazily + # create the conversation entry. Avoids re-parsing the id back out + # of the dotted path string. + message = Message(role="assistant", contents=[body]) + state.append(messages_path, message) + + +HTTP_ACTION_EXECUTORS: dict[str, type[DeclarativeActionExecutor]] = { + "HttpRequestAction": HttpRequestActionExecutor, +} diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_factory.py b/python/packages/declarative/agent_framework_declarative/_workflows/_factory.py index 9ba4cb84de..d1e21d76e9 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_factory.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_factory.py @@ -24,18 +24,16 @@ from agent_framework import ( SupportsAgentRun, Workflow, ) -from agent_framework.exceptions import WorkflowException from .._loader import AgentFactory from ._declarative_builder import DeclarativeWorkflowBuilder +from ._errors import DeclarativeWorkflowError +from ._http_handler import HttpRequestHandler logger = logging.getLogger("agent_framework.declarative") -class DeclarativeWorkflowError(WorkflowException): - """Exception raised for errors in declarative workflow processing.""" - - pass +__all__ = ["WorkflowFactory"] class WorkflowFactory: @@ -92,6 +90,7 @@ class WorkflowFactory: env_file: str | None = None, checkpoint_storage: CheckpointStorage | None = None, max_iterations: int | None = None, + http_request_handler: HttpRequestHandler | None = None, ) -> None: """Initialize the workflow factory. @@ -105,6 +104,12 @@ class WorkflowFactory: max_iterations: Optional maximum runner supersteps. Overrides the YAML ``maxTurns`` field and the core default (100). Workflows with ``GotoAction`` loops (e.g. DeepResearch) typically need a higher value. + http_request_handler: Optional handler used to dispatch HTTP requests for + ``HttpRequestAction``. Required if the workflow contains any + ``HttpRequestAction``; build will fail with :class:`DeclarativeWorkflowError` + otherwise. Use :class:`agent_framework.declarative.DefaultHttpRequestHandler` + for a no-policy ``httpx``-based default, or supply your own implementation + to enforce SSRF guards, allowlisting, or auth resolution. Examples: .. code-block:: python @@ -144,6 +149,7 @@ class WorkflowFactory: self._tools: dict[str, Any] = {} # Tool registry for InvokeFunctionTool actions self._checkpoint_storage = checkpoint_storage self._max_iterations = max_iterations + self._http_request_handler = http_request_handler def create_workflow_from_yaml_path( self, @@ -387,6 +393,7 @@ class WorkflowFactory: tools=self._tools, checkpoint_storage=self._checkpoint_storage, max_iterations=self._max_iterations, + http_request_handler=self._http_request_handler, ) workflow = graph_builder.build() except ValueError as e: diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_http_handler.py b/python/packages/declarative/agent_framework_declarative/_workflows/_http_handler.py new file mode 100644 index 0000000000..90ff5b87b4 --- /dev/null +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_http_handler.py @@ -0,0 +1,237 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""HTTP request handler abstraction for declarative workflows. + +Mirrors the .NET ``IHttpRequestHandler`` / ``DefaultHttpRequestHandler`` pair from +``Microsoft.Agents.AI.Workflows.Declarative``. Provides: + +- :class:`HttpRequestInfo` — request input data passed from the executor. +- :class:`HttpRequestResult` — response data returned to the executor. +- :class:`HttpRequestHandler` — :class:`typing.Protocol` callers implement to plug + in custom transports (e.g. with allowlisting, mTLS, retries, etc.). +- :class:`DefaultHttpRequestHandler` — production-grade default backed by + ``httpx.AsyncClient``. + +Security note: :class:`DefaultHttpRequestHandler` performs **no** URL filtering +or SSRF protection. Production deployments should supply a custom handler that +enforces an allowlist or DNS-rebinding-resistant policy. This split mirrors the +.NET design. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable, Mapping +from dataclasses import dataclass, field +from typing import Any, Protocol, runtime_checkable + +import httpx + +__all__ = [ + "DefaultHttpRequestHandler", + "HttpRequestHandler", + "HttpRequestInfo", + "HttpRequestResult", +] + + +@dataclass +class HttpRequestInfo: + """Description of an HTTP request to be dispatched by a :class:`HttpRequestHandler`. + + Mirrors the .NET ``HttpRequestInfo`` record. Field semantics: + + - ``method``: HTTP method (``GET``, ``POST``, etc.). Already upper-cased by the executor. + - ``url``: Absolute URL. Already evaluated from the YAML expression. + - ``headers``: Single-value header map (case-insensitive keys per HTTP semantics + but stored as authored). Empty values are skipped by the executor. + - ``query_parameters``: String key/value pairs appended to the URL. + - ``body``: Request body bytes/text, or ``None`` for no body. + - ``body_content_type``: Content type to send (e.g. ``application/json``). + Ignored when ``body`` is ``None``. + - ``timeout_ms``: Per-request timeout in milliseconds. ``None`` => use the + handler's default. + - ``connection_name``: Optional Foundry connection name for handlers that + resolve auth/credentials by connection. + """ + + method: str + url: str + headers: dict[str, str] = field(default_factory=dict) # type: ignore[reportUnknownVariableType] + query_parameters: dict[str, str] = field(default_factory=dict) # type: ignore[reportUnknownVariableType] + body: str | None = None + body_content_type: str | None = None + timeout_ms: int | None = None + connection_name: str | None = None + + +@dataclass +class HttpRequestResult: + """Response returned by a :class:`HttpRequestHandler`. + + Mirrors the .NET ``HttpRequestResult`` record. ``headers`` preserves + multi-value response headers (e.g. multiple ``Set-Cookie`` headers) as a + ``dict[str, list[str]]``. The executor folds duplicates into a single + comma-joined string only at the point it assigns ``responseHeaders`` to + workflow state. + + Header keys are normalized to lowercase so that lookups are consistent + regardless of the server's transmitted casing (HTTP headers are + case-insensitive per RFC 7230 §3.2). Custom :class:`HttpRequestHandler` + implementations should follow the same convention. + """ + + status_code: int + is_success_status_code: bool + body: str + headers: dict[str, list[str]] = field(default_factory=dict) # type: ignore[reportUnknownVariableType] + + +@runtime_checkable +class HttpRequestHandler(Protocol): + """Protocol for HTTP request handlers used by ``HttpRequestAction``. + + Implementations must be safe to call concurrently from multiple workflow + runs. Implementations are responsible for any URL allowlisting, SSRF + guards, retry policies, auth resolution, and other policies that the + workflow author wants applied. + """ + + async def send(self, info: HttpRequestInfo) -> HttpRequestResult: + """Dispatch ``info`` and return the response result. + + Args: + info: Description of the request to send. + + Returns: + The response. Implementations should NOT raise on non-2xx status + codes; instead, set ``is_success_status_code`` accordingly. They + SHOULD raise on transport-level failures (connection refused, + DNS errors, timeouts). + """ + ... + + +ClientProvider = Callable[[HttpRequestInfo], Awaitable["httpx.AsyncClient | None"]] + + +class DefaultHttpRequestHandler: + """Default :class:`HttpRequestHandler` backed by :class:`httpx.AsyncClient`. + + Construction modes: + + 1. ``DefaultHttpRequestHandler()`` — owns an internal client created lazily + on first ``send()``. Closed by :meth:`aclose`. + 2. ``DefaultHttpRequestHandler(client=existing)`` — caller-owned client. + Not closed by :meth:`aclose`. + 3. ``DefaultHttpRequestHandler(client_provider=cb)`` — per-request client + lookup (parity with .NET's ``httpClientProvider`` callback). The + provider may return ``None`` to fall back to the owned/default client. + + .. warning:: + + This handler performs **no** URL filtering or SSRF protection. Wrap or + replace it with a custom handler in production. + """ + + def __init__( + self, + *, + client: httpx.AsyncClient | None = None, + client_provider: ClientProvider | None = None, + ) -> None: + self._owned_client: httpx.AsyncClient | None = None + self._caller_client = client + self._client_provider = client_provider + # Guards lazy creation of ``_owned_client`` against concurrent first + # ``send()`` calls leaking duplicate clients. + self._owned_client_lock = asyncio.Lock() + + async def send(self, info: HttpRequestInfo) -> HttpRequestResult: + """Dispatch the request and return the parsed result.""" + if not info.url: + raise ValueError("HttpRequestInfo.url must be a non-empty string.") + if not info.method: + raise ValueError("HttpRequestInfo.method must be a non-empty string.") + + client = await self._resolve_client(info) + + timeout: httpx.Timeout | object + if info.timeout_ms is not None and info.timeout_ms > 0: + timeout = httpx.Timeout(info.timeout_ms / 1000.0) + else: + timeout = httpx.USE_CLIENT_DEFAULT + + headers = dict(info.headers) + content: bytes | str | None = None + if info.body is not None: + content = info.body + if not _has_header(headers, "content-type"): + # Match .NET DefaultHttpRequestHandler: when a body is sent + # without an explicit content type, default to ``text/plain`` + # so the request is interpretable by servers and direct + # callers (not just the YAML executor) get sensible defaults. + headers["Content-Type"] = info.body_content_type or "text/plain" + + params: Mapping[str, str] | None = info.query_parameters or None + + response = await client.request( + method=info.method, + url=info.url, + params=params, + headers=headers or None, + content=content, + timeout=timeout, # type: ignore[arg-type] + ) + + # Preserve multi-value headers (e.g. multiple Set-Cookie) as list[str]. + # Normalize names to lowercase so lookups are consistent and case + # variations from the transport do not create duplicate logical keys + # (HTTP headers are case-insensitive per RFC 7230 §3.2). + result_headers: dict[str, list[str]] = {} + for key, value in response.headers.multi_items(): + result_headers.setdefault(key.lower(), []).append(value) + + body_text = response.text + + return HttpRequestResult( + status_code=response.status_code, + is_success_status_code=200 <= response.status_code < 300, + body=body_text, + headers=result_headers, + ) + + async def aclose(self) -> None: + """Release the owned client, if any. Caller-owned clients are NOT closed.""" + if self._owned_client is not None: + await self._owned_client.aclose() + self._owned_client = None + + async def _resolve_client(self, info: HttpRequestInfo) -> httpx.AsyncClient: + """Pick a client for this request: provider → caller → lazily-owned.""" + if self._client_provider is not None: + provided = await self._client_provider(info) + if provided is not None: + return provided + if self._caller_client is not None: + return self._caller_client + if self._owned_client is None: + # Double-checked locking under asyncio.Lock so concurrent first + # callers don't each create a fresh httpx.AsyncClient and orphan + # one of them. + async with self._owned_client_lock: + if self._owned_client is None: + self._owned_client = httpx.AsyncClient() + return self._owned_client + + async def __aenter__(self) -> DefaultHttpRequestHandler: + return self + + async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> None: + await self.aclose() + + +def _has_header(headers: Mapping[str, str], name: str) -> bool: + """Case-insensitive header presence check.""" + needle = name.lower() + return any(key.lower() == needle for key in headers) diff --git a/python/packages/declarative/pyproject.toml b/python/packages/declarative/pyproject.toml index a26981505c..a03928ac82 100644 --- a/python/packages/declarative/pyproject.toml +++ b/python/packages/declarative/pyproject.toml @@ -23,6 +23,7 @@ classifiers = [ ] dependencies = [ "agent-framework-core>=1.2.2,<2", + "httpx>=0.27,<1", "powerfx>=0.0.32,<0.0.35; python_version < '3.14'", "pyyaml>=6.0,<7.0", ] diff --git a/python/packages/declarative/tests/test_default_http_request_handler.py b/python/packages/declarative/tests/test_default_http_request_handler.py new file mode 100644 index 0000000000..ecdce3d7ff --- /dev/null +++ b/python/packages/declarative/tests/test_default_http_request_handler.py @@ -0,0 +1,329 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for ``DefaultHttpRequestHandler``. + +These tests exercise the real handler against ``httpx.MockTransport`` (no real +network) to cover the parts of the handler not exercisable through the executor +stub: query-param URL composition, content-type forwarding, per-request +timeout overrides, multi-value response header preservation, and client +ownership semantics. +""" + +from __future__ import annotations + +import sys + +import httpx +import pytest + +try: + import powerfx # noqa: F401 + + _powerfx_available = True +except (ImportError, RuntimeError): + _powerfx_available = False + +# These tests don't actually need PowerFx, but the rest of the suite gates on +# Python versions and we keep behaviour consistent. +pytestmark = pytest.mark.skipif( + sys.version_info >= (3, 14), + reason="Skipped on Python 3.14+ to keep parity with rest of declarative suite", +) + +from agent_framework_declarative._workflows._http_handler import ( # noqa: E402 + DefaultHttpRequestHandler, + HttpRequestInfo, +) + + +def _make_handler(transport: httpx.MockTransport) -> DefaultHttpRequestHandler: + """Return a handler with a MockTransport-backed caller-owned client.""" + client = httpx.AsyncClient(transport=transport) + return DefaultHttpRequestHandler(client=client) + + +class TestRequestComposition: + @pytest.mark.asyncio + async def test_query_parameters_merged_into_url(self) -> None: + captured: dict[str, httpx.Request] = {} + + def respond(request: httpx.Request) -> httpx.Response: + captured["req"] = request + return httpx.Response(200, text="ok") + + handler = _make_handler(httpx.MockTransport(respond)) + try: + await handler.send( + HttpRequestInfo( + method="GET", + url="https://api.example.test/items", + query_parameters={"q": "alpha", "limit": "5"}, + ) + ) + finally: + await handler.aclose() + + req = captured["req"] + # httpx exposes the merged URL with QS appended + assert req.url.params.get("q") == "alpha" + assert req.url.params.get("limit") == "5" + + @pytest.mark.asyncio + async def test_body_content_type_forwarded(self) -> None: + captured: dict[str, httpx.Request] = {} + + def respond(request: httpx.Request) -> httpx.Response: + captured["req"] = request + return httpx.Response(204) + + handler = _make_handler(httpx.MockTransport(respond)) + try: + await handler.send( + HttpRequestInfo( + method="POST", + url="https://api.example.test/items", + body='{"k":"v"}', + body_content_type="application/json", + ) + ) + finally: + await handler.aclose() + + req = captured["req"] + assert req.headers.get("content-type") == "application/json" + assert req.content == b'{"k":"v"}' + + @pytest.mark.asyncio + async def test_existing_content_type_header_not_overwritten(self) -> None: + captured: dict[str, httpx.Request] = {} + + def respond(request: httpx.Request) -> httpx.Response: + captured["req"] = request + return httpx.Response(200, text="ok") + + handler = _make_handler(httpx.MockTransport(respond)) + try: + await handler.send( + HttpRequestInfo( + method="POST", + url="https://api.example.test/items", + headers={"Content-Type": "application/xml"}, # caller wins + body="", + body_content_type="application/json", + ) + ) + finally: + await handler.aclose() + + req = captured["req"] + assert req.headers.get("content-type") == "application/xml" + + @pytest.mark.asyncio + async def test_body_without_content_type_defaults_to_text_plain(self) -> None: + """Match .NET DefaultHttpRequestHandler: body without explicit content type → ``text/plain``.""" + captured: dict[str, httpx.Request] = {} + + def respond(request: httpx.Request) -> httpx.Response: + captured["req"] = request + return httpx.Response(204) + + handler = _make_handler(httpx.MockTransport(respond)) + try: + await handler.send( + HttpRequestInfo( + method="POST", + url="https://api.example.test/items", + body="hello", + # No body_content_type and no Content-Type header. + ) + ) + finally: + await handler.aclose() + + req = captured["req"] + assert req.headers.get("content-type") == "text/plain" + assert req.content == b"hello" + + +class TestTimeout: + @pytest.mark.asyncio + async def test_per_request_timeout_surfaces_as_timeout_exception(self) -> None: + def respond(request: httpx.Request) -> httpx.Response: + raise httpx.TimeoutException("simulated timeout", request=request) + + handler = _make_handler(httpx.MockTransport(respond)) + try: + with pytest.raises(httpx.TimeoutException): + await handler.send( + HttpRequestInfo( + method="GET", + url="https://api.example.test/slow", + timeout_ms=50, + ) + ) + finally: + await handler.aclose() + + +class TestResponseHeaders: + @pytest.mark.asyncio + async def test_multi_value_headers_preserved(self) -> None: + def respond(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 200, + text="ok", + headers=[ + ("Content-Type", "application/json"), + ("Set-Cookie", "a=1"), + ("Set-Cookie", "b=2"), + ], + ) + + handler = _make_handler(httpx.MockTransport(respond)) + try: + result = await handler.send(HttpRequestInfo(method="GET", url="https://api.example.test/x")) + finally: + await handler.aclose() + + assert result.is_success_status_code + # The handler keeps multi-value headers as list[str]. + assert result.headers.get("set-cookie") == ["a=1", "b=2"] + assert result.headers.get("content-type") == ["application/json"] + + +class TestClientOwnership: + @pytest.mark.asyncio + async def test_owned_client_is_closed_on_aclose(self) -> None: + handler = DefaultHttpRequestHandler() + # Inject a MockTransport-backed client into the owned slot and verify + # aclose() releases it. Avoids real network access. + owned = httpx.AsyncClient(transport=httpx.MockTransport(lambda r: httpx.Response(200, text="ok"))) + handler._owned_client = owned + assert not owned.is_closed + await handler.aclose() + assert owned.is_closed + + @pytest.mark.asyncio + async def test_caller_owned_client_is_not_closed(self) -> None: + client = httpx.AsyncClient(transport=httpx.MockTransport(lambda r: httpx.Response(200, text="ok"))) + handler = DefaultHttpRequestHandler(client=client) + await handler.send(HttpRequestInfo(method="GET", url="https://api.example.test/x")) + await handler.aclose() + assert not client.is_closed + await client.aclose() # cleanup + + @pytest.mark.asyncio + async def test_concurrent_first_send_creates_single_owned_client(self) -> None: + """Concurrent first-send calls must not race-leak duplicate clients. + + Without the lock, two concurrent calls on a fresh handler would each + observe ``_owned_client is None`` and create their own + ``httpx.AsyncClient``, orphaning one. Verify that lazy initialization + is serialized: all concurrent sends end up using the same client and + ``aclose()`` cleanly closes it. + """ + import asyncio + + # Patch httpx.AsyncClient to count constructions, but only when called + # from inside _resolve_client (no transport=) so we don't break the + # MockTransport-backed clients used elsewhere. + original_ctor = httpx.AsyncClient + construction_count = 0 + + def counting_ctor(*args, **kwargs): # type: ignore[no-untyped-def] + nonlocal construction_count + if not args and not kwargs: + construction_count += 1 + return original_ctor(transport=httpx.MockTransport(lambda r: httpx.Response(200, text="ok"))) + return original_ctor(*args, **kwargs) + + import agent_framework_declarative._workflows._http_handler as hh + + hh.httpx.AsyncClient = counting_ctor # type: ignore[assignment] + try: + handler = DefaultHttpRequestHandler() + try: + await asyncio.gather(*[ + handler.send(HttpRequestInfo(method="GET", url="https://api.example.test/x")) for _ in range(8) + ]) + finally: + await handler.aclose() + finally: + hh.httpx.AsyncClient = original_ctor # type: ignore[assignment] + + assert construction_count == 1, ( + f"Expected exactly 1 owned client to be lazily created but got {construction_count}" + ) + + +class TestClientProvider: + @pytest.mark.asyncio + async def test_client_provider_overrides_default(self) -> None: + captured: dict[str, str] = {} + + def primary(request: httpx.Request) -> httpx.Response: + captured["transport"] = "primary" + return httpx.Response(200, text="primary") + + def provided(request: httpx.Request) -> httpx.Response: + captured["transport"] = "provided" + return httpx.Response(200, text="provided") + + primary_client = httpx.AsyncClient(transport=httpx.MockTransport(primary)) + provided_client = httpx.AsyncClient(transport=httpx.MockTransport(provided)) + + async def provider(info: HttpRequestInfo) -> httpx.AsyncClient: + return provided_client + + handler = DefaultHttpRequestHandler(client=primary_client, client_provider=provider) + try: + result = await handler.send(HttpRequestInfo(method="GET", url="https://api.example.test/x")) + assert result.body == "provided" + assert captured["transport"] == "provided" + finally: + await handler.aclose() + await primary_client.aclose() + await provided_client.aclose() + + @pytest.mark.asyncio + async def test_client_provider_returning_none_falls_back(self) -> None: + captured: dict[str, str] = {} + + def primary(request: httpx.Request) -> httpx.Response: + captured["transport"] = "primary" + return httpx.Response(200, text="primary") + + async def provider(info: HttpRequestInfo) -> httpx.AsyncClient | None: + return None + + primary_client = httpx.AsyncClient(transport=httpx.MockTransport(primary)) + handler = DefaultHttpRequestHandler(client=primary_client, client_provider=provider) + try: + result = await handler.send(HttpRequestInfo(method="GET", url="https://api.example.test/x")) + assert result.body == "primary" + finally: + await handler.aclose() + await primary_client.aclose() + + +class TestValidation: + @pytest.mark.asyncio + async def test_empty_url_raises(self) -> None: + handler = DefaultHttpRequestHandler() + with pytest.raises(ValueError): + await handler.send(HttpRequestInfo(method="GET", url="")) + + @pytest.mark.asyncio + async def test_empty_method_raises(self) -> None: + handler = DefaultHttpRequestHandler() + with pytest.raises(ValueError): + await handler.send(HttpRequestInfo(method="", url="https://x.test/")) + + +class TestAsyncContextManager: + @pytest.mark.asyncio + async def test_context_manager_closes_owned_client(self) -> None: + async with DefaultHttpRequestHandler() as handler: + owned = httpx.AsyncClient(transport=httpx.MockTransport(lambda r: httpx.Response(200, text="ok"))) + handler._owned_client = owned + assert owned.is_closed diff --git a/python/packages/declarative/tests/test_http_request_executor.py b/python/packages/declarative/tests/test_http_request_executor.py new file mode 100644 index 0000000000..4030cf4294 --- /dev/null +++ b/python/packages/declarative/tests/test_http_request_executor.py @@ -0,0 +1,645 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for HttpRequestActionExecutor. + +These tests use a stub HttpRequestHandler that returns canned HttpRequestResults. +No real network or httpx transports are exercised. See +test_default_http_request_handler.py for tests that exercise the real +DefaultHttpRequestHandler against httpx.MockTransport. +""" + +from __future__ import annotations + +import asyncio +import sys +from typing import Any + +import httpx +import pytest + +try: + import powerfx # noqa: F401 + + _powerfx_available = True +except (ImportError, RuntimeError): + _powerfx_available = False + +pytestmark = pytest.mark.skipif( + not _powerfx_available or sys.version_info >= (3, 14), + reason="PowerFx engine not available (requires dotnet runtime)", +) + +from agent_framework_declarative._workflows import ( # noqa: E402 + DECLARATIVE_STATE_KEY, + DeclarativeActionError, + DeclarativeWorkflowError, + HttpRequestHandler, + HttpRequestInfo, + HttpRequestResult, + WorkflowFactory, +) + + +class StubHandler: + """Test stub that records the last call and returns a canned result.""" + + def __init__( + self, + result: HttpRequestResult | None = None, + *, + raise_exc: BaseException | None = None, + ) -> None: + self.result = result + self.raise_exc = raise_exc + self.last_info: HttpRequestInfo | None = None + self.call_count = 0 + + async def send(self, info: HttpRequestInfo) -> HttpRequestResult: + self.call_count += 1 + self.last_info = info + if self.raise_exc is not None: + raise self.raise_exc + assert self.result is not None + return self.result + + +def _ok(body: str = "", headers: dict[str, list[str]] | None = None) -> HttpRequestResult: + return HttpRequestResult( + status_code=200, + is_success_status_code=True, + body=body, + headers=headers or {}, + ) + + +def _err(status: int = 500, body: str = "", headers: dict[str, list[str]] | None = None) -> HttpRequestResult: + return HttpRequestResult( + status_code=status, + is_success_status_code=False, + body=body, + headers=headers or {}, + ) + + +async def _run(yaml_def: dict[str, Any], handler: HttpRequestHandler) -> Any: + """Build & run a workflow, returning final WorkflowState.""" + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(yaml_def) + return await workflow.run({}) + + +def _state(workflow: Any, events: Any) -> dict[str, Any]: + """Read declarative state out of the workflow after run completes.""" + return workflow._state.get(DECLARATIVE_STATE_KEY) or {} + + +# Helper used by parametrised path tests +_TEST_URL = "https://api.example.test/items" + + +def _action( + *, + method: str | None = None, + url: str = _TEST_URL, + headers: dict[str, Any] | None = None, + query_parameters: dict[str, Any] | None = None, + body: dict[str, Any] | None = None, + response: Any = None, + response_headers: Any = None, + conversation_id: str | None = None, + request_timeout_ms: int | None = None, + connection: dict[str, Any] | None = None, +) -> dict[str, Any]: + action: dict[str, Any] = { + "kind": "HttpRequestAction", + "id": "http_action", + "url": url, + } + if method is not None: + action["method"] = method + if headers is not None: + action["headers"] = headers + if query_parameters is not None: + action["queryParameters"] = query_parameters + if body is not None: + action["body"] = body + if response is not None: + action["response"] = response + if response_headers is not None: + action["responseHeaders"] = response_headers + if conversation_id is not None: + action["conversationId"] = conversation_id + if request_timeout_ms is not None: + action["requestTimeoutInMilliseconds"] = request_timeout_ms + if connection is not None: + action["connection"] = connection + return action + + +def _yaml(action: dict[str, Any]) -> dict[str, Any]: + return {"name": "http_test", "actions": [action]} + + +# ---------- Success path: response parsing ---------------------------------- + + +class TestSuccessPath: + @pytest.mark.asyncio + async def test_get_parses_json_object(self) -> None: + handler = StubHandler(_ok('{"key":"value","number":42}')) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(method="GET", response="Local.Result"))) + await workflow.run({}) + + decl = workflow._state.get(DECLARATIVE_STATE_KEY) + assert decl["Local"]["Result"] == {"key": "value", "number": 42} + assert handler.last_info is not None + assert handler.last_info.method == "GET" + assert handler.last_info.url == _TEST_URL + + @pytest.mark.asyncio + async def test_get_parses_plain_string(self) -> None: + handler = StubHandler(_ok("not-json content")) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(response="Local.Result"))) + await workflow.run({}) + + decl = workflow._state.get(DECLARATIVE_STATE_KEY) + assert decl["Local"]["Result"] == "not-json content" + + @pytest.mark.asyncio + async def test_get_empty_body_yields_none(self) -> None: + handler = StubHandler(_ok("")) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(response="Local.Result"))) + await workflow.run({}) + + decl = workflow._state.get(DECLARATIVE_STATE_KEY) + assert decl["Local"]["Result"] is None + + @pytest.mark.asyncio + async def test_response_object_form_path(self) -> None: + handler = StubHandler(_ok('{"x":1}')) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(response={"path": "Local.Result"}))) + await workflow.run({}) + + decl = workflow._state.get(DECLARATIVE_STATE_KEY) + assert decl["Local"]["Result"] == {"x": 1} + + @pytest.mark.asyncio + async def test_no_response_path_does_not_assign(self) -> None: + handler = StubHandler(_ok('{"x":1}')) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action())) + # Should complete without error and without writing anything + await workflow.run({}) + + +# ---------- Method / headers / query params -------------------------------- + + +class TestRequestComposition: + @pytest.mark.asyncio + async def test_default_method_is_get(self) -> None: + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action())) + await workflow.run({}) + + assert handler.last_info is not None + assert handler.last_info.method == "GET" + + @pytest.mark.asyncio + async def test_method_uppercased(self) -> None: + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(method="post"))) + await workflow.run({}) + + assert handler.last_info is not None + assert handler.last_info.method == "POST" + + @pytest.mark.asyncio + async def test_headers_are_forwarded_and_empty_skipped(self) -> None: + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition( + _yaml( + _action( + headers={ + "Accept": "application/json", + "X-Empty": "", + "Authorization": "Bearer token", + } + ) + ) + ) + await workflow.run({}) + + assert handler.last_info is not None + assert handler.last_info.headers == { + "Accept": "application/json", + "Authorization": "Bearer token", + } + + @pytest.mark.asyncio + async def test_query_parameters_stringified(self) -> None: + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition( + _yaml( + _action( + query_parameters={ + "name": "alpha", + "limit": 10, + "active": True, + "ratio": 0.5, + "missing": None, # dropped + } + ) + ) + ) + await workflow.run({}) + + assert handler.last_info is not None + assert handler.last_info.query_parameters == { + "name": "alpha", + "limit": "10", + "active": "true", + "ratio": "0.5", + } + + +# ---------- Body composition ------------------------------------------------ + + +class TestBody: + @pytest.mark.asyncio + async def test_post_json_body_sets_content_type_and_serialises(self) -> None: + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition( + _yaml( + _action( + method="POST", + body={"kind": "json", "content": {"k": "v", "n": 1}}, + ) + ) + ) + await workflow.run({}) + + info = handler.last_info + assert info is not None + assert info.body_content_type == "application/json" + assert info.body is not None + # JSON serialized, key order may vary + import json + + assert json.loads(info.body) == {"k": "v", "n": 1} + + @pytest.mark.asyncio + async def test_post_raw_body_uses_declared_content_type(self) -> None: + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition( + _yaml( + _action( + method="POST", + body={ + "kind": "raw", + "content": "raw body text", + "contentType": "text/plain", + }, + ) + ) + ) + await workflow.run({}) + + info = handler.last_info + assert info is not None + assert info.body == "raw body text" + assert info.body_content_type == "text/plain" + + @pytest.mark.asyncio + async def test_post_raw_body_without_content_type_defaults_to_text_plain(self) -> None: + """Match .NET RawRequestContent: no contentType => default text/plain. + + Otherwise the request is sent without a Content-Type header which most + servers will treat as application/octet-stream and fail to parse. + """ + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition( + _yaml( + _action( + method="POST", + body={"kind": "raw", "content": "plain body"}, + ) + ) + ) + await workflow.run({}) + + info = handler.last_info + assert info is not None + assert info.body == "plain body" + assert info.body_content_type == "text/plain" + + @pytest.mark.asyncio + async def test_long_form_body_kinds_accepted(self) -> None: + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition( + _yaml( + _action( + method="POST", + body={"kind": "JsonRequestContent", "content": {"k": 1}}, + ) + ) + ) + await workflow.run({}) + info = handler.last_info + assert info is not None + assert info.body_content_type == "application/json" + + @pytest.mark.asyncio + async def test_unknown_body_kind_raises(self) -> None: + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(body={"kind": "weirdform", "content": "x"}))) + with pytest.raises(Exception) as excinfo: + await workflow.run({}) + # Should surface as ValueError (potentially wrapped by runner) + msg = str(excinfo.value) + assert "weirdform" in msg or "unsupported value" in msg + + @pytest.mark.asyncio + async def test_no_body_omitted(self) -> None: + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action())) + await workflow.run({}) + info = handler.last_info + assert info is not None + assert info.body is None + assert info.body_content_type is None + + +# ---------- Non-2xx and error handling ------------------------------------- + + +class TestErrorHandling: + @pytest.mark.asyncio + async def test_non_2xx_raises_declarative_action_error(self) -> None: + handler = StubHandler(_err(status=500, body="server exploded")) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action())) + with pytest.raises(DeclarativeActionError) as excinfo: + await workflow.run({}) + msg = str(excinfo.value) + assert "500" in msg + assert "server exploded" in msg + + @pytest.mark.asyncio + async def test_non_2xx_long_body_truncated(self) -> None: + big_body = "A" * 1000 + handler = StubHandler(_err(status=500, body=big_body)) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action())) + with pytest.raises(DeclarativeActionError) as excinfo: + await workflow.run({}) + msg = str(excinfo.value) + assert "[truncated]" in msg + assert len(msg) < 512 + # Should NOT contain the full 1000-char body + assert big_body not in msg + + @pytest.mark.asyncio + async def test_non_2xx_empty_body_omits_body_section(self) -> None: + handler = StubHandler(_err(status=404, body="")) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action())) + with pytest.raises(DeclarativeActionError) as excinfo: + await workflow.run({}) + msg = str(excinfo.value) + assert "404" in msg + assert "Body:" not in msg + + @pytest.mark.asyncio + async def test_non_2xx_control_chars_collapsed(self) -> None: + handler = StubHandler(_err(status=500, body="line1\r\nline2\tlong")) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action())) + with pytest.raises(DeclarativeActionError) as excinfo: + await workflow.run({}) + msg = str(excinfo.value) + assert "\r" not in msg + assert "\n" not in msg + assert "\t" not in msg + assert "line1 line2 long" in msg + + @pytest.mark.asyncio + async def test_timeout_exception_becomes_declarative_action_error(self) -> None: + handler = StubHandler(raise_exc=httpx.TimeoutException("timeout")) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action())) + with pytest.raises(DeclarativeActionError) as excinfo: + await workflow.run({}) + assert "timed out" in str(excinfo.value) + + @pytest.mark.asyncio + async def test_stdlib_timeout_error_becomes_declarative_action_error(self) -> None: + handler = StubHandler(raise_exc=TimeoutError("clock")) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action())) + with pytest.raises(DeclarativeActionError) as excinfo: + await workflow.run({}) + assert "timed out" in str(excinfo.value) + + @pytest.mark.asyncio + async def test_transport_error_becomes_declarative_action_error(self) -> None: + handler = StubHandler(raise_exc=httpx.ConnectError("dns failure")) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action())) + with pytest.raises(DeclarativeActionError) as excinfo: + await workflow.run({}) + msg = str(excinfo.value) + assert "failed" in msg + assert _TEST_URL in msg + + @pytest.mark.asyncio + async def test_cancelled_error_propagates_unchanged(self) -> None: + """CancelledError from the handler must propagate so cancellation works.""" + handler = StubHandler(raise_exc=asyncio.CancelledError()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action())) + # CancelledError is allowed to surface as either CancelledError or as + # the runner's wrapped form, but it MUST NOT be DeclarativeActionError. + with pytest.raises(BaseException) as excinfo: + await workflow.run({}) + assert not isinstance(excinfo.value, DeclarativeActionError) + + @pytest.mark.asyncio + async def test_generic_exception_from_custom_handler_wrapped(self) -> None: + """A custom handler raising a non-httpx Exception must be wrapped. + + Authors can plug in custom HttpRequestHandler implementations that use + any transport (requests-like clients, gRPC bridges, mock test doubles, + etc.). The executor must wrap arbitrary Exception subclasses uniformly + so that workflow error handling stays consistent across transports. + """ + handler = StubHandler(raise_exc=RuntimeError("custom transport blew up")) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action())) + with pytest.raises(DeclarativeActionError) as excinfo: + await workflow.run({}) + msg = str(excinfo.value) + assert "failed" in msg + assert "RuntimeError" in msg + assert _TEST_URL in msg + + +# ---------- Response headers ------------------------------------------------ + + +class TestResponseHeaders: + @pytest.mark.asyncio + async def test_response_headers_folded_with_commas(self) -> None: + handler = StubHandler( + _ok( + "ok", + headers={ + "Content-Type": ["application/json"], + "Set-Cookie": ["a=1", "b=2"], + }, + ) + ) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(response_headers="Local.H"))) + await workflow.run({}) + decl = workflow._state.get(DECLARATIVE_STATE_KEY) + h = decl["Local"]["H"] + assert h["Content-Type"] == "application/json" + assert h["Set-Cookie"] == "a=1,b=2" + + @pytest.mark.asyncio + async def test_response_headers_empty_assigned_none(self) -> None: + handler = StubHandler(_ok("ok", headers={})) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(response_headers="Local.H"))) + await workflow.run({}) + decl = workflow._state.get(DECLARATIVE_STATE_KEY) + assert decl["Local"]["H"] is None + + @pytest.mark.asyncio + async def test_non_2xx_still_publishes_headers(self) -> None: + handler = StubHandler(_err(status=500, body="boom", headers={"X-Trace": ["abc"]})) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(response_headers="Local.H"))) + with pytest.raises(DeclarativeActionError): + await workflow.run({}) + decl = workflow._state.get(DECLARATIVE_STATE_KEY) + assert decl["Local"]["H"] == {"X-Trace": "abc"} + + +# ---------- ConversationId append ------------------------------------------- + + +class TestConversationAppend: + @pytest.mark.asyncio + async def test_conversation_id_appends_message(self) -> None: + handler = StubHandler(_ok('{"answer":"hello"}')) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition( + _yaml( + _action( + response="Local.Result", + conversation_id="conv-test-1", + ) + ) + ) + await workflow.run({}) + decl = workflow._state.get(DECLARATIVE_STATE_KEY) + conv = decl["System"]["conversations"].get("conv-test-1") + assert conv is not None + assert len(conv["messages"]) == 1 + + @pytest.mark.asyncio + async def test_empty_conversation_id_does_not_append(self) -> None: + handler = StubHandler(_ok('{"answer":"hello"}')) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(response="Local.Result", conversation_id=""))) + await workflow.run({}) + decl = workflow._state.get(DECLARATIVE_STATE_KEY) + # Auto-init creates an entry for the System.ConversationId conversation, + # but it should NOT have HTTP-appended messages from us. + for _cid, conv in decl["System"]["conversations"].items(): + assert conv["messages"] == [] + + @pytest.mark.asyncio + async def test_empty_body_skips_conversation_append(self) -> None: + handler = StubHandler(_ok("")) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(conversation_id="conv-test-1"))) + await workflow.run({}) + decl = workflow._state.get(DECLARATIVE_STATE_KEY) + # No conversation entry should have been created either. + assert "conv-test-1" not in decl["System"]["conversations"] + + +# ---------- Connection name ------------------------------------------------- + + +class TestConnection: + @pytest.mark.asyncio + async def test_connection_name_forwarded(self) -> None: + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(connection={"name": "my-connection"}))) + await workflow.run({}) + assert handler.last_info is not None + assert handler.last_info.connection_name == "my-connection" + + +# ---------- Build-time validation ------------------------------------------- + + +class TestBuildTimeValidation: + def test_missing_url_fails_validation(self) -> None: + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + bad = { + "name": "no_url", + "actions": [{"kind": "HttpRequestAction", "id": "x"}], + } + with pytest.raises(DeclarativeWorkflowError): + factory.create_workflow_from_definition(bad) + + def test_missing_handler_fails_at_build(self) -> None: + factory = WorkflowFactory() # no handler + with pytest.raises(DeclarativeWorkflowError) as excinfo: + factory.create_workflow_from_definition(_yaml(_action())) + assert "http_request_handler" in str(excinfo.value) + + +# ---------- Timeout forwarding ---------------------------------------------- + + +class TestTimeout: + @pytest.mark.asyncio + async def test_timeout_ms_forwarded(self) -> None: + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(request_timeout_ms=2500))) + await workflow.run({}) + assert handler.last_info is not None + assert handler.last_info.timeout_ms == 2500 + + @pytest.mark.asyncio + async def test_timeout_ms_zero_treated_as_unset(self) -> None: + handler = StubHandler(_ok()) + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_definition(_yaml(_action(request_timeout_ms=0))) + await workflow.run({}) + assert handler.last_info is not None + assert handler.last_info.timeout_ms is None diff --git a/python/packages/declarative/tests/test_http_request_yaml_integration.py b/python/packages/declarative/tests/test_http_request_yaml_integration.py new file mode 100644 index 0000000000..49cd0d15e8 --- /dev/null +++ b/python/packages/declarative/tests/test_http_request_yaml_integration.py @@ -0,0 +1,111 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""End-to-end YAML integration test for ``HttpRequestAction``. + +Loads the ``tests/workflows/http_request.yaml`` fixture (parity with the .NET +integration fixture) through ``WorkflowFactory.create_workflow_from_yaml_path`` +with a stub :class:`HttpRequestHandler` and asserts state is populated. +""" + +from __future__ import annotations + +import sys +from pathlib import Path +from typing import Any + +import pytest + +try: + import powerfx # noqa: F401 + + _powerfx_available = True +except (ImportError, RuntimeError): + _powerfx_available = False + +pytestmark = [ + pytest.mark.skipif( + not _powerfx_available, + reason="powerfx not available — declarative workflows require it.", + ), + pytest.mark.skipif( + sys.version_info >= (3, 14), + reason="Skipped on Python 3.14+ to keep parity with declarative suite.", + ), +] + +from agent_framework_declarative import WorkflowFactory # noqa: E402 +from agent_framework_declarative._workflows import DECLARATIVE_STATE_KEY # noqa: E402 +from agent_framework_declarative._workflows._http_handler import ( # noqa: E402 + HttpRequestInfo, + HttpRequestResult, +) + +FIXTURE_PATH = Path(__file__).parent / "workflows" / "http_request.yaml" + + +class _StubHandler: + """Test double that records requests and returns a canned response.""" + + def __init__(self, result: HttpRequestResult) -> None: + self._result = result + self.received: list[HttpRequestInfo] = [] + + async def send(self, info: HttpRequestInfo) -> HttpRequestResult: + self.received.append(info) + return self._result + + +@pytest.mark.asyncio +async def test_http_request_yaml_roundtrip() -> None: + handler = _StubHandler( + HttpRequestResult( + status_code=200, + is_success_status_code=True, + body='{"name": "runtime", "visibility": "public", "stars": 12345}', + headers={ + "content-type": ["application/json"], + "x-ratelimit-remaining": ["59"], + }, + ) + ) + + factory = WorkflowFactory(http_request_handler=handler) + workflow = factory.create_workflow_from_yaml_path(FIXTURE_PATH) + await workflow.run({}) + + decl: dict[str, Any] = workflow._state.get(DECLARATIVE_STATE_KEY) or {} + local = decl.get("Local") or {} + + assert local.get("RepoOwner") == "dotnet" + repo_info = local.get("RepoInfo") + assert isinstance(repo_info, dict), f"Expected dict body, got {type(repo_info)!r}" + assert repo_info["name"] == "runtime" + assert repo_info["visibility"] == "public" + assert repo_info["stars"] == 12345 + + repo_headers = local.get("RepoHeaders") + assert isinstance(repo_headers, dict) + # Single-value header surfaces as plain string. + assert repo_headers.get("content-type") == "application/json" + assert repo_headers.get("x-ratelimit-remaining") == "59" + + # Stub got the right call. + assert len(handler.received) == 1 + sent = handler.received[0] + assert sent.method == "GET" + assert sent.url == "https://api.github.com/repos/dotnet/runtime" + assert sent.headers["Accept"] == "application/vnd.github+json" + assert sent.headers["User-Agent"] == "agent-framework-integration-test" + + +@pytest.mark.asyncio +async def test_http_request_yaml_missing_handler_fails_at_build_time() -> None: + """Without an http_request_handler, building the workflow must raise.""" + from agent_framework_declarative._workflows._errors import DeclarativeWorkflowError + + factory = WorkflowFactory() # no handler configured + with pytest.raises(DeclarativeWorkflowError) as excinfo: + factory.create_workflow_from_yaml_path(FIXTURE_PATH) + msg = str(excinfo.value) + assert "HttpRequestAction" in msg + assert "http_request_handler" in msg diff --git a/python/packages/declarative/tests/test_workflow_factory.py b/python/packages/declarative/tests/test_workflow_factory.py index e313f78799..720bca3498 100644 --- a/python/packages/declarative/tests/test_workflow_factory.py +++ b/python/packages/declarative/tests/test_workflow_factory.py @@ -4,10 +4,8 @@ import pytest -from agent_framework_declarative._workflows._factory import ( - DeclarativeWorkflowError, - WorkflowFactory, -) +from agent_framework_declarative._workflows._errors import DeclarativeWorkflowError +from agent_framework_declarative._workflows._factory import WorkflowFactory try: import powerfx # noqa: F401 diff --git a/python/packages/declarative/tests/workflows/http_request.yaml b/python/packages/declarative/tests/workflows/http_request.yaml new file mode 100644 index 0000000000..382d3fafe2 --- /dev/null +++ b/python/packages/declarative/tests/workflows/http_request.yaml @@ -0,0 +1,29 @@ +# +# Integration fixture: end-to-end HttpRequestAction round-trip using a +# stub HttpRequestHandler. Mirrors the .NET integration fixture in +# dotnet/tests/.../Workflows/HttpRequest.yaml. +# +kind: Workflow +trigger: + + kind: OnConversationStart + id: workflow_http_request_test + actions: + + # Set the repo owner used to form the request URL. + - kind: SetVariable + id: set_repo_owner + variable: Local.RepoOwner + value: dotnet + + # Invoke the (stubbed) GitHub repo API. + - kind: HttpRequestAction + id: fetch_repo_info + conversationId: =System.ConversationId + method: GET + url: =Concatenate("https://api.github.com/repos/", Local.RepoOwner, "/runtime") + headers: + Accept: application/vnd.github+json + User-Agent: agent-framework-integration-test + response: Local.RepoInfo + responseHeaders: Local.RepoHeaders diff --git a/python/samples/03-workflows/declarative/invoke_http_request/main.py b/python/samples/03-workflows/declarative/invoke_http_request/main.py new file mode 100644 index 0000000000..ebcbcc0a16 --- /dev/null +++ b/python/samples/03-workflows/declarative/invoke_http_request/main.py @@ -0,0 +1,97 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Invoke HTTP Request sample - demonstrates the HttpRequestAction declarative action. + +This sample shows how to: + 1. Configure a ``WorkflowFactory`` with a ``HttpRequestHandler`` so the YAML + ``HttpRequestAction`` can dispatch real HTTP calls. + 2. Fetch JSON from a public REST endpoint (the GitHub repository API) and + bind the parsed response to a workflow variable. + 3. Mirror the response body into the conversation via ``conversationId`` so + a downstream Foundry agent can answer questions about it using only that + conversation context. + +Security note: + ``DefaultHttpRequestHandler`` issues HTTP calls to whatever URL the + workflow author specifies and performs **no** allowlisting or SSRF + guards. For production use, replace it with a custom handler that + enforces an allowlist or DNS-rebinding-resistant policy and adds any + required authentication headers per call. + +Run with: + python -m samples.03-workflows.declarative.invoke_http_request.main +""" + +import asyncio +import os +from pathlib import Path + +from agent_framework import Agent +from agent_framework.declarative import ( + DefaultHttpRequestHandler, + WorkflowFactory, +) +from agent_framework.foundry import FoundryChatClient +from azure.identity import AzureCliCredential + +GITHUB_REPO_INFO_AGENT_INSTRUCTIONS = """\ +You answer the user's question about a GitHub repository using ONLY the JSON +data already present in the conversation history. If the answer is not +contained in the conversation, say so plainly rather than guessing. Be concise +and helpful. +""" + + +async def main() -> None: + """Run the invoke HTTP request workflow.""" + chat_client = FoundryChatClient( + project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"], + model=os.environ["FOUNDRY_MODEL"], + credential=AzureCliCredential(), + ) + + # The agent has no tools — it answers the question about the GitHub + # repository using only the JSON data that ``HttpRequestAction`` adds to + # the conversation. + github_repo_info_agent = Agent( + client=chat_client, + name="GitHubRepoInfoAgent", + instructions=GITHUB_REPO_INFO_AGENT_INSTRUCTIONS, + ) + + agents = {"GitHubRepoInfoAgent": github_repo_info_agent} + + # The default HttpRequestHandler is sufficient for this sample because + # the GitHub REST endpoint used here does not require authentication. + # For authenticated endpoints, supply a custom client_provider callback + # to DefaultHttpRequestHandler so each request can be routed through a + # pre-configured httpx.AsyncClient with the appropriate credentials. + async with DefaultHttpRequestHandler() as http_handler: + factory = WorkflowFactory( + agents=agents, + http_request_handler=http_handler, + ) + + workflow_path = Path(__file__).parent / "workflow.yaml" + workflow = factory.create_workflow_from_yaml_path(workflow_path) + + print("=" * 60) + print("Invoke HTTP Request Workflow Demo") + print("=" * 60) + print() + print("Ask one question about the microsoft/agent-framework repo.") + print() + + user_input = input("You: ").strip() # noqa: ASYNC250 + if not user_input: + user_input = "Please summarize the repository." + + print("\nAgent: ", end="", flush=True) + async for event in workflow.run(user_input, stream=True): + if event.type == "output" and isinstance(event.data, str): + print(event.data, end="", flush=True) + print() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/03-workflows/declarative/invoke_http_request/workflow.yaml b/python/samples/03-workflows/declarative/invoke_http_request/workflow.yaml new file mode 100644 index 0000000000..f1bfd223bb --- /dev/null +++ b/python/samples/03-workflows/declarative/invoke_http_request/workflow.yaml @@ -0,0 +1,57 @@ +# +# This workflow demonstrates the HttpRequestAction declarative action. +# +# HttpRequestAction lets a workflow author issue an HTTP call directly from +# YAML without writing any Python glue. It can: +# +# - fetch data from external REST endpoints, +# - store the parsed response in a workflow variable, and +# - add the response body to the conversation so a downstream agent can +# answer questions based on it. +# +# This sample fetches public metadata for the microsoft/agent-framework +# repository from the GitHub REST API (no authentication required) and uses +# a Foundry agent to answer a single question about it. +# +# Example input: +# How many open issues does the repository have? +# +kind: Workflow +trigger: + + kind: OnConversationStart + id: workflow_invoke_http_request_demo + actions: + + # Set the repository org/name used to form the request URL. + - kind: SetVariable + id: set_repo_name + variable: Local.RepoName + value: microsoft/agent-framework + + # Invoke the GitHub repo API. The response body is parsed into + # Local.RepoInfo and also added to the conversation (via conversationId) + # so the agent below can answer questions based on it. + - kind: HttpRequestAction + id: fetch_repo_info + conversationId: =System.ConversationId + method: GET + url: =Concatenate("https://api.github.com/repos/", Local.RepoName) + headers: + Accept: application/vnd.github+json + User-Agent: agent-framework-sample + response: Local.RepoInfo + + # Use the agent to answer the user's question using the conversation + # context (which now contains the GitHub JSON response). The user's + # original message is already in the conversation as System.LastMessage, + # and the executor's input fallback chain extracts its ``Text`` field + # automatically when ``input.messages`` is omitted. + - kind: InvokeAzureAgent + id: answer_question + conversationId: =System.ConversationId + agent: + name: GitHubRepoInfoAgent + output: + autoSend: true + messages: Local.AgentResponse diff --git a/python/uv.lock b/python/uv.lock index 12c5f39b27..85a968174a 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -428,6 +428,7 @@ version = "1.0.0b260429" source = { editable = "packages/declarative" } dependencies = [ { name = "agent-framework-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, { name = "powerfx", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, { name = "pyyaml", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, ] @@ -440,6 +441,7 @@ dev = [ [package.metadata] requires-dist = [ { name = "agent-framework-core", editable = "packages/core" }, + { name = "httpx", specifier = ">=0.27,<1" }, { name = "powerfx", marker = "python_full_version < '3.14'", specifier = ">=0.0.32,<0.0.35" }, { name = "pyyaml", specifier = ">=6.0,<7.0" }, ]