Files
agent-framework/python/packages/core
T
Ben Thomas 8b71f9459a Python: Feature/hosted dwf (#5531)
* Fix declarative Workflow.as_agent() by accepting list[Message] in start executor

The declarative start executor (JoinExecutor) only advertised dict and str
in its input_types, so WorkflowAgent.__init__ rejected it with
'Workflow's start executor cannot handle list[Message]'.

Add list[Message] to the JoinExecutor handler annotation and add a
matching branch in DeclarativeActionExecutor._ensure_state_initialized
that extracts the last user-message text and falls through to the
string-input initialization path, so =System.LastMessageText works
end-to-end via as_agent().

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Populate Conversation.messages from list[Message] trigger

When Workflow.as_agent() is invoked with a list[Message], the start executor now populates Conversation.messages / Conversation.history / System.conversations.{id}.messages with prior turns only (excluding the latest user message), and surfaces the latest user message via Inputs.input and System.LastMessage*. This matches InvokeAzureAgent's contract that the messages binding holds prior turns and the executor itself appends the new user input before invoking, avoiding double-append of the trailing user turn while preserving full history (incl. assistant/system/tool roles and multi-modal content) for downstream actions.

* Coerce Enum values when serializing PowerFx symbols

MessageRole and other str-subclass Enums passed isinstance(v, str) and were forwarded to pythonnet unchanged. pythonnet then raised 'MessageRole value cannot be converted to System.String' for every PowerFx primitive when ConditionGroup/Expr eval walked the symbol table containing Conversation.messages. Reduce Enum members to their underlying value before the primitive check so eval sees plain strings/ints.

* Foundry hosting: pass full conversation history to workflow agents

_handle_inner_workflow only forwarded the latest user turn to WorkflowAgent.run, even though _handle_inner_agent already prepends history fetched from Foundry storage to the messages it sends a regular agent. Declarative workflows reset Conversation.messages on every run (state.initialize), so checkpoint replay alone does not give them prior turns - the host has to pass them in, the same way it does for non-workflow agents. Mirror that contract: fetch context.get_history() and pass [*history, *input_messages] to the workflow agent.

* feat(workflows): support combined message + checkpoint_id for multi-turn continuation

Allow Workflow.run(message=..., checkpoint_id=...) so callers can restore
prior workflow state from a checkpoint AND deliver a new message to the
start executor in a single call. The existing reset_context logic
already preserves shared state when checkpoint_id is set, so this gives
us 'fresh start executor invocation with prior state intact' - exactly
what hosted multi-turn declarative workflows need.

- _workflow.py: drop the message+checkpoint_id mutual exclusion and
  update _execute_with_message_or_checkpoint to do both (restore then
  execute) when both are provided.
- _agent.py: in _run_core's checkpoint branch, also forward
  input_messages so WorkflowAgent.run(messages, checkpoint_id=...) works
  end-to-end. Falls back to the legacy 'restore only' behavior when
  messages are absent.
- _declarative_base.py: detect continuation in _ensure_state_initialized
  by checking whether DECLARATIVE_STATE_KEY already exists in shared
  state; if so, refresh inputs/LastMessage* and append non-user trigger
  messages instead of calling state.initialize() (which would wipe
  Conversation/Local/System).
- foundry_hosting/_responses.py: collapse the host's two-call pattern
  (restore-only, then fresh run) into a single combined call now that
  the underlying APIs support it.
- tests: drop the assertion that combined message+checkpoint_id raises.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Pivot: preserve workflow state across run() calls

Replace the prior 'combined message + checkpoint_id in one run()' approach
with a cleaner default: Workflow.run no longer wipes shared state or runner-
context messages between calls. Iteration counting and per-run kwargs still
reset on a fresh-message run; checkpoint and responses runs are continuations
that preserve everything.

This lets a WorkflowAgent be invoked repeatedly on the same instance and
maintain multi-turn context (e.g. accumulated Conversation.messages) without
asking developers to opt in. Hosted-agent multi-turn pattern becomes two
explicit calls: restore-from-checkpoint (drive to idle), then run-with-message.

Key changes:
- _workflow.py: drop _state.clear() and reset_for_new_run() from run().
  Reset iteration count and run kwargs on fresh-message runs only.
  Restore 'Cannot provide both message and checkpoint_id' validation.
  Add async guard: fresh-message run with un-drained pending executor
  messages from a prior run is invalid.
- _runner.py: clear _state before import_state in restore_from_checkpoint
  so restore is authoritative (import_state merges, not replaces).
- _agent.py: revert checkpoint branch to restore-only (no message forward).
- _responses.py (foundry_hosting): two-call host pattern - restore checkpoint
  silently, then run with new user input.
- tests: state-preservation is the new default; rebuild Workflow for clean slate.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix CI lint and mypy issues from prior pivot commit

- _workflow.py: collapse nested if (SIM102), drop redundant assignment (RET504)
- _declarative_base.py: remove unused last_user_msg = tail assignment
  whose Message | None type clashed with the prior Message-typed branch

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address PR review: fix Inputs.input update and checkpoint storage path

- _declarative_base.py: continuation branch was writing 'Inputs.input' via
  state.set, which routes to the Custom namespace and never updates the
  PowerFx-visible Workflow.Inputs.input. Update state_data['Inputs'] in
  place via get_state_data / set_state_data so =Workflow.Inputs.input and
  =inputs.input see the new turn's user text on continuation.
- _declarative_base.py: refresh docstring to clarify that on a list[Message]
  trigger, Conversation.messages excludes the current user message at the
  start of the turn (agent executors append it before invoking the inner
  agent).
- _responses.py: when previous_response_id is supplied (no conversation_id),
  the prior checkpoint lives under <storage>/<previous_response_id> but new
  checkpoints must land under <storage>/<current_response_id> for the next
  turn to find them. Hold onto restore_storage from the get_latest lookup
  and pass it to the restore-only run; pass write_storage (current id) to
  the message-delivery run and to checkpoint cleanup.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix pyright errors in _declarative_base.py for CI

- Replace state._state.get(...) protected access with new public
  is_initialized() method on DeclarativeWorkflowState (also clearer intent
  for the continuation detection use case).
- Add narrow pyright ignores for the Any-typed trigger paths that pyright
  cannot fully narrow (the list[Message] isinstance loop and the
  fallback-DefaultTransform branch).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address Copilot review batch: tests + Workflow.reset escape hatch

* Add Workflow.reset() public method as recovery escape hatch when an
  in-flight run aborted (e.g. WorkflowConvergenceException) and the
  workflow is not checkpointed. Update the in-flight messages guard's
  error message to point callers at it.

* Add test_workflow_run_inflight_messages_guard exercising both the
  guard (sync + streaming) and the reset() recovery path.
* Add test_workflow_reset_rejects_concurrent_runs to lock down the
  in-progress guard on reset.

* Add test_as_agent_continuation_preserves_prior_state covering the
  is_continuation branch in _ensure_state_initialized: stamps a marker
  between calls and asserts it survives, while Inputs.input and
  System.LastMessageText refresh to the new turn.

* Add test_powerfx_safe.py regression tests for the Enum branch in
  _make_powerfx_safe (str-subclass, int-subclass, plain Enum, and
  Enums nested in dict/list).

* Drop redundant @pytest.mark.asyncio on
  test_as_agent_round_trip_with_last_message_text (asyncio_mode='auto').

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Skip restore-only pre-pass when checkpoint has pending request_info

Address Copilot review on _responses.py: the restore-only checkpoint
replay populates self._agent.pending_requests for any request_info
events captured in the checkpoint. The follow-up run(input_messages)
call would then route through WorkflowAgent._process_pending_requests,
which expects function-response content and rejects plain text input
as 'unexpected content while awaiting request info responses'.

Workflows resumed from a checkpoint that was idle-with-pending-requests
would therefore fail every subsequent plain-text user turn. Inspect the
loaded checkpoint and skip the pre-pass when its
pending_request_info_events dict is non-empty. Workflows that don't use
request_info (the current sample set) are unaffected; workflows that do
will fall through to a fresh-message run rather than silently corrupting
the routing state.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Loosen azure-ai-agentserver-* pins to major version

The exact-version pins on azure-ai-agentserver-{core,responses,invocations}
forced foundry-hosting consumers to upgrade in lockstep with every beta
bump from upstream. Switch to '>=current,<next-major' so we pick up patch
and feature updates within the same major series without a coordinated
release.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Drop Workflow.reset(); checkpointing is the recovery path

The in-flight-messages guard prevented silent misbehavior, but the
companion Workflow.reset() escape hatch only cleared _messages while
leaving iteration count, executor-local state, and shared State
mutations in an indeterminate condition after a mid-run failure. That
gave a false sense of recovery.

Recovery from a mid-run failure is supported only via checkpoint
restoration. Keep the guard and reframe its error message accordingly;
remove reset() and its tests.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address Tao's review on PR 5531

- Rename Workflow._run_workflow_with_tracing parameter
  is_fresh_message_run -> is_continuation (default False, inverted).
  Fresh-message turns reset per-run accounting; continuations
  (checkpoint restores, responses replays) preserve it.
- Simplify the in-flight-messages guard: _validate_run_params already
  enforces that 'message' is mutually exclusive with 'checkpoint_id'
  and 'responses', so the additional checks were dead code.
- foundry_hosting _responses: move the restore-only pre-pass above
  emit_created/emit_in_progress; restore is preparation, not run
  progress. Drop the skip-restore gate (state preservation requires
  unconditional restore) and instead clear agent.pending_requests
  after the restore-only call. Collapse over-conditioned check.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Don't clear pending_requests after restore-only pre-pass

Pending requests in the restored checkpoint represent genuinely
outstanding HITL requests. The next user input may carry function
responses (Responses API `function_call_output` items become
FunctionResultContent / FunctionApprovalResponseContent), which
`WorkflowAgent._process_pending_requests` correctly extracts and
matches against the populated `pending_requests`. Clearing them
after restore would silently drop that state and force the next turn
to be treated as a fresh input even when the caller is responding to
the outstanding requests.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: alliscode <bentho@microsoft.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Evan Mattson <35585003+moonbox3@users.noreply.github.com>
8b71f9459a · 2026-04-29 00:51:49 +00:00
History
..
2026-04-29 00:51:49 +00:00
2025-09-30 07:18:36 +00:00

Get Started with Microsoft Agent Framework

Highlights

  • Flexible Agent Framework: build, orchestrate, and deploy AI agents and multi-agent systems
  • Multi-Agent Orchestration: Group chat, sequential, concurrent, and handoff patterns
  • Plugin Ecosystem: Extend with native functions, OpenAPI, Model Context Protocol (MCP), and more
  • LLM Support: OpenAI, Foundry, Anthropic, and more
  • Runtime Support: In-process and distributed agent execution
  • Multimodal: Text, vision, and function calling
  • Cross-Platform: .NET and Python implementations

Quick Install

pip install agent-framework-core
# Optional: Add Azure AI Foundry integration
pip install agent-framework-foundry
# Optional: Add OpenAI integration
pip install agent-framework-openai

Supported Platforms:

  • Python: 3.10+
  • OS: Windows, macOS, Linux

1. Setup API Keys

Depending on the client you want to use, there are various environment variables you can set to configure the chat clients. This can be done in the environment itself, or with a .env file in your project root, some examples of environment variables include:

FOUNDRY_PROJECT_ENDPOINT=...
FOUNDRY_MODEL=...
...
OPENAI_API_KEY=sk-...
OPENAI_CHAT_COMPLETION_MODEL=...
OPENAI_CHAT_MODEL=...
...
AZURE_OPENAI_API_KEY=...
AZURE_OPENAI_ENDPOINT=...
AZURE_OPENAI_MODEL=...

You can also override environment variables by explicitly passing configuration parameters to the chat client constructor:

from agent_framework.openai import OpenAIChatClient

client = OpenAIChatClient(
    api_key="",
    model="",
)

See the following getting started samples for more information.

2. Create a Simple Agent

Create agents and invoke them directly:

import asyncio
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient

agent = Agent(
    client=OpenAIChatClient(),
    instructions="""
    1) A robot may not injure a human being...
    2) A robot must obey orders given it by human beings...
    3) A robot must protect its own existence...

    Give me the TLDR in exactly 5 words.
    """
)

result = asyncio.run(agent.run("Summarize the Three Laws of Robotics"))
print(result)
# Output: Protect humans, obey, self-preserve, prioritized.

3. Directly Use Chat Clients (No Agent Required)

You can use the chat client classes directly for advanced workflows:

import asyncio
from agent_framework.openai import OpenAIChatClient
from agent_framework import Message, Role

async def main():
    client = OpenAIChatClient()

    response = await client.get_response([
        Message("system", ["You are a helpful assistant."]),
        Message("user", ["Write a haiku about Agent Framework."])
    ])
    print(response.messages[0].text)

    """
    Output:

    Agents work in sync,
    Framework threads through each task—
    Code sparks collaboration.
    """

asyncio.run(main())

4. Build an Agent with Tools and Functions

Enhance your agent with custom tools and function calling:

import asyncio
from typing import Annotated
from random import randint
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient


def get_weather(
    location: Annotated[str, "The location to get the weather for."],
) -> str:
    """Get the weather for a given location."""
    conditions = ["sunny", "cloudy", "rainy", "stormy"]
    return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."


def get_menu_specials() -> str:
    """Get today's menu specials."""
    return """
    Special Soup: Clam Chowder
    Special Salad: Cobb Salad
    Special Drink: Chai Tea
    """


async def main():
    agent = Agent(
        client=OpenAIChatClient(),
        instructions="You are a helpful assistant that can provide weather and restaurant information.",
        tools=[get_weather, get_menu_specials]
    )

    response = await agent.run("What's the weather in Amsterdam and what are today's specials?")
    print(response)

    # Output:
    # The weather in Amsterdam is sunny with a high of 22°C. Today's specials include
    # Clam Chowder soup, Cobb Salad, and Chai Tea as the special drink.

asyncio.run(main())

You can explore additional agent samples here.

5. Multi-Agent Orchestration

Coordinate multiple agents to collaborate on complex tasks using orchestration patterns:

import asyncio
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient


async def main():
    # Create specialized agents
    writer = Agent(
        client=OpenAIChatClient(),
        name="Writer",
        instructions="You are a creative content writer. Generate and refine slogans based on feedback."
    )

    reviewer = Agent(
        client=OpenAIChatClient(),
        name="Reviewer",
        instructions="You are a critical reviewer. Provide detailed feedback on proposed slogans."
    )

    # Sequential workflow: Writer creates, Reviewer provides feedback
    task = "Create a slogan for a new electric SUV that is affordable and fun to drive."

    # Step 1: Writer creates initial slogan
    initial_result = await writer.run(task)
    print(f"Writer: {initial_result}")

    # Step 2: Reviewer provides feedback
    feedback_request = f"Please review this slogan: {initial_result}"
    feedback = await reviewer.run(feedback_request)
    print(f"Reviewer: {feedback}")

    # Step 3: Writer refines based on feedback
    refinement_request = f"Please refine this slogan based on the feedback: {initial_result}\nFeedback: {feedback}"
    final_result = await writer.run(refinement_request)
    print(f"Final Slogan: {final_result}")

    # Example Output:
    # Writer: "Charge Forward: Affordable Adventure Awaits!"
    # Reviewer: "Good energy, but 'Charge Forward' is overused in EV marketing..."
    # Final Slogan: "Power Up Your Adventure: Premium Feel, Smart Price!"

if __name__ == "__main__":
    asyncio.run(main())

Note: Sequential, Concurrent, Group Chat, Handoff, and Magentic orchestrations are available. See examples in orchestration samples.

More Examples & Samples

Agent Framework Documentation