Python: Update sample validation scripts (#4870)

* Update sample validation scripts

* Adjust prompt

* Update autogen-migration samples

* Add fix suggestion

* Split jobs

* Add .env

* Create trend report

* Add timestamp

* Add more env vars

* Comments

* force node24

* force node24

* force node22
This commit is contained in:
Tao Chen
2026-03-24 18:21:32 -07:00
committed by GitHub
Unverified
parent 2c000b032d
commit 4b533608b6
19 changed files with 928 additions and 202 deletions
+6 -7
View File
@@ -165,18 +165,17 @@ Produces:
## Report Status Codes
| Status | Label | Description |
| ------- | --------- | ----------------------------------------- |
| SUCCESS | [PASS] | Sample ran to completion with exit code 0 |
| FAILURE | [FAIL] | Sample exited with non-zero code |
| TIMEOUT | [TIMEOUT] | Sample exceeded timeout limit |
| ERROR | [ERROR] | Exception during execution |
| Status | Label | Description |
| ------------- | --------------- | ----------------------------------------- |
| SUCCESS | [PASS] | Sample ran to completion with exit code 0 |
| FAILURE | [FAIL] | Sample did not complete successfully (non-zero exit code) |
| MISSING_SETUP | [MISSING_SETUP] | Sample skipped due to missing setup |
## Troubleshooting
### Agent output parsing errors
If an agent returns non-JSON content, that sample is marked as `ERROR` with parser details in the report.
If an agent returns non-JSON content, that sample is marked as `FAILURE` with parser details in the report.
### GitHub Copilot authentication or CLI issues
+9 -1
View File
@@ -75,6 +75,13 @@ Examples:
help="Custom name for the report files (without extension). If not provided, uses timestamp.",
)
parser.add_argument(
"--exclude",
nargs="+",
type=str,
help="Subdirectory paths to exclude (relative to the search directory set by --subdir)",
)
return parser.parse_args()
@@ -104,6 +111,7 @@ async def main() -> int:
samples_dir=samples_dir,
python_root=python_root,
subdir=args.subdir,
exclude=args.exclude,
max_parallel_workers=max(1, args.max_parallel_workers),
)
@@ -138,7 +146,7 @@ async def main() -> int:
print(f" JSON: {json_path}")
# Return appropriate exit code
failed = report.failure_count + report.timeout_count + report.error_count
failed = report.failure_count + report.missing_setup_count
return 1 if failed > 0 else 0
@@ -0,0 +1,224 @@
# Copyright (c) Microsoft. All rights reserved.
"""Aggregate validation reports across runs and produce a trend report.
Reads JSON reports from individual validation jobs, combines them with
cached history from previous runs, and produces a markdown trend report
showing per-sample status over the last 5 runs.
Usage:
python aggregate.py <reports-dir> <history-file> <output-file>
"""
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
MAX_HISTORY = 5
STATUS_EMOJI = {
"success": "",
"failure": "",
"missing_setup": "⚠️",
}
def _format_run_label(timestamp: str) -> str:
"""Format a run timestamp as a compact column label (e.g. '03-24 18:05')."""
try:
dt = datetime.fromisoformat(timestamp)
return dt.strftime("%m-%d %H:%M")
except (ValueError, TypeError):
return timestamp[:16]
def load_current_run(reports_dir: Path) -> dict[str, Any]:
"""Load all JSON report files from the current run and merge them."""
combined_results: dict[str, str] = {}
total = success = failure = missing = 0
json_files = sorted(reports_dir.glob("*.json"))
if not json_files:
print(f"Warning: No JSON report files found in {reports_dir}")
return {
"timestamp": datetime.now().isoformat(),
"summary": {
"total_samples": 0,
"success_count": 0,
"failure_count": 0,
"missing_setup_count": 0,
},
"results": {},
}
for json_file in json_files:
print(f" Loading report: {json_file.name}")
with open(json_file, encoding="utf-8") as f:
report = json.load(f)
for result in report["results"]:
combined_results[result["path"]] = result["status"]
summary = report["summary"]
total += summary["total_samples"]
success += summary["success_count"]
failure += summary["failure_count"]
missing += summary["missing_setup_count"]
return {
"timestamp": datetime.now().isoformat(),
"summary": {
"total_samples": total,
"success_count": success,
"failure_count": failure,
"missing_setup_count": missing,
},
"results": combined_results,
}
def load_history(history_path: Path) -> list[dict[str, Any]]:
"""Load previous run history from cache."""
if history_path.exists():
with open(history_path, encoding="utf-8") as f:
data = json.load(f)
runs = data.get("runs", [])
print(f" Loaded {len(runs)} previous run(s) from history")
return runs
print(" No previous history found")
return []
def save_history(history_path: Path, runs: list[dict[str, Any]]) -> None:
"""Save run history, keeping only the last MAX_HISTORY entries."""
history_path.parent.mkdir(parents=True, exist_ok=True)
trimmed = runs[-MAX_HISTORY:]
with open(history_path, "w", encoding="utf-8") as f:
json.dump({"runs": trimmed}, f, indent=2)
print(f" Saved {len(trimmed)} run(s) to history")
def generate_trend_report(runs: list[dict[str, Any]]) -> str:
"""Generate a markdown trend report from run history."""
lines = [
"# Sample Validation Trend Report",
"",
f"*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M UTC')}*",
"",
]
# --- Overall status table (most recent first) ---
lines.append("## Overall Status (Last 5 Runs)")
lines.append("")
lines.append("| Run | Success | Failure | Missing Setup | Total |")
lines.append("|-----|---------|---------|---------------|-------|")
for run in reversed(runs):
s = run["summary"]
label = _format_run_label(run["timestamp"])
lines.append(
f"| {label} | {s['success_count']}/{s['total_samples']} "
f"| {s['failure_count']}/{s['total_samples']} "
f"| {s['missing_setup_count']}/{s['total_samples']} "
f"| {s['total_samples']} |"
)
# Pad with N/A rows if fewer than 5 runs
for _ in range(MAX_HISTORY - len(runs)):
lines.append("| N/A | N/A | N/A | N/A | N/A |")
lines.append("")
# --- Per-sample results table ---
lines.append("## Per-Sample Results")
lines.append("")
# Collect all sample paths across all runs
all_paths: set[str] = set()
for run in runs:
all_paths.update(run["results"].keys())
if not all_paths:
lines.append("*No sample results available.*")
return "\n".join(lines)
# Build header (most recent run first)
header = "| Sample |"
separator = "|--------|"
for run in reversed(runs):
label = _format_run_label(run["timestamp"])
header += f" {label} |"
separator += "------------|"
for _ in range(MAX_HISTORY - len(runs)):
header += " N/A |"
separator += "-----|"
lines.append(header)
lines.append(separator)
for path in sorted(all_paths):
row = f"| `{path}` |"
for run in reversed(runs):
status = run["results"].get(path, "N/A")
emoji = STATUS_EMOJI.get(status, "N/A")
row += f" {emoji} |"
for _ in range(MAX_HISTORY - len(runs)):
row += " N/A |"
lines.append(row)
lines.append("")
lines.append("**Legend:** ✅ Success · ❌ Failure · ⚠️ Missing Setup · N/A Not available")
lines.append("")
return "\n".join(lines)
def main() -> int:
if len(sys.argv) != 4:
print("Usage: python aggregate.py <reports-dir> <history-file> <output-file>")
return 1
reports_dir = Path(sys.argv[1])
history_path = Path(sys.argv[2])
output_path = Path(sys.argv[3])
print("Aggregating validation results...")
# Load current run's reports
print(f"\nLoading reports from {reports_dir}:")
current_run = load_current_run(reports_dir)
s = current_run["summary"]
print(
f" Current run: {s['success_count']} success, "
f"{s['failure_count']} failure, "
f"{s['missing_setup_count']} missing setup "
f"(total: {s['total_samples']})"
)
# Load history and append current run
print(f"\nLoading history from {history_path}:")
runs = load_history(history_path)
runs.append(current_run)
runs = runs[-MAX_HISTORY:]
# Save updated history
print(f"\nSaving history to {history_path}:")
save_history(history_path, runs)
# Generate trend report
print("\nGenerating trend report...")
report = generate_trend_report(runs)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report, encoding="utf-8")
print(f"Trend report written to {output_path}")
# Also print the report to stdout
print("\n" + "=" * 80)
print(report)
return 0
if __name__ == "__main__":
sys.exit(main())
@@ -14,7 +14,8 @@ from agent_framework import (
handler,
)
from agent_framework.github import GitHubCopilotAgent
from copilot.types import PermissionRequest, PermissionRequestResult
from copilot.generated.session_events import PermissionRequest
from copilot.types import PermissionRequestResult
from pydantic import BaseModel
from typing_extensions import Never
@@ -36,6 +37,7 @@ class AgentResponseFormat(BaseModel):
status: str
output: str
error: str
fix: str
@dataclass
@@ -54,15 +56,20 @@ class BatchCompletion:
AgentInstruction = (
"You are validating exactly one Python sample.\n"
"Analyze the sample code and execute it. Based on the execution result, determine if it "
"runs successfully, fails, or times out. Feel free to install any required dependencies.\n"
"Analyze the sample code and execute it as it is. Based on the execution result, determine "
"if it runs successfully, fails, or is missing_setup. Use `missing_setup` if the sample reports "
"missing required environment variables. The environment you're given should contain the necessary "
"variables. Don't create new environment variables nor modify the sample code.\n"
"Feel free to install any required dependencies if needed.\n"
"The sample can be interactive. If it is interactive, respond to the sample when prompted "
"based on your analysis of the code. You do not need to consult human on what to respond.\n"
"If the sample fails, investigate the error and suggest a fix.\n"
"Return ONLY valid JSON with this schema:\n"
"{\n"
' "status": "success|failure|timeout|error",\n'
' "status": "success|failure|missing_setup",\n'
' "output": "short summary of the result and what you did if the sample was interactive",\n'
' "error": "error details or empty string"\n'
' "error": "error details or empty string",\n'
' "fix": "suggested code fix if the sample failed, otherwise empty string"\n'
"}\n\n"
)
@@ -87,16 +94,15 @@ def status_from_text(value: str) -> RunStatus:
for status in RunStatus:
if status.value == normalized:
return status
return RunStatus.ERROR
return RunStatus.FAILURE
def prompt_permission(
request: PermissionRequest, context: dict[str, str]
) -> PermissionRequestResult:
"""Permission handler that always approves."""
kind = request.get("kind", "unknown")
logger.debug(
f"[Permission Request: {kind}] ({context})Automatically approved for sample validation."
f"[Permission Request: {request.kind}] ({context})Automatically approved for sample validation."
)
return PermissionRequestResult(kind="approved")
@@ -108,39 +114,73 @@ class CustomAgentExecutor(Executor):
returned as error responses, otherwise an exception in one agent could crash the entire workflow.
"""
# Retry in case GitHub Copilot agent encounters transient errors unrelated to the sample execution.
RETRY_COUNT = 1
def __init__(self, agent: GitHubCopilotAgent):
super().__init__(id=agent.id)
self.agent = agent
self._session = agent.create_session()
@handler
async def handle_task(
self, sample: SampleInfo, ctx: WorkflowContext[WorkerFreed | RunResult]
) -> None:
"""Execute one sample task and notify collector + coordinator."""
try:
response = await self.agent.run(
[
Message(
role="user",
text=f"Validate the following sample:\n\n{sample.relative_path}",
current_retry = 0
while True:
try:
response = await self.agent.run(
[
Message(
role="user",
text=f"Validate the following sample:\n\n{sample.relative_path}",
)
],
session=self._session,
)
result_payload = parse_agent_json(response.text)
result = RunResult(
sample=sample,
status=status_from_text(result_payload.status),
output=result_payload.output,
error=result_payload.error,
fix=result_payload.fix,
)
break
except Exception as ex:
if current_retry < self.RETRY_COUNT:
logger.warning(
f"Error executing agent {self.agent.id} (attempt {current_retry + 1}/{self.RETRY_COUNT}): {ex}. Retrying..."
)
]
)
result_payload = parse_agent_json(response.text)
result = RunResult(
sample=sample,
status=status_from_text(result_payload.status),
output=result_payload.output,
error=result_payload.error,
)
except Exception as ex:
logger.error(f"Error executing agent {self.agent.id}: {ex}")
result = RunResult(
sample=sample,
status=RunStatus.ERROR,
output="",
error=str(ex),
)
try:
current_retry += 1
await self.agent.stop()
await self.agent.start()
self._session = self.agent.create_session() # Reset session for retry
continue
except Exception as restart_ex:
logger.error(
f"Error restarting agent {self.agent.id}: {restart_ex}. No more retries."
)
result = RunResult(
sample=sample,
status=RunStatus.FAILURE,
output="",
error=f"Original error: {ex}. Restart error: {restart_ex}",
fix="",
)
break
logger.error(f"Error executing agent {self.agent.id}: {ex}")
result = RunResult(
sample=sample,
status=RunStatus.FAILURE,
output="",
error=str(ex),
fix="",
)
break
await ctx.send_message(result, target_id="collector")
await ctx.send_message(WorkerFreed(worker_id=self.id), target_id="coordinator")
@@ -252,7 +292,7 @@ class CreateConcurrentValidationWorkflowExecutor(Executor):
instructions=AgentInstruction,
default_options={
"on_permission_request": prompt_permission,
"timeout": 180,
"timeout": 60,
}, # type: ignore
)
agents.append(agent)
+20 -4
View File
@@ -52,13 +52,18 @@ def _has_main_entrypoint_guard(path: Path) -> bool:
)
def discover_samples(samples_dir: Path, subdir: str | None = None) -> list[SampleInfo]:
def discover_samples(
samples_dir: Path,
subdir: str | None = None,
exclude: list[str] | None = None,
) -> list[SampleInfo]:
"""
Find all Python sample files in the samples directory.
Args:
samples_dir: Root samples directory
subdir: Optional subdirectory to filter to
exclude: Optional list of subdirectory paths (relative to the search directory) to exclude
Returns:
List of SampleInfo objects for each discovered sample
@@ -72,12 +77,21 @@ def discover_samples(samples_dir: Path, subdir: str | None = None) -> list[Sampl
else:
search_dir = samples_dir
# Resolve excluded paths to absolute for reliable comparison
exclude_paths = {(search_dir / exc).resolve() for exc in (exclude or [])}
python_files: list[Path] = []
# Walk through all subdirectories and find .py files
for root, dirs, files in os.walk(search_dir):
# Skip directories that start with _ (like _sample_validation)
dirs[:] = [d for d in dirs if not d.startswith("_") and d != "__pycache__"]
# Skip directories that start with _, __pycache__, or excluded paths
dirs[:] = [
d
for d in dirs
if not d.startswith("_")
and d != "__pycache__"
and (Path(root) / d).resolve() not in exclude_paths
]
for file in files:
# Skip files that start with _ and include only scripts with a main entrypoint guard
@@ -113,8 +127,10 @@ class DiscoverSamplesExecutor(Executor):
print(f"🔍 Discovering samples in {self.config.samples_dir}")
if self.config.subdir:
print(f" Filtering to subdirectory: {self.config.subdir}")
if self.config.exclude:
print(f" Excluding: {', '.join(self.config.exclude)}")
samples = discover_samples(self.config.samples_dir, self.config.subdir)
samples = discover_samples(self.config.samples_dir, self.config.subdir, self.config.exclude)
print(f" Found {len(samples)} samples")
await ctx.send_message(DiscoveryResult(samples=samples))
+9 -11
View File
@@ -18,6 +18,7 @@ class ValidationConfig:
samples_dir: Path
python_root: Path
subdir: str | None = None
exclude: list[str] | None = None
max_parallel_workers: int = 10
@@ -60,8 +61,7 @@ class RunStatus(Enum):
SUCCESS = "success"
FAILURE = "failure"
TIMEOUT = "timeout"
ERROR = "error"
MISSING_SETUP = "missing_setup"
@dataclass
@@ -72,6 +72,7 @@ class RunResult:
status: RunStatus
output: str
error: str
fix: str
@dataclass
@@ -89,8 +90,7 @@ class Report:
total_samples: int
success_count: int
failure_count: int
timeout_count: int
error_count: int
missing_setup_count: int
results: list[RunResult] = field(default_factory=list) # type: ignore
def to_markdown(self) -> str:
@@ -107,15 +107,14 @@ class Report:
f"| Total Samples | {self.total_samples} |",
f"| [PASS] Success | {self.success_count} |",
f"| [FAIL] Failure | {self.failure_count} |",
f"| [TIMEOUT] Timeout | {self.timeout_count} |",
f"| [ERROR] Error | {self.error_count} |",
f"| [MISSING_SETUP] Missing Setup | {self.missing_setup_count} |",
"",
"## Detailed Results",
"",
]
# Group by status
for status in [RunStatus.FAILURE, RunStatus.TIMEOUT, RunStatus.ERROR, RunStatus.SUCCESS]:
for status in [RunStatus.FAILURE, RunStatus.MISSING_SETUP, RunStatus.SUCCESS]:
status_results = [r for r in self.results if r.status == status]
if not status_results:
continue
@@ -123,8 +122,7 @@ class Report:
status_label = {
RunStatus.SUCCESS: "[PASS]",
RunStatus.FAILURE: "[FAIL]",
RunStatus.TIMEOUT: "[TIMEOUT]",
RunStatus.ERROR: "[ERROR]",
RunStatus.MISSING_SETUP: "[MISSING_SETUP]",
}
lines.append(f"### {status_label[status]} {status.value.title()} ({len(status_results)})")
@@ -148,8 +146,7 @@ class Report:
"total_samples": self.total_samples,
"success_count": self.success_count,
"failure_count": self.failure_count,
"timeout_count": self.timeout_count,
"error_count": self.error_count,
"missing_setup_count": self.missing_setup_count,
},
"results": [
{
@@ -157,6 +154,7 @@ class Report:
"status": r.status.value,
"output": r.output,
"error": r.error,
"fix": r.fix,
}
for r in self.results
],
+6 -10
View File
@@ -22,12 +22,11 @@ def generate_report(results: list[RunResult]) -> Report:
Returns:
Report object with aggregated statistics
"""
# Sort results: failures, timeouts, errors first, then successes
# Sort results: failures, missing setup first, then successes
status_priority = {
RunStatus.FAILURE: 0,
RunStatus.TIMEOUT: 1,
RunStatus.ERROR: 2,
RunStatus.SUCCESS: 3,
RunStatus.MISSING_SETUP: 1,
RunStatus.SUCCESS: 2,
}
sorted_results = sorted(results, key=lambda r: status_priority[r.status])
@@ -36,8 +35,7 @@ def generate_report(results: list[RunResult]) -> Report:
total_samples=len(results),
success_count=sum(1 for r in results if r.status == RunStatus.SUCCESS),
failure_count=sum(1 for r in results if r.status == RunStatus.FAILURE),
timeout_count=sum(1 for r in results if r.status == RunStatus.TIMEOUT),
error_count=sum(1 for r in results if r.status == RunStatus.ERROR),
missing_setup_count=sum(1 for r in results if r.status == RunStatus.MISSING_SETUP),
results=sorted_results,
)
@@ -86,8 +84,7 @@ def print_summary(report: Report) -> None:
if (
report.failure_count == 0
and report.timeout_count == 0
and report.error_count == 0
and report.missing_setup_count == 0
):
print("[PASS] ALL SAMPLES PASSED!")
else:
@@ -98,8 +95,7 @@ def print_summary(report: Report) -> None:
print("Results:")
print(f" [PASS] Success: {report.success_count}")
print(f" [FAIL] Failure: {report.failure_count}")
print(f" [TIMEOUT] Timeout: {report.timeout_count}")
print(f" [ERR] Errors: {report.error_count}")
print(f" [MISSING_SETUP] Missing Setup: {report.missing_setup_count}")
print("=" * 80)
# Print JSON output for GitHub Actions visibility
@@ -66,9 +66,10 @@ class RunDynamicValidationWorkflowExecutor(Executor):
fallback_results = [
RunResult(
sample=sample,
status=RunStatus.ERROR,
status=RunStatus.FAILURE,
output="",
error="Nested workflow did not return an ExecutionResult.",
fix="",
)
for sample in creation.samples
]