Skip to content

Parallel Orchestration

SmartTools executes steps sequentially within a tool, but you can run multiple tools in parallel using Python's ThreadPoolExecutor. This pattern is ideal for multi-agent workflows, parallel analysis, or any task where you need responses from multiple AI providers simultaneously.

Why Parallel Execution?

Consider a code review workflow that needs input from multiple perspectives:

  • Sequential: Security → Performance → Style = 45 seconds
  • Parallel: All three at once = 15 seconds

Basic Pattern

Use Python's concurrent.futures to run multiple SmartTools in parallel:

import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed

def run_tool(tool_name: str, input_text: str) -> dict:
    """Run a SmartTool and return its output."""
    result = subprocess.run(
        [tool_name],
        input=input_text,
        capture_output=True,
        text=True
    )
    return {
        "tool": tool_name,
        "output": result.stdout,
        "success": result.returncode == 0
    }

def run_parallel(tools: list[str], input_text: str) -> list[dict]:
    """Run multiple tools in parallel on the same input."""
    results = []

    with ThreadPoolExecutor(max_workers=len(tools)) as executor:
        # Submit all tools
        futures = {
            executor.submit(run_tool, tool, input_text): tool
            for tool in tools
        }

        # Collect results as they complete
        for future in as_completed(futures):
            results.append(future.result())

    return results

# Example usage
tools = ["security-review", "performance-review", "style-review"]
code = open("main.py").read()

reviews = run_parallel(tools, code)
for review in reviews:
    print(f"=== {review['tool']} ===")
    print(review['output'])

Real-World Example: Multi-Perspective Analysis

Here's a complete script that gets multiple AI perspectives on a topic:

#!/usr/bin/env python3
"""Get multiple AI perspectives on a topic in parallel."""

import subprocess
import json
from concurrent.futures import ThreadPoolExecutor, as_completed

# Define your perspective tools (each is a SmartTool)
PERSPECTIVES = [
    "perspective-optimist",    # Focuses on opportunities
    "perspective-critic",      # Identifies problems
    "perspective-pragmatist",  # Focuses on actionability
]

def get_perspective(tool: str, topic: str) -> dict:
    """Get one perspective on a topic."""
    result = subprocess.run(
        [tool],
        input=topic,
        capture_output=True,
        text=True,
        timeout=60  # Timeout after 60 seconds
    )

    return {
        "perspective": tool.replace("perspective-", ""),
        "response": result.stdout.strip(),
        "success": result.returncode == 0
    }

def analyze_topic(topic: str) -> list[dict]:
    """Get all perspectives in parallel."""
    with ThreadPoolExecutor(max_workers=len(PERSPECTIVES)) as executor:
        futures = {
            executor.submit(get_perspective, tool, topic): tool
            for tool in PERSPECTIVES
        }

        results = []
        for future in as_completed(futures):
            try:
                results.append(future.result())
            except Exception as e:
                tool = futures[future]
                results.append({
                    "perspective": tool,
                    "response": f"Error: {e}",
                    "success": False
                })

        return results

if __name__ == "__main__":
    import sys
    topic = sys.stdin.read() if not sys.stdin.isatty() else input("Topic: ")

    print("Gathering perspectives...\n")
    perspectives = analyze_topic(topic)

    for p in perspectives:
        status = "✓" if p["success"] else "✗"
        print(f"[{status}] {p['perspective'].upper()}")
        print("-" * 40)
        print(p["response"])
        print()

Adding Progress Feedback

For long-running parallel tasks, show progress as tools complete:

import sys
from concurrent.futures import ThreadPoolExecutor, as_completed

def run_with_progress(tools: list[str], input_text: str):
    """Run tools in parallel with progress updates."""
    total = len(tools)
    completed = 0

    with ThreadPoolExecutor(max_workers=total) as executor:
        futures = {
            executor.submit(run_tool, tool, input_text): tool
            for tool in tools
        }

        results = []
        for future in as_completed(futures):
            completed += 1
            tool = futures[future]
            result = future.result()
            results.append(result)

            # Progress update
            status = "✓" if result["success"] else "✗"
            print(f"[{completed}/{total}] {status} {tool}", file=sys.stderr)

        return results

Error Handling

Handle failures gracefully so one tool doesn't break the entire workflow:

def run_tool_safe(tool_name: str, input_text: str, timeout: int = 120) -> dict:
    """Run a tool with timeout and error handling."""
    try:
        result = subprocess.run(
            [tool_name],
            input=input_text,
            capture_output=True,
            text=True,
            timeout=timeout
        )
        return {
            "tool": tool_name,
            "output": result.stdout,
            "error": result.stderr if result.returncode != 0 else None,
            "success": result.returncode == 0
        }
    except subprocess.TimeoutExpired:
        return {
            "tool": tool_name,
            "output": "",
            "error": f"Timeout after {timeout}s",
            "success": False
        }
    except FileNotFoundError:
        return {
            "tool": tool_name,
            "output": "",
            "error": f"Tool '{tool_name}' not found",
            "success": False
        }

Best Practices

  • Set timeouts - Prevent hanging on slow providers
  • Handle errors per-tool - Don't let one failure break everything
  • Limit concurrency - Match max_workers to your use case
  • Use stderr for progress - Keep stdout clean for piping
  • Consider rate limits - Some providers limit concurrent requests

Full Example: orchestrated-discussions

For a complete implementation of parallel SmartTools orchestration, see the orchestrated-discussions project. It implements:

  • Multiple AI "participants" as SmartTools
  • Parallel execution with live progress logging
  • Shared log files for real-time monitoring
  • Discussion workflows with voting and consensus