Python: Clean up and formatting (#487)

* Clean up and formatting

* Fix mypy

* Bug fix
This commit is contained in:
Tao Chen
2025-08-26 13:21:32 -07:00
committed by GitHub
Unverified
parent 5df0bd6cb0
commit beb5218838
6 changed files with 196 additions and 187 deletions
@@ -79,9 +79,7 @@ class TextProcessor(Executor):
super().__init__(id="text_processor")
@handler
async def process_text(
self, request: TextProcessingRequest, ctx: WorkflowContext[TextProcessingResult]
) -> None:
async def process_text(self, request: TextProcessingRequest, ctx: WorkflowContext[TextProcessingResult]) -> None:
"""Process a text string and return statistics."""
text_preview = f"'{request.text[:50]}{'...' if len(request.text) > 50 else ''}'"
print(f"πŸ” Sub-workflow processing text (Task {request.task_id}): {text_preview}")
@@ -108,7 +106,7 @@ class TextProcessor(Executor):
# Parent workflow
class TextProcessingOrchestrator(Executor):
"""Orchestrates multiple text processing tasks using sub-workflows."""
results: list[TextProcessingResult] = []
expected_count: int = 0
@@ -116,9 +114,7 @@ class TextProcessingOrchestrator(Executor):
super().__init__(id="text_orchestrator")
@handler
async def start_processing(
self, texts: list[str], ctx: WorkflowContext[TextProcessingRequest]
) -> None:
async def start_processing(self, texts: list[str], ctx: WorkflowContext[TextProcessingRequest]) -> None:
"""Start processing multiple text strings."""
print(f"πŸ“„ Starting processing of {len(texts)} text strings")
print("=" * 60)
@@ -127,15 +123,13 @@ class TextProcessingOrchestrator(Executor):
# Send each text to a sub-workflow
for i, text in enumerate(texts):
task_id = f"task_{i+1}"
task_id = f"task_{i + 1}"
request = TextProcessingRequest(text=text, task_id=task_id)
print(f"πŸ“€ Dispatching {task_id} to sub-workflow")
await ctx.send_message(request, target_id="text_processor_workflow")
@handler
async def collect_result(
self, result: TextProcessingResult, ctx: WorkflowContext[None]
) -> None:
async def collect_result(self, result: TextProcessingResult, ctx: WorkflowContext[None]) -> None:
"""Collect results from sub-workflows."""
print(f"πŸ“₯ Collected result from {result.task_id}")
self.results.append(result)
@@ -168,11 +162,7 @@ async def main():
# Step 1: Create the text processing sub-workflow
text_processor = TextProcessor()
processing_workflow = (
WorkflowBuilder()
.set_start_executor(text_processor)
.build()
)
processing_workflow = WorkflowBuilder().set_start_executor(text_processor).build()
print("πŸ”§ Setting up parent workflow...")
@@ -205,7 +195,7 @@ async def main():
result = await main_workflow.run(test_texts)
# Step 5: Display results
print(f"\nπŸ“Š Processing Results:")
print("\nπŸ“Š Processing Results:")
print("=" * 60)
# Sort results by task_id for consistent display
@@ -213,12 +203,12 @@ async def main():
for result in sorted_results:
preview = result.text[:30] + "..." if len(result.text) > 30 else result.text
preview = preview.replace('\n', ' ').strip() or '(empty)'
preview = preview.replace("\n", " ").strip() or "(empty)"
print(f"βœ… {result.task_id}: '{preview}' -> {result.word_count} words, {result.char_count} chars")
# Step 6: Display summary
summary = orchestrator.get_summary()
print(f"\nπŸ“ˆ Summary:")
print("\nπŸ“ˆ Summary:")
print("=" * 60)
print(f"πŸ“„ Total texts processed: {summary['total_texts']}")
print(f"πŸ“ Total words: {summary['total_words']}")
@@ -226,7 +216,7 @@ async def main():
print(f"πŸ“Š Average words per text: {summary['average_words_per_text']}")
print(f"πŸ“ Average characters per text: {summary['average_characters_per_text']}")
print(f"\n🏁 Processing complete!")
print("\n🏁 Processing complete!")
if __name__ == "__main__":
@@ -127,11 +127,19 @@ class EmailValidator(Executor):
) -> None:
"""Handle domain check response from RequestInfo with correlation."""
approved = bool(response.data)
domain = response.original_request.domain if (hasattr(response, 'original_request') and response.original_request) else "unknown"
domain = (
response.original_request.domain
if (hasattr(response, "original_request") and response.original_request)
else "unknown"
)
print(f"πŸ“¬ Sub-workflow received domain response for '{domain}': {approved}")
# Find the corresponding email using the request_id
request_id = response.original_request.request_id if (hasattr(response, 'original_request') and response.original_request) else None
request_id = (
response.original_request.request_id
if (hasattr(response, "original_request") and response.original_request)
else None
)
if request_id and request_id in self._pending_emails:
email = self._pending_emails.pop(request_id) # Remove from pending
result = ValidationResult(
@@ -146,6 +154,7 @@ class EmailValidator(Executor):
# 3. Implement the parent workflow with request interception
class SmartEmailOrchestrator(Executor):
"""Parent orchestrator that can intercept domain checks."""
approved_domains: set[str] = set()
def __init__(self, approved_domains: set[str] | None = None):
@@ -177,7 +186,7 @@ class SmartEmailOrchestrator(Executor):
print(f"βœ… Domain '{request.domain}' is pre-approved locally!")
return RequestResponse[DomainCheckRequest, bool].handled(True)
print(f"❓ Domain '{request.domain}' unknown, forwarding to external service...")
return RequestResponse.forward()
return RequestResponse[DomainCheckRequest, bool].forward()
@handler
async def collect_result(self, result: ValidationResult, ctx: WorkflowContext[None]) -> None:
@@ -196,7 +205,7 @@ async def run_example() -> None:
"""Run the sub-workflow example."""
print("πŸš€ Setting up sub-workflow with request interception...")
print()
# 4. Build the sub-workflow
email_validator = EmailValidator()
# Match the target_id used in EmailValidator ("email_request_info")
@@ -262,12 +271,12 @@ async def run_example() -> None:
print("\n🎯 All requests were intercepted and handled locally!")
# 10. Display final summary
print(f"\nπŸ“Š Final Results Summary:")
print("\nπŸ“Š Final Results Summary:")
print("=" * 60)
for result in orchestrator.results:
status = "βœ… VALID" if result.is_valid else "❌ INVALID"
print(f"{status} {result.email}: {result.reason}")
print(f"\n🏁 Processed {len(orchestrator.results)} emails total")
@@ -143,7 +143,10 @@ class ResourceRequester(Executor):
# Send to parent workflow for interception - not to target_id
await ctx.send_message(request)
elif req_type == "policy":
print(f" πŸ›‘οΈ Checking policy: {req_data.get('type', 'cpu')} x{req_data.get('amount', 1)} ({req_data.get('policy_type', 'quota')})")
print(
f" πŸ›‘οΈ Checking policy: {req_data.get('type', 'cpu')} x{req_data.get('amount', 1)} "
f"({req_data.get('policy_type', 'quota')})"
)
request = PolicyCheckRequest(
resource_type=req_data.get("type", "cpu"),
amount=req_data.get("amount", 1),
@@ -162,7 +165,8 @@ class ResourceRequester(Executor):
if response.data:
source_icon = "πŸͺ" if response.data.source == "cache" else "🌐"
print(
f"πŸ“¦ {source_icon} Sub-workflow received: {response.data.allocated} {response.data.resource_type} from {response.data.source}"
f"πŸ“¦ {source_icon} Sub-workflow received: {response.data.allocated} {response.data.resource_type} "
f"from {response.data.source}"
)
if self._collect_results():
# Emit completion event and send RequestFinished to the parent workflow.
@@ -175,7 +179,10 @@ class ResourceRequester(Executor):
"""Handle policy check response."""
if response.data:
status_icon = "βœ…" if response.data.approved else "❌"
print(f"πŸ›‘οΈ {status_icon} Sub-workflow received policy response: {response.data.approved} - {response.data.reason}")
print(
f"πŸ›‘οΈ {status_icon} Sub-workflow received policy response: "
f"{response.data.approved} - {response.data.reason}"
)
if self._collect_results():
# Emit completion event and send RequestFinished to the parent workflow.
await ctx.add_event(WorkflowCompletedEvent(RequestFinished()))
@@ -218,7 +225,7 @@ class ResourceCache(Executor):
# Cache miss - forward to external
print(f" ❌ Cache miss: need {request.amount}, have {available} {request.resource_type}")
return RequestResponse.forward()
return RequestResponse[ResourceRequest, ResourceResponse].forward()
@handler
async def collect_result(
@@ -228,7 +235,8 @@ class ResourceCache(Executor):
if response.data and response.data.source != "cache": # Don't double-count our own results
self.results.append(response.data)
print(
f"πŸͺ 🌐 Cache received external response: {response.data.allocated} {response.data.resource_type} from {response.data.source}"
f"πŸͺ 🌐 Cache received external response: {response.data.allocated} {response.data.resource_type} "
f"from {response.data.source}"
)
@@ -265,11 +273,11 @@ class PolicyEngine(Executor):
return RequestResponse[PolicyCheckRequest, PolicyResponse].handled(response)
# Exceeds quota - forward to external for review
print(f" ❌ Policy exceeds quota: {request.amount} > {quota_limit}, forwarding to external")
return RequestResponse.forward()
return RequestResponse[PolicyCheckRequest, PolicyResponse].forward()
# Unknown policy type - forward to external
print(f" ❓ Unknown policy type: {request.policy_type}, forwarding")
return RequestResponse.forward()
return RequestResponse[PolicyCheckRequest, PolicyResponse].forward()
@handler
async def collect_policy_result(
@@ -292,19 +300,22 @@ class Coordinator(Executor):
@handler
async def handle_completion(self, completion: RequestFinished, ctx: WorkflowContext[None]) -> None:
"""Handle sub-workflow completion. It comes from the sub-workflow emitted WorkflowCompletionEvent's data field."""
print(f"🎯 Main workflow received completion.")
"""Handle sub-workflow completion.
It comes from the sub-workflow emitted WorkflowCompletionEvent's data field.
"""
print("🎯 Main workflow received completion.")
async def main() -> None:
"""Demonstrate parallel request interception patterns."""
print("πŸš€ Starting Sub-Workflow Parallel Request Interception Demo...")
print("=" * 60)
# 5. Create the sub-workflow
# 5. Create the sub-workflow
resource_requester = ResourceRequester()
sub_request_info = RequestInfoExecutor(id="sub_request_info")
sub_workflow = (
WorkflowBuilder()
.set_start_executor(resource_requester)
@@ -353,7 +364,10 @@ async def main() -> None:
print(f"πŸ§ͺ Testing with {len(test_requests)} mixed requests:")
for i, req in enumerate(test_requests, 1):
req_icon = "πŸ“¦" if req["request_type"] == "resource" else "πŸ›‘οΈ"
print(f" {i}. {req_icon} {req['type']} x{req['amount']} ({req.get('priority', req.get('policy_type', 'default'))})")
print(
f" {i}. {req_icon} {req['type']} x{req['amount']} "
f"({req.get('priority', req.get('policy_type', 'default'))})"
)
print("=" * 70)
# 8. Run the workflow
@@ -398,11 +412,11 @@ async def main() -> None:
status_icon = "βœ…" if result.approved else "❌"
print(f" {status_icon} Approved: {result.approved} - {result.reason}")
print(f"\nπŸ’Ύ Final Cache State:")
print("\nπŸ’Ύ Final Cache State:")
for resource, amount in cache.cache.items():
print(f" πŸ“¦ {resource}: {amount} remaining")
print(f"\nπŸ“ˆ Summary:")
print("\nπŸ“ˆ Summary:")
print(f" 🎯 Total requests: {len(test_requests)}")
print(f" πŸͺ Resource requests handled: {len(cache.results)}")
print(f" πŸ›‘οΈ Policy requests handled: {len(policy.results)}")
@@ -410,5 +424,6 @@ async def main() -> None:
print("\n" + "=" * 70)
if __name__ == "__main__":
asyncio.run(main())