Python: Fix workflow not pausing when agent calls declaration-only tool (#3757)

* Fix workflow not pausing when agent calls declaration-only tool

* Remove comment
This commit is contained in:
Evan Mattson
2026-02-10 08:51:44 +09:00
committed by GitHub
Unverified
parent e3b4b6662b
commit 6eb251464b
5 changed files with 322 additions and 7 deletions
+16 -5
View File
@@ -1703,7 +1703,14 @@ async def _try_execute_function_calls(
)
if declaration_only_flag:
# return the declaration only tools to the user, since we cannot execute them.
return ([fcc for fcc in function_calls if fcc.type == "function_call"], False)
# Mark as user_input_request so AgentExecutor emits request_info events and pauses the workflow.
declaration_only_calls = []
for fcc in function_calls:
if fcc.type == "function_call":
fcc.user_input_request = True
fcc.id = fcc.call_id
declaration_only_calls.append(fcc)
return (declaration_only_calls, False)
# Run all function calls concurrently, handling MiddlewareTermination
from ._middleware import MiddlewareTermination
@@ -1944,10 +1951,14 @@ def _handle_function_call_results(
from ._types import ChatMessage
if any(fccr.type in {"function_approval_request", "function_call"} for fccr in function_call_results):
if response.messages and response.messages[0].role == "assistant":
response.messages[0].contents.extend(function_call_results)
else:
response.messages.append(ChatMessage(role="assistant", contents=function_call_results))
# Only add items that aren't already in the message (e.g. function_approval_request wrappers).
# Declaration-only function_call items are already present from the LLM response.
new_items = [fccr for fccr in function_call_results if fccr.type != "function_call"]
if new_items:
if response.messages and response.messages[0].role == "assistant":
response.messages[0].contents.extend(new_items)
else:
response.messages.append(ChatMessage(role="assistant", contents=new_items))
return {
"action": "return",
"errors_in_a_row": errors_in_a_row,
@@ -194,8 +194,11 @@ class AgentExecutor(Executor):
self._pending_agent_requests.pop(original_request.id, None) # type: ignore[arg-type]
if not self._pending_agent_requests:
# All pending requests have been resolved; resume agent execution
self._cache = normalize_messages_input(ChatMessage(role="user", contents=self._pending_responses_to_agent))
# All pending requests have been resolved; resume agent execution.
# Use role="tool" for function_result responses (from declaration-only tools)
# so the LLM receives proper tool results instead of orphaned tool_calls.
role = "tool" if all(r.type == "function_result" for r in self._pending_responses_to_agent) else "user"
self._cache = normalize_messages_input(ChatMessage(role=role, contents=self._pending_responses_to_agent))
self._pending_responses_to_agent.clear()
await self._run_agent_and_emit(ctx)
@@ -19,6 +19,7 @@ from agent_framework import (
ChatResponse,
ChatResponseUpdate,
Content,
FunctionTool,
ResponseStream,
WorkflowBuilder,
WorkflowContext,
@@ -384,3 +385,207 @@ async def test_agent_executor_parallel_tool_call_with_approval_streaming() -> No
# Assert
assert output is not None
assert output == "Tool executed successfully."
# --- Declaration-only tool tests ---
declaration_only_tool = FunctionTool(
name="client_side_tool",
func=None,
description="A client-side tool that the framework cannot execute.",
input_model={"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]},
)
class DeclarationOnlyMockChatClient(FunctionInvocationLayer[Any], BaseChatClient[Any]):
"""Mock chat client that calls a declaration-only tool on first iteration."""
def __init__(self, parallel_request: bool = False) -> None:
FunctionInvocationLayer.__init__(self)
BaseChatClient.__init__(self)
self._iteration: int = 0
self._parallel_request: bool = parallel_request
def _inner_get_response(
self,
*,
messages: Sequence[ChatMessage],
stream: bool,
options: Mapping[str, Any],
**kwargs: Any,
) -> Awaitable[ChatResponse] | ResponseStream[ChatResponseUpdate, ChatResponse]:
if stream:
return self._build_response_stream(self._stream_response())
async def _get_response() -> ChatResponse:
return self._create_response()
return _get_response()
def _create_response(self) -> ChatResponse:
if self._iteration == 0:
if self._parallel_request:
response = ChatResponse(
messages=ChatMessage(
"assistant",
[
Content.from_function_call(
call_id="1", name="client_side_tool", arguments='{"query": "test"}'
),
Content.from_function_call(
call_id="2", name="client_side_tool", arguments='{"query": "test2"}'
),
],
)
)
else:
response = ChatResponse(
messages=ChatMessage(
"assistant",
[
Content.from_function_call(
call_id="1", name="client_side_tool", arguments='{"query": "test"}'
)
],
)
)
else:
response = ChatResponse(messages=ChatMessage("assistant", ["Tool executed successfully."]))
self._iteration += 1
return response
async def _stream_response(self) -> AsyncIterable[ChatResponseUpdate]:
if self._iteration == 0:
if self._parallel_request:
yield ChatResponseUpdate(
contents=[
Content.from_function_call(call_id="1", name="client_side_tool", arguments='{"query": "test"}'),
Content.from_function_call(
call_id="2", name="client_side_tool", arguments='{"query": "test2"}'
),
],
role="assistant",
)
else:
yield ChatResponseUpdate(
contents=[
Content.from_function_call(call_id="1", name="client_side_tool", arguments='{"query": "test"}')
],
role="assistant",
)
else:
yield ChatResponseUpdate(contents=[Content.from_text(text="Tool executed ")], role="assistant")
yield ChatResponseUpdate(contents=[Content.from_text(text="successfully.")], role="assistant")
self._iteration += 1
async def test_agent_executor_declaration_only_tool_emits_request_info() -> None:
"""Test that AgentExecutor emits request_info when agent calls a declaration-only tool."""
agent = ChatAgent(
chat_client=DeclarationOnlyMockChatClient(),
name="DeclarationOnlyAgent",
tools=[declaration_only_tool],
)
workflow = (
WorkflowBuilder(start_executor=agent, output_executors=[test_executor]).add_edge(agent, test_executor).build()
)
# Act
events = await workflow.run("Use the client side tool")
# Assert - workflow should pause with a request_info event
request_info_events = events.get_request_info_events()
assert len(request_info_events) == 1
request = request_info_events[0]
assert request.data.type == "function_call"
assert request.data.name == "client_side_tool"
assert request.data.call_id == "1"
# Act - provide the function result to resume the workflow
events = await workflow.run(
responses={
request.request_id: Content.from_function_result(call_id=request.data.call_id, result="client result")
}
)
# Assert - workflow should complete
final_response = events.get_outputs()
assert len(final_response) == 1
assert final_response[0] == "Tool executed successfully."
async def test_agent_executor_declaration_only_tool_emits_request_info_streaming() -> None:
"""Test that AgentExecutor emits request_info for declaration-only tools in streaming mode."""
agent = ChatAgent(
chat_client=DeclarationOnlyMockChatClient(),
name="DeclarationOnlyAgent",
tools=[declaration_only_tool],
)
workflow = WorkflowBuilder(start_executor=agent).add_edge(agent, test_executor).build()
# Act
request_info_events: list[WorkflowEvent] = []
async for event in workflow.run("Use the client side tool", stream=True):
if event.type == "request_info":
request_info_events.append(event)
# Assert
assert len(request_info_events) == 1
request = request_info_events[0]
assert request.data.type == "function_call"
assert request.data.name == "client_side_tool"
assert request.data.call_id == "1"
# Act - provide the function result
output: str | None = None
async for event in workflow.run(
stream=True,
responses={
request.request_id: Content.from_function_result(call_id=request.data.call_id, result="client result")
},
):
if event.type == "output":
output = event.data
# Assert
assert output is not None
assert output == "Tool executed successfully."
async def test_agent_executor_parallel_declaration_only_tool_emits_request_info() -> None:
"""Test that AgentExecutor emits request_info for parallel declaration-only tool calls."""
agent = ChatAgent(
chat_client=DeclarationOnlyMockChatClient(parallel_request=True),
name="DeclarationOnlyAgent",
tools=[declaration_only_tool],
)
workflow = (
WorkflowBuilder(start_executor=agent, output_executors=[test_executor]).add_edge(agent, test_executor).build()
)
# Act
events = await workflow.run("Use the client side tool")
# Assert - should get 2 request_info events
request_info_events = events.get_request_info_events()
assert len(request_info_events) == 2
for req in request_info_events:
assert req.data.type == "function_call"
assert req.data.name == "client_side_tool"
# Act - provide both function results
responses = {
req.request_id: Content.from_function_result(call_id=req.data.call_id, result=f"result for {req.data.call_id}")
for req in request_info_events
}
events = await workflow.run(responses=responses)
# Assert - workflow should complete
final_response = events.get_outputs()
assert len(final_response) == 1
assert final_response[0] == "Tool executed successfully."
@@ -84,6 +84,7 @@ Once comfortable with these, explore the rest of the samples below.
| ------------------------------------------ | ------------------------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------- |
| Human-In-The-Loop (Guessing Game) | [human-in-the-loop/guessing_game_with_human_input.py](./human-in-the-loop/guessing_game_with_human_input.py) | Interactive request/response prompts with a human via `ctx.request_info()` |
| Agents with Approval Requests in Workflows | [human-in-the-loop/agents_with_approval_requests.py](./human-in-the-loop/agents_with_approval_requests.py) | Agents that create approval requests during workflow execution and wait for human approval to proceed |
| Agents with Declaration-Only Tools | [human-in-the-loop/agents_with_declaration_only_tools.py](./human-in-the-loop/agents_with_declaration_only_tools.py) | Workflow pauses when agent calls a client-side tool (`func=None`), caller supplies the result |
| SequentialBuilder Request Info | [human-in-the-loop/sequential_request_info.py](./human-in-the-loop/sequential_request_info.py) | Request info for agent responses mid-workflow using `.with_request_info()` on SequentialBuilder |
| ConcurrentBuilder Request Info | [human-in-the-loop/concurrent_request_info.py](./human-in-the-loop/concurrent_request_info.py) | Review concurrent agent outputs before aggregation using `.with_request_info()` on ConcurrentBuilder |
| GroupChatBuilder Request Info | [human-in-the-loop/group_chat_request_info.py](./human-in-the-loop/group_chat_request_info.py) | Steer group discussions with periodic guidance using `.with_request_info()` on GroupChatBuilder |
@@ -0,0 +1,95 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Sample: Declaration-only tools in a workflow (issue #3425)
A declaration-only tool (func=None) represents a client-side tool that the
framework cannot execute — the LLM can call it, but the workflow must pause
so the caller can supply the result.
Flow:
1. The agent is given a declaration-only tool ("get_user_location").
2. When the LLM decides to call it, the workflow pauses and emits a
request_info event containing the FunctionCallContent.
3. The caller inspects the tool name/args, runs the tool however it wants,
and feeds the result back via workflow.run(responses={...}).
4. The workflow resumes — the agent sees the tool result and finishes.
Prerequisites:
- Azure OpenAI endpoint configured via environment variables.
- `az login` for AzureCliCredential.
"""
import asyncio
import json
from typing import Any
from agent_framework import Content, FunctionTool, WorkflowBuilder
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
# A declaration-only tool: the schema is sent to the LLM, but the framework
# has no implementation to execute. The caller must supply the result.
get_user_location = FunctionTool(
name="get_user_location",
func=None,
description="Get the user's current city. Only the client application can resolve this.",
input_model={
"type": "object",
"properties": {
"reason": {"type": "string", "description": "Why the location is needed"},
},
"required": ["reason"],
},
)
async def main() -> None:
agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
name="WeatherBot",
instructions=(
"You are a helpful weather assistant. "
"When the user asks about weather, call get_user_location first, "
"then make up a plausible forecast for that city."
),
tools=[get_user_location],
)
workflow = WorkflowBuilder(start_executor=agent).build()
# --- First run: the agent should call the declaration-only tool ---
print(">>> Sending: 'What's the weather like today?'")
result = await workflow.run("What's the weather like today?")
requests = result.get_request_info_events()
if not requests:
# The LLM chose not to call the tool — print whatever it said and exit
print(f"Agent replied without calling the tool: {result.get_outputs()}")
return
# --- Inspect what the agent wants ---
for req in requests:
data = req.data
args = json.loads(data.arguments) if isinstance(data.arguments, str) else data.arguments
print(f"Workflow paused — agent called: {data.name}({args})")
# --- "Execute" the tool on the client side and send results back ---
responses: dict[str, Any] = {}
for req in requests:
# In a real app this could be a GPS lookup, browser API, user prompt, etc.
client_result = "Seattle, WA"
print(f"Client provides result for {req.data.name}: {client_result!r}")
responses[req.request_id] = Content.from_function_result(
call_id=req.data.call_id,
result=client_result,
)
result = await workflow.run(responses=responses)
# --- Final answer ---
for output in result.get_outputs():
print(f"\nAgent: {output.text}")
if __name__ == "__main__":
asyncio.run(main())