Parallel Orchestration
SmartTools executes steps sequentially within a tool, but you can run multiple tools in parallel using Python's ThreadPoolExecutor. This pattern is ideal for multi-agent workflows, parallel analysis, or any task where you need responses from multiple AI providers simultaneously.
Why Parallel Execution?
Consider a code review workflow that needs input from multiple perspectives:
- Sequential: Security → Performance → Style = 45 seconds
- Parallel: All three at once = 15 seconds
Basic Pattern
Use Python's concurrent.futures to run multiple SmartTools in parallel:
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
def run_tool(tool_name: str, input_text: str) -> dict:
"""Run a SmartTool and return its output."""
result = subprocess.run(
[tool_name],
input=input_text,
capture_output=True,
text=True
)
return {
"tool": tool_name,
"output": result.stdout,
"success": result.returncode == 0
}
def run_parallel(tools: list[str], input_text: str) -> list[dict]:
"""Run multiple tools in parallel on the same input."""
results = []
with ThreadPoolExecutor(max_workers=len(tools)) as executor:
# Submit all tools
futures = {
executor.submit(run_tool, tool, input_text): tool
for tool in tools
}
# Collect results as they complete
for future in as_completed(futures):
results.append(future.result())
return results
# Example usage
tools = ["security-review", "performance-review", "style-review"]
code = open("main.py").read()
reviews = run_parallel(tools, code)
for review in reviews:
print(f"=== {review['tool']} ===")
print(review['output'])
Real-World Example: Multi-Perspective Analysis
Here's a complete script that gets multiple AI perspectives on a topic:
#!/usr/bin/env python3
"""Get multiple AI perspectives on a topic in parallel."""
import subprocess
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
# Define your perspective tools (each is a SmartTool)
PERSPECTIVES = [
"perspective-optimist", # Focuses on opportunities
"perspective-critic", # Identifies problems
"perspective-pragmatist", # Focuses on actionability
]
def get_perspective(tool: str, topic: str) -> dict:
"""Get one perspective on a topic."""
result = subprocess.run(
[tool],
input=topic,
capture_output=True,
text=True,
timeout=60 # Timeout after 60 seconds
)
return {
"perspective": tool.replace("perspective-", ""),
"response": result.stdout.strip(),
"success": result.returncode == 0
}
def analyze_topic(topic: str) -> list[dict]:
"""Get all perspectives in parallel."""
with ThreadPoolExecutor(max_workers=len(PERSPECTIVES)) as executor:
futures = {
executor.submit(get_perspective, tool, topic): tool
for tool in PERSPECTIVES
}
results = []
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as e:
tool = futures[future]
results.append({
"perspective": tool,
"response": f"Error: {e}",
"success": False
})
return results
if __name__ == "__main__":
import sys
topic = sys.stdin.read() if not sys.stdin.isatty() else input("Topic: ")
print("Gathering perspectives...\n")
perspectives = analyze_topic(topic)
for p in perspectives:
status = "✓" if p["success"] else "✗"
print(f"[{status}] {p['perspective'].upper()}")
print("-" * 40)
print(p["response"])
print()
Adding Progress Feedback
For long-running parallel tasks, show progress as tools complete:
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
def run_with_progress(tools: list[str], input_text: str):
"""Run tools in parallel with progress updates."""
total = len(tools)
completed = 0
with ThreadPoolExecutor(max_workers=total) as executor:
futures = {
executor.submit(run_tool, tool, input_text): tool
for tool in tools
}
results = []
for future in as_completed(futures):
completed += 1
tool = futures[future]
result = future.result()
results.append(result)
# Progress update
status = "✓" if result["success"] else "✗"
print(f"[{completed}/{total}] {status} {tool}", file=sys.stderr)
return results
Error Handling
Handle failures gracefully so one tool doesn't break the entire workflow:
def run_tool_safe(tool_name: str, input_text: str, timeout: int = 120) -> dict:
"""Run a tool with timeout and error handling."""
try:
result = subprocess.run(
[tool_name],
input=input_text,
capture_output=True,
text=True,
timeout=timeout
)
return {
"tool": tool_name,
"output": result.stdout,
"error": result.stderr if result.returncode != 0 else None,
"success": result.returncode == 0
}
except subprocess.TimeoutExpired:
return {
"tool": tool_name,
"output": "",
"error": f"Timeout after {timeout}s",
"success": False
}
except FileNotFoundError:
return {
"tool": tool_name,
"output": "",
"error": f"Tool '{tool_name}' not found",
"success": False
}
Best Practices
- Set timeouts - Prevent hanging on slow providers
- Handle errors per-tool - Don't let one failure break everything
- Limit concurrency - Match
max_workersto your use case - Use stderr for progress - Keep stdout clean for piping
- Consider rate limits - Some providers limit concurrent requests
Full Example: orchestrated-discussions
For a complete implementation of parallel SmartTools orchestration, see the orchestrated-discussions project. It implements:
- Multiple AI "participants" as SmartTools
- Parallel execution with live progress logging
- Shared log files for real-time monitoring
- Discussion workflows with voting and consensus