Move sample validation script from samples/ to scripts/ (#4400)

This commit is contained in:
Tao Chen
2026-03-02 15:36:18 -08:00
committed by GitHub
Unverified
parent 6de5e57b20
commit d7abfcd444
11 changed files with 152 additions and 86 deletions
+183
View File
@@ -0,0 +1,183 @@
# Sample Validation System
An AI-powered workflow system for validating Python samples by discovering them, creating a nested batched workflow, and producing a report.
## Architecture
```
┌─────────────────────────────────────────────────────────────────────┐
│ Sample Validation Workflow │
│ (Sequential - 4 Executors) │
└─────────────────────────────────────────────────────────────────────┘
┌──────────────────────────┼──────────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Discover │ ──► │ Create Dynamic │ ──► │ Run Nested │
│ Samples │ │ Batched Flow │ │ Workflow │
└───────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
List[SampleInfo] WorkflowCreationResult ExecutionResult
(workers + coordinator) │
┌─────────────────┐
│ Generate Report │
└─────────────────┘
Report
```
### Nested Workflow Strategy
```
┌─────────────────────────────────────────────────────────────────────┐
│ Nested Batched Workflow (coordinator + workers) │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ WorkflowBuilder + fan-out/fan-in edges │ │
│ │ - Coordinator dispatches tasks in bounded batches │ │
│ │ - Worker executors run GitHub Copilot agents │ │
│ │ - Collector aggregates per-sample RunResult messages │ │
│ │ - Max in-flight workers set by --max-parallel-workers │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
```
## File Structure
```
scripts/
├── sample_validation/
│ ├── __init__.py # Package exports
│ ├── README.md # This file
│ ├── models.py # Data classes
│ │ ├── SampleInfo # Discovered sample metadata
│ │ ├── RunResult # Execution result
│ │ └── Report # Final validation report
│ ├── discovery.py # Sample discovery
│ │ ├── discover_samples() # Finds all .py files
│ │ └── DiscoverSamplesExecutor
│ ├── report.py # Report generation
│ │ ├── generate_report() # Create Report from results
│ │ ├── save_report() # Write to markdown/JSON
│ │ ├── print_summary() # Console output
│ │ └── GenerateReportExecutor
│ ├── create_dynamic_workflow_executor.py # Coordinator, workers, collector, CreateConcurrentValidationWorkflowExecutor
│ ├── run_dynamic_validation_workflow_executor.py # RunDynamicValidationWorkflowExecutor
│ └── workflow.py # Workflow assembly entrypoint
├── __main__.py # CLI entry point
```
## Dependencies
### Required
- **agent-framework** - Core workflow and agent functionality
- **agent-framework-github-copilot** - GitHub Copilot agent integration
### Optional
- `GITHUB_COPILOT_MODEL` to override default Copilot model selection.
## Environment Variables
No required environment variables. Optional:
| Variable | Description | Required |
| ------------------------ | --------------------------------- | -------- |
| `GITHUB_COPILOT_MODEL` | Copilot model override | No |
| `GITHUB_COPILOT_TIMEOUT` | Copilot request timeout (seconds) | No |
## Usage
### Basic Usage
```bash
# Validate all samples
uv run python -m sample_validation
# Validate specific subdirectory
uv run python -m sample_validation --subdir 03-workflows
# Save reports to files
uv run python -m sample_validation --save-report --output-dir ./reports
```
### Configuration Options
```bash
uv run python -m sample_validation [OPTIONS]
Options:
--subdir TEXT Subdirectory to validate (relative to samples/)
--output-dir TEXT Report output directory (default: ./_sample_validation/reports)
--max-parallel-workers INT Max in-flight workers per batch (default: 10)
--save-report Save reports to files
```
### Examples
```bash
# Quick validation of a small directory
uv run python -m sample_validation --subdir 03-workflows/_start-here
# Limit parallel workers for large sample sets
uv run python -m sample_validation --subdir 02-agents --max-parallel-workers 8
# Save report artifacts
uv run python -m sample_validation --save-report
```
## How It Works
### 1. Discovery
Walks the samples directory and finds all `.py` files that:
- Don't start with `_` (excludes private files)
- Aren't in `__pycache__` directories
- Aren't in directories starting with `_` (excludes `_sample_validation`)
### 2. Dynamic Workflow Creation
Creates a nested workflow with:
- A coordinator executor
- One worker executor per discovered sample
- A collector executor
### 3. Nested Workflow Execution
The coordinator sends initial work to the first `max_parallel_workers` workers. As each worker finishes, it notifies
the coordinator, which dispatches the next queued sample. Workers also send result items to the collector, which emits
the final `ExecutionResult` once all samples are processed.
### 4. Report Generation
Produces:
- **Console summary** - Pass/fail counts with emoji indicators
- **Markdown report** - Detailed results grouped by status
- **JSON report** - Machine-readable for CI integration
## Report Status Codes
| Status | Label | Description |
| ------- | --------- | ----------------------------------------- |
| SUCCESS | [PASS] | Sample ran to completion with exit code 0 |
| FAILURE | [FAIL] | Sample exited with non-zero code |
| TIMEOUT | [TIMEOUT] | Sample exceeded timeout limit |
| ERROR | [ERROR] | Exception during execution |
## Troubleshooting
### Agent output parsing errors
If an agent returns non-JSON content, that sample is marked as `ERROR` with parser details in the report.
### GitHub Copilot authentication or CLI issues
Ensure GitHub Copilot is authenticated in your environment and the Copilot CLI is available.
@@ -0,0 +1,25 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Sample Validation System
A workflow-based system for validating Python samples by:
1. Discovering all sample files
2. Creating a dynamic nested concurrent workflow (one GitHub agent per sample)
3. Running the nested workflow
4. Generating a validation report
Usage:
uv run python -m sample_validation
uv run python -m sample_validation --subdir 01-get-started
"""
from sample_validation.models import Report, RunResult, SampleInfo
from sample_validation.workflow import create_validation_workflow
__all__ = [
"SampleInfo",
"RunResult",
"Report",
"create_validation_workflow",
]
@@ -0,0 +1,147 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Sample Validation Script
Validates all Python samples in the samples directory using a workflow that:
1. Discovers all sample files
2. Builds a nested concurrent workflow with one GitHub agent per sample
3. Runs the nested workflow
4. Generates a validation report
Usage:
uv run python -m sample_validation
uv run python -m sample_validation --subdir 03-workflows
uv run python -m sample_validation --output-dir ./reports
"""
import argparse
import asyncio
import os
import sys
import time
from pathlib import Path
# Add the samples directory to the path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from sample_validation.models import Report
from sample_validation.report import save_report
from sample_validation.workflow import ValidationConfig, create_validation_workflow
def parse_arguments() -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Validate Python samples using a dynamic nested concurrent workflow",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
uv run python -m sample_validation # Validate all samples
uv run python -m sample_validation --subdir 03-workflows # Validate only workflows
uv run python -m sample_validation --output-dir ./reports # Save reports to custom dir
""",
)
parser.add_argument(
"--subdir",
type=str,
help="Validate samples only in the specified subdirectory (relative to samples/)",
)
parser.add_argument(
"--output-dir",
type=str,
default="./sample_validation/reports",
help="Directory to save validation reports (default: ./sample_validation/reports)",
)
parser.add_argument(
"--save-report",
action="store_true",
help="Save the validation report to files",
)
parser.add_argument(
"--max-parallel-workers",
type=int,
default=10,
help="Maximum number of samples to run in parallel per batch (default: 10)",
)
parser.add_argument(
"--report-name",
type=str,
help="Custom name for the report files (without extension). If not provided, uses timestamp.",
)
return parser.parse_args()
async def main() -> int:
"""Main entry point."""
args = parse_arguments()
# Determine paths
# Script is at python/scripts/sample_validation/__main__.py
# python_root is python/, samples_dir is python/samples/
python_root = Path(__file__).parent.parent.parent
samples_dir = python_root / "samples"
print("=" * 80)
print("SAMPLE VALIDATION WORKFLOW")
print("=" * 80)
print(f"Samples directory: {samples_dir}")
print(f"Python root: {python_root}")
if os.environ.get("GITHUB_COPILOT_MODEL"):
print(
f"Using GitHub Copilot model override: {os.environ['GITHUB_COPILOT_MODEL']}"
)
# Create validation config
config = ValidationConfig(
samples_dir=samples_dir,
python_root=python_root,
subdir=args.subdir,
max_parallel_workers=max(1, args.max_parallel_workers),
)
# Create and run the workflow
workflow = create_validation_workflow(config)
print("\nStarting validation workflow...")
print("-" * 80)
# Run the workflow
run_start = time.perf_counter()
try:
events = await workflow.run("start")
finally:
run_duration = time.perf_counter() - run_start
print(f"\nWorkflow run completed in {run_duration:.2f}s")
outputs = events.get_outputs()
if not outputs:
print("\n[ERROR] Workflow did not produce any output")
return 1
report: Report = outputs[0]
# Save report if requested
if args.save_report:
output_dir = samples_dir / args.output_dir
md_path, json_path = save_report(report, output_dir, name=args.report_name)
print("\nReports saved:")
print(f" Markdown: {md_path}")
print(f" JSON: {json_path}")
# Return appropriate exit code
failed = report.failure_count + report.timeout_count + report.error_count
return 1 if failed > 0 else 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)
@@ -0,0 +1,3 @@
# Copyright (c) Microsoft. All rights reserved.
WORKER_COMPLETED = "worker_completed"
@@ -0,0 +1,284 @@
# Copyright (c) Microsoft. All rights reserved.
import logging
from collections import deque
from dataclasses import dataclass
from agent_framework import (
Executor,
Message,
Workflow,
WorkflowBuilder,
WorkflowContext,
WorkflowEvent,
handler,
)
from agent_framework.github import GitHubCopilotAgent
from copilot.types import PermissionRequest, PermissionRequestResult
from pydantic import BaseModel
from typing_extensions import Never
from sample_validation.const import WORKER_COMPLETED
from sample_validation.discovery import DiscoveryResult
from sample_validation.models import (
ExecutionResult,
RunResult,
RunStatus,
SampleInfo,
ValidationConfig,
WorkflowCreationResult,
)
logger = logging.getLogger(__name__)
class AgentResponseFormat(BaseModel):
status: str
output: str
error: str
@dataclass
class CoordinatorStart:
samples: list[SampleInfo]
@dataclass
class WorkerFreed:
worker_id: str
class BatchCompletion:
pass
AgentInstruction = (
"You are validating exactly one Python sample.\n"
"Analyze the sample code and execute it. Based on the execution result, determine if it "
"runs successfully, fails, or times out. Feel free to install any required dependencies.\n"
"The sample can be interactive. If it is interactive, respond to the sample when prompted "
"based on your analysis of the code. You do not need to consult human on what to respond.\n"
"Return ONLY valid JSON with this schema:\n"
"{\n"
' "status": "success|failure|timeout|error",\n'
' "output": "short summary of the result and what you did if the sample was interactive",\n'
' "error": "error details or empty string"\n'
"}\n\n"
)
def parse_agent_json(text: str) -> AgentResponseFormat:
"""Parse JSON object from an agent response."""
stripped = text.strip()
if stripped.startswith("{") and stripped.endswith("}"):
return AgentResponseFormat.model_validate_json(stripped)
start = stripped.find("{")
end = stripped.rfind("}")
if start == -1 or end == -1 or end <= start:
raise ValueError("No JSON object found in response")
return AgentResponseFormat.model_validate_json(stripped[start : end + 1])
def status_from_text(value: str) -> RunStatus:
"""Convert a string value to RunStatus with safe fallback."""
normalized = value.strip().lower()
for status in RunStatus:
if status.value == normalized:
return status
return RunStatus.ERROR
def prompt_permission(
request: PermissionRequest, context: dict[str, str]
) -> PermissionRequestResult:
"""Permission handler that always approves."""
kind = request.get("kind", "unknown")
logger.debug(
f"[Permission Request: {kind}] ({context})Automatically approved for sample validation."
)
return PermissionRequestResult(kind="approved")
class CustomAgentExecutor(Executor):
"""Executor that runs a GitHub Copilot agent and returns its response.
We need the custom executor to wrap the agent call in a try/except to ensure that any exceptions are caught and
returned as error responses, otherwise an exception in one agent could crash the entire workflow.
"""
def __init__(self, agent: GitHubCopilotAgent):
super().__init__(id=agent.id)
self.agent = agent
@handler
async def handle_task(
self, sample: SampleInfo, ctx: WorkflowContext[WorkerFreed | RunResult]
) -> None:
"""Execute one sample task and notify collector + coordinator."""
try:
response = await self.agent.run(
[
Message(
role="user",
text=f"Validate the following sample:\n\n{sample.relative_path}",
)
]
)
result_payload = parse_agent_json(response.text)
result = RunResult(
sample=sample,
status=status_from_text(result_payload.status),
output=result_payload.output,
error=result_payload.error,
)
except Exception as ex:
logger.error(f"Error executing agent {self.agent.id}: {ex}")
result = RunResult(
sample=sample,
status=RunStatus.ERROR,
output="",
error=str(ex),
)
await ctx.send_message(result, target_id="collector")
await ctx.send_message(WorkerFreed(worker_id=self.id), target_id="coordinator")
await ctx.add_event(WorkflowEvent(WORKER_COMPLETED, sample)) # type: ignore
class BatchCoordinatorExecutor(Executor):
"""Dispatch sample tasks to worker executors in bounded batches."""
def __init__(self, worker_ids: list[str], max_parallel_workers: int) -> None:
super().__init__(id="coordinator")
self._worker_ids = worker_ids
self._max_parallel_workers = max(1, max_parallel_workers)
self._pending: deque[SampleInfo] = deque()
self._inflight: set[str] = set()
async def _assign_next(
self, worker_id: str, ctx: WorkflowContext[SampleInfo | BatchCompletion]
) -> None:
if not self._pending:
# No more samples to assign
if not self._inflight:
# All tasks are completed, notify collector and exit
await ctx.send_message(BatchCompletion(), target_id="collector")
return
sample = self._pending.popleft()
self._inflight.add(worker_id)
# Messages will get queued in the runner until the next superstep when all workers are freed,
# thus achieving automatic batching without needing complex synchronization logic
await ctx.send_message(sample, target_id=worker_id)
@handler
async def on_start(
self,
start: CoordinatorStart,
ctx: WorkflowContext[SampleInfo | BatchCompletion],
) -> None:
"""Initialize queue and dispatch first wave of tasks."""
self._pending = deque(start.samples)
self._inflight.clear()
for worker_id in self._worker_ids[: self._max_parallel_workers]:
await self._assign_next(worker_id, ctx)
@handler
async def on_worker_freed(
self, freed: WorkerFreed, ctx: WorkflowContext[SampleInfo | BatchCompletion]
) -> None:
"""Dispatch next queued sample when a worker finishes."""
self._inflight.discard(freed.worker_id)
await self._assign_next(freed.worker_id, ctx)
class CollectorExecutor(Executor):
"""Collect per-sample results and emit the final execution result."""
def __init__(self) -> None:
super().__init__(id="collector")
self._results: list[RunResult] = []
@handler
async def on_all(
self,
batch_completion: BatchCompletion,
ctx: WorkflowContext[Never, ExecutionResult],
) -> None:
"""Receive all results at once and emit final output."""
await ctx.yield_output(ExecutionResult(results=self._results))
@handler
async def on_item(self, item: RunResult, ctx: WorkflowContext) -> None:
"""Record a result and emit output when all expected results arrive."""
self._results.append(item)
class CreateConcurrentValidationWorkflowExecutor(Executor):
"""Executor that builds a nested concurrent workflow with one agent per sample."""
def __init__(self, config: ValidationConfig):
super().__init__(id="create_dynamic_workflow")
self.config = config
@handler
async def create(
self,
discovery: DiscoveryResult,
ctx: WorkflowContext[WorkflowCreationResult],
) -> None:
"""Create a nested workflow with a coordinator + worker fan-out/fan-in."""
sample_count = len(discovery.samples)
print(f"\nCreating nested batched workflow for {sample_count} samples...")
if sample_count == 0:
await ctx.send_message(
WorkflowCreationResult(samples=[], workflow=None, agents=[])
)
return
agents: list[GitHubCopilotAgent] = []
workers: list[CustomAgentExecutor] = []
for index, sample in enumerate(discovery.samples, start=1):
agent_id = f"sample_validator_{index}({sample.relative_path})"
agent = GitHubCopilotAgent(
id=agent_id,
name=agent_id,
instructions=AgentInstruction,
default_options={
"on_permission_request": prompt_permission,
"timeout": 180,
}, # type: ignore
)
agents.append(agent)
workers.append(CustomAgentExecutor(agent))
coordinator = BatchCoordinatorExecutor(
worker_ids=[worker.id for worker in workers],
max_parallel_workers=self.config.max_parallel_workers,
)
collector = CollectorExecutor()
nested_builder = WorkflowBuilder(
start_executor=coordinator, output_executors=[collector]
)
nested_builder.add_edge(coordinator, collector)
for worker in workers:
nested_builder.add_edge(coordinator, worker)
nested_builder.add_edge(worker, coordinator)
nested_builder.add_edge(worker, collector)
nested_workflow: Workflow = nested_builder.build()
await ctx.send_message(
WorkflowCreationResult(
samples=discovery.samples,
workflow=nested_workflow,
agents=agents,
)
)
@@ -0,0 +1,120 @@
# Copyright (c) Microsoft. All rights reserved.
"""Sample discovery module."""
import ast
import os
from pathlib import Path
from agent_framework import Executor, WorkflowContext, handler
from sample_validation.models import DiscoveryResult, SampleInfo, ValidationConfig
def _is_main_entrypoint_guard(test: ast.expr) -> bool:
"""Check whether an expression is ``__name__ == '__main__'``."""
if not isinstance(test, ast.Compare):
return False
if len(test.ops) != 1 or not isinstance(test.ops[0], ast.Eq):
return False
if len(test.comparators) != 1:
return False
left = test.left
right = test.comparators[0]
return (
isinstance(left, ast.Name)
and left.id == "__name__"
and isinstance(right, ast.Constant)
and right.value == "__main__"
) or (
isinstance(right, ast.Name)
and right.id == "__name__"
and isinstance(left, ast.Constant)
and left.value == "__main__"
)
def _has_main_entrypoint_guard(path: Path) -> bool:
"""Check whether a Python file defines a top-level main entrypoint guard."""
try:
source = path.read_text(encoding="utf-8")
tree = ast.parse(source)
except Exception:
return False
return any(
isinstance(node, ast.If) and _is_main_entrypoint_guard(node.test)
for node in tree.body
)
def discover_samples(samples_dir: Path, subdir: str | None = None) -> list[SampleInfo]:
"""
Find all Python sample files in the samples directory.
Args:
samples_dir: Root samples directory
subdir: Optional subdirectory to filter to
Returns:
List of SampleInfo objects for each discovered sample
"""
# Determine the search directory
if subdir:
search_dir = samples_dir / subdir
if not search_dir.exists():
print(f"Warning: Subdirectory '{subdir}' does not exist in {samples_dir}")
return []
else:
search_dir = samples_dir
python_files: list[Path] = []
# Walk through all subdirectories and find .py files
for root, dirs, files in os.walk(search_dir):
# Skip directories that start with _ (like _sample_validation)
dirs[:] = [d for d in dirs if not d.startswith("_") and d != "__pycache__"]
for file in files:
# Skip files that start with _ and include only scripts with a main entrypoint guard
if file.endswith(".py") and not file.startswith("_"):
file_path = Path(root) / file
if _has_main_entrypoint_guard(file_path):
python_files.append(file_path)
# Sort files for consistent execution order
python_files = sorted(python_files)
# Convert to SampleInfo objects
samples: list[SampleInfo] = []
for path in python_files:
try:
samples.append(SampleInfo.from_path(path, samples_dir))
except Exception as e:
print(f"Warning: Could not read {path}: {e}")
return samples
class DiscoverSamplesExecutor(Executor):
"""Executor that discovers all samples in the samples directory."""
def __init__(self, config: ValidationConfig):
super().__init__(id="discover_samples")
self.config = config
@handler
async def discover(self, _: str, ctx: WorkflowContext[DiscoveryResult]) -> None:
"""Discover all Python samples."""
print(f"🔍 Discovering samples in {self.config.samples_dir}")
if self.config.subdir:
print(f" Filtering to subdirectory: {self.config.subdir}")
samples = discover_samples(self.config.samples_dir, self.config.subdir)
print(f" Found {len(samples)} samples")
await ctx.send_message(DiscoveryResult(samples=samples))
+163
View File
@@ -0,0 +1,163 @@
# Copyright (c) Microsoft. All rights reserved.
"""Data models for the sample validation system."""
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from agent_framework import Workflow
from agent_framework.github import GitHubCopilotAgent
@dataclass
class ValidationConfig:
"""Configuration for the validation workflow."""
samples_dir: Path
python_root: Path
subdir: str | None = None
max_parallel_workers: int = 10
@dataclass
class SampleInfo:
"""Information about a discovered sample file."""
path: Path
relative_path: str
code: str
@classmethod
def from_path(cls, path: Path, samples_dir: Path) -> "SampleInfo":
"""Create SampleInfo from a file path."""
return cls(
path=path,
relative_path=str(path.relative_to(samples_dir)),
code=path.read_text(encoding="utf-8"),
)
@dataclass
class DiscoveryResult:
"""Result of sample discovery."""
samples: list[SampleInfo]
@dataclass
class WorkflowCreationResult:
"""Result of creating a nested per-sample concurrent workflow."""
samples: list[SampleInfo]
workflow: Workflow | None
agents: list[GitHubCopilotAgent]
class RunStatus(Enum):
"""Status of a sample run."""
SUCCESS = "success"
FAILURE = "failure"
TIMEOUT = "timeout"
ERROR = "error"
@dataclass
class RunResult:
"""Result of running a single sample."""
sample: SampleInfo
status: RunStatus
output: str
error: str
@dataclass
class ExecutionResult:
"""Result of sample execution."""
results: list[RunResult]
@dataclass
class Report:
"""Final validation report."""
timestamp: datetime
total_samples: int
success_count: int
failure_count: int
timeout_count: int
error_count: int
results: list[RunResult] = field(default_factory=list) # type: ignore
def to_markdown(self) -> str:
"""Generate a markdown report."""
lines = [
"# Sample Validation Report",
"",
f"**Generated:** {self.timestamp.isoformat()}",
"",
"## Summary",
"",
"| Metric | Count |",
"|--------|-------|",
f"| Total Samples | {self.total_samples} |",
f"| [PASS] Success | {self.success_count} |",
f"| [FAIL] Failure | {self.failure_count} |",
f"| [TIMEOUT] Timeout | {self.timeout_count} |",
f"| [ERROR] Error | {self.error_count} |",
"",
"## Detailed Results",
"",
]
# Group by status
for status in [RunStatus.FAILURE, RunStatus.TIMEOUT, RunStatus.ERROR, RunStatus.SUCCESS]:
status_results = [r for r in self.results if r.status == status]
if not status_results:
continue
status_label = {
RunStatus.SUCCESS: "[PASS]",
RunStatus.FAILURE: "[FAIL]",
RunStatus.TIMEOUT: "[TIMEOUT]",
RunStatus.ERROR: "[ERROR]",
}
lines.append(f"### {status_label[status]} {status.value.title()} ({len(status_results)})")
lines.append("")
for result in status_results:
lines.append(f"- **{result.sample.relative_path}**")
if result.error:
# Truncate long errors
error_preview = result.error[:200] + "..." if len(result.error) > 200 else result.error
lines.append(f" - Error: `{error_preview}`")
lines.append("")
return "\n".join(lines)
def to_dict(self) -> dict[str, object]:
"""Convert report to dictionary for JSON serialization."""
return {
"timestamp": self.timestamp.isoformat(),
"summary": {
"total_samples": self.total_samples,
"success_count": self.success_count,
"failure_count": self.failure_count,
"timeout_count": self.timeout_count,
"error_count": self.error_count,
},
"results": [
{
"path": r.sample.relative_path,
"status": r.status.value,
"output": r.output,
"error": r.error,
}
for r in self.results
],
}
+126
View File
@@ -0,0 +1,126 @@
# Copyright (c) Microsoft. All rights reserved.
"""Report generation for sample validation results."""
import json
from datetime import datetime
from pathlib import Path
from agent_framework import Executor, WorkflowContext, handler
from typing_extensions import Never
from sample_validation.models import ExecutionResult, Report, RunResult, RunStatus
def generate_report(results: list[RunResult]) -> Report:
"""
Generate a validation report from run results.
Args:
results: List of RunResult objects from sample execution
Returns:
Report object with aggregated statistics
"""
# Sort results: failures, timeouts, errors first, then successes
status_priority = {
RunStatus.FAILURE: 0,
RunStatus.TIMEOUT: 1,
RunStatus.ERROR: 2,
RunStatus.SUCCESS: 3,
}
sorted_results = sorted(results, key=lambda r: status_priority[r.status])
return Report(
timestamp=datetime.now(),
total_samples=len(results),
success_count=sum(1 for r in results if r.status == RunStatus.SUCCESS),
failure_count=sum(1 for r in results if r.status == RunStatus.FAILURE),
timeout_count=sum(1 for r in results if r.status == RunStatus.TIMEOUT),
error_count=sum(1 for r in results if r.status == RunStatus.ERROR),
results=sorted_results,
)
def save_report(
report: Report, output_dir: Path, name: str | None = None
) -> tuple[Path, Path]:
"""
Save the report to markdown and JSON files.
Args:
report: The report to save
output_dir: Directory to save the report files
name: Optional custom name for the report files (without extension)
Returns:
Tuple of (markdown_path, json_path)
"""
output_dir.mkdir(parents=True, exist_ok=True)
if name:
base_name = name
else:
timestamp_str = report.timestamp.strftime("%Y%m%d_%H%M%S")
base_name = f"validation_report_{timestamp_str}"
# Save markdown
md_path = output_dir / f"{base_name}.md"
md_path.write_text(report.to_markdown(), encoding="utf-8")
# Save JSON
json_path = output_dir / f"{base_name}.json"
json_path.write_text(
json.dumps(report.to_dict(), indent=2),
encoding="utf-8",
)
return md_path, json_path
def print_summary(report: Report) -> None:
"""Print a summary of the validation report to console."""
print("\n" + "=" * 80)
print("SAMPLE VALIDATION SUMMARY")
print("=" * 80)
if (
report.failure_count == 0
and report.timeout_count == 0
and report.error_count == 0
):
print("[PASS] ALL SAMPLES PASSED!")
else:
print("[FAIL] SOME SAMPLES FAILED")
print(f"\nTotal samples: {report.total_samples}")
print()
print("Results:")
print(f" [PASS] Success: {report.success_count}")
print(f" [FAIL] Failure: {report.failure_count}")
print(f" [TIMEOUT] Timeout: {report.timeout_count}")
print(f" [ERR] Errors: {report.error_count}")
print("=" * 80)
# Print JSON output for GitHub Actions visibility
print("\nJSON Report:")
print(json.dumps(report.to_dict(), indent=2))
class GenerateReportExecutor(Executor):
"""Executor that generates the final validation report."""
def __init__(self) -> None:
super().__init__(id="generate_report")
@handler
async def generate(
self, execution: ExecutionResult, ctx: WorkflowContext[Never, Report]
) -> None:
"""Generate the validation report from fan-in results."""
print("\nGenerating report...")
report = generate_report(execution.results)
print_summary(report)
await ctx.yield_output(report)
@@ -0,0 +1,77 @@
# Copyright (c) Microsoft. All rights reserved.
from collections.abc import Sequence
from agent_framework import Executor, WorkflowContext, handler
from agent_framework.github import GitHubCopilotAgent
from sample_validation.const import WORKER_COMPLETED
from sample_validation.create_dynamic_workflow_executor import CoordinatorStart
from sample_validation.models import (
ExecutionResult,
RunResult,
RunStatus,
SampleInfo,
WorkflowCreationResult,
)
async def stop_agents(agents: Sequence[GitHubCopilotAgent]) -> None:
"""Stop all GitHub Copilot agents used by the nested workflow."""
for agent in agents:
try:
await agent.stop()
except Exception:
continue
class RunDynamicValidationWorkflowExecutor(Executor):
"""Executor that runs the nested workflow created in the previous step."""
def __init__(self) -> None:
super().__init__(id="run_dynamic_workflow")
@handler
async def run(
self, creation: WorkflowCreationResult, ctx: WorkflowContext[ExecutionResult]
) -> None:
"""Run the nested workflow and emit execution results."""
if creation.workflow is None:
await ctx.send_message(ExecutionResult(results=[]))
return
print("\nRunning nested batched workflow...")
print("-" * 80)
try:
remaining_sample_counts = len(creation.samples)
result: ExecutionResult | None = None
async for event in creation.workflow.run(
CoordinatorStart(samples=creation.samples), stream=True
):
if event.type == "output" and isinstance(event.data, ExecutionResult):
result = event.data # type: ignore
elif event.type == WORKER_COMPLETED and isinstance(
event.data, SampleInfo
): # type: ignore
remaining_sample_counts -= 1
print(
f"Completed validation for sample: {event.data.relative_path:<80} | "
f"Remaining: {remaining_sample_counts:>4}"
)
if result is not None:
await ctx.send_message(result)
else:
fallback_results = [
RunResult(
sample=sample,
status=RunStatus.ERROR,
output="",
error="Nested workflow did not return an ExecutionResult.",
)
for sample in creation.samples
]
await ctx.send_message(ExecutionResult(results=fallback_results))
finally:
await stop_agents(creation.agents)
@@ -0,0 +1,47 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Sample Validation Workflow using Microsoft Agent Framework.
Workflow composition for sample validation.
"""
from agent_framework import Workflow, WorkflowBuilder
from sample_validation.create_dynamic_workflow_executor import (
CreateConcurrentValidationWorkflowExecutor,
)
from sample_validation.discovery import DiscoverSamplesExecutor, ValidationConfig
from sample_validation.report import GenerateReportExecutor
from sample_validation.run_dynamic_validation_workflow_executor import (
RunDynamicValidationWorkflowExecutor,
)
def create_validation_workflow(
config: ValidationConfig,
) -> Workflow:
"""
Create the sample validation workflow.
Args:
config: Validation configuration
Returns:
Configured Workflow instance
"""
discover = DiscoverSamplesExecutor(config)
create_dynamic_workflow = CreateConcurrentValidationWorkflowExecutor(config)
run_dynamic_workflow = RunDynamicValidationWorkflowExecutor()
generate = GenerateReportExecutor()
return (
WorkflowBuilder(start_executor=discover)
.add_edge(discover, create_dynamic_workflow)
.add_edge(create_dynamic_workflow, run_dynamic_workflow)
.add_edge(run_dynamic_workflow, generate)
.build()
)
__all__ = ["ValidationConfig", "create_validation_workflow"]