SuperClaude/docs/Development/pm-agent-parallel-architecture.md
Mithun Gowda B faa53f27da
Revert "feat: comprehensive framework improvements (#447)" (#453)
This reverts commit 00706f0ea99aa04f54a3ddf5244738b0a9b37a91.
2025-10-21 09:48:41 +05:30

20 KiB

PM Agent Parallel Architecture Proposal

Date: 2025-10-17 Status: Proposed Enhancement Inspiration: Deep Research Agent parallel execution pattern

🎯 Vision

Transform PM Agent from sequential orchestrator to parallel meta-layer commander, enabling:

  • 10x faster execution for multi-domain tasks
  • Intelligent parallelization of independent sub-agent operations
  • Deep Research-style multi-hop parallel analysis
  • Zero-token baseline with on-demand MCP tool loading

🚨 Current Problem

Sequential Execution Bottleneck:

User Request: "Build real-time chat with video calling"

Current PM Agent Flow (Sequential):
  1. requirements-analyst: 10 minutes
  2. system-architect: 10 minutes
  3. backend-architect: 15 minutes
  4. frontend-architect: 15 minutes
  5. security-engineer: 10 minutes
  6. quality-engineer: 10 minutes
  Total: 70 minutes (all sequential)

Problem:
  - Steps 1-2 could run in parallel
  - Steps 3-4 could run in parallel after step 2
  - Steps 5-6 could run in parallel with 3-4
  - Actual dependency: Only ~30% of tasks are truly dependent
  - 70% of time wasted on unnecessary sequencing

Evidence from Deep Research Agent:

Deep Research Pattern:
  - Parallel search queries (3-5 simultaneous)
  - Parallel content extraction (multiple URLs)
  - Parallel analysis (multiple perspectives)
  - Sequential only when dependencies exist

Result:
  - 60-70% time reduction
  - Better resource utilization
  - Improved user experience

🎨 Proposed Architecture

Parallel Execution Engine

# Conceptual architecture (not implementation)

class PMAgentParallelOrchestrator:
    """
    PM Agent with Deep Research-style parallel execution

    Key Principles:
    1. Default to parallel execution
    2. Sequential only for true dependencies
    3. Intelligent dependency analysis
    4. Dynamic MCP tool loading per phase
    5. Self-correction with parallel retry
    """

    def __init__(self):
        self.dependency_analyzer = DependencyAnalyzer()
        self.mcp_gateway = MCPGatewayManager()  # Dynamic tool loading
        self.parallel_executor = ParallelExecutor()
        self.result_synthesizer = ResultSynthesizer()

    async def orchestrate(self, user_request: str):
        """Main orchestration flow"""

        # Phase 0: Request Analysis (Fast, Native Tools)
        analysis = await self.analyze_request(user_request)

        # Phase 1: Parallel Investigation
        if analysis.requires_multiple_agents:
            investigation_results = await self.execute_phase_parallel(
                phase="investigation",
                agents=analysis.required_agents,
                dependencies=analysis.dependencies
            )

        # Phase 2: Synthesis (Sequential, PM Agent)
        unified_plan = await self.synthesize_plan(investigation_results)

        # Phase 3: Parallel Implementation
        if unified_plan.has_parallelizable_tasks:
            implementation_results = await self.execute_phase_parallel(
                phase="implementation",
                agents=unified_plan.implementation_agents,
                dependencies=unified_plan.task_dependencies
            )

        # Phase 4: Parallel Validation
        validation_results = await self.execute_phase_parallel(
            phase="validation",
            agents=["quality-engineer", "security-engineer", "performance-engineer"],
            dependencies={}  # All independent
        )

        # Phase 5: Final Integration (Sequential, PM Agent)
        final_result = await self.integrate_results(
            implementation_results,
            validation_results
        )

        return final_result

    async def execute_phase_parallel(
        self,
        phase: str,
        agents: List[str],
        dependencies: Dict[str, List[str]]
    ):
        """
        Execute phase with parallel agent execution

        Args:
            phase: Phase name (investigation, implementation, validation)
            agents: List of agent names to execute
            dependencies: Dict mapping agent -> list of dependencies

        Returns:
            Synthesized results from all agents
        """

        # 1. Build dependency graph
        graph = self.dependency_analyzer.build_graph(agents, dependencies)

        # 2. Identify parallel execution waves
        waves = graph.topological_waves()

        # 3. Execute waves in sequence, agents within wave in parallel
        all_results = {}

        for wave_num, wave_agents in enumerate(waves):
            print(f"Phase {phase} - Wave {wave_num + 1}: {wave_agents}")

            # Load MCP tools needed for this wave
            required_tools = self.get_required_tools_for_agents(wave_agents)
            await self.mcp_gateway.load_tools(required_tools)

            # Execute all agents in wave simultaneously
            wave_tasks = [
                self.execute_agent(agent, all_results)
                for agent in wave_agents
            ]

            wave_results = await asyncio.gather(*wave_tasks)

            # Store results
            for agent, result in zip(wave_agents, wave_results):
                all_results[agent] = result

            # Unload MCP tools after wave (resource cleanup)
            await self.mcp_gateway.unload_tools(required_tools)

        # 4. Synthesize results across all agents
        return self.result_synthesizer.synthesize(all_results)

    async def execute_agent(self, agent_name: str, context: Dict):
        """Execute single sub-agent with context"""
        agent = self.get_agent_instance(agent_name)

        try:
            result = await agent.execute(context)
            return {
                "status": "success",
                "agent": agent_name,
                "result": result
            }
        except Exception as e:
            # Error: trigger self-correction flow
            return await self.self_correct_agent_execution(
                agent_name,
                error=e,
                context=context
            )

    async def self_correct_agent_execution(
        self,
        agent_name: str,
        error: Exception,
        context: Dict
    ):
        """
        Self-correction flow (from PM Agent design)

        Steps:
        1. STOP - never retry blindly
        2. Investigate root cause (WebSearch, past errors)
        3. Form hypothesis
        4. Design DIFFERENT approach
        5. Execute new approach
        6. Learn (store in mindbase + local files)
        """
        # Implementation matches PM Agent self-correction protocol
        # (Refer to superclaude/commands/pm.md:536-640)
        pass


class DependencyAnalyzer:
    """Analyze task dependencies for parallel execution"""

    def build_graph(self, agents: List[str], dependencies: Dict) -> DependencyGraph:
        """Build dependency graph from agent list and dependencies"""
        graph = DependencyGraph()

        for agent in agents:
            graph.add_node(agent)

        for agent, deps in dependencies.items():
            for dep in deps:
                graph.add_edge(dep, agent)  # dep must complete before agent

        return graph

    def infer_dependencies(self, agents: List[str], task_context: Dict) -> Dict:
        """
        Automatically infer dependencies based on domain knowledge

        Example:
            backend-architect + frontend-architect = parallel (independent)
            system-architect → backend-architect = sequential (dependent)
            security-engineer = parallel with implementation (independent)
        """
        dependencies = {}

        # Rule-based inference
        if "system-architect" in agents:
            # System architecture must complete before implementation
            for agent in ["backend-architect", "frontend-architect"]:
                if agent in agents:
                    dependencies.setdefault(agent, []).append("system-architect")

        if "requirements-analyst" in agents:
            # Requirements must complete before any design/implementation
            for agent in agents:
                if agent != "requirements-analyst":
                    dependencies.setdefault(agent, []).append("requirements-analyst")

        # Backend and frontend can run in parallel (no dependency)
        # Security and quality can run in parallel with implementation

        return dependencies


class DependencyGraph:
    """Graph representation of agent dependencies"""

    def topological_waves(self) -> List[List[str]]:
        """
        Compute topological ordering as waves

        Wave N can execute in parallel (all nodes with no remaining dependencies)

        Returns:
            List of waves, each wave is list of agents that can run in parallel
        """
        # Kahn's algorithm adapted for wave-based execution
        # ...
        pass


class MCPGatewayManager:
    """Manage MCP tool lifecycle (load/unload on demand)"""

    async def load_tools(self, tool_names: List[str]):
        """Dynamically load MCP tools via airis-mcp-gateway"""
        # Connect to Docker Gateway
        # Load specified tools
        # Return tool handles
        pass

    async def unload_tools(self, tool_names: List[str]):
        """Unload MCP tools to free resources"""
        # Disconnect from tools
        # Free memory
        pass


class ResultSynthesizer:
    """Synthesize results from multiple parallel agents"""

    def synthesize(self, results: Dict[str, Any]) -> Dict:
        """
        Combine results from multiple agents into coherent output

        Handles:
        - Conflict resolution (agents disagree)
        - Gap identification (missing information)
        - Integration (combine complementary insights)
        """
        pass

🔄 Execution Flow Examples

Example 1: Simple Feature (Minimal Parallelization)

User: "Fix login form validation bug in LoginForm.tsx:45"

PM Agent Analysis:
  - Single domain (frontend)
  - Simple fix
  - Minimal parallelization opportunity

Execution Plan:
  Wave 1 (Parallel):
    - refactoring-expert: Fix validation logic
    - quality-engineer: Write tests

  Wave 2 (Sequential):
    - Integration: Run tests, verify fix

Timeline:
  Traditional Sequential: 15 minutes
  PM Agent Parallel: 8 minutes (47% faster)

Example 2: Complex Feature (Maximum Parallelization)

User: "Build real-time chat feature with video calling"

PM Agent Analysis:
  - Multi-domain (backend, frontend, security, real-time, media)
  - Complex dependencies
  - High parallelization opportunity

Dependency Graph:
  requirements-analyst
    
  system-architect
    
  ├─→ backend-architect (Supabase Realtime)
  ├─→ backend-architect (WebRTC signaling)
  └─→ frontend-architect (Chat UI)
      
  ├─→ frontend-architect (Video UI)
  ├─→ security-engineer (Security review)
  └─→ quality-engineer (Testing)
      
  performance-engineer (Optimization)

Execution Waves:
  Wave 1: requirements-analyst (5 min)
  Wave 2: system-architect (10 min)
  Wave 3 (Parallel):
    - backend-architect: Realtime subscriptions (12 min)
    - backend-architect: WebRTC signaling (12 min)
    - frontend-architect: Chat UI (12 min)
  Wave 4 (Parallel):
    - frontend-architect: Video UI (10 min)
    - security-engineer: Security review (10 min)
    - quality-engineer: Testing (10 min)
  Wave 5: performance-engineer (8 min)

Timeline:
  Traditional Sequential:
    5 + 10 + 12 + 12 + 12 + 10 + 10 + 10 + 8 = 89 minutes

  PM Agent Parallel:
    5 + 10 + 12 (longest in wave 3) + 10 (longest in wave 4) + 8 = 45 minutes

  Speedup: 49% faster (nearly 2x)

Example 3: Investigation Task (Deep Research Pattern)

User: "Investigate authentication best practices for our stack"

PM Agent Analysis:
  - Research task
  - Multiple parallel searches possible
  - Deep Research pattern applicable

Execution Waves:
  Wave 1 (Parallel Searches):
    - WebSearch: "Supabase Auth best practices 2025"
    - WebSearch: "Next.js authentication patterns"
    - WebSearch: "JWT security considerations"
    - Context7: "Official Supabase Auth documentation"

  Wave 2 (Parallel Analysis):
    - Sequential: Analyze search results
    - Sequential: Compare patterns
    - Sequential: Identify gaps

  Wave 3 (Parallel Content Extraction):
    - WebFetch: Top 3 articles (parallel)
    - Context7: Framework-specific patterns

  Wave 4 (Sequential Synthesis):
    - PM Agent: Synthesize findings
    - PM Agent: Create recommendations

Timeline:
  Traditional Sequential: 25 minutes
  PM Agent Parallel: 10 minutes (60% faster)

📊 Expected Performance Gains

Benchmark Scenarios

Simple Tasks (1-2 agents):
  Current: 10-15 minutes
  Parallel: 8-12 minutes
  Improvement: 20-25%

Medium Tasks (3-5 agents):
  Current: 30-45 minutes
  Parallel: 15-25 minutes
  Improvement: 40-50%

Complex Tasks (6-10 agents):
  Current: 60-90 minutes
  Parallel: 25-45 minutes
  Improvement: 50-60%

Investigation Tasks:
  Current: 20-30 minutes
  Parallel: 8-15 minutes
  Improvement: 60-70% (Deep Research pattern)

Resource Utilization

CPU Usage:
  Current: 20-30% (one agent at a time)
  Parallel: 60-80% (multiple agents)
  Better utilization of available resources

Memory Usage:
  With MCP Gateway: Dynamic loading/unloading
  Peak memory similar to sequential (tool caching)

Token Usage:
  No increase (same total operations)
  Actually may decrease (smarter synthesis)

🔧 Implementation Plan

Phase 1: Dependency Analysis Engine

Tasks:
  - Implement DependencyGraph class
  - Implement topological wave computation
  - Create rule-based dependency inference
  - Test with simple scenarios

Deliverable:
  - Functional dependency analyzer
  - Unit tests for graph algorithms
  - Documentation

Phase 2: Parallel Executor

Tasks:
  - Implement ParallelExecutor with asyncio
  - Wave-based execution engine
  - Agent execution wrapper
  - Error handling and retry logic

Deliverable:
  - Working parallel execution engine
  - Integration tests
  - Performance benchmarks

Phase 3: MCP Gateway Integration

Tasks:
  - Integrate with airis-mcp-gateway
  - Dynamic tool loading/unloading
  - Resource management
  - Performance optimization

Deliverable:
  - Zero-token baseline with on-demand loading
  - Resource usage monitoring
  - Documentation

Phase 4: Result Synthesis

Tasks:
  - Implement ResultSynthesizer
  - Conflict resolution logic
  - Gap identification
  - Integration quality validation

Deliverable:
  - Coherent multi-agent result synthesis
  - Quality assurance tests
  - User feedback integration

Phase 5: Self-Correction Integration

Tasks:
  - Integrate PM Agent self-correction protocol
  - Parallel error recovery
  - Learning from failures
  - Documentation updates

Deliverable:
  - Robust error handling
  - Learning system integration
  - Performance validation

🧪 Testing Strategy

Unit Tests

# tests/test_pm_agent_parallel.py

def test_dependency_graph_simple():
    """Test simple linear dependency"""
    graph = DependencyGraph()
    graph.add_edge("A", "B")
    graph.add_edge("B", "C")

    waves = graph.topological_waves()
    assert waves == [["A"], ["B"], ["C"]]

def test_dependency_graph_parallel():
    """Test parallel execution detection"""
    graph = DependencyGraph()
    graph.add_edge("A", "B")
    graph.add_edge("A", "C")  # B and C can run in parallel

    waves = graph.topological_waves()
    assert waves == [["A"], ["B", "C"]]  # or ["C", "B"]

def test_dependency_inference():
    """Test automatic dependency inference"""
    analyzer = DependencyAnalyzer()
    agents = ["requirements-analyst", "backend-architect", "frontend-architect"]

    deps = analyzer.infer_dependencies(agents, context={})

    # Requirements must complete before implementation
    assert "requirements-analyst" in deps["backend-architect"]
    assert "requirements-analyst" in deps["frontend-architect"]

    # Backend and frontend can run in parallel
    assert "backend-architect" not in deps.get("frontend-architect", [])
    assert "frontend-architect" not in deps.get("backend-architect", [])

Integration Tests

# tests/integration/test_parallel_orchestration.py

async def test_parallel_feature_implementation():
    """Test full parallel orchestration flow"""
    pm_agent = PMAgentParallelOrchestrator()

    result = await pm_agent.orchestrate(
        "Build authentication system with JWT and OAuth"
    )

    assert result["status"] == "success"
    assert "implementation" in result
    assert "tests" in result
    assert "documentation" in result

async def test_performance_improvement():
    """Verify parallel execution is faster than sequential"""
    request = "Build complex feature requiring 5 agents"

    # Sequential execution
    start = time.perf_counter()
    await pm_agent_sequential.orchestrate(request)
    sequential_time = time.perf_counter() - start

    # Parallel execution
    start = time.perf_counter()
    await pm_agent_parallel.orchestrate(request)
    parallel_time = time.perf_counter() - start

    # Should be at least 30% faster
    assert parallel_time < sequential_time * 0.7

Performance Benchmarks

# Run comprehensive benchmarks
pytest tests/performance/test_pm_agent_parallel_performance.py -v

# Expected output:
# - Simple tasks: 20-25% improvement
# - Medium tasks: 40-50% improvement
# - Complex tasks: 50-60% improvement
# - Investigation: 60-70% improvement

🎯 Success Criteria

Performance Targets

Speedup (vs Sequential):
  Simple Tasks (1-2 agents): ≥ 20%
  Medium Tasks (3-5 agents): ≥ 40%
  Complex Tasks (6-10 agents): ≥ 50%
  Investigation Tasks: ≥ 60%

Resource Usage:
  Token Usage: ≤ 100% of sequential (no increase)
  Memory Usage: ≤ 120% of sequential (acceptable overhead)
  CPU Usage: 50-80% (better utilization)

Quality:
  Result Coherence: ≥ 95% (vs sequential)
  Error Rate: ≤ 5% (vs sequential)
  User Satisfaction: ≥ 90% (survey-based)

User Experience

Transparency:
  - Show parallel execution progress
  - Clear wave-based status updates
  - Visible agent coordination

Control:
  - Allow manual dependency specification
  - Override parallel execution if needed
  - Force sequential mode option

Reliability:
  - Robust error handling
  - Graceful degradation to sequential
  - Self-correction on failures

📋 Migration Path

Backward Compatibility

Phase 1 (Current):
  - Existing PM Agent works as-is
  - No breaking changes

Phase 2 (Parallel Available):
  - Add --parallel flag (opt-in)
  - Users can test parallel mode
  - Collect feedback

Phase 3 (Parallel Default):
  - Make parallel mode default
  - Add --sequential flag (opt-out)
  - Monitor performance

Phase 4 (Deprecate Sequential):
  - Remove sequential mode (if proven)
  - Full parallel orchestration

Feature Flags

Environment Variables:
  SC_PM_PARALLEL_ENABLED=true|false
  SC_PM_MAX_PARALLEL_AGENTS=10
  SC_PM_WAVE_TIMEOUT_SECONDS=300
  SC_PM_MCP_DYNAMIC_LOADING=true|false

Configuration:
  ~/.claude/pm_agent_config.json:
    {
      "parallel_execution": true,
      "max_parallel_agents": 10,
      "dependency_inference": true,
      "mcp_dynamic_loading": true
    }

🚀 Next Steps

  1. Document parallel architecture proposal (this file)
  2. Prototype DependencyGraph and wave computation
  3. Implement ParallelExecutor with asyncio
  4. Integrate with airis-mcp-gateway
  5. Run performance benchmarks (before/after)
  6. Gather user feedback on parallel mode
  7. Prepare Pull Request with evidence

📚 References

  • Deep Research Agent: Parallel search and analysis pattern
  • airis-mcp-gateway: Dynamic tool loading architecture
  • PM Agent Current Design: superclaude/commands/pm.md
  • Performance Benchmarks: tests/performance/test_installation_performance.py

Conclusion: Parallel orchestration will transform PM Agent from sequential coordinator to intelligent meta-layer commander, unlocking 50-60% performance improvements for complex multi-domain tasks while maintaining quality and reliability.

User Benefit: Faster feature development, better resource utilization, and improved developer experience with transparent parallel execution.