mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
Python: Fix workflow samples for bugbash: part 1 (#4055)
* Fix workflow samples for bugbash: part 1 * Fix mypy * Fix tests
This commit is contained in:
committed by
GitHub
Unverified
parent
2dfe90306b
commit
7cee839982
@@ -180,7 +180,7 @@ async def main() -> None:
|
||||
)
|
||||
|
||||
# Load workflow from YAML
|
||||
samples_root = Path(__file__).parent.parent.parent.parent.parent.parent.parent
|
||||
samples_root = Path(__file__).parent.parent.parent.parent.parent.parent
|
||||
workflow_path = samples_root / "workflow-samples" / "DeepResearch.yaml"
|
||||
if not workflow_path.exists():
|
||||
# Fall back to local copy if workflow-samples doesn't exist
|
||||
|
||||
@@ -14,6 +14,7 @@ In a production scenario, you would integrate with a real UI or chat interface.
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
from agent_framework import Workflow
|
||||
from agent_framework.declarative import ExternalInputRequest, WorkflowFactory
|
||||
@@ -31,27 +32,18 @@ async def run_with_streaming(workflow: Workflow) -> None:
|
||||
data = event.data
|
||||
if isinstance(data, TextOutputEvent):
|
||||
print(f"[Bot]: {data.text}")
|
||||
elif isinstance(data, ExternalInputRequest):
|
||||
# In a real scenario, you would:
|
||||
# 1. Display the prompt to the user
|
||||
# 2. Wait for their response
|
||||
# 3. Use the response to continue the workflow
|
||||
output_property = data.metadata.get("output_property", "unknown")
|
||||
print(f"[System] Input requested for: {output_property}")
|
||||
if data.message:
|
||||
print(f"[System] Prompt: {data.message}")
|
||||
else:
|
||||
print(f"[Output]: {data}")
|
||||
|
||||
|
||||
async def run_with_result(workflow: Workflow) -> None:
|
||||
"""Demonstrate batch workflow execution with run()."""
|
||||
print("\n=== Batch Execution (run) ===")
|
||||
print("-" * 40)
|
||||
|
||||
result = await workflow.run({})
|
||||
for output in result.get_outputs():
|
||||
print(f" Output: {output}")
|
||||
elif event.type == "request_info":
|
||||
request = cast(ExternalInputRequest, event.data)
|
||||
# In a real scenario, you would:
|
||||
# 1. Display the prompt to the user
|
||||
# 2. Wait for their response
|
||||
# 3. Use the response to continue the workflow
|
||||
output_property = request.metadata.get("output_property", "unknown")
|
||||
print(f"[System] Input requested for: {output_property}")
|
||||
if request.message:
|
||||
print(f"[System] Prompt: {request.message}")
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
@@ -70,9 +62,6 @@ async def main() -> None:
|
||||
# Demonstrate streaming execution
|
||||
await run_with_streaming(workflow)
|
||||
|
||||
# Demonstrate batch execution
|
||||
# await run_with_result(workflow)
|
||||
|
||||
print("\n" + "-" * 40)
|
||||
print("=== Workflow Complete ===")
|
||||
print()
|
||||
|
||||
Reference in New Issue
Block a user