From fbd0c566c28f54d9472011e44ce2de421669134f Mon Sep 17 00:00:00 2001 From: Evan Mattson <35585003+moonbox3@users.noreply.github.com> Date: Fri, 12 Sep 2025 11:20:04 +0900 Subject: [PATCH 01/11] Python: Add Concurrent orchestration builder support. Samples. Tests. (#683) * Add Concurrent orchestration builder support. Samples. Tests. * Add sample output * Add better docstrings * PR feedback * PR feedback --- .../main/agent_framework/workflow/__init__.py | 1 + .../agent_framework/workflow/__init__.pyi | 2 + .../agent_framework_workflow/__init__.py | 2 + .../agent_framework_workflow/_concurrent.py | 304 ++++++++++++++++++ .../workflow/tests/test_concurrent.py | 126 ++++++++ .../getting_started/workflow/README.md | 3 + .../orchestration/concurrent_agents.py | 131 ++++++++ .../concurrent_custom_agent_executors.py | 175 ++++++++++ .../concurrent_custom_aggregator.py | 125 +++++++ 9 files changed, 869 insertions(+) create mode 100644 python/packages/workflow/agent_framework_workflow/_concurrent.py create mode 100644 python/packages/workflow/tests/test_concurrent.py create mode 100644 python/samples/getting_started/workflow/orchestration/concurrent_agents.py create mode 100644 python/samples/getting_started/workflow/orchestration/concurrent_custom_agent_executors.py create mode 100644 python/samples/getting_started/workflow/orchestration/concurrent_custom_aggregator.py diff --git a/python/packages/main/agent_framework/workflow/__init__.py b/python/packages/main/agent_framework/workflow/__init__.py index db8bccbcb2..cb8cd56fcf 100644 --- a/python/packages/main/agent_framework/workflow/__init__.py +++ b/python/packages/main/agent_framework/workflow/__init__.py @@ -56,6 +56,7 @@ _IMPORTS = [ "PlanReviewRequest", "RequestInfoEvent", "StandardMagenticManager", + "ConcurrentBuilder", ] diff --git a/python/packages/main/agent_framework/workflow/__init__.pyi b/python/packages/main/agent_framework/workflow/__init__.pyi index 63f301775f..ea88728525 100644 --- a/python/packages/main/agent_framework/workflow/__init__.pyi +++ b/python/packages/main/agent_framework/workflow/__init__.pyi @@ -8,6 +8,7 @@ from agent_framework_workflow import ( AgentRunUpdateEvent, Case, CheckpointStorage, + ConcurrentBuilder, Default, Executor, ExecutorCompletedEvent, @@ -56,6 +57,7 @@ __all__ = [ "AgentRunUpdateEvent", "Case", "CheckpointStorage", + "ConcurrentBuilder", "Default", "Executor", "ExecutorCompletedEvent", diff --git a/python/packages/workflow/agent_framework_workflow/__init__.py b/python/packages/workflow/agent_framework_workflow/__init__.py index 25fe4591de..5d3e1352fb 100644 --- a/python/packages/workflow/agent_framework_workflow/__init__.py +++ b/python/packages/workflow/agent_framework_workflow/__init__.py @@ -9,6 +9,7 @@ from ._checkpoint import ( InMemoryCheckpointStorage, WorkflowCheckpoint, ) +from ._concurrent import ConcurrentBuilder from ._const import ( DEFAULT_MAX_ITERATIONS, ) @@ -93,6 +94,7 @@ __all__ = [ "AgentRunUpdateEvent", "Case", "CheckpointStorage", + "ConcurrentBuilder", "Default", "EdgeDuplicationError", "Executor", diff --git a/python/packages/workflow/agent_framework_workflow/_concurrent.py b/python/packages/workflow/agent_framework_workflow/_concurrent.py new file mode 100644 index 0000000000..14feb6bc8e --- /dev/null +++ b/python/packages/workflow/agent_framework_workflow/_concurrent.py @@ -0,0 +1,304 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import inspect +import logging +from collections.abc import Callable, Sequence +from typing import Any + +from agent_framework import AgentProtocol, ChatMessage, Role + +from ._events import WorkflowCompletedEvent +from ._executor import AgentExecutorRequest, AgentExecutorResponse, Executor, handler +from ._workflow import Workflow, WorkflowBuilder +from ._workflow_context import WorkflowContext + +logger = logging.getLogger(__name__) + +"""Concurrent builder for agent-only fan-out/fan-in workflows. + +This module provides a high-level, agent-focused API to quickly assemble a +parallel workflow with: +- a default dispatcher that broadcasts the input to all agent participants +- a default aggregator that combines all agent conversations and completes the workflow + +Notes: +- Participants should be AgentProtocol instances or Executors. +- A custom aggregator can be provided as: + - an Executor instance (it should handle list[AgentExecutorResponse] and add a WorkflowCompletedEvent), or + - a callback function with signature: + def cb(results: list[AgentExecutorResponse]) -> Any | None + def cb(results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None + If the callback returns a non-None value, it is sent as the data of a WorkflowCompletedEvent. + If it returns None, the callback may have already emitted a completion event via ctx. +""" + + +class _DispatchToAllParticipants(Executor): + """Broadcasts input to all downstream participants (via fan-out edges).""" + + @handler + async def from_request(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorRequest]) -> None: + # No explicit target: edge routing delivers to all connected participants. + await ctx.send_message(request) + + @handler + async def from_str(self, prompt: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None: + request = AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=prompt)], should_respond=True) + await ctx.send_message(request) + + @handler + async def from_message(self, message: ChatMessage, ctx: WorkflowContext[AgentExecutorRequest]) -> None: # type: ignore[name-defined] + request = AgentExecutorRequest(messages=[message], should_respond=True) + await ctx.send_message(request) + + @handler + async def from_messages(self, messages: list[ChatMessage], ctx: WorkflowContext[AgentExecutorRequest]) -> None: # type: ignore[name-defined] + request = AgentExecutorRequest(messages=list(messages), should_respond=True) + await ctx.send_message(request) + + +class _AggregateAgentConversations(Executor): + """Aggregates agent responses and completes with combined ChatMessages. + + Emits a list[ChatMessage] shaped as: + [ single_user_prompt?, agent1_final_assistant, agent2_final_assistant, ... ] + + - Extracts a single user prompt (first user message seen across results). + - For each result, selects the final assistant message (prefers agent_run_response.messages). + - Avoids duplicating the same user message per agent. + """ + + @handler + async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> None: + if not results: + logger.error("Concurrent aggregator received empty results list") + raise ValueError("Aggregation failed: no results provided") + + def _is_role(msg: Any, role: Role) -> bool: + r = getattr(msg, "role", None) + if r is None: + return False + # Normalize both r and role to lowercase strings for comparison + r_str = str(r).lower() if isinstance(r, str) or hasattr(r, "__str__") else r + role_str = getattr(role, "value", None) + if role_str is None: + role_str = str(role) + role_str = role_str.lower() + return r_str == role_str + + prompt_message: ChatMessage | None = None + assistant_replies: list[ChatMessage] = [] + + for r in results: + resp_messages = list(getattr(r.agent_run_response, "messages", []) or []) + conv = r.full_conversation if r.full_conversation is not None else resp_messages + + logger.debug( + f"Aggregating executor {getattr(r, 'executor_id', '')}: " + f"{len(resp_messages)} response msgs, {len(conv)} conversation msgs" + ) + + # Capture a single user prompt (first encountered across any conversation) + if prompt_message is None: + found_user = next((m for m in conv if _is_role(m, Role.USER)), None) + if found_user is not None: + prompt_message = found_user + + # Pick the final assistant message from the response; fallback to conversation search + final_assistant = next((m for m in reversed(resp_messages) if _is_role(m, Role.ASSISTANT)), None) + if final_assistant is None: + final_assistant = next((m for m in reversed(conv) if _is_role(m, Role.ASSISTANT)), None) + + if final_assistant is not None: + assistant_replies.append(final_assistant) + else: + logger.warning( + f"No assistant reply found for executor {getattr(r, 'executor_id', '')}; skipping" + ) + + if not assistant_replies: + logger.error(f"Aggregation failed: no assistant replies found across {len(results)} results") + raise RuntimeError("Aggregation failed: no assistant replies found") + + output: list[ChatMessage] = [] + if prompt_message is not None: + output.append(prompt_message) + else: + logger.warning("No user prompt found in any conversation; emitting assistants only") + output.extend(assistant_replies) + + await ctx.add_event(WorkflowCompletedEvent(data=output)) + + +class _CallbackAggregator(Executor): + """Wraps a Python callback as an aggregator. + + Accepts either an async or sync callback with one of the signatures: + - (results: list[AgentExecutorResponse]) -> Any | None + - (results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None + + Notes: + - Async callbacks are awaited directly. + - Sync callbacks are executed via asyncio.to_thread to avoid blocking the event loop. + - If the callback returns a non-None value, it is wrapped in a WorkflowCompletedEvent. + """ + + def __init__(self, callback: Callable[..., Any], id: str | None = None) -> None: + super().__init__(id) + self._callback = callback + self._param_count = len(inspect.signature(callback).parameters) + + @handler + async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> None: + # Call according to provided signature, always non-blocking for sync callbacks + if self._param_count >= 2: + if inspect.iscoroutinefunction(self._callback): + ret = await self._callback(results, ctx) # type: ignore[misc] + else: + ret = await asyncio.to_thread(self._callback, results, ctx) + else: + if inspect.iscoroutinefunction(self._callback): + ret = await self._callback(results) # type: ignore[misc] + else: + ret = await asyncio.to_thread(self._callback, results) + + # If the callback returned a value, finalize the workflow with it + if ret is not None: + await ctx.add_event(WorkflowCompletedEvent(ret)) + + +class ConcurrentBuilder: + r"""High-level builder for concurrent agent workflows. + + - `participants([...])` accepts a list of AgentProtocol (recommended) or Executor. + - `build()` wires: dispatcher -> fan-out -> participants -> fan-in -> aggregator. + - `with_custom_aggregator(...)` overrides the default aggregator with an Executor or callback. + + Usage: + ```python + from agent_framework.workflow import ConcurrentBuilder + + # Minimal: use default aggregator (returns list[ChatMessage]) + workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build() + + + # Custom aggregator via callback (sync or async). The callback receives + # list[AgentExecutorResponse] and its return value becomes + # WorkflowCompletedEvent.data + def summarize(results): + return " | ".join(r.agent_run_response.messages[-1].text for r in results) + + + workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_custom_aggregator(summarize).build() + ``` + """ + + def __init__(self) -> None: + self._participants: list[AgentProtocol | Executor] = [] + self._aggregator: Executor | None = None + + def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "ConcurrentBuilder": + r"""Define the parallel participants for this concurrent workflow. + + Accepts AgentProtocol instances (e.g., created by a chat client) or Executor + instances. Each participant is wired as a parallel branch using fan-out edges + from an internal dispatcher. + + Raises: + ValueError: if `participants` is empty or contains duplicates + TypeError: if any entry is not AgentProtocol or Executor + + Example: + ```python + wf = ConcurrentBuilder().participants([researcher_agent, marketer_agent, legal_agent]).build() + + # Mixing agent(s) and executor(s) is supported + wf2 = ConcurrentBuilder().participants([researcher_agent, my_custom_executor]).build() + ``` + """ + if not participants: + raise ValueError("participants cannot be empty") + + # Defensive duplicate detection + seen_agent_ids: set[int] = set() + seen_executor_ids: set[str] = set() + for p in participants: + if isinstance(p, Executor): + if p.id in seen_executor_ids: + raise ValueError(f"Duplicate executor participant detected: id '{p.id}'") + seen_executor_ids.add(p.id) + elif isinstance(p, AgentProtocol): + pid = id(p) + if pid in seen_agent_ids: + raise ValueError("Duplicate agent participant detected (same agent instance provided twice)") + seen_agent_ids.add(pid) + else: + raise TypeError(f"participants must be AgentProtocol or Executor instances; got {type(p).__name__}") + + self._participants = list(participants) + return self + + def with_aggregator(self, aggregator: Executor | Callable[..., Any]) -> "ConcurrentBuilder": + r"""Override the default aggregator with an Executor or a callback. + + - Executor: must handle `list[AgentExecutorResponse]` and add a + `WorkflowCompletedEvent` to the context. + - Callback: sync or async callable with one of the signatures: + `(results: list[AgentExecutorResponse]) -> Any | None` or + `(results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> Any | None`. + If the callback returns a non-None value, it becomes the + `WorkflowCompletedEvent.data`. + + Example: + ```python + # Callback-based aggregator (string result) + async def summarize(results): + return " | ".join(r.agent_run_response.messages[-1].text for r in results) + + + wf = ConcurrentBuilder().participants([a1, a2, a3]).with_custom_aggregator(summarize).build() + ``` + """ + if isinstance(aggregator, Executor): + self._aggregator = aggregator + elif callable(aggregator): + self._aggregator = _CallbackAggregator(aggregator) + else: + raise TypeError("aggregator must be an Executor or a callable") + return self + + def build(self) -> Workflow: + r"""Build and validate the concurrent workflow. + + Wiring pattern: + - Dispatcher (internal) fans out the input to all `participants` + - Fan-in aggregator collects `AgentExecutorResponse` objects + - Aggregator emits a `WorkflowCompletedEvent` with either: + - list[ChatMessage] (default aggregator: one user + one assistant per agent) + - custom payload from the provided callback/executor + + Returns: + Workflow: a ready-to-run workflow instance + + Raises: + ValueError: if no participants were defined + + Example: + ```python + workflow = ConcurrentBuilder().participants([agent1, agent2]).build() + ``` + """ + if not self._participants: + raise ValueError("No participants provided. Call .participants([...]) first.") + + dispatcher = _DispatchToAllParticipants(id="dispatcher") + aggregator = self._aggregator or _AggregateAgentConversations(id="aggregator") + + builder = WorkflowBuilder() + return ( + builder.set_start_executor(dispatcher) + .add_fan_out_edges(dispatcher, list(self._participants)) + .add_fan_in_edges(list(self._participants), aggregator) + .build() + ) diff --git a/python/packages/workflow/tests/test_concurrent.py b/python/packages/workflow/tests/test_concurrent.py new file mode 100644 index 0000000000..25becca7cd --- /dev/null +++ b/python/packages/workflow/tests/test_concurrent.py @@ -0,0 +1,126 @@ +# Copyright (c) Microsoft. All rights reserved. + +from typing import Any, cast + +import pytest +from agent_framework import AgentRunResponse, ChatMessage, Role + +from agent_framework_workflow import ( + AgentExecutorRequest, + AgentExecutorResponse, + ConcurrentBuilder, + Executor, + WorkflowCompletedEvent, + WorkflowContext, + handler, +) + + +class _FakeAgentExec(Executor): + """Test executor that mimics an agent by emitting an AgentExecutorResponse. + + It takes the incoming AgentExecutorRequest, produces a single assistant message + with the configured reply text, and sends an AgentExecutorResponse that includes + full_conversation (the original user prompt followed by the assistant message). + """ + + def __init__(self, id: str, reply_text: str) -> None: + super().__init__(id) + self._reply_text = reply_text + + @handler + async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None: + response = AgentRunResponse(messages=ChatMessage(Role.ASSISTANT, text=self._reply_text)) + full_conversation = list(request.messages) + list(response.messages) + await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation)) + + +def test_concurrent_builder_rejects_empty_participants() -> None: + with pytest.raises(ValueError): + ConcurrentBuilder().participants([]) + + +def test_concurrent_builder_rejects_duplicate_executors() -> None: + a = _FakeAgentExec("dup", "A") + b = _FakeAgentExec("dup", "B") # same executor id + with pytest.raises(ValueError): + ConcurrentBuilder().participants([a, b]) + + +async def test_concurrent_default_aggregator_emits_single_user_and_assistants() -> None: + # Three synthetic agent executors + e1 = _FakeAgentExec("agentA", "Alpha") + e2 = _FakeAgentExec("agentB", "Beta") + e3 = _FakeAgentExec("agentC", "Gamma") + + wf = ConcurrentBuilder().participants([e1, e2, e3]).build() + + completed: WorkflowCompletedEvent | None = None + async for ev in wf.run_stream("prompt: hello world"): + if isinstance(ev, WorkflowCompletedEvent): + completed = ev + break + + assert completed is not None + assert isinstance(completed.data, list) + messages: list[ChatMessage] = cast(list[ChatMessage], completed.data) # type: ignore + + # Expect one user message + one assistant message per participant + assert len(messages) == 1 + 3 + assert messages[0].role == Role.USER + assert "hello world" in messages[0].text + + assistant_texts = {m.text for m in messages[1:]} + assert assistant_texts == {"Alpha", "Beta", "Gamma"} + assert all(m.role == Role.ASSISTANT for m in messages[1:]) + + +async def test_concurrent_custom_aggregator_callback_is_used() -> None: + # Two synthetic agent executors for brevity + e1 = _FakeAgentExec("agentA", "One") + e2 = _FakeAgentExec("agentB", "Two") + + async def summarize(results: list[AgentExecutorResponse]) -> str: + texts: list[str] = [] + for r in results: + msgs: list[ChatMessage] = r.agent_run_response.messages + texts.append(msgs[-1].text if msgs else "") + return " | ".join(sorted(texts)) + + wf = ConcurrentBuilder().participants([e1, e2]).with_aggregator(summarize).build() + + completed: WorkflowCompletedEvent | None = None + async for ev in wf.run_stream("prompt: custom"): + if isinstance(ev, WorkflowCompletedEvent): + completed = ev + break + + assert completed is not None + # Custom aggregator returns a string payload + assert isinstance(completed.data, str) + assert completed.data == "One | Two" + + +async def test_concurrent_custom_aggregator_sync_callback_is_used() -> None: + e1 = _FakeAgentExec("agentA", "One") + e2 = _FakeAgentExec("agentB", "Two") + + # Sync callback with ctx parameter (should run via asyncio.to_thread) + def summarize_sync(results: list[AgentExecutorResponse], ctx: WorkflowContext[Any]) -> str: # type: ignore[unused-argument] + texts: list[str] = [] + for r in results: + msgs: list[ChatMessage] = r.agent_run_response.messages + texts.append(msgs[-1].text if msgs else "") + return " | ".join(sorted(texts)) + + wf = ConcurrentBuilder().participants([e1, e2]).with_aggregator(summarize_sync).build() + + completed: WorkflowCompletedEvent | None = None + async for ev in wf.run_stream("prompt: custom sync"): + if isinstance(ev, WorkflowCompletedEvent): + completed = ev + break + + assert completed is not None + assert isinstance(completed.data, str) + assert completed.data == "One | Two" diff --git a/python/samples/getting_started/workflow/README.md b/python/samples/getting_started/workflow/README.md index 744ad3c9c0..9fa42bc209 100644 --- a/python/samples/getting_started/workflow/README.md +++ b/python/samples/getting_started/workflow/README.md @@ -78,6 +78,9 @@ Once comfortable with these, explore the rest of the samples below. ### orchestration | Sample | File | Concepts | |---|---|---| +| Concurrent Orchestration (Default Aggregator) | [orchestration/concurrent_agents.py](./orchestration/concurrent_agents.py) | Fan-out to multiple agents; fan-in with default aggregator returning combined ChatMessages | +| Concurrent Orchestration (Custom Aggregator) | [orchestration/concurrent_custom_aggregator.py](./orchestration/concurrent_custom_aggregator.py) | Override aggregator via callback; summarize results with an LLM | +| Concurrent Orchestration (Custom Agent Executors) | [orchestration/concurrent_custom_agent_executors.py](./orchestration/concurrent_custom_agent_executors.py) | Child executors own ChatAgents; concurrent fan-out/fan-in via ConcurrentBuilder | | Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming | | Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution | diff --git a/python/samples/getting_started/workflow/orchestration/concurrent_agents.py b/python/samples/getting_started/workflow/orchestration/concurrent_agents.py new file mode 100644 index 0000000000..b8e165c5b4 --- /dev/null +++ b/python/samples/getting_started/workflow/orchestration/concurrent_agents.py @@ -0,0 +1,131 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from typing import Any + +from agent_framework import ChatMessage +from agent_framework.azure import AzureChatClient +from agent_framework.workflow import ConcurrentBuilder, WorkflowCompletedEvent +from azure.identity import AzureCliCredential + +""" +Sample: Concurrent fan-out/fan-in (agent-only API) with default aggregator + +Build a high-level concurrent workflow using ConcurrentBuilder and three domain agents. +The default dispatcher fans out the same user prompt to all agents in parallel. +The default aggregator fans in their results and emits a WorkflowCompletedEvent whose +data is a list[ChatMessage] representing the concatenated conversations from all agents. + +Demonstrates: +- Minimal wiring with ConcurrentBuilder().participants([...]).build() +- Fan-out to multiple agents, fan-in aggregation of final ChatMessages +- Streaming of AgentRunEvent for simple progress visibility + +Prerequisites: +- Azure OpenAI access configured for AzureChatClient (use az login + env vars) +- Familiarity with Workflow events (AgentRunEvent, WorkflowCompletedEvent) +""" + + +async def main() -> None: + # 1) Create three domain agents using AzureChatClient + chat_client = AzureChatClient(credential=AzureCliCredential()) + + researcher = chat_client.create_agent( + instructions=( + "You're an expert market and product researcher. Given a prompt, provide concise, factual insights," + " opportunities, and risks." + ), + name="researcher", + ) + + marketer = chat_client.create_agent( + instructions=( + "You're a creative marketing strategist. Craft compelling value propositions and target messaging" + " aligned to the prompt." + ), + name="marketer", + ) + + legal = chat_client.create_agent( + instructions=( + "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns" + " based on the prompt." + ), + name="legal", + ) + + # 2) Build a concurrent workflow + # Participants are either Agents (type of AgentProtocol) or Executors + workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build() + + # 3) Run with a single prompt, stream progress, and pretty-print the final combined messages + completion: WorkflowCompletedEvent | None = None + async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."): + if isinstance(event, WorkflowCompletedEvent): + completion = event + + if completion: + print("===== Final Aggregated Conversation (messages) =====") + messages: list[ChatMessage] | Any = completion.data + for i, msg in enumerate(messages, start=1): + name = msg.author_name if msg.author_name else "user" + print(f"{'-' * 60}\n\n{i:02d} [{name}]:\n{msg.text}") + + """ + Sample Output: + + ===== Final Aggregated Conversation (messages) ===== + ------------------------------------------------------------ + + 01 [user]: + We are launching a new budget-friendly electric bike for urban commuters. + ------------------------------------------------------------ + + 02 [researcher]: + **Insights:** + + - **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport; + likely to include students, young professionals, and price-sensitive urban residents. + - **Market Trends:** E-bike sales are growing globally, with increasing urbanization, + higher fuel costs, and sustainability concerns driving adoption. + - **Competitive Landscape:** Key competitors include brands like Rad Power Bikes, Aventon, + Lectric, and domestic budget-focused manufacturers in North America, Europe, and Asia. + - **Feature Expectations:** Customers expect reliability, ease-of-use, theft protection, + lightweight design, sufficient battery range for daily city commutes (typically 25-40 miles), + and low-maintenance components. + + **Opportunities:** + + - **First-time Buyers:** Capture newcomers to e-biking by emphasizing affordability, ease of + operation, and cost savings vs. public transit/car ownership. + ... + ------------------------------------------------------------ + + 03 [marketer]: + **Value Proposition:** + "Empowering your city commute: Our new electric bike combines affordability, reliability, and + sustainable design—helping you conquer urban journeys without breaking the bank." + + **Target Messaging:** + + *For Young Professionals:* + ... + ------------------------------------------------------------ + + 04 [legal]: + **Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:** + + **1. Regulatory Compliance** + - Verify that the electric bike meets all applicable federal, state, and local regulations + regarding e-bike classification, speed limits, power output, and safety features. + - Ensure necessary certifications (e.g., UL certification for batteries, CE markings if sold internationally) are obtained. + + **2. Product Safety** + - Include consumer safety warnings regarding use, battery handling, charging protocols, and age restrictions. + ... + """ # noqa: E501 + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflow/orchestration/concurrent_custom_agent_executors.py b/python/samples/getting_started/workflow/orchestration/concurrent_custom_agent_executors.py new file mode 100644 index 0000000000..067489dee2 --- /dev/null +++ b/python/samples/getting_started/workflow/orchestration/concurrent_custom_agent_executors.py @@ -0,0 +1,175 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from typing import Any + +from agent_framework import ChatAgent, ChatMessage +from agent_framework.azure import AzureChatClient +from agent_framework.workflow import ( + AgentExecutorRequest, + AgentExecutorResponse, + ConcurrentBuilder, + Executor, + WorkflowCompletedEvent, + WorkflowContext, + handler, +) +from azure.identity import AzureCliCredential + +""" +Sample: Concurrent Orchestration with Custom Agent Executors + +This sample shows a concurrent fan-out/fan-in pattern using child Executor classes +that each own their ChatAgent. The executors accept AgentExecutorRequest inputs +and emit AgentExecutorResponse outputs, which allows reuse of the high-level +ConcurrentBuilder API and the default aggregator. + +Demonstrates: +- Executors that create their ChatAgent in __init__ (via AzureChatClient) +- A @handler that converts AgentExecutorRequest -> AgentExecutorResponse +- ConcurrentBuilder().participants([...]) to build fan-out/fan-in +- Default aggregator returning list[ChatMessage] (one user + one assistant per agent) + +Prerequisites: +- Azure OpenAI configured for AzureChatClient (az login + required env vars) +""" + + +class ResearcherExec(Executor): + agent: ChatAgent + + def __init__(self, chat_client: AzureChatClient, id: str = "researcher"): + agent = chat_client.create_agent( + instructions=( + "You're an expert market and product researcher. Given a prompt, provide concise, factual insights," + " opportunities, and risks." + ), + name=id, + ) + super().__init__(agent=agent, id=id) + + @handler + async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None: + response = await self.agent.run(request.messages) + full_conversation = list(request.messages) + list(response.messages) + await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation)) + + +class MarketerExec(Executor): + agent: ChatAgent + + def __init__(self, chat_client: AzureChatClient, id: str = "marketer"): + agent = chat_client.create_agent( + instructions=( + "You're a creative marketing strategist. Craft compelling value propositions and target messaging" + " aligned to the prompt." + ), + name=id, + ) + super().__init__(agent=agent, id=id) + + @handler + async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None: + response = await self.agent.run(request.messages) + full_conversation = list(request.messages) + list(response.messages) + await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation)) + + +class LegalExec(Executor): + agent: ChatAgent + + def __init__(self, chat_client: AzureChatClient, id: str = "legal"): + agent = chat_client.create_agent( + instructions=( + "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns" + " based on the prompt." + ), + name=id, + ) + super().__init__(agent=agent, id=id) + + @handler + async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None: + response = await self.agent.run(request.messages) + full_conversation = list(request.messages) + list(response.messages) + await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation)) + + +async def main() -> None: + chat_client = AzureChatClient(credential=AzureCliCredential()) + + researcher = ResearcherExec(chat_client) + marketer = MarketerExec(chat_client) + legal = LegalExec(chat_client) + + workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build() + + completion: WorkflowCompletedEvent | None = None + async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."): + if isinstance(event, WorkflowCompletedEvent): + completion = event + + if completion: + print("===== Final Aggregated Conversation (messages) =====") + messages: list[ChatMessage] | Any = completion.data + for i, msg in enumerate(messages, start=1): + name = msg.author_name if msg.author_name else "user" + print(f"{'-' * 60}\n\n{i:02d} [{name}]:\n{msg.text}") + + """ + Sample Output: + + ===== Final Aggregated Conversation (messages) ===== + ------------------------------------------------------------ + + 01 [user]: + We are launching a new budget-friendly electric bike for urban commuters. + ------------------------------------------------------------ + + 02 [researcher]: + **Insights:** + + - **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport; + likely to include students, young professionals, and price-sensitive urban residents. + - **Market Trends:** E-bike sales are growing globally, with increasing urbanization, + higher fuel costs, and sustainability concerns driving adoption. + - **Competitive Landscape:** Key competitors include brands like Rad Power Bikes, Aventon, + Lectric, and domestic budget-focused manufacturers in North America, Europe, and Asia. + - **Feature Expectations:** Customers expect reliability, ease-of-use, theft protection, + lightweight design, sufficient battery range for daily city commutes (typically 25-40 miles), + and low-maintenance components. + + **Opportunities:** + + - **First-time Buyers:** Capture newcomers to e-biking by emphasizing affordability, ease of + operation, and cost savings vs. public transit/car ownership. + ... + ------------------------------------------------------------ + + 03 [marketer]: + **Value Proposition:** + "Empowering your city commute: Our new electric bike combines affordability, reliability, and + sustainable design—helping you conquer urban journeys without breaking the bank." + + **Target Messaging:** + + *For Young Professionals:* + ... + ------------------------------------------------------------ + + 04 [legal]: + **Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:** + + **1. Regulatory Compliance** + - Verify that the electric bike meets all applicable federal, state, and local regulations + regarding e-bike classification, speed limits, power output, and safety features. + - Ensure necessary certifications (e.g., UL certification for batteries, CE markings if sold internationally) are obtained. + + **2. Product Safety** + - Include consumer safety warnings regarding use, battery handling, charging protocols, and age restrictions. + ... + """ # noqa: E501 + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflow/orchestration/concurrent_custom_aggregator.py b/python/samples/getting_started/workflow/orchestration/concurrent_custom_aggregator.py new file mode 100644 index 0000000000..73f95388c6 --- /dev/null +++ b/python/samples/getting_started/workflow/orchestration/concurrent_custom_aggregator.py @@ -0,0 +1,125 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from typing import Any + +from agent_framework import ChatMessage, Role +from agent_framework.azure import AzureChatClient +from agent_framework.workflow import ConcurrentBuilder, WorkflowCompletedEvent +from azure.identity import AzureCliCredential + +""" +Sample: Concurrent Orchestration with Custom Aggregator + +Build a concurrent workflow with ConcurrentBuilder that fans out one prompt to +multiple domain agents and fans in their responses. Override the default +aggregator with a custom async callback that uses AzureChatClient.get_response() +to synthesize a concise, consolidated summary from the experts' outputs. + +Demonstrates: +- ConcurrentBuilder().participants([...]).with_custom_aggregator(callback) +- Fan-out to agents and fan-in at an aggregator +- Aggregation implemented via an LLM call (chat_client.get_response) +- WorkflowCompletedEvent carrying the synthesized summary string + +Prerequisites: +- Azure OpenAI configured for AzureChatClient (az login + required env vars) +""" + + +async def main() -> None: + chat_client = AzureChatClient(credential=AzureCliCredential()) + + researcher = chat_client.create_agent( + instructions=( + "You're an expert market and product researcher. Given a prompt, provide concise, factual insights," + " opportunities, and risks." + ), + name="researcher", + ) + marketer = chat_client.create_agent( + instructions=( + "You're a creative marketing strategist. Craft compelling value propositions and target messaging" + " aligned to the prompt." + ), + name="marketer", + ) + legal = chat_client.create_agent( + instructions=( + "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns" + " based on the prompt." + ), + name="legal", + ) + + # Define a custom aggregator callback that uses the chat client to summarize + async def summarize_results(results: list[Any]) -> str: + # Extract one final assistant message per agent + expert_sections: list[str] = [] + for r in results: + try: + messages = getattr(r.agent_run_response, "messages", []) + final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)" + expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}:\n{final_text}") + except Exception as e: + expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}: (error: {type(e).__name__}: {e})") + + # Ask the model to synthesize a concise summary of the experts' outputs + system_msg = ChatMessage( + Role.SYSTEM, + text=( + "You are a helpful assistant that consolidates multiple domain expert outputs " + "into one cohesive, concise summary with clear takeaways. Keep it under 200 words." + ), + ) + user_msg = ChatMessage(Role.USER, text="\n\n".join(expert_sections)) + + response = await chat_client.get_response([system_msg, user_msg]) + # Return the model's final assistant text as the completion result + return response.messages[-1].text if response.messages else "" + + # Build with a custom aggregator callback function + # - participants([...]) accepts AgentProtocol (agents) or Executor instances. + # Each participant becomes a parallel branch (fan-out) from an internal dispatcher. + # - with_aggregator(...) overrides the default aggregator: + # • Default aggregator -> returns list[ChatMessage] (one user + one assistant per agent) + # • Custom callback -> return value becomes WorkflowCompletedEvent.data (string here) + # The callback can be sync or async; it receives list[AgentExecutorResponse]. + workflow = ( + ConcurrentBuilder().participants([researcher, marketer, legal]).with_aggregator(summarize_results).build() + ) + + completion: WorkflowCompletedEvent | None = None + async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."): + if isinstance(event, WorkflowCompletedEvent): + completion = event + + if completion: + print("===== Final Consolidated Output =====") + print(completion.data) + + """ + Sample Output: + + ===== Final Consolidated Output ===== + Urban e-bike demand is rising rapidly due to eco-awareness, urban congestion, and high fuel costs, + with market growth projected at a ~10% CAGR through 2030. Key customer concerns are affordability, + easy maintenance, convenient charging, compact design, and theft protection. Differentiation opportunities + include integrating smart features (GPS, app connectivity), offering subscription or leasing options, and + developing portable, space-saving designs. Partnering with local governments and bike shops can boost visibility. + + Risks include price wars eroding margins, regulatory hurdles, battery quality concerns, and heightened expectations + for after-sales support. Accurate, substantiated product claims and transparent marketing (with range disclaimers) + are essential. All e-bikes must comply with local and federal regulations on speed, wattage, safety certification, + and labeling. Clear warranty, safety instructions (especially regarding batteries), and inclusive, accessible + marketing are required. For connected features, data privacy policies and user consents are mandatory. + + Effective messaging should target young professionals, students, eco-conscious commuters, and first-time buyers, + emphasizing affordability, convenience, and sustainability. Slogan suggestion: “Charge Ahead—City Commutes Made + Affordable.” Legal review in each target market, compliance vetting, and robust customer support policies are + critical before launch. + """ + + +if __name__ == "__main__": + asyncio.run(main()) From d54b0f0849dbf7f724fab0a0667d3f95d2bc7219 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 12 Sep 2025 12:24:56 +0100 Subject: [PATCH 02/11] Bump Microsoft.ML.OnnxRuntimeGenAI from 0.9.0 to 0.9.1 (#712) --- updated-dependencies: - dependency-name: Microsoft.ML.OnnxRuntimeGenAI dependency-version: 0.9.1 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- dotnet/Directory.Packages.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props index f0e6d3fe0d..74862bb0e8 100644 --- a/dotnet/Directory.Packages.props +++ b/dotnet/Directory.Packages.props @@ -81,7 +81,7 @@ - + From f79fbfa92e6bd71aed0998f6707056c0b5a441f2 Mon Sep 17 00:00:00 2001 From: Christian Glessner Date: Fri, 12 Sep 2025 19:07:49 +0700 Subject: [PATCH 03/11] Python: Add OpenAI reasoning events support to Responses client (#698) * feat: add OpenAI reasoning events support with comprehensive test coverage - Implement reasoning event handling in OpenAI Responses client * Add support for ResponseReasoningTextDeltaEvent * Add support for ResponseReasoningTextDoneEvent * Add support for ResponseReasoningSummaryTextDeltaEvent * Add support for ResponseReasoningSummaryTextDoneEvent * Map all reasoning events to TextReasoningContent objects * Preserve metadata across reasoning events - Add comprehensive test coverage (5 focused test functions) * test_streaming_reasoning_text_delta_event * test_streaming_reasoning_text_done_event * test_streaming_reasoning_summary_text_delta_event * test_streaming_reasoning_summary_text_done_event * test_streaming_reasoning_events_preserve_metadata - Add sample demonstrating reasoning functionality * Shows how to enable reasoning in chat options * Demonstrates accessing reasoning content from responses - Code quality improvements * Follow existing code patterns and style guidelines * Organize imports properly * Maintain backwards compatibility * All tests pass and quality checks succeed * fix: resolve type errors and cleanup unused imports after rebase - Add proper hasattr checks for optional attributes in union types - Remove unused OpenAI event type imports - Fix line length formatting issues - Ensure type safety when accessing content attributes --- .../openai/_responses_client.py | 40 +++-- .../openai/test_openai_responses_client.py | 145 ++++++++++++++++++ .../openai_responses_client_reasoning.py | 8 +- 3 files changed, 177 insertions(+), 16 deletions(-) diff --git a/python/packages/main/agent_framework/openai/_responses_client.py b/python/packages/main/agent_framework/openai/_responses_client.py index 9bc646c5f5..77cf54edf2 100644 --- a/python/packages/main/agent_framework/openai/_responses_client.py +++ b/python/packages/main/agent_framework/openai/_responses_client.py @@ -762,10 +762,10 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient): TextContent(text=message_content.refusal, raw_representation=message_content) ) case "reasoning": # ResponseOutputReasoning - if item.content: + if hasattr(item, "content") and item.content: for index, reasoning_content in enumerate(item.content): additional_properties = None - if item.summary and index < len(item.summary): + if hasattr(item, "summary") and item.summary and index < len(item.summary): additional_properties = {"summary": item.summary[index]} contents.append( TextReasoningContent( @@ -775,7 +775,7 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient): ) ) case "code_interpreter_call": # ResponseOutputCodeInterpreterCall - if item.outputs: + if hasattr(item, "outputs") and item.outputs: for code_output in item.outputs: if code_output.type == "logs": contents.append(TextContent(text=code_output.logs, raw_representation=item)) @@ -788,16 +788,16 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient): media_type="image", ) ) - elif item.code: + elif hasattr(item, "code") and item.code: # fallback if no output was returned is the code: contents.append(TextContent(text=item.code, raw_representation=item)) case "function_call": # ResponseOutputFunctionCall contents.append( FunctionCallContent( - call_id=item.call_id if item.call_id else "", - name=item.name, - arguments=item.arguments, - additional_properties={"fc_id": item.id}, + call_id=item.call_id if hasattr(item, "call_id") and item.call_id else "", + name=item.name if hasattr(item, "name") else "", + arguments=item.arguments if hasattr(item, "arguments") else "", + additional_properties={"fc_id": item.id} if hasattr(item, "id") else {}, raw_representation=item, ) ) @@ -922,6 +922,18 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient): case "response.output_text.delta": contents.append(TextContent(text=event.delta, raw_representation=event)) metadata.update(self._get_metadata_from_response(event)) + case "response.reasoning_text.delta": + contents.append(TextReasoningContent(text=event.delta, raw_representation=event)) + metadata.update(self._get_metadata_from_response(event)) + case "response.reasoning_text.done": + contents.append(TextReasoningContent(text=event.text, raw_representation=event)) + metadata.update(self._get_metadata_from_response(event)) + case "response.reasoning_summary_text.delta": + contents.append(TextReasoningContent(text=event.delta, raw_representation=event)) + metadata.update(self._get_metadata_from_response(event)) + case "response.reasoning_summary_text.done": + contents.append(TextReasoningContent(text=event.text, raw_representation=event)) + metadata.update(self._get_metadata_from_response(event)) case "response.completed": conversation_id = event.response.id if chat_options.store is True else None model = event.response.model @@ -962,7 +974,7 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient): ) ) case "code_interpreter_call": # ResponseOutputCodeInterpreterCall - if event_item.outputs: + if hasattr(event_item, "outputs") and event_item.outputs: for code_output in event_item.outputs: if code_output.type == "logs": contents.append(TextContent(text=code_output.logs, raw_representation=event_item)) @@ -975,14 +987,18 @@ class OpenAIBaseResponsesClient(OpenAIBase, BaseChatClient): media_type="image", ) ) - elif event_item.code: + elif hasattr(event_item, "code") and event_item.code: # fallback if no output was returned is the code: contents.append(TextContent(text=event_item.code, raw_representation=event_item)) case "reasoning": # ResponseOutputReasoning - if event_item.content: + if hasattr(event_item, "content") and event_item.content: for index, reasoning_content in enumerate(event_item.content): additional_properties = None - if event_item.summary and index < len(event_item.summary): + if ( + hasattr(event_item, "summary") + and event_item.summary + and index < len(event_item.summary) + ): additional_properties = {"summary": event_item.summary[index]} contents.append( TextReasoningContent( diff --git a/python/packages/main/tests/openai/test_openai_responses_client.py b/python/packages/main/tests/openai/test_openai_responses_client.py index 4f2846a981..2c7bd668c7 100644 --- a/python/packages/main/tests/openai/test_openai_responses_client.py +++ b/python/packages/main/tests/openai/test_openai_responses_client.py @@ -7,6 +7,11 @@ from unittest.mock import MagicMock, patch import pytest from openai import BadRequestError +from openai.types.responses.response_reasoning_summary_text_delta_event import ResponseReasoningSummaryTextDeltaEvent +from openai.types.responses.response_reasoning_summary_text_done_event import ResponseReasoningSummaryTextDoneEvent +from openai.types.responses.response_reasoning_text_delta_event import ResponseReasoningTextDeltaEvent +from openai.types.responses.response_reasoning_text_done_event import ResponseReasoningTextDoneEvent +from openai.types.responses.response_text_delta_event import ResponseTextDeltaEvent from pydantic import BaseModel from agent_framework import ( @@ -1445,3 +1450,143 @@ def test_service_response_exception_includes_original_error_details() -> None: exception_message = str(exc_info.value) assert "service failed to complete the prompt:" in exception_message assert original_error_message in exception_message + + +def test_streaming_reasoning_text_delta_event() -> None: + """Test reasoning text delta event creates TextReasoningContent.""" + client = OpenAIResponsesClient(ai_model_id="test-model", api_key="test-key") + chat_options = ChatOptions() + function_call_ids: dict[int, tuple[str, str]] = {} + + event = ResponseReasoningTextDeltaEvent( + type="response.reasoning_text.delta", + content_index=0, + item_id="reasoning_123", + output_index=0, + sequence_number=1, + delta="reasoning delta", + ) + + with patch.object(client, "_get_metadata_from_response", return_value={}) as mock_metadata: + response = client._create_streaming_response_content(event, chat_options, function_call_ids) # type: ignore + + assert len(response.contents) == 1 + assert isinstance(response.contents[0], TextReasoningContent) + assert response.contents[0].text == "reasoning delta" + assert response.contents[0].raw_representation == event + mock_metadata.assert_called_once_with(event) + + +def test_streaming_reasoning_text_done_event() -> None: + """Test reasoning text done event creates TextReasoningContent with complete text.""" + client = OpenAIResponsesClient(ai_model_id="test-model", api_key="test-key") + chat_options = ChatOptions() + function_call_ids: dict[int, tuple[str, str]] = {} + + event = ResponseReasoningTextDoneEvent( + type="response.reasoning_text.done", + content_index=0, + item_id="reasoning_456", + output_index=0, + sequence_number=2, + text="complete reasoning", + ) + + with patch.object(client, "_get_metadata_from_response", return_value={"test": "data"}) as mock_metadata: + response = client._create_streaming_response_content(event, chat_options, function_call_ids) # type: ignore + + assert len(response.contents) == 1 + assert isinstance(response.contents[0], TextReasoningContent) + assert response.contents[0].text == "complete reasoning" + assert response.contents[0].raw_representation == event + mock_metadata.assert_called_once_with(event) + assert response.additional_properties == {"test": "data"} + + +def test_streaming_reasoning_summary_text_delta_event() -> None: + """Test reasoning summary text delta event creates TextReasoningContent.""" + client = OpenAIResponsesClient(ai_model_id="test-model", api_key="test-key") + chat_options = ChatOptions() + function_call_ids: dict[int, tuple[str, str]] = {} + + event = ResponseReasoningSummaryTextDeltaEvent( + type="response.reasoning_summary_text.delta", + item_id="summary_789", + output_index=0, + sequence_number=3, + summary_index=0, + delta="summary delta", + ) + + with patch.object(client, "_get_metadata_from_response", return_value={}) as mock_metadata: + response = client._create_streaming_response_content(event, chat_options, function_call_ids) # type: ignore + + assert len(response.contents) == 1 + assert isinstance(response.contents[0], TextReasoningContent) + assert response.contents[0].text == "summary delta" + assert response.contents[0].raw_representation == event + mock_metadata.assert_called_once_with(event) + + +def test_streaming_reasoning_summary_text_done_event() -> None: + """Test reasoning summary text done event creates TextReasoningContent with complete text.""" + client = OpenAIResponsesClient(ai_model_id="test-model", api_key="test-key") + chat_options = ChatOptions() + function_call_ids: dict[int, tuple[str, str]] = {} + + event = ResponseReasoningSummaryTextDoneEvent( + type="response.reasoning_summary_text.done", + item_id="summary_012", + output_index=0, + sequence_number=4, + summary_index=0, + text="complete summary", + ) + + with patch.object(client, "_get_metadata_from_response", return_value={"custom": "meta"}) as mock_metadata: + response = client._create_streaming_response_content(event, chat_options, function_call_ids) # type: ignore + + assert len(response.contents) == 1 + assert isinstance(response.contents[0], TextReasoningContent) + assert response.contents[0].text == "complete summary" + assert response.contents[0].raw_representation == event + mock_metadata.assert_called_once_with(event) + assert response.additional_properties == {"custom": "meta"} + + +def test_streaming_reasoning_events_preserve_metadata() -> None: + """Test that reasoning events preserve metadata like regular text events.""" + client = OpenAIResponsesClient(ai_model_id="test-model", api_key="test-key") + chat_options = ChatOptions() + function_call_ids: dict[int, tuple[str, str]] = {} + + text_event = ResponseTextDeltaEvent( + type="response.output_text.delta", + content_index=0, + item_id="text_item", + output_index=0, + sequence_number=1, + logprobs=[], + delta="text", + ) + + reasoning_event = ResponseReasoningTextDeltaEvent( + type="response.reasoning_text.delta", + content_index=0, + item_id="reasoning_item", + output_index=0, + sequence_number=2, + delta="reasoning", + ) + + with patch.object(client, "_get_metadata_from_response", return_value={"test": "metadata"}): + text_response = client._create_streaming_response_content(text_event, chat_options, function_call_ids) # type: ignore + reasoning_response = client._create_streaming_response_content(reasoning_event, chat_options, function_call_ids) # type: ignore + + # Both should preserve metadata + assert text_response.additional_properties == {"test": "metadata"} + assert reasoning_response.additional_properties == {"test": "metadata"} + + # Content types should be different + assert isinstance(text_response.contents[0], TextContent) + assert isinstance(reasoning_response.contents[0], TextReasoningContent) diff --git a/python/samples/getting_started/agents/openai_responses_client/openai_responses_client_reasoning.py b/python/samples/getting_started/agents/openai_responses_client/openai_responses_client_reasoning.py index fcc07c6efa..f9dfb34f16 100644 --- a/python/samples/getting_started/agents/openai_responses_client/openai_responses_client_reasoning.py +++ b/python/samples/getting_started/agents/openai_responses_client/openai_responses_client_reasoning.py @@ -10,12 +10,12 @@ async def reasoning_example() -> None: """Example of reasoning response (get results as they are generated).""" print("=== Reasoning Example ===") - agent = OpenAIResponsesClient(ai_model_id="o4-mini").create_agent( + agent = OpenAIResponsesClient(ai_model_id="gpt-5").create_agent( name="MathHelper", instructions="You are a personal math tutor. When asked a math question, " "write and run code using the python tool to answer the question.", tools=HostedCodeInterpreterTool(), - reasoning={"effort": "medium"}, + reasoning={"effort": "high", "summary": "detailed"}, ) query = "I need to solve the equation 3x + 11 = 14. Can you help me?" @@ -27,9 +27,9 @@ async def reasoning_example() -> None: for content in chunk.contents: if isinstance(content, TextReasoningContent): print(f"\033[97m{content.text}\033[0m", end="", flush=True) - if isinstance(content, TextContent): + elif isinstance(content, TextContent): print(content.text, end="", flush=True) - if isinstance(content, UsageContent): + elif isinstance(content, UsageContent): usage = content print("\n") if usage: From d4cc254b0bef5b181e30dd72c9064e5e4aaee1d7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 12 Sep 2025 12:11:08 +0000 Subject: [PATCH 04/11] Bump Microsoft.Agents.CopilotStudio.Client from 1.1.151 to 1.2.41 (#710) --- updated-dependencies: - dependency-name: Microsoft.Agents.CopilotStudio.Client dependency-version: 1.2.41 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- dotnet/Directory.Packages.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props index 74862bb0e8..b02578ecc3 100644 --- a/dotnet/Directory.Packages.props +++ b/dotnet/Directory.Packages.props @@ -74,7 +74,7 @@ - + From 44a248addbc8729f175cccc7f0659f8368d11131 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 12 Sep 2025 13:48:16 +0100 Subject: [PATCH 05/11] Bump FluentAssertions from 8.5.0 to 8.6.0 (#709) --- updated-dependencies: - dependency-name: FluentAssertions dependency-version: 8.6.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- dotnet/Directory.Packages.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props index b02578ecc3..907a58ece8 100644 --- a/dotnet/Directory.Packages.props +++ b/dotnet/Directory.Packages.props @@ -91,7 +91,7 @@ - + From fc78e65c43a7826340400bc558fe2c02bd1ece28 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 12 Sep 2025 13:48:30 +0100 Subject: [PATCH 06/11] Bump MishaKav/pytest-coverage-comment from 1.1.53 to 1.1.56 (#436) Bumps [MishaKav/pytest-coverage-comment](https://github.com/mishakav/pytest-coverage-comment) from 1.1.53 to 1.1.56. - [Release notes](https://github.com/mishakav/pytest-coverage-comment/releases) - [Changelog](https://github.com/MishaKav/pytest-coverage-comment/blob/main/CHANGELOG.md) - [Commits](https://github.com/mishakav/pytest-coverage-comment/compare/v1.1.53...v1.1.56) --- updated-dependencies: - dependency-name: MishaKav/pytest-coverage-comment dependency-version: 1.1.56 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Chris <66376200+crickman@users.noreply.github.com> --- .github/workflows/python-test-coverage-report.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-test-coverage-report.yml b/.github/workflows/python-test-coverage-report.yml index 3f3256db89..955ea4399a 100644 --- a/.github/workflows/python-test-coverage-report.yml +++ b/.github/workflows/python-test-coverage-report.yml @@ -39,7 +39,7 @@ jobs: echo "PR_NUMBER=$PR_NUMBER" >> $GITHUB_ENV - name: Pytest coverage comment id: coverageComment - uses: MishaKav/pytest-coverage-comment@v1.1.53 + uses: MishaKav/pytest-coverage-comment@v1.1.56 with: github-token: ${{ secrets.GH_ACTIONS_PR_WRITE }} issue-number: ${{ env.PR_NUMBER }} From 6e4cb5e1837efbfc65e6eae3cb1172e3b4c7c5f7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 12 Sep 2025 12:48:53 +0000 Subject: [PATCH 07/11] Bump actions/checkout from 4 to 5 (#435) Bumps [actions/checkout](https://github.com/actions/checkout) from 4 to 5. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v4...v5) --- updated-dependencies: - dependency-name: actions/checkout dependency-version: '5' dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Chris <66376200+crickman@users.noreply.github.com> --- .github/workflows/codeql-analysis.yml | 2 +- .github/workflows/dotnet-build-and-test.yml | 2 +- .github/workflows/dotnet-format.yml | 2 +- .github/workflows/python-code-quality.yml | 2 +- .github/workflows/python-merge-tests.yml | 8 ++++---- .github/workflows/python-release.yml | 2 +- .github/workflows/python-test-coverage-report.yml | 2 +- .github/workflows/python-test-coverage.yml | 2 +- .github/workflows/python-tests.yml | 2 +- 9 files changed, 12 insertions(+), 12 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 06ee8d8d4c..e2649178c0 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -32,7 +32,7 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v4 + uses: actions/checkout@v5 with: persist-credentials: false diff --git a/.github/workflows/dotnet-build-and-test.yml b/.github/workflows/dotnet-build-and-test.yml index 82248ff921..237c52cf46 100644 --- a/.github/workflows/dotnet-build-and-test.yml +++ b/.github/workflows/dotnet-build-and-test.yml @@ -47,7 +47,7 @@ jobs: runs-on: ${{ matrix.os }} environment: ${{ matrix.environment }} steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 with: persist-credentials: false sparse-checkout: | diff --git a/.github/workflows/dotnet-format.yml b/.github/workflows/dotnet-format.yml index 893bd969b8..581318c538 100644 --- a/.github/workflows/dotnet-format.yml +++ b/.github/workflows/dotnet-format.yml @@ -30,7 +30,7 @@ jobs: steps: - name: Check out code - uses: actions/checkout@v4 + uses: actions/checkout@v5 with: fetch-depth: 0 persist-credentials: false diff --git a/.github/workflows/python-code-quality.yml b/.github/workflows/python-code-quality.yml index d940513986..e4bd5155c1 100644 --- a/.github/workflows/python-code-quality.yml +++ b/.github/workflows/python-code-quality.yml @@ -27,7 +27,7 @@ jobs: env: UV_PYTHON: ${{ matrix.python-version }} steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Set up uv uses: astral-sh/setup-uv@v6 with: diff --git a/.github/workflows/python-merge-tests.yml b/.github/workflows/python-merge-tests.yml index 9868dc6c80..950394bae6 100644 --- a/.github/workflows/python-merge-tests.yml +++ b/.github/workflows/python-merge-tests.yml @@ -28,7 +28,7 @@ jobs: outputs: pythonChanges: ${{ steps.filter.outputs.python}} steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - uses: dorny/paths-filter@v3 id: filter with: @@ -66,7 +66,7 @@ jobs: run: working-directory: python steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Set up uv uses: astral-sh/setup-uv@v6 with: @@ -127,7 +127,7 @@ jobs: run: working-directory: python steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Set up uv uses: astral-sh/setup-uv@v6 with: @@ -194,7 +194,7 @@ jobs: run: working-directory: python steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Set up uv uses: astral-sh/setup-uv@v6 with: diff --git a/.github/workflows/python-release.yml b/.github/workflows/python-release.yml index 571af6c148..39935650ef 100644 --- a/.github/workflows/python-release.yml +++ b/.github/workflows/python-release.yml @@ -23,7 +23,7 @@ jobs: run: working-directory: python steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Set up uv uses: astral-sh/setup-uv@v6 with: diff --git a/.github/workflows/python-test-coverage-report.yml b/.github/workflows/python-test-coverage-report.yml index 955ea4399a..26e31eb35d 100644 --- a/.github/workflows/python-test-coverage-report.yml +++ b/.github/workflows/python-test-coverage-report.yml @@ -19,7 +19,7 @@ jobs: run: working-directory: python steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Download coverage report uses: actions/download-artifact@v4 with: diff --git a/.github/workflows/python-test-coverage.yml b/.github/workflows/python-test-coverage.yml index 08ffe47158..c3f014235f 100644 --- a/.github/workflows/python-test-coverage.yml +++ b/.github/workflows/python-test-coverage.yml @@ -20,7 +20,7 @@ jobs: env: UV_PYTHON: "3.10" steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 # Save the PR number to a file since the workflow_run event # in the coverage report workflow does not have access to it - name: Save PR number diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml index 5981f27757..20413071e7 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/python-tests.yml @@ -26,7 +26,7 @@ jobs: run: working-directory: python steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - name: Set up uv uses: astral-sh/setup-uv@v6 with: From 7c802f326589dd78da12a6a71629bfeb64a38bf0 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 12 Sep 2025 12:49:14 +0000 Subject: [PATCH 08/11] Bump actions/download-artifact from 4 to 5 (#434) Bumps [actions/download-artifact](https://github.com/actions/download-artifact) from 4 to 5. - [Release notes](https://github.com/actions/download-artifact/releases) - [Commits](https://github.com/actions/download-artifact/compare/v4...v5) --- updated-dependencies: - dependency-name: actions/download-artifact dependency-version: '5' dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Chris <66376200+crickman@users.noreply.github.com> --- .github/workflows/python-test-coverage-report.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-test-coverage-report.yml b/.github/workflows/python-test-coverage-report.yml index 26e31eb35d..811e65098c 100644 --- a/.github/workflows/python-test-coverage-report.yml +++ b/.github/workflows/python-test-coverage-report.yml @@ -21,7 +21,7 @@ jobs: steps: - uses: actions/checkout@v5 - name: Download coverage report - uses: actions/download-artifact@v4 + uses: actions/download-artifact@v5 with: github-token: ${{ secrets.GH_ACTIONS_PR_WRITE }} run-id: ${{ github.event.workflow_run.id }} From dd025b368b19916c6ee31ce5c2dc6c1a0185bcb3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 12 Sep 2025 12:49:40 +0000 Subject: [PATCH 09/11] Bump pydata-sphinx-theme from 0.16.0 to 0.16.1 in /python (#314) Bumps [pydata-sphinx-theme](https://github.com/pydata/pydata-sphinx-theme) from 0.16.0 to 0.16.1. - [Release notes](https://github.com/pydata/pydata-sphinx-theme/releases) - [Changelog](https://github.com/pydata/pydata-sphinx-theme/blob/main/RELEASE.md) - [Commits](https://github.com/pydata/pydata-sphinx-theme/compare/v0.16.0...v0.16.1) --- updated-dependencies: - dependency-name: pydata-sphinx-theme dependency-version: 0.16.1 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Chris <66376200+crickman@users.noreply.github.com> --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index a8b1d7f979..a891bbe713 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -31,7 +31,7 @@ dev = [ "markdownify", # Documentation "myst-nb==1.1.2", - "pydata-sphinx-theme==0.16.0", + "pydata-sphinx-theme==0.16.1", "sphinx-copybutton", "sphinx-design", "sphinx", From ace7b406fa7dabeae0e6b1332d12375627f02a06 Mon Sep 17 00:00:00 2001 From: Alex Lavaee <57336517+lavaman131@users.noreply.github.com> Date: Fri, 12 Sep 2025 07:32:23 -0700 Subject: [PATCH 10/11] fixed opentelemetry-semantic-conventions-ai module not found error (#706) Co-authored-by: Alex Lavaee --- python/packages/main/pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/python/packages/main/pyproject.toml b/python/packages/main/pyproject.toml index 64b34bd0ef..51c658b03e 100644 --- a/python/packages/main/pyproject.toml +++ b/python/packages/main/pyproject.toml @@ -33,6 +33,7 @@ dependencies = [ "azure-monitor-opentelemetry>=1.7.0", "azure-monitor-opentelemetry-exporter>=1.0.0b41", "opentelemetry-exporter-otlp-proto-grpc>=1.36.0", + "opentelemetry-semantic-conventions-ai>=0.4.13" ] [project.optional-dependencies] From 8af4918f56d1fe642f067bf40af76f92013ebd26 Mon Sep 17 00:00:00 2001 From: westey <164392973+westey-m@users.noreply.github.com> Date: Fri, 12 Sep 2025 15:58:46 +0100 Subject: [PATCH 11/11] .NET: BREAKING Update AIAgent.Run to take IEnumerable instead of IReadonlyCollection (#729) * Update AIAgent,Run to take IEnumerable instead of IReadonlyCollection * Address PR comment. * Cast to IReadonlyCollection since this is typically all that is required to avoid unecessary allocation. * Update OrchestratingAgent protected methods as well. --- .../Program.cs | 6 ++--- .../Program.cs | 2 +- .../ConcurrentOrchestration.cs | 6 ++--- .../GroupChat/GroupChatOrchestration.cs | 4 ++-- .../Handoffs/HandoffOrchestration.cs | 4 ++-- .../OrchestratingAgent.cs | 18 +++++++------- .../SequentialOrchestration.cs | 7 +++--- .../WorkflowHostAgent.cs | 6 ++--- .../WorkflowMessageStore.cs | 2 +- .../A2AAgent.cs | 6 ++--- .../Extensions/ChatMessageExtensions.cs | 2 +- .../AIAgent.cs | 15 +++++------- .../AgentThread.cs | 2 +- .../DelegatingAIAgent.cs | 4 ++-- .../IChatMessageStore.cs | 2 +- .../InMemoryChatMessageStore.cs | 9 ++----- .../CopilotStudioAgent.cs | 4 ++-- .../AgentProxy.cs | 10 ++++---- .../OpenAIChatClientAgent.cs | 4 ++-- .../ChatCompletion/ChatClientAgent.cs | 17 ++++++------- .../OpenTelemetryAgent.cs | 24 +++++++++---------- .../MockAgent.cs | 4 ++-- .../OrchestrationResultTests.cs | 8 +++---- .../RepresentationTests.cs | 4 ++-- .../Sample/06_GroupChat_Workflow.cs | 14 ++++++----- .../SpecializedExecutorSmokeTests.cs | 4 ++-- .../AIAgentTests.cs | 6 ++--- .../AgentActorTests.cs | 4 ++-- .../AgentAIFunctionFactoryTests.cs | 4 ++-- 29 files changed, 99 insertions(+), 103 deletions(-) diff --git a/dotnet/samples/GettingStarted/AgentProviders/Agent_With_CustomImplementation/Program.cs b/dotnet/samples/GettingStarted/AgentProviders/Agent_With_CustomImplementation/Program.cs index df315dbfaa..a2713c80d2 100644 --- a/dotnet/samples/GettingStarted/AgentProviders/Agent_With_CustomImplementation/Program.cs +++ b/dotnet/samples/GettingStarted/AgentProviders/Agent_With_CustomImplementation/Program.cs @@ -30,7 +30,7 @@ namespace SampleApp // Custom agent that parrot's the user input back in upper case. internal sealed class UpperCaseParrotAgent : AIAgent { - public override async Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + public override async Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { // Create a thread if the user didn't supply one. thread ??= this.GetNewThread(); @@ -50,7 +50,7 @@ namespace SampleApp }; } - public override async IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) + public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { // Create a thread if the user didn't supply one. thread ??= this.GetNewThread(); @@ -76,7 +76,7 @@ namespace SampleApp } } - private static IEnumerable CloneAndToUpperCase(IReadOnlyCollection messages, string agentName) => messages.Select(x => + private static IEnumerable CloneAndToUpperCase(IEnumerable messages, string agentName) => messages.Select(x => { // Clone the message and update its author to be the agent. var messageClone = x.Clone(); diff --git a/dotnet/samples/GettingStarted/Agents/Agent_Step07_3rdPartyThreadStorage/Program.cs b/dotnet/samples/GettingStarted/Agents/Agent_Step07_3rdPartyThreadStorage/Program.cs index 11cbd7670f..8db527a2af 100644 --- a/dotnet/samples/GettingStarted/Agents/Agent_Step07_3rdPartyThreadStorage/Program.cs +++ b/dotnet/samples/GettingStarted/Agents/Agent_Step07_3rdPartyThreadStorage/Program.cs @@ -82,7 +82,7 @@ namespace SampleApp public string? ThreadId => this._threadId; - public async Task AddMessagesAsync(IReadOnlyCollection messages, CancellationToken cancellationToken) + public async Task AddMessagesAsync(IEnumerable messages, CancellationToken cancellationToken) { this._threadId ??= Guid.NewGuid().ToString(); diff --git a/dotnet/src/Microsoft.Agents.Orchestration/ConcurrentOrchestration.cs b/dotnet/src/Microsoft.Agents.Orchestration/ConcurrentOrchestration.cs index 3953b7fd0f..009c327e0e 100644 --- a/dotnet/src/Microsoft.Agents.Orchestration/ConcurrentOrchestration.cs +++ b/dotnet/src/Microsoft.Agents.Orchestration/ConcurrentOrchestration.cs @@ -57,11 +57,11 @@ public partial class ConcurrentOrchestration : OrchestratingAgent } /// - protected override Task RunCoreAsync(IReadOnlyCollection messages, OrchestratingAgentContext context, CancellationToken cancellationToken) => - this.ResumeAsync(messages, new AgentRunResponse?[this.Agents.Count], context, cancellationToken); + protected override Task RunCoreAsync(IEnumerable messages, OrchestratingAgentContext context, CancellationToken cancellationToken) => + this.ResumeAsync(messages as IReadOnlyCollection ?? messages.ToList(), new AgentRunResponse?[this.Agents.Count], context, cancellationToken); /// - protected override Task ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) + protected override Task ResumeCoreAsync(JsonElement checkpointState, IEnumerable newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) { var state = checkpointState.Deserialize(OrchestrationJsonContext.Default.ConcurrentState) ?? throw new InvalidOperationException("The checkpoint state is invalid."); diff --git a/dotnet/src/Microsoft.Agents.Orchestration/GroupChat/GroupChatOrchestration.cs b/dotnet/src/Microsoft.Agents.Orchestration/GroupChat/GroupChatOrchestration.cs index 0f0973e768..5afd420a27 100644 --- a/dotnet/src/Microsoft.Agents.Orchestration/GroupChat/GroupChatOrchestration.cs +++ b/dotnet/src/Microsoft.Agents.Orchestration/GroupChat/GroupChatOrchestration.cs @@ -43,7 +43,7 @@ public sealed partial class GroupChatOrchestration : OrchestratingAgent public Func>? InteractiveCallback { get; set; } /// - protected override Task RunCoreAsync(IReadOnlyCollection messages, OrchestratingAgentContext context, CancellationToken cancellationToken) + protected override Task RunCoreAsync(IEnumerable messages, OrchestratingAgentContext context, CancellationToken cancellationToken) { List allMessages = [.. messages]; int originalMessageCount = allMessages.Count; @@ -51,7 +51,7 @@ public sealed partial class GroupChatOrchestration : OrchestratingAgent } /// - protected override Task ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) + protected override Task ResumeCoreAsync(JsonElement checkpointState, IEnumerable newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) { var state = checkpointState.Deserialize(OrchestrationJsonContext.Default.GroupChatState) ?? throw new InvalidOperationException("The checkpoint state is invalid."); diff --git a/dotnet/src/Microsoft.Agents.Orchestration/Handoffs/HandoffOrchestration.cs b/dotnet/src/Microsoft.Agents.Orchestration/Handoffs/HandoffOrchestration.cs index 8ef64910e7..23d29f9f69 100644 --- a/dotnet/src/Microsoft.Agents.Orchestration/Handoffs/HandoffOrchestration.cs +++ b/dotnet/src/Microsoft.Agents.Orchestration/Handoffs/HandoffOrchestration.cs @@ -44,7 +44,7 @@ public sealed partial class HandoffOrchestration : OrchestratingAgent public Func>? InteractiveCallback { get; set; } /// - protected override Task RunCoreAsync(IReadOnlyCollection messages, OrchestratingAgentContext context, CancellationToken cancellationToken) + protected override Task RunCoreAsync(IEnumerable messages, OrchestratingAgentContext context, CancellationToken cancellationToken) { List allMessages = [.. messages]; int originalMessageCount = allMessages.Count; @@ -52,7 +52,7 @@ public sealed partial class HandoffOrchestration : OrchestratingAgent } /// - protected override Task ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) + protected override Task ResumeCoreAsync(JsonElement checkpointState, IEnumerable newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) { var state = checkpointState.Deserialize(OrchestrationJsonContext.Default.HandoffState) ?? throw new InvalidOperationException("The checkpoint state is invalid."); diff --git a/dotnet/src/Microsoft.Agents.Orchestration/OrchestratingAgent.cs b/dotnet/src/Microsoft.Agents.Orchestration/OrchestratingAgent.cs index 8da88c20bc..18c70cab3a 100644 --- a/dotnet/src/Microsoft.Agents.Orchestration/OrchestratingAgent.cs +++ b/dotnet/src/Microsoft.Agents.Orchestration/OrchestratingAgent.cs @@ -65,7 +65,7 @@ public abstract partial class OrchestratingAgent : AIAgent /// public sealed override async Task RunAsync( - IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { _ = Throw.IfNull(messages); @@ -87,7 +87,7 @@ public abstract partial class OrchestratingAgent : AIAgent /// public sealed override async IAsyncEnumerable RunStreamingAsync( - IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { // TODO: There should be a RunAsync overload that returns an OrchestratingAgentStreamingResponse, which this then delegates to. @@ -106,12 +106,12 @@ public abstract partial class OrchestratingAgent : AIAgent /// The runtime associated with the orchestration. /// The to monitor for cancellation requests. The default is . public async ValueTask RunAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentRunOptions? options = null, IActorRuntimeContext? runtime = null, CancellationToken cancellationToken = default) { - Throw.IfNull(messages, nameof(messages)); + var readonlyCollectionMessages = Throw.IfNull(messages) as IReadOnlyCollection ?? messages.ToList(); cancellationToken.ThrowIfCancellationRequested(); ILogger logger = this.LoggerFactory.CreateLogger(this.GetType().Name); @@ -131,8 +131,8 @@ public abstract partial class OrchestratingAgent : AIAgent JsonElement? checkpoint = await this.ReadCheckpointAsync(context, cancellationToken).ConfigureAwait(false); Task completion = checkpoint is null ? - this.RunCoreAsync(messages, context, cancellationToken) : - this.ResumeCoreAsync(checkpoint.Value, messages, context, cancellationToken); + this.RunCoreAsync(readonlyCollectionMessages, context, cancellationToken) : + this.ResumeCoreAsync(checkpoint.Value, readonlyCollectionMessages, context, cancellationToken); if (logger.IsEnabled(LogLevel.Trace)) { @@ -148,7 +148,7 @@ public abstract partial class OrchestratingAgent : AIAgent /// The input message. /// The context for this operation. /// A cancellation token that can be used to cancel the operation. - protected abstract Task RunCoreAsync(IReadOnlyCollection messages, OrchestratingAgentContext context, CancellationToken cancellationToken); + protected abstract Task RunCoreAsync(IEnumerable messages, OrchestratingAgentContext context, CancellationToken cancellationToken); /// /// Resumes processing of the orchestration. @@ -157,7 +157,7 @@ public abstract partial class OrchestratingAgent : AIAgent /// The new messages to be processed in addition to the checkpoint state. /// The context for this operation. /// A cancellation token that can be used to cancel the operation. - protected abstract Task ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken); + protected abstract Task ResumeCoreAsync(JsonElement checkpointState, IEnumerable newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken); /// /// Runs the agent with input messages and respond with both streamed and regular messages. @@ -168,7 +168,7 @@ public abstract partial class OrchestratingAgent : AIAgent /// Options to use when invoking the agent. /// A cancellation token that can be used to cancel the operation. /// A task that returns the response . - protected static async ValueTask RunAsync(AIAgent agent, OrchestratingAgentContext context, IReadOnlyCollection input, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + protected static async ValueTask RunAsync(AIAgent agent, OrchestratingAgentContext context, IEnumerable input, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { // Utilize streaming iff a streaming callback is provided; otherwise, use the non-streaming API. AgentRunResponse response; diff --git a/dotnet/src/Microsoft.Agents.Orchestration/SequentialOrchestration.cs b/dotnet/src/Microsoft.Agents.Orchestration/SequentialOrchestration.cs index 1c113cfbde..14a4bb1cdd 100644 --- a/dotnet/src/Microsoft.Agents.Orchestration/SequentialOrchestration.cs +++ b/dotnet/src/Microsoft.Agents.Orchestration/SequentialOrchestration.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -28,11 +29,11 @@ public sealed partial class SequentialOrchestration : OrchestratingAgent } /// - protected override Task RunCoreAsync(IReadOnlyCollection messages, OrchestratingAgentContext context, CancellationToken cancellationToken) => - this.ResumeAsync(0, messages, context, cancellationToken); + protected override Task RunCoreAsync(IEnumerable messages, OrchestratingAgentContext context, CancellationToken cancellationToken) => + this.ResumeAsync(0, messages as IReadOnlyCollection ?? messages.ToList(), context, cancellationToken); /// - protected override Task ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) + protected override Task ResumeCoreAsync(JsonElement checkpointState, IEnumerable newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) { var state = checkpointState.Deserialize(OrchestrationJsonContext.Default.SequentialState) ?? throw new InvalidOperationException("The checkpoint state is invalid."); diff --git a/dotnet/src/Microsoft.Agents.Workflows/WorkflowHostAgent.cs b/dotnet/src/Microsoft.Agents.Workflows/WorkflowHostAgent.cs index 3751cd2c9f..d0a48288d5 100644 --- a/dotnet/src/Microsoft.Agents.Workflows/WorkflowHostAgent.cs +++ b/dotnet/src/Microsoft.Agents.Workflows/WorkflowHostAgent.cs @@ -96,7 +96,7 @@ internal class WorkflowHostAgent : AIAgent } } - private async ValueTask UpdateThreadAsync(IReadOnlyCollection messages, AgentThread? thread = null, CancellationToken cancellation = default) + private async ValueTask UpdateThreadAsync(IEnumerable messages, AgentThread? thread = null, CancellationToken cancellation = default) { if (thread is null) { @@ -114,7 +114,7 @@ internal class WorkflowHostAgent : AIAgent public override async Task RunAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) @@ -134,7 +134,7 @@ internal class WorkflowHostAgent : AIAgent public override async IAsyncEnumerable RunStreamingAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) diff --git a/dotnet/src/Microsoft.Agents.Workflows/WorkflowMessageStore.cs b/dotnet/src/Microsoft.Agents.Workflows/WorkflowMessageStore.cs index 35de73ce0b..dd2aed2750 100644 --- a/dotnet/src/Microsoft.Agents.Workflows/WorkflowMessageStore.cs +++ b/dotnet/src/Microsoft.Agents.Workflows/WorkflowMessageStore.cs @@ -25,7 +25,7 @@ internal class WorkflowMessageStore : IChatMessageStore this._chatMessages.AddRange(messages); } - public Task AddMessagesAsync(IReadOnlyCollection messages, CancellationToken cancellationToken) + public Task AddMessagesAsync(IEnumerable messages, CancellationToken cancellationToken) { this._chatMessages.AddRange(messages); diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/A2AAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/A2AAgent.cs index 061b988137..802744bf9a 100644 --- a/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/A2AAgent.cs +++ b/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/A2AAgent.cs @@ -52,7 +52,7 @@ internal sealed class A2AAgent : AIAgent } /// - public override async Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + public override async Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { ValidateInputMessages(messages); @@ -98,7 +98,7 @@ internal sealed class A2AAgent : AIAgent } /// - public override async IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) + public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { ValidateInputMessages(messages); @@ -147,7 +147,7 @@ internal sealed class A2AAgent : AIAgent /// public override string? Description => this._description ?? base.Description; - private static void ValidateInputMessages(IReadOnlyCollection messages) + private static void ValidateInputMessages(IEnumerable messages) { _ = Throw.IfNull(messages); diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/Extensions/ChatMessageExtensions.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/Extensions/ChatMessageExtensions.cs index f030750dcd..f7a2b09013 100644 --- a/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/Extensions/ChatMessageExtensions.cs +++ b/dotnet/src/Microsoft.Extensions.AI.Agents.A2A/Extensions/ChatMessageExtensions.cs @@ -11,7 +11,7 @@ namespace Microsoft.Extensions.AI.Agents.A2A; /// internal static class ChatMessageExtensions { - internal static Message ToA2AMessage(this IReadOnlyCollection messages) + internal static Message ToA2AMessage(this IEnumerable messages) { List allParts = []; diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AIAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AIAgent.cs index 05bdf49039..395f075029 100644 --- a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AIAgent.cs +++ b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AIAgent.cs @@ -112,7 +112,7 @@ public abstract class AIAgent AgentRunOptions? options = null, CancellationToken cancellationToken = default) { - return this.RunAsync((IReadOnlyCollection)[], thread, options, cancellationToken); + return this.RunAsync((IEnumerable)[], thread, options, cancellationToken); } /// @@ -174,7 +174,7 @@ public abstract class AIAgent /// The to monitor for cancellation requests. The default is . /// A containing the list of items. public abstract Task RunAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default); @@ -194,7 +194,7 @@ public abstract class AIAgent AgentRunOptions? options = null, CancellationToken cancellationToken = default) { - return this.RunStreamingAsync((IReadOnlyCollection)[], thread, options, cancellationToken); + return this.RunStreamingAsync((IEnumerable)[], thread, options, cancellationToken); } /// @@ -256,7 +256,7 @@ public abstract class AIAgent /// The to monitor for cancellation requests. The default is . /// An async list of response items that each contain a . public abstract IAsyncEnumerable RunStreamingAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default); @@ -285,14 +285,11 @@ public abstract class AIAgent /// The messages to pass to the thread. /// The to monitor for cancellation requests. The default is . /// An async task that completes once the notification is complete. - protected static async Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IReadOnlyCollection messages, CancellationToken cancellationToken) + protected static async Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IEnumerable messages, CancellationToken cancellationToken) { _ = Throw.IfNull(thread); _ = Throw.IfNull(messages); - if (messages.Count > 0) - { - await thread.OnNewMessagesAsync(messages, cancellationToken).ConfigureAwait(false); - } + await thread.OnNewMessagesAsync(messages, cancellationToken).ConfigureAwait(false); } } diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AgentThread.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AgentThread.cs index afc92bc60d..74a5c8108e 100644 --- a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AgentThread.cs +++ b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/AgentThread.cs @@ -139,7 +139,7 @@ public class AgentThread /// The to monitor for cancellation requests. The default is . /// A task that completes when the context has been updated. /// The thread has been deleted. - protected internal virtual async Task OnNewMessagesAsync(IReadOnlyCollection newMessages, CancellationToken cancellationToken = default) + protected internal virtual async Task OnNewMessagesAsync(IEnumerable newMessages, CancellationToken cancellationToken = default) { switch (this) { diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/DelegatingAIAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/DelegatingAIAgent.cs index e2e0fd7f2d..81cc93628c 100644 --- a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/DelegatingAIAgent.cs +++ b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/DelegatingAIAgent.cs @@ -54,7 +54,7 @@ public class DelegatingAIAgent : AIAgent /// public override Task RunAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) @@ -62,7 +62,7 @@ public class DelegatingAIAgent : AIAgent /// public override IAsyncEnumerable RunStreamingAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/IChatMessageStore.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/IChatMessageStore.cs index ad6303f36e..e9c3bf374a 100644 --- a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/IChatMessageStore.cs +++ b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/IChatMessageStore.cs @@ -42,7 +42,7 @@ public interface IChatMessageStore /// The messages to add. /// The to monitor for cancellation requests. The default is . /// An async task. - Task AddMessagesAsync(IReadOnlyCollection messages, CancellationToken cancellationToken = default); + Task AddMessagesAsync(IEnumerable messages, CancellationToken cancellationToken = default); /// /// Deserializes the state contained in the provided into the properties on this store. diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/InMemoryChatMessageStore.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/InMemoryChatMessageStore.cs index 61e56c5f18..6f59fd696c 100644 --- a/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/InMemoryChatMessageStore.cs +++ b/dotnet/src/Microsoft.Extensions.AI.Agents.Abstractions/InMemoryChatMessageStore.cs @@ -61,15 +61,10 @@ public sealed class InMemoryChatMessageStore : IList, IChatMessageS } /// - public async Task AddMessagesAsync(IReadOnlyCollection messages, CancellationToken cancellationToken) + public async Task AddMessagesAsync(IEnumerable messages, CancellationToken cancellationToken) { _ = Throw.IfNull(messages); - if (messages.Count == 0) - { - return; - } - this._messages.AddRange(messages); if (this._reducerTriggerEvent == ChatReducerTriggerEvent.AfterMessageAdded && this._chatReducer is not null) @@ -172,7 +167,7 @@ public sealed class InMemoryChatMessageStore : IList, IChatMessageS { /// /// Trigger the reducer when a new message is added. - /// will only complete when reducer processing is done. + /// will only complete when reducer processing is done. /// AfterMessageAdded, diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.CopilotStudio/CopilotStudioAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.CopilotStudio/CopilotStudioAgent.cs index 30aa346d3c..dbd15c2b6e 100644 --- a/dotnet/src/Microsoft.Extensions.AI.Agents.CopilotStudio/CopilotStudioAgent.cs +++ b/dotnet/src/Microsoft.Extensions.AI.Agents.CopilotStudio/CopilotStudioAgent.cs @@ -41,7 +41,7 @@ public class CopilotStudioAgent : AIAgent /// public override async Task RunAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) @@ -74,7 +74,7 @@ public class CopilotStudioAgent : AIAgent /// public override async IAsyncEnumerable RunStreamingAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.Hosting/AgentProxy.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.Hosting/AgentProxy.cs index 731f54da72..5a0b41bb8a 100644 --- a/dotnet/src/Microsoft.Extensions.AI.Agents.Hosting/AgentProxy.cs +++ b/dotnet/src/Microsoft.Extensions.AI.Agents.Hosting/AgentProxy.cs @@ -45,7 +45,7 @@ public sealed class AgentProxy : AIAgent /// public override async Task RunAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) @@ -57,7 +57,7 @@ public sealed class AgentProxy : AIAgent /// public override async IAsyncEnumerable RunStreamingAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -70,7 +70,7 @@ public sealed class AgentProxy : AIAgent } } - private async Task RunAsync(IReadOnlyCollection messages, string threadId, CancellationToken cancellationToken) + private async Task RunAsync(IEnumerable messages, string threadId, CancellationToken cancellationToken) { var handle = await this.RunCoreAsync(messages, threadId, cancellationToken).ConfigureAwait(false); var response = await handle.GetResponseAsync(cancellationToken).ConfigureAwait(false); @@ -85,7 +85,7 @@ public sealed class AgentProxy : AIAgent } private async IAsyncEnumerable RunStreamingAsync( - IReadOnlyCollection messages, + IEnumerable messages, string threadId, [EnumeratorCancellation] CancellationToken cancellationToken) { @@ -123,7 +123,7 @@ public sealed class AgentProxy : AIAgent return agentProxyThread.ConversationId!; } - private async Task RunCoreAsync(IReadOnlyCollection messages, string threadId, CancellationToken cancellationToken) + private async Task RunCoreAsync(IEnumerable messages, string threadId, CancellationToken cancellationToken) { List newMessages = [.. messages]; diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents.OpenAI/OpenAIChatClientAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents.OpenAI/OpenAIChatClientAgent.cs index 2c9dbdcee3..e583d5d4e5 100644 --- a/dotnet/src/Microsoft.Extensions.AI.Agents.OpenAI/OpenAIChatClientAgent.cs +++ b/dotnet/src/Microsoft.Extensions.AI.Agents.OpenAI/OpenAIChatClientAgent.cs @@ -85,7 +85,7 @@ public class OpenAIChatClientAgent : AIAgent /// public sealed override Task RunAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) @@ -93,7 +93,7 @@ public class OpenAIChatClientAgent : AIAgent /// public sealed override IAsyncEnumerable RunStreamingAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents/ChatCompletion/ChatClientAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents/ChatCompletion/ChatClientAgent.cs index f100fee198..3deafcdc6f 100644 --- a/dotnet/src/Microsoft.Extensions.AI.Agents/ChatCompletion/ChatClientAgent.cs +++ b/dotnet/src/Microsoft.Extensions.AI.Agents/ChatCompletion/ChatClientAgent.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -99,15 +100,15 @@ public sealed class ChatClientAgent : AIAgent /// public override async Task RunAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { - _ = Throw.IfNull(messages); + var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection ?? messages.ToList(); (AgentThread safeThread, ChatOptions? chatOptions, List threadMessages) = - await this.PrepareThreadAndMessagesAsync(thread, messages, options, cancellationToken).ConfigureAwait(false); + await this.PrepareThreadAndMessagesAsync(thread, inputMessages, options, cancellationToken).ConfigureAwait(false); var agentName = this.GetLoggingAgentName(); @@ -115,14 +116,14 @@ public sealed class ChatClientAgent : AIAgent ChatResponse chatResponse = await this.ChatClient.GetResponseAsync(threadMessages, chatOptions, cancellationToken).ConfigureAwait(false); - this._logger.LogAgentChatClientInvokedAgent(nameof(RunAsync), this.Id, agentName, this._chatClientType, messages.Count); + this._logger.LogAgentChatClientInvokedAgent(nameof(RunAsync), this.Id, agentName, this._chatClientType, inputMessages.Count); // We can derive the type of supported thread from whether we have a conversation id, // so let's update it and set the conversation id for the service thread case. this.UpdateThreadWithTypeAndConversationId(safeThread, chatResponse.ConversationId); // Only notify the thread of new messages if the chatResponse was successful to avoid inconsistent messages state in the thread. - await NotifyThreadOfNewMessagesAsync(safeThread, messages, cancellationToken).ConfigureAwait(false); + await NotifyThreadOfNewMessagesAsync(safeThread, inputMessages, cancellationToken).ConfigureAwait(false); // Ensure that the author name is set for each message in the response. foreach (ChatMessage chatResponseMessage in chatResponse.Messages) @@ -140,12 +141,12 @@ public sealed class ChatClientAgent : AIAgent /// public override async IAsyncEnumerable RunStreamingAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - var inputMessages = Throw.IfNull(messages); + var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection ?? messages.ToList(); (AgentThread safeThread, ChatOptions? chatOptions, List threadMessages) = await this.PrepareThreadAndMessagesAsync(thread, inputMessages, options, cancellationToken).ConfigureAwait(false); @@ -334,7 +335,7 @@ public sealed class ChatClientAgent : AIAgent /// A tuple containing the thread, chat options, and thread messages. private async Task<(AgentThread AgentThread, ChatOptions? ChatOptions, List ThreadMessages)> PrepareThreadAndMessagesAsync( AgentThread? thread, - IReadOnlyCollection inputMessages, + IEnumerable inputMessages, AgentRunOptions? runOptions, CancellationToken cancellationToken) { diff --git a/dotnet/src/Microsoft.Extensions.AI.Agents/OpenTelemetryAgent.cs b/dotnet/src/Microsoft.Extensions.AI.Agents/OpenTelemetryAgent.cs index 7f8b46c82b..90f547a1c2 100644 --- a/dotnet/src/Microsoft.Extensions.AI.Agents/OpenTelemetryAgent.cs +++ b/dotnet/src/Microsoft.Extensions.AI.Agents/OpenTelemetryAgent.cs @@ -123,23 +123,23 @@ public sealed partial class OpenTelemetryAgent : DelegatingAIAgent, IDisposable /// public override async Task RunAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { - _ = Throw.IfNull(messages); + var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection ?? messages.ToList(); - using Activity? activity = this.CreateAndConfigureActivity(OpenTelemetryConsts.GenAI.Operation.NameValues.InvokeAgent, messages, thread); + using Activity? activity = this.CreateAndConfigureActivity(OpenTelemetryConsts.GenAI.Operation.NameValues.InvokeAgent, inputMessages, thread); Stopwatch? stopwatch = this._operationDurationHistogram.Enabled ? Stopwatch.StartNew() : null; - this.LogChatMessages(messages); + this.LogChatMessages(inputMessages); AgentRunResponse? response = null; Exception? error = null; try { - response = await base.RunAsync(messages, thread, options, cancellationToken).ConfigureAwait(false); + response = await base.RunAsync(inputMessages, thread, options, cancellationToken).ConfigureAwait(false); return response; } catch (Exception ex) @@ -149,30 +149,30 @@ public sealed partial class OpenTelemetryAgent : DelegatingAIAgent, IDisposable } finally { - this.TraceResponse(activity, response, error, stopwatch, messages.Count, isStreaming: false); + this.TraceResponse(activity, response, error, stopwatch, inputMessages.Count, isStreaming: false); } } /// public override async IAsyncEnumerable RunStreamingAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - _ = Throw.IfNull(messages); + var inputMessages = Throw.IfNull(messages) as IReadOnlyCollection ?? messages.ToList(); - using Activity? activity = this.CreateAndConfigureActivity(OpenTelemetryConsts.GenAI.Operation.NameValues.InvokeAgent, messages, thread); + using Activity? activity = this.CreateAndConfigureActivity(OpenTelemetryConsts.GenAI.Operation.NameValues.InvokeAgent, inputMessages, thread); Stopwatch? stopwatch = this._operationDurationHistogram.Enabled ? Stopwatch.StartNew() : null; IAsyncEnumerable updates; try { - updates = base.RunStreamingAsync(messages, thread, options, cancellationToken); + updates = base.RunStreamingAsync(inputMessages, thread, options, cancellationToken); } catch (Exception ex) { - this.TraceResponse(activity, response: null, ex, stopwatch, messages.Count, isStreaming: true); + this.TraceResponse(activity, response: null, ex, stopwatch, inputMessages.Count, isStreaming: true); throw; } @@ -206,7 +206,7 @@ public sealed partial class OpenTelemetryAgent : DelegatingAIAgent, IDisposable } finally { - this.TraceResponse(activity, trackedUpdates.ToAgentRunResponse(), error, stopwatch, messages.Count, isStreaming: true); + this.TraceResponse(activity, trackedUpdates.ToAgentRunResponse(), error, stopwatch, inputMessages.Count, isStreaming: true); await responseEnumerator.DisposeAsync().ConfigureAwait(false); } } diff --git a/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/MockAgent.cs b/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/MockAgent.cs index bf0a428aa3..09ddd5c0cb 100644 --- a/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/MockAgent.cs +++ b/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/MockAgent.cs @@ -36,7 +36,7 @@ internal sealed class MockAgent(int index) : AIAgent return new AgentThread() { ConversationId = Guid.NewGuid().ToString() }; } - public override Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + public override Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { this.InvokeCount++; if (thread == null) @@ -48,7 +48,7 @@ internal sealed class MockAgent(int index) : AIAgent return Task.FromResult(new AgentRunResponse(messages: [.. this.Response])); } - public override IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + public override IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { this.InvokeCount++; diff --git a/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/OrchestrationResultTests.cs b/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/OrchestrationResultTests.cs index 9d9f238cca..70c1601425 100644 --- a/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/OrchestrationResultTests.cs +++ b/dotnet/tests/Microsoft.Agents.Orchestration.UnitTests/OrchestrationResultTests.cs @@ -80,18 +80,18 @@ public class OrchestrationResultTests private sealed class MockOrchestratingAgent() : OrchestratingAgent([new MockAgent()]) { - protected override Task RunCoreAsync(IReadOnlyCollection messages, OrchestratingAgentContext context, CancellationToken cancellationToken) => + protected override Task RunCoreAsync(IEnumerable messages, OrchestratingAgentContext context, CancellationToken cancellationToken) => throw new NotSupportedException(); - protected override Task ResumeCoreAsync(JsonElement checkpointState, IReadOnlyCollection newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) => + protected override Task ResumeCoreAsync(JsonElement checkpointState, IEnumerable newMessages, OrchestratingAgentContext context, CancellationToken cancellationToken) => throw new NotSupportedException(); } private sealed class MockAgent : AIAgent { - public override Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) => + public override Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) => throw new NotSupportedException(); - public override IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) => + public override IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) => throw new NotSupportedException(); } } diff --git a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/RepresentationTests.cs b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/RepresentationTests.cs index 294b58a4de..ef0978db75 100644 --- a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/RepresentationTests.cs +++ b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/RepresentationTests.cs @@ -22,12 +22,12 @@ public class RepresentationTests private sealed class TestAgent : AIAgent { - public override Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + public override Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { throw new NotImplementedException(); } - public override IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + public override IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { throw new NotImplementedException(); } diff --git a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/Sample/06_GroupChat_Workflow.cs b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/Sample/06_GroupChat_Workflow.cs index 47e5128cd9..18a855c803 100644 --- a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/Sample/06_GroupChat_Workflow.cs +++ b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/Sample/06_GroupChat_Workflow.cs @@ -95,7 +95,7 @@ internal sealed class HelloAgent(string id = nameof(HelloAgent)) : AIAgent public override string Id => id; public override string? Name => id; - public override async Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + public override async Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { IEnumerable update = [ await this.RunStreamingAsync(messages, thread, options, cancellationToken) @@ -105,7 +105,7 @@ internal sealed class HelloAgent(string id = nameof(HelloAgent)) : AIAgent return update.ToAgentRunResponse(); } - public override async IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) + public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { AgentRunResponseUpdate response = new(ChatRole.Assistant, "Hello World!") { @@ -126,7 +126,7 @@ internal sealed class EchoAgent(string id = nameof(EchoAgent)) : AIAgent public override string Id => id; public override string? Name => id; - public override async Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + public override async Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { IEnumerable update = [ await this.RunStreamingAsync(messages, thread, options, cancellationToken) @@ -136,15 +136,17 @@ internal sealed class EchoAgent(string id = nameof(EchoAgent)) : AIAgent return update.ToAgentRunResponse(); } - public override async IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) + public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - if (messages.Count == 0) + var messagesList = messages as IReadOnlyCollection ?? messages.ToList(); + + if (messagesList.Count == 0) { throw new ArgumentException("No messages provided to echo.", nameof(messages)); } StringBuilder collectedText = new(Prefix); - foreach (string messageText in messages.Select(message => message.Text) + foreach (string messageText in messagesList.Select(message => message.Text) .Where(text => !string.IsNullOrEmpty(text))) { collectedText.AppendLine(messageText); diff --git a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs index 959ec2c39c..5e67966711 100644 --- a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs @@ -56,7 +56,7 @@ public class SpecializedExecutorSmokeTests public List Messages { get; } = Validate(messages) ?? []; - public override Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + public override Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { return Task.FromResult(new AgentRunResponse(this.Messages) { @@ -65,7 +65,7 @@ public class SpecializedExecutorSmokeTests }); } - public override async IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) + public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { string responseId = Guid.NewGuid().ToString("N"); foreach (ChatMessage message in this.Messages) diff --git a/dotnet/tests/Microsoft.Extensions.AI.Agents.Abstractions.UnitTests/AIAgentTests.cs b/dotnet/tests/Microsoft.Extensions.AI.Agents.Abstractions.UnitTests/AIAgentTests.cs index 411e0bfb7e..04cd8584c8 100644 --- a/dotnet/tests/Microsoft.Extensions.AI.Agents.Abstractions.UnitTests/AIAgentTests.cs +++ b/dotnet/tests/Microsoft.Extensions.AI.Agents.Abstractions.UnitTests/AIAgentTests.cs @@ -361,13 +361,13 @@ public class AIAgentTests private sealed class MockAgent : AIAgent { - public static new Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IReadOnlyCollection messages, CancellationToken cancellationToken) + public static new Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IEnumerable messages, CancellationToken cancellationToken) { return AIAgent.NotifyThreadOfNewMessagesAsync(thread, messages, cancellationToken); } public override Task RunAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) @@ -376,7 +376,7 @@ public class AIAgentTests } public override IAsyncEnumerable RunStreamingAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) diff --git a/dotnet/tests/Microsoft.Extensions.AI.Agents.Hosting.UnitTests/AgentActorTests.cs b/dotnet/tests/Microsoft.Extensions.AI.Agents.Hosting.UnitTests/AgentActorTests.cs index 1584960e9b..d6eeb63b7a 100644 --- a/dotnet/tests/Microsoft.Extensions.AI.Agents.Hosting.UnitTests/AgentActorTests.cs +++ b/dotnet/tests/Microsoft.Extensions.AI.Agents.Hosting.UnitTests/AgentActorTests.cs @@ -175,7 +175,7 @@ public class AgentActorTests public bool RunStreamingAsyncCalled { get; private set; } public AgentThread? ThreadUsedInRunStreamingAsync { get; private set; } - public override Task RunAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) + public override Task RunAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) { this.ThreadUsedInRunStreamingAsync = thread; return Task.FromResult(new AgentRunResponse @@ -184,7 +184,7 @@ public class AgentActorTests }); } - public override async IAsyncEnumerable RunStreamingAsync(IReadOnlyCollection messages, AgentThread? thread = null, AgentRunOptions? options = null, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) + public override async IAsyncEnumerable RunStreamingAsync(IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) { this.RunStreamingAsyncCalled = true; this.ThreadUsedInRunStreamingAsync = thread; diff --git a/dotnet/tests/Microsoft.Extensions.AI.Agents.UnitTests/AgentAIFunctionFactoryTests.cs b/dotnet/tests/Microsoft.Extensions.AI.Agents.UnitTests/AgentAIFunctionFactoryTests.cs index d1c10c5f40..014c9b6759 100644 --- a/dotnet/tests/Microsoft.Extensions.AI.Agents.UnitTests/AgentAIFunctionFactoryTests.cs +++ b/dotnet/tests/Microsoft.Extensions.AI.Agents.UnitTests/AgentAIFunctionFactoryTests.cs @@ -306,7 +306,7 @@ public class AgentAIFunctionFactoryTests public int RunAsyncCallCount { get; private set; } public override Task RunAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default) @@ -324,7 +324,7 @@ public class AgentAIFunctionFactoryTests } public override async IAsyncEnumerable RunStreamingAsync( - IReadOnlyCollection messages, + IEnumerable messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)