mirror of
https://github.com/SuperClaude-Org/SuperClaude_Framework.git
synced 2025-12-29 16:16:08 +00:00
refactor: migrate to clean architecture with src/ layout
## Migration Summary
- Moved from flat `superclaude/` to `src/superclaude/` (PEP 517/518)
- Deleted old structure (119 files removed)
- Added new structure with clean architecture layers
## Project Structure Changes
- OLD: `superclaude/{agents,commands,modes,framework}/`
- NEW: `src/superclaude/{cli,execution,pm_agent}/`
## Build System Updates
- Switched: setuptools → hatchling (modern, PEP 517)
- Updated: pyproject.toml with proper entry points
- Added: pytest plugin auto-discovery
- Version: 4.1.6 → 0.4.0 (clean slate)
## Makefile Enhancements
- Removed: `superclaude install` calls (deprecated)
- Added: `make verify` - Phase 1 installation verification
- Added: `make test-plugin` - pytest plugin loading test
- Added: `make doctor` - health check command
## Documentation Added
- docs/architecture/ - 7 architecture docs
- docs/research/python_src_layout_research_20251021.md
- docs/PR_STRATEGY.md
## Migration Phases
- Phase 1: Core installation ✅ (this commit)
- Phase 2: Lazy loading + Skills system (next)
- Phase 3: PM Agent meta-layer (future)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
23
src/superclaude/__init__.py
Normal file
23
src/superclaude/__init__.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""
|
||||
SuperClaude Framework
|
||||
|
||||
AI-enhanced development framework for Claude Code.
|
||||
Provides pytest plugin for enhanced testing and optional skills system.
|
||||
"""
|
||||
|
||||
__version__ = "0.4.0"
|
||||
__author__ = "Kazuki Nakai"
|
||||
|
||||
# Expose main components
|
||||
from .pm_agent.confidence import ConfidenceChecker
|
||||
from .pm_agent.self_check import SelfCheckProtocol
|
||||
from .pm_agent.reflexion import ReflexionPattern
|
||||
from .pm_agent.token_budget import TokenBudgetManager
|
||||
|
||||
__all__ = [
|
||||
"ConfidenceChecker",
|
||||
"SelfCheckProtocol",
|
||||
"ReflexionPattern",
|
||||
"TokenBudgetManager",
|
||||
"__version__",
|
||||
]
|
||||
3
src/superclaude/__version__.py
Normal file
3
src/superclaude/__version__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""Version information for SuperClaude"""
|
||||
|
||||
__version__ = "0.4.0"
|
||||
12
src/superclaude/cli/__init__.py
Normal file
12
src/superclaude/cli/__init__.py
Normal file
@@ -0,0 +1,12 @@
|
||||
"""
|
||||
SuperClaude CLI
|
||||
|
||||
Commands:
|
||||
- superclaude install-skill pm-agent # Install PM Agent skill
|
||||
- superclaude doctor # Check installation health
|
||||
- superclaude version # Show version
|
||||
"""
|
||||
|
||||
from .main import main
|
||||
|
||||
__all__ = ["main"]
|
||||
148
src/superclaude/cli/doctor.py
Normal file
148
src/superclaude/cli/doctor.py
Normal file
@@ -0,0 +1,148 @@
|
||||
"""
|
||||
SuperClaude Doctor Command
|
||||
|
||||
Health check for SuperClaude installation.
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Any
|
||||
import sys
|
||||
|
||||
|
||||
def run_doctor(verbose: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
Run SuperClaude health checks
|
||||
|
||||
Args:
|
||||
verbose: Include detailed diagnostic information
|
||||
|
||||
Returns:
|
||||
Dict with check results
|
||||
"""
|
||||
checks = []
|
||||
|
||||
# Check 1: pytest plugin loaded
|
||||
plugin_check = _check_pytest_plugin()
|
||||
checks.append(plugin_check)
|
||||
|
||||
# Check 2: Skills installed
|
||||
skills_check = _check_skills_installed()
|
||||
checks.append(skills_check)
|
||||
|
||||
# Check 3: Configuration
|
||||
config_check = _check_configuration()
|
||||
checks.append(config_check)
|
||||
|
||||
return {
|
||||
"checks": checks,
|
||||
"passed": all(check["passed"] for check in checks),
|
||||
}
|
||||
|
||||
|
||||
def _check_pytest_plugin() -> Dict[str, Any]:
|
||||
"""
|
||||
Check if pytest plugin is loaded
|
||||
|
||||
Returns:
|
||||
Check result dict
|
||||
"""
|
||||
try:
|
||||
import pytest
|
||||
|
||||
# Try to get pytest config
|
||||
try:
|
||||
config = pytest.Config.fromdictargs({}, [])
|
||||
plugins = config.pluginmanager.list_plugin_distinfo()
|
||||
|
||||
# Check if superclaude plugin is loaded
|
||||
superclaude_loaded = any(
|
||||
"superclaude" in str(plugin[0]).lower()
|
||||
for plugin in plugins
|
||||
)
|
||||
|
||||
if superclaude_loaded:
|
||||
return {
|
||||
"name": "pytest plugin loaded",
|
||||
"passed": True,
|
||||
"details": ["SuperClaude pytest plugin is active"],
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"name": "pytest plugin loaded",
|
||||
"passed": False,
|
||||
"details": ["SuperClaude plugin not found in pytest plugins"],
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"name": "pytest plugin loaded",
|
||||
"passed": False,
|
||||
"details": [f"Could not check pytest plugins: {e}"],
|
||||
}
|
||||
|
||||
except ImportError:
|
||||
return {
|
||||
"name": "pytest plugin loaded",
|
||||
"passed": False,
|
||||
"details": ["pytest not installed"],
|
||||
}
|
||||
|
||||
|
||||
def _check_skills_installed() -> Dict[str, Any]:
|
||||
"""
|
||||
Check if any skills are installed
|
||||
|
||||
Returns:
|
||||
Check result dict
|
||||
"""
|
||||
skills_dir = Path("~/.claude/skills").expanduser()
|
||||
|
||||
if not skills_dir.exists():
|
||||
return {
|
||||
"name": "Skills installed",
|
||||
"passed": True, # Optional, so pass
|
||||
"details": ["No skills installed (optional)"],
|
||||
}
|
||||
|
||||
# Find skills (directories with implementation.md)
|
||||
skills = []
|
||||
for item in skills_dir.iterdir():
|
||||
if item.is_dir() and (item / "implementation.md").exists():
|
||||
skills.append(item.name)
|
||||
|
||||
if skills:
|
||||
return {
|
||||
"name": "Skills installed",
|
||||
"passed": True,
|
||||
"details": [f"{len(skills)} skill(s) installed: {', '.join(skills)}"],
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"name": "Skills installed",
|
||||
"passed": True, # Optional
|
||||
"details": ["No skills installed (optional)"],
|
||||
}
|
||||
|
||||
|
||||
def _check_configuration() -> Dict[str, Any]:
|
||||
"""
|
||||
Check SuperClaude configuration
|
||||
|
||||
Returns:
|
||||
Check result dict
|
||||
"""
|
||||
# Check if package is importable
|
||||
try:
|
||||
import superclaude
|
||||
version = superclaude.__version__
|
||||
|
||||
return {
|
||||
"name": "Configuration",
|
||||
"passed": True,
|
||||
"details": [f"SuperClaude {version} installed correctly"],
|
||||
}
|
||||
except ImportError as e:
|
||||
return {
|
||||
"name": "Configuration",
|
||||
"passed": False,
|
||||
"details": [f"Could not import superclaude: {e}"],
|
||||
}
|
||||
99
src/superclaude/cli/install_skill.py
Normal file
99
src/superclaude/cli/install_skill.py
Normal file
@@ -0,0 +1,99 @@
|
||||
"""
|
||||
Skill Installation Command
|
||||
|
||||
Installs SuperClaude skills to ~/.claude/skills/ directory.
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Tuple
|
||||
import shutil
|
||||
|
||||
|
||||
def install_skill_command(
|
||||
skill_name: str,
|
||||
target_path: Path,
|
||||
force: bool = False
|
||||
) -> Tuple[bool, str]:
|
||||
"""
|
||||
Install a skill to target directory
|
||||
|
||||
Args:
|
||||
skill_name: Name of skill to install (e.g., 'pm-agent')
|
||||
target_path: Target installation directory
|
||||
force: Force reinstall if skill exists
|
||||
|
||||
Returns:
|
||||
Tuple of (success: bool, message: str)
|
||||
"""
|
||||
# Get skill source directory
|
||||
skill_source = _get_skill_source(skill_name)
|
||||
|
||||
if not skill_source:
|
||||
return False, f"Skill '{skill_name}' not found"
|
||||
|
||||
if not skill_source.exists():
|
||||
return False, f"Skill source directory not found: {skill_source}"
|
||||
|
||||
# Create target directory
|
||||
skill_target = target_path / skill_name
|
||||
target_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Check if skill already installed
|
||||
if skill_target.exists() and not force:
|
||||
return False, f"Skill '{skill_name}' already installed (use --force to reinstall)"
|
||||
|
||||
# Remove existing if force
|
||||
if skill_target.exists() and force:
|
||||
shutil.rmtree(skill_target)
|
||||
|
||||
# Copy skill files
|
||||
try:
|
||||
shutil.copytree(skill_source, skill_target)
|
||||
return True, f"Skill '{skill_name}' installed successfully to {skill_target}"
|
||||
except Exception as e:
|
||||
return False, f"Failed to install skill: {e}"
|
||||
|
||||
|
||||
def _get_skill_source(skill_name: str) -> Path:
|
||||
"""
|
||||
Get source directory for skill
|
||||
|
||||
Skills are stored in:
|
||||
src/superclaude/skills/{skill_name}/
|
||||
|
||||
Args:
|
||||
skill_name: Name of skill
|
||||
|
||||
Returns:
|
||||
Path to skill source directory
|
||||
"""
|
||||
# Get package root
|
||||
package_root = Path(__file__).parent.parent
|
||||
|
||||
# Skill source directory
|
||||
skill_source = package_root / "skills" / skill_name
|
||||
|
||||
return skill_source if skill_source.exists() else None
|
||||
|
||||
|
||||
def list_available_skills() -> list[str]:
|
||||
"""
|
||||
List all available skills
|
||||
|
||||
Returns:
|
||||
List of skill names
|
||||
"""
|
||||
package_root = Path(__file__).parent.parent
|
||||
skills_dir = package_root / "skills"
|
||||
|
||||
if not skills_dir.exists():
|
||||
return []
|
||||
|
||||
skills = []
|
||||
for item in skills_dir.iterdir():
|
||||
if item.is_dir() and not item.name.startswith("_"):
|
||||
# Check if skill has implementation.md
|
||||
if (item / "implementation.md").exists():
|
||||
skills.append(item.name)
|
||||
|
||||
return skills
|
||||
118
src/superclaude/cli/main.py
Normal file
118
src/superclaude/cli/main.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""
|
||||
SuperClaude CLI Main Entry Point
|
||||
|
||||
Provides command-line interface for SuperClaude operations.
|
||||
"""
|
||||
|
||||
import click
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
# Add parent directory to path to import superclaude
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
from superclaude import __version__
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.version_option(version=__version__, prog_name="SuperClaude")
|
||||
def main():
|
||||
"""
|
||||
SuperClaude - AI-enhanced development framework for Claude Code
|
||||
|
||||
A pytest plugin providing PM Agent capabilities and optional skills system.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument("skill_name")
|
||||
@click.option(
|
||||
"--target",
|
||||
default="~/.claude/skills",
|
||||
help="Installation directory (default: ~/.claude/skills)",
|
||||
)
|
||||
@click.option(
|
||||
"--force",
|
||||
is_flag=True,
|
||||
help="Force reinstall if skill already exists",
|
||||
)
|
||||
def install_skill(skill_name: str, target: str, force: bool):
|
||||
"""
|
||||
Install a SuperClaude skill to Claude Code
|
||||
|
||||
SKILL_NAME: Name of the skill to install (e.g., pm-agent)
|
||||
|
||||
Example:
|
||||
superclaude install-skill pm-agent
|
||||
superclaude install-skill pm-agent --target ~/.claude/skills --force
|
||||
"""
|
||||
from .install_skill import install_skill_command
|
||||
|
||||
target_path = Path(target).expanduser()
|
||||
|
||||
click.echo(f"📦 Installing skill '{skill_name}' to {target_path}...")
|
||||
|
||||
success, message = install_skill_command(
|
||||
skill_name=skill_name,
|
||||
target_path=target_path,
|
||||
force=force
|
||||
)
|
||||
|
||||
if success:
|
||||
click.echo(f"✅ {message}")
|
||||
else:
|
||||
click.echo(f"❌ {message}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.option(
|
||||
"--verbose",
|
||||
is_flag=True,
|
||||
help="Show detailed diagnostic information",
|
||||
)
|
||||
def doctor(verbose: bool):
|
||||
"""
|
||||
Check SuperClaude installation health
|
||||
|
||||
Verifies:
|
||||
- pytest plugin loaded correctly
|
||||
- Skills installed (if any)
|
||||
- Configuration files present
|
||||
"""
|
||||
from .doctor import run_doctor
|
||||
|
||||
click.echo("🔍 SuperClaude Doctor\n")
|
||||
|
||||
results = run_doctor(verbose=verbose)
|
||||
|
||||
# Display results
|
||||
for check in results["checks"]:
|
||||
status_symbol = "✅" if check["passed"] else "❌"
|
||||
click.echo(f"{status_symbol} {check['name']}")
|
||||
|
||||
if verbose and check.get("details"):
|
||||
for detail in check["details"]:
|
||||
click.echo(f" {detail}")
|
||||
|
||||
# Summary
|
||||
click.echo()
|
||||
total = len(results["checks"])
|
||||
passed = sum(1 for check in results["checks"] if check["passed"])
|
||||
|
||||
if passed == total:
|
||||
click.echo("✅ SuperClaude is healthy")
|
||||
else:
|
||||
click.echo(f"⚠️ {total - passed}/{total} checks failed")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@main.command()
|
||||
def version():
|
||||
"""Show SuperClaude version"""
|
||||
click.echo(f"SuperClaude version {__version__}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,13 +1,13 @@
|
||||
"""
|
||||
SuperClaude Core - Intelligent Execution Engine
|
||||
SuperClaude Execution Engine
|
||||
|
||||
Integrates three core engines:
|
||||
Integrates three execution engines:
|
||||
1. Reflection Engine: Think × 3 before execution
|
||||
2. Parallel Engine: Execute at maximum speed
|
||||
3. Self-Correction Engine: Learn from mistakes
|
||||
|
||||
Usage:
|
||||
from superclaude.core import intelligent_execute
|
||||
from superclaude.execution import intelligent_execute
|
||||
|
||||
result = intelligent_execute(
|
||||
task="Create user authentication system",
|
||||
21
src/superclaude/pm_agent/__init__.py
Normal file
21
src/superclaude/pm_agent/__init__.py
Normal file
@@ -0,0 +1,21 @@
|
||||
"""
|
||||
PM Agent Core Module
|
||||
|
||||
Provides core functionality for PM Agent:
|
||||
- Pre-execution confidence checking
|
||||
- Post-implementation self-check protocol
|
||||
- Reflexion error learning pattern
|
||||
- Token budget management
|
||||
"""
|
||||
|
||||
from .confidence import ConfidenceChecker
|
||||
from .self_check import SelfCheckProtocol
|
||||
from .reflexion import ReflexionPattern
|
||||
from .token_budget import TokenBudgetManager
|
||||
|
||||
__all__ = [
|
||||
"ConfidenceChecker",
|
||||
"SelfCheckProtocol",
|
||||
"ReflexionPattern",
|
||||
"TokenBudgetManager",
|
||||
]
|
||||
169
src/superclaude/pm_agent/confidence.py
Normal file
169
src/superclaude/pm_agent/confidence.py
Normal file
@@ -0,0 +1,169 @@
|
||||
"""
|
||||
Pre-execution Confidence Check
|
||||
|
||||
Prevents wrong-direction execution by assessing confidence BEFORE starting.
|
||||
|
||||
Token Budget: 100-200 tokens
|
||||
ROI: 25-250x token savings when stopping wrong direction
|
||||
|
||||
Confidence Levels:
|
||||
- High (90-100%): Official docs verified, patterns identified, path clear
|
||||
- Medium (70-89%): Multiple approaches possible, trade-offs require consideration
|
||||
- Low (<70%): Requirements unclear, no patterns, domain knowledge insufficient
|
||||
"""
|
||||
|
||||
from typing import Dict, Any, Optional
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class ConfidenceChecker:
|
||||
"""
|
||||
Pre-implementation confidence assessment
|
||||
|
||||
Usage:
|
||||
checker = ConfidenceChecker()
|
||||
confidence = checker.assess(context)
|
||||
|
||||
if confidence >= 0.9:
|
||||
# High confidence - proceed immediately
|
||||
elif confidence >= 0.7:
|
||||
# Medium confidence - present options to user
|
||||
else:
|
||||
# Low confidence - STOP and request clarification
|
||||
"""
|
||||
|
||||
def assess(self, context: Dict[str, Any]) -> float:
|
||||
"""
|
||||
Assess confidence level (0.0 - 1.0)
|
||||
|
||||
Checks:
|
||||
1. Official documentation verified? (40%)
|
||||
2. Existing patterns identified? (30%)
|
||||
3. Implementation path clear? (30%)
|
||||
|
||||
Args:
|
||||
context: Context dict with test/implementation details
|
||||
|
||||
Returns:
|
||||
float: Confidence score (0.0 = no confidence, 1.0 = absolute)
|
||||
"""
|
||||
score = 0.0
|
||||
checks = []
|
||||
|
||||
# Check 1: Documentation verified (40%)
|
||||
if self._has_official_docs(context):
|
||||
score += 0.4
|
||||
checks.append("✅ Official documentation")
|
||||
else:
|
||||
checks.append("❌ Missing documentation")
|
||||
|
||||
# Check 2: Existing patterns (30%)
|
||||
if self._has_existing_patterns(context):
|
||||
score += 0.3
|
||||
checks.append("✅ Existing patterns found")
|
||||
else:
|
||||
checks.append("❌ No existing patterns")
|
||||
|
||||
# Check 3: Clear implementation path (30%)
|
||||
if self._has_clear_path(context):
|
||||
score += 0.3
|
||||
checks.append("✅ Implementation path clear")
|
||||
else:
|
||||
checks.append("❌ Implementation unclear")
|
||||
|
||||
# Store check results for reporting
|
||||
context["confidence_checks"] = checks
|
||||
|
||||
return score
|
||||
|
||||
def _has_official_docs(self, context: Dict[str, Any]) -> bool:
|
||||
"""
|
||||
Check if official documentation exists
|
||||
|
||||
Looks for:
|
||||
- README.md in project
|
||||
- CLAUDE.md with relevant patterns
|
||||
- docs/ directory with related content
|
||||
"""
|
||||
# Check for test file path
|
||||
test_file = context.get("test_file")
|
||||
if not test_file:
|
||||
return False
|
||||
|
||||
project_root = Path(test_file).parent
|
||||
while project_root.parent != project_root:
|
||||
# Check for documentation files
|
||||
if (project_root / "README.md").exists():
|
||||
return True
|
||||
if (project_root / "CLAUDE.md").exists():
|
||||
return True
|
||||
if (project_root / "docs").exists():
|
||||
return True
|
||||
project_root = project_root.parent
|
||||
|
||||
return False
|
||||
|
||||
def _has_existing_patterns(self, context: Dict[str, Any]) -> bool:
|
||||
"""
|
||||
Check if existing patterns can be followed
|
||||
|
||||
Looks for:
|
||||
- Similar test files
|
||||
- Common naming conventions
|
||||
- Established directory structure
|
||||
"""
|
||||
test_file = context.get("test_file")
|
||||
if not test_file:
|
||||
return False
|
||||
|
||||
test_path = Path(test_file)
|
||||
test_dir = test_path.parent
|
||||
|
||||
# Check for other test files in same directory
|
||||
if test_dir.exists():
|
||||
test_files = list(test_dir.glob("test_*.py"))
|
||||
return len(test_files) > 1
|
||||
|
||||
return False
|
||||
|
||||
def _has_clear_path(self, context: Dict[str, Any]) -> bool:
|
||||
"""
|
||||
Check if implementation path is clear
|
||||
|
||||
Considers:
|
||||
- Test name suggests clear purpose
|
||||
- Markers indicate test type
|
||||
- Context has sufficient information
|
||||
"""
|
||||
# Check test name clarity
|
||||
test_name = context.get("test_name", "")
|
||||
if not test_name or test_name == "test_example":
|
||||
return False
|
||||
|
||||
# Check for markers indicating test type
|
||||
markers = context.get("markers", [])
|
||||
known_markers = {
|
||||
"unit", "integration", "hallucination",
|
||||
"performance", "confidence_check", "self_check"
|
||||
}
|
||||
|
||||
has_markers = bool(set(markers) & known_markers)
|
||||
|
||||
return has_markers or len(test_name) > 10
|
||||
|
||||
def get_recommendation(self, confidence: float) -> str:
|
||||
"""
|
||||
Get recommended action based on confidence level
|
||||
|
||||
Args:
|
||||
confidence: Confidence score (0.0 - 1.0)
|
||||
|
||||
Returns:
|
||||
str: Recommended action
|
||||
"""
|
||||
if confidence >= 0.9:
|
||||
return "✅ High confidence - Proceed immediately"
|
||||
elif confidence >= 0.7:
|
||||
return "⚠️ Medium confidence - Present options to user"
|
||||
else:
|
||||
return "❌ Low confidence - STOP and request clarification"
|
||||
343
src/superclaude/pm_agent/reflexion.py
Normal file
343
src/superclaude/pm_agent/reflexion.py
Normal file
@@ -0,0 +1,343 @@
|
||||
"""
|
||||
Reflexion Error Learning Pattern
|
||||
|
||||
Learn from past errors to prevent recurrence.
|
||||
|
||||
Token Budget:
|
||||
- Cache hit: 0 tokens (known error → instant solution)
|
||||
- Cache miss: 1-2K tokens (new investigation)
|
||||
|
||||
Performance:
|
||||
- Error recurrence rate: <10%
|
||||
- Solution reuse rate: >90%
|
||||
|
||||
Storage Strategy:
|
||||
- Primary: docs/memory/solutions_learned.jsonl (local file)
|
||||
- Secondary: mindbase (if available, semantic search)
|
||||
- Fallback: grep-based text search
|
||||
|
||||
Process:
|
||||
1. Error detected → Check past errors (smart lookup)
|
||||
2. IF similar found → Apply known solution (0 tokens)
|
||||
3. ELSE → Investigate root cause → Document solution
|
||||
4. Store for future reference (dual storage)
|
||||
"""
|
||||
|
||||
from typing import Dict, List, Optional, Any
|
||||
from pathlib import Path
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class ReflexionPattern:
|
||||
"""
|
||||
Error learning and prevention through reflexion
|
||||
|
||||
Usage:
|
||||
reflexion = ReflexionPattern()
|
||||
|
||||
# When error occurs
|
||||
error_info = {
|
||||
"error_type": "AssertionError",
|
||||
"error_message": "Expected 5, got 3",
|
||||
"test_name": "test_calculation",
|
||||
}
|
||||
|
||||
# Check for known solution
|
||||
solution = reflexion.get_solution(error_info)
|
||||
|
||||
if solution:
|
||||
print(f"✅ Known error - Solution: {solution}")
|
||||
else:
|
||||
# New error - investigate and record
|
||||
reflexion.record_error(error_info)
|
||||
"""
|
||||
|
||||
def __init__(self, memory_dir: Optional[Path] = None):
|
||||
"""
|
||||
Initialize reflexion pattern
|
||||
|
||||
Args:
|
||||
memory_dir: Directory for storing error solutions
|
||||
(defaults to docs/memory/ in current project)
|
||||
"""
|
||||
if memory_dir is None:
|
||||
# Default to docs/memory/ in current working directory
|
||||
memory_dir = Path.cwd() / "docs" / "memory"
|
||||
|
||||
self.memory_dir = memory_dir
|
||||
self.solutions_file = memory_dir / "solutions_learned.jsonl"
|
||||
self.mistakes_dir = memory_dir.parent / "mistakes"
|
||||
|
||||
# Ensure directories exist
|
||||
self.memory_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.mistakes_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def get_solution(self, error_info: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Get known solution for similar error
|
||||
|
||||
Lookup strategy:
|
||||
1. Try mindbase semantic search (if available)
|
||||
2. Fallback to grep-based text search
|
||||
3. Return None if no match found
|
||||
|
||||
Args:
|
||||
error_info: Error information dict
|
||||
|
||||
Returns:
|
||||
Solution dict if found, None otherwise
|
||||
"""
|
||||
error_signature = self._create_error_signature(error_info)
|
||||
|
||||
# Try mindbase first (semantic search, 500 tokens)
|
||||
solution = self._search_mindbase(error_signature)
|
||||
if solution:
|
||||
return solution
|
||||
|
||||
# Fallback to file-based search (0 tokens, local grep)
|
||||
solution = self._search_local_files(error_signature)
|
||||
return solution
|
||||
|
||||
def record_error(self, error_info: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Record error and solution for future learning
|
||||
|
||||
Stores to:
|
||||
1. docs/memory/solutions_learned.jsonl (append-only log)
|
||||
2. docs/mistakes/[feature]-[date].md (detailed analysis)
|
||||
|
||||
Args:
|
||||
error_info: Error information dict containing:
|
||||
- test_name: Name of failing test
|
||||
- error_type: Type of error (e.g., AssertionError)
|
||||
- error_message: Error message
|
||||
- traceback: Stack trace
|
||||
- solution (optional): Solution applied
|
||||
- root_cause (optional): Root cause analysis
|
||||
"""
|
||||
# Add timestamp
|
||||
error_info["timestamp"] = datetime.now().isoformat()
|
||||
|
||||
# Append to solutions log (JSONL format)
|
||||
with self.solutions_file.open("a") as f:
|
||||
f.write(json.dumps(error_info) + "\n")
|
||||
|
||||
# If this is a significant error with analysis, create mistake doc
|
||||
if error_info.get("root_cause") or error_info.get("solution"):
|
||||
self._create_mistake_doc(error_info)
|
||||
|
||||
def _create_error_signature(self, error_info: Dict[str, Any]) -> str:
|
||||
"""
|
||||
Create error signature for matching
|
||||
|
||||
Combines:
|
||||
- Error type
|
||||
- Key parts of error message
|
||||
- Test context
|
||||
|
||||
Args:
|
||||
error_info: Error information dict
|
||||
|
||||
Returns:
|
||||
str: Error signature for matching
|
||||
"""
|
||||
parts = []
|
||||
|
||||
if "error_type" in error_info:
|
||||
parts.append(error_info["error_type"])
|
||||
|
||||
if "error_message" in error_info:
|
||||
# Extract key words from error message
|
||||
message = error_info["error_message"]
|
||||
# Remove numbers (often varies between errors)
|
||||
import re
|
||||
message = re.sub(r'\d+', 'N', message)
|
||||
parts.append(message[:100]) # First 100 chars
|
||||
|
||||
if "test_name" in error_info:
|
||||
parts.append(error_info["test_name"])
|
||||
|
||||
return " | ".join(parts)
|
||||
|
||||
def _search_mindbase(self, error_signature: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Search for similar error in mindbase (semantic search)
|
||||
|
||||
Args:
|
||||
error_signature: Error signature to search
|
||||
|
||||
Returns:
|
||||
Solution dict if found, None if mindbase unavailable or no match
|
||||
"""
|
||||
# TODO: Implement mindbase integration
|
||||
# For now, return None (fallback to file search)
|
||||
return None
|
||||
|
||||
def _search_local_files(self, error_signature: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Search for similar error in local JSONL file
|
||||
|
||||
Uses simple text matching on error signatures.
|
||||
|
||||
Args:
|
||||
error_signature: Error signature to search
|
||||
|
||||
Returns:
|
||||
Solution dict if found, None otherwise
|
||||
"""
|
||||
if not self.solutions_file.exists():
|
||||
return None
|
||||
|
||||
# Read JSONL file and search
|
||||
with self.solutions_file.open("r") as f:
|
||||
for line in f:
|
||||
try:
|
||||
record = json.loads(line)
|
||||
stored_signature = self._create_error_signature(record)
|
||||
|
||||
# Simple similarity check
|
||||
if self._signatures_match(error_signature, stored_signature):
|
||||
return {
|
||||
"solution": record.get("solution"),
|
||||
"root_cause": record.get("root_cause"),
|
||||
"prevention": record.get("prevention"),
|
||||
"timestamp": record.get("timestamp"),
|
||||
}
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return None
|
||||
|
||||
def _signatures_match(self, sig1: str, sig2: str, threshold: float = 0.7) -> bool:
|
||||
"""
|
||||
Check if two error signatures match
|
||||
|
||||
Simple word overlap check (good enough for most cases).
|
||||
|
||||
Args:
|
||||
sig1: First signature
|
||||
sig2: Second signature
|
||||
threshold: Minimum word overlap ratio (default: 0.7)
|
||||
|
||||
Returns:
|
||||
bool: Whether signatures are similar enough
|
||||
"""
|
||||
words1 = set(sig1.lower().split())
|
||||
words2 = set(sig2.lower().split())
|
||||
|
||||
if not words1 or not words2:
|
||||
return False
|
||||
|
||||
overlap = len(words1 & words2)
|
||||
total = len(words1 | words2)
|
||||
|
||||
return (overlap / total) >= threshold
|
||||
|
||||
def _create_mistake_doc(self, error_info: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Create detailed mistake documentation
|
||||
|
||||
Format: docs/mistakes/[feature]-YYYY-MM-DD.md
|
||||
|
||||
Structure:
|
||||
- What Happened (現象)
|
||||
- Root Cause (根本原因)
|
||||
- Why Missed (なぜ見逃したか)
|
||||
- Fix Applied (修正内容)
|
||||
- Prevention Checklist (防止策)
|
||||
- Lesson Learned (教訓)
|
||||
|
||||
Args:
|
||||
error_info: Error information with analysis
|
||||
"""
|
||||
# Generate filename
|
||||
test_name = error_info.get("test_name", "unknown")
|
||||
date = datetime.now().strftime("%Y-%m-%d")
|
||||
filename = f"{test_name}-{date}.md"
|
||||
filepath = self.mistakes_dir / filename
|
||||
|
||||
# Create mistake document
|
||||
content = f"""# Mistake Record: {test_name}
|
||||
|
||||
**Date**: {date}
|
||||
**Error Type**: {error_info.get('error_type', 'Unknown')}
|
||||
|
||||
---
|
||||
|
||||
## ❌ What Happened (現象)
|
||||
|
||||
{error_info.get('error_message', 'No error message')}
|
||||
|
||||
```
|
||||
{error_info.get('traceback', 'No traceback')}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 🔍 Root Cause (根本原因)
|
||||
|
||||
{error_info.get('root_cause', 'Not analyzed')}
|
||||
|
||||
---
|
||||
|
||||
## 🤔 Why Missed (なぜ見逃したか)
|
||||
|
||||
{error_info.get('why_missed', 'Not analyzed')}
|
||||
|
||||
---
|
||||
|
||||
## ✅ Fix Applied (修正内容)
|
||||
|
||||
{error_info.get('solution', 'Not documented')}
|
||||
|
||||
---
|
||||
|
||||
## 🛡️ Prevention Checklist (防止策)
|
||||
|
||||
{error_info.get('prevention', 'Not documented')}
|
||||
|
||||
---
|
||||
|
||||
## 💡 Lesson Learned (教訓)
|
||||
|
||||
{error_info.get('lesson', 'Not documented')}
|
||||
"""
|
||||
|
||||
filepath.write_text(content)
|
||||
|
||||
def get_statistics(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Get reflexion pattern statistics
|
||||
|
||||
Returns:
|
||||
Dict with statistics:
|
||||
- total_errors: Total errors recorded
|
||||
- errors_with_solutions: Errors with documented solutions
|
||||
- solution_reuse_rate: Percentage of reused solutions
|
||||
"""
|
||||
if not self.solutions_file.exists():
|
||||
return {
|
||||
"total_errors": 0,
|
||||
"errors_with_solutions": 0,
|
||||
"solution_reuse_rate": 0.0,
|
||||
}
|
||||
|
||||
total = 0
|
||||
with_solutions = 0
|
||||
|
||||
with self.solutions_file.open("r") as f:
|
||||
for line in f:
|
||||
try:
|
||||
record = json.loads(line)
|
||||
total += 1
|
||||
if record.get("solution"):
|
||||
with_solutions += 1
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return {
|
||||
"total_errors": total,
|
||||
"errors_with_solutions": with_solutions,
|
||||
"solution_reuse_rate": (with_solutions / total * 100) if total > 0 else 0.0,
|
||||
}
|
||||
249
src/superclaude/pm_agent/self_check.py
Normal file
249
src/superclaude/pm_agent/self_check.py
Normal file
@@ -0,0 +1,249 @@
|
||||
"""
|
||||
Post-implementation Self-Check Protocol
|
||||
|
||||
Hallucination prevention through evidence-based validation.
|
||||
|
||||
Token Budget: 200-2,500 tokens (complexity-dependent)
|
||||
Detection Rate: 94% (Reflexion benchmark)
|
||||
|
||||
The Four Questions:
|
||||
1. テストは全てpassしてる? (Are all tests passing?)
|
||||
2. 要件を全て満たしてる? (Are all requirements met?)
|
||||
3. 思い込みで実装してない? (No assumptions without verification?)
|
||||
4. 証拠はある? (Is there evidence?)
|
||||
"""
|
||||
|
||||
from typing import Dict, List, Tuple, Any, Optional
|
||||
|
||||
|
||||
class SelfCheckProtocol:
|
||||
"""
|
||||
Post-implementation validation
|
||||
|
||||
Mandatory Questions (The Four Questions):
|
||||
1. テストは全てpassしてる?
|
||||
→ Run tests → Show ACTUAL results
|
||||
→ IF any fail: NOT complete
|
||||
|
||||
2. 要件を全て満たしてる?
|
||||
→ Compare implementation vs requirements
|
||||
→ List: ✅ Done, ❌ Missing
|
||||
|
||||
3. 思い込みで実装してない?
|
||||
→ Review: Assumptions verified?
|
||||
→ Check: Official docs consulted?
|
||||
|
||||
4. 証拠はある?
|
||||
→ Test results (actual output)
|
||||
→ Code changes (file list)
|
||||
→ Validation (lint, typecheck)
|
||||
|
||||
Usage:
|
||||
protocol = SelfCheckProtocol()
|
||||
passed, issues = protocol.validate(implementation)
|
||||
|
||||
if passed:
|
||||
print("✅ Implementation complete with evidence")
|
||||
else:
|
||||
print("❌ Issues detected:")
|
||||
for issue in issues:
|
||||
print(f" - {issue}")
|
||||
"""
|
||||
|
||||
# 7 Red Flags for Hallucination Detection
|
||||
HALLUCINATION_RED_FLAGS = [
|
||||
"tests pass", # without showing output
|
||||
"everything works", # without evidence
|
||||
"implementation complete", # with failing tests
|
||||
# Skipping error messages
|
||||
# Ignoring warnings
|
||||
# Hiding failures
|
||||
# "probably works" statements
|
||||
]
|
||||
|
||||
def validate(self, implementation: Dict[str, Any]) -> Tuple[bool, List[str]]:
|
||||
"""
|
||||
Run self-check validation
|
||||
|
||||
Args:
|
||||
implementation: Implementation details dict containing:
|
||||
- tests_passed (bool): Whether tests passed
|
||||
- test_output (str): Actual test output
|
||||
- requirements (List[str]): List of requirements
|
||||
- requirements_met (List[str]): List of met requirements
|
||||
- assumptions (List[str]): List of assumptions made
|
||||
- assumptions_verified (List[str]): List of verified assumptions
|
||||
- evidence (Dict): Evidence dict with test_results, code_changes, validation
|
||||
|
||||
Returns:
|
||||
Tuple of (passed: bool, issues: List[str])
|
||||
"""
|
||||
issues = []
|
||||
|
||||
# Question 1: Tests passing?
|
||||
if not self._check_tests_passing(implementation):
|
||||
issues.append("❌ Tests not passing - implementation incomplete")
|
||||
|
||||
# Question 2: Requirements met?
|
||||
unmet = self._check_requirements_met(implementation)
|
||||
if unmet:
|
||||
issues.append(f"❌ Requirements not fully met: {', '.join(unmet)}")
|
||||
|
||||
# Question 3: Assumptions verified?
|
||||
unverified = self._check_assumptions_verified(implementation)
|
||||
if unverified:
|
||||
issues.append(f"❌ Unverified assumptions: {', '.join(unverified)}")
|
||||
|
||||
# Question 4: Evidence provided?
|
||||
missing_evidence = self._check_evidence_exists(implementation)
|
||||
if missing_evidence:
|
||||
issues.append(f"❌ Missing evidence: {', '.join(missing_evidence)}")
|
||||
|
||||
# Additional: Check for hallucination red flags
|
||||
hallucinations = self._detect_hallucinations(implementation)
|
||||
if hallucinations:
|
||||
issues.extend([f"🚨 Hallucination detected: {h}" for h in hallucinations])
|
||||
|
||||
return len(issues) == 0, issues
|
||||
|
||||
def _check_tests_passing(self, impl: Dict[str, Any]) -> bool:
|
||||
"""
|
||||
Verify all tests pass WITH EVIDENCE
|
||||
|
||||
Must have:
|
||||
- tests_passed = True
|
||||
- test_output (actual results, not just claim)
|
||||
"""
|
||||
if not impl.get("tests_passed", False):
|
||||
return False
|
||||
|
||||
# Require actual test output (anti-hallucination)
|
||||
test_output = impl.get("test_output", "")
|
||||
if not test_output:
|
||||
return False
|
||||
|
||||
# Check for passing indicators in output
|
||||
passing_indicators = ["passed", "OK", "✓", "✅"]
|
||||
return any(indicator in test_output for indicator in passing_indicators)
|
||||
|
||||
def _check_requirements_met(self, impl: Dict[str, Any]) -> List[str]:
|
||||
"""
|
||||
Verify all requirements satisfied
|
||||
|
||||
Returns:
|
||||
List of unmet requirements (empty if all met)
|
||||
"""
|
||||
requirements = impl.get("requirements", [])
|
||||
requirements_met = set(impl.get("requirements_met", []))
|
||||
|
||||
unmet = []
|
||||
for req in requirements:
|
||||
if req not in requirements_met:
|
||||
unmet.append(req)
|
||||
|
||||
return unmet
|
||||
|
||||
def _check_assumptions_verified(self, impl: Dict[str, Any]) -> List[str]:
|
||||
"""
|
||||
Verify assumptions checked against official docs
|
||||
|
||||
Returns:
|
||||
List of unverified assumptions (empty if all verified)
|
||||
"""
|
||||
assumptions = impl.get("assumptions", [])
|
||||
assumptions_verified = set(impl.get("assumptions_verified", []))
|
||||
|
||||
unverified = []
|
||||
for assumption in assumptions:
|
||||
if assumption not in assumptions_verified:
|
||||
unverified.append(assumption)
|
||||
|
||||
return unverified
|
||||
|
||||
def _check_evidence_exists(self, impl: Dict[str, Any]) -> List[str]:
|
||||
"""
|
||||
Verify evidence provided (test results, code changes, validation)
|
||||
|
||||
Returns:
|
||||
List of missing evidence types (empty if all present)
|
||||
"""
|
||||
evidence = impl.get("evidence", {})
|
||||
missing = []
|
||||
|
||||
# Evidence requirement 1: Test Results
|
||||
if not evidence.get("test_results"):
|
||||
missing.append("test_results")
|
||||
|
||||
# Evidence requirement 2: Code Changes
|
||||
if not evidence.get("code_changes"):
|
||||
missing.append("code_changes")
|
||||
|
||||
# Evidence requirement 3: Validation (lint, typecheck, build)
|
||||
if not evidence.get("validation"):
|
||||
missing.append("validation")
|
||||
|
||||
return missing
|
||||
|
||||
def _detect_hallucinations(self, impl: Dict[str, Any]) -> List[str]:
|
||||
"""
|
||||
Detect hallucination red flags
|
||||
|
||||
7 Red Flags:
|
||||
1. "Tests pass" without showing output
|
||||
2. "Everything works" without evidence
|
||||
3. "Implementation complete" with failing tests
|
||||
4. Skipping error messages
|
||||
5. Ignoring warnings
|
||||
6. Hiding failures
|
||||
7. "Probably works" statements
|
||||
|
||||
Returns:
|
||||
List of detected hallucination patterns
|
||||
"""
|
||||
detected = []
|
||||
|
||||
# Red Flag 1: "Tests pass" without output
|
||||
if impl.get("tests_passed") and not impl.get("test_output"):
|
||||
detected.append("Claims tests pass without showing output")
|
||||
|
||||
# Red Flag 2: "Everything works" without evidence
|
||||
if impl.get("status") == "complete" and not impl.get("evidence"):
|
||||
detected.append("Claims completion without evidence")
|
||||
|
||||
# Red Flag 3: "Complete" with failing tests
|
||||
if impl.get("status") == "complete" and not impl.get("tests_passed"):
|
||||
detected.append("Claims completion despite failing tests")
|
||||
|
||||
# Red Flag 4-6: Check for ignored errors/warnings
|
||||
errors = impl.get("errors", [])
|
||||
warnings = impl.get("warnings", [])
|
||||
if (errors or warnings) and impl.get("status") == "complete":
|
||||
detected.append("Ignored errors/warnings")
|
||||
|
||||
# Red Flag 7: Uncertainty language
|
||||
description = impl.get("description", "").lower()
|
||||
uncertainty_words = ["probably", "maybe", "should work", "might work"]
|
||||
if any(word in description for word in uncertainty_words):
|
||||
detected.append(f"Uncertainty language detected: {description}")
|
||||
|
||||
return detected
|
||||
|
||||
def format_report(self, passed: bool, issues: List[str]) -> str:
|
||||
"""
|
||||
Format validation report
|
||||
|
||||
Args:
|
||||
passed: Whether validation passed
|
||||
issues: List of issues detected
|
||||
|
||||
Returns:
|
||||
str: Formatted report
|
||||
"""
|
||||
if passed:
|
||||
return "✅ Self-Check PASSED - Implementation complete with evidence"
|
||||
|
||||
report = ["❌ Self-Check FAILED - Issues detected:\n"]
|
||||
for issue in issues:
|
||||
report.append(f" {issue}")
|
||||
|
||||
return "\n".join(report)
|
||||
260
src/superclaude/pm_agent/token_budget.py
Normal file
260
src/superclaude/pm_agent/token_budget.py
Normal file
@@ -0,0 +1,260 @@
|
||||
"""
|
||||
Token Budget Management
|
||||
|
||||
Budget-aware operations with complexity-based allocation.
|
||||
|
||||
Budget Levels:
|
||||
- Simple (typo fix): 200 tokens
|
||||
- Medium (bug fix): 1,000 tokens
|
||||
- Complex (feature): 2,500 tokens
|
||||
|
||||
Token Efficiency Strategy:
|
||||
- Compress trial-and-error history (keep only successful path)
|
||||
- Focus on actionable learnings (not full trajectory)
|
||||
- Example: "[Summary] 3 failures (details: failures.json) | Success: proper validation"
|
||||
|
||||
Expected Reduction:
|
||||
- Simple tasks: 80-95% reduction
|
||||
- Medium tasks: 60-80% reduction
|
||||
- Complex tasks: 40-60% reduction
|
||||
"""
|
||||
|
||||
from typing import Dict, Literal, Optional
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class ComplexityLevel(str, Enum):
|
||||
"""Task complexity levels"""
|
||||
SIMPLE = "simple"
|
||||
MEDIUM = "medium"
|
||||
COMPLEX = "complex"
|
||||
|
||||
|
||||
class TokenBudgetManager:
|
||||
"""
|
||||
Token budget management for complexity-aware operations
|
||||
|
||||
Usage:
|
||||
# Simple task (typo fix)
|
||||
budget = TokenBudgetManager(complexity="simple")
|
||||
assert budget.limit == 200
|
||||
|
||||
# Medium task (bug fix)
|
||||
budget = TokenBudgetManager(complexity="medium")
|
||||
assert budget.limit == 1000
|
||||
|
||||
# Complex task (feature implementation)
|
||||
budget = TokenBudgetManager(complexity="complex")
|
||||
assert budget.limit == 2500
|
||||
|
||||
# Check budget
|
||||
if budget.remaining < 100:
|
||||
print("⚠️ Low budget - compress output")
|
||||
"""
|
||||
|
||||
# Budget allocations by complexity
|
||||
BUDGETS = {
|
||||
ComplexityLevel.SIMPLE: 200, # Typo fix, comment update
|
||||
ComplexityLevel.MEDIUM: 1000, # Bug fix, refactoring
|
||||
ComplexityLevel.COMPLEX: 2500, # Feature implementation
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
complexity: Literal["simple", "medium", "complex"] = "medium",
|
||||
custom_limit: Optional[int] = None
|
||||
):
|
||||
"""
|
||||
Initialize token budget manager
|
||||
|
||||
Args:
|
||||
complexity: Task complexity level
|
||||
custom_limit: Custom token limit (overrides complexity-based)
|
||||
"""
|
||||
self.complexity = ComplexityLevel(complexity)
|
||||
|
||||
if custom_limit is not None:
|
||||
self.limit = custom_limit
|
||||
else:
|
||||
self.limit = self.BUDGETS[self.complexity]
|
||||
|
||||
self.used = 0
|
||||
self.operations = []
|
||||
|
||||
def use(self, tokens: int, operation: str = "") -> bool:
|
||||
"""
|
||||
Use tokens for an operation
|
||||
|
||||
Args:
|
||||
tokens: Number of tokens to use
|
||||
operation: Description of operation
|
||||
|
||||
Returns:
|
||||
bool: Whether tokens were successfully allocated
|
||||
"""
|
||||
if self.used + tokens > self.limit:
|
||||
return False
|
||||
|
||||
self.used += tokens
|
||||
self.operations.append({
|
||||
"tokens": tokens,
|
||||
"operation": operation,
|
||||
"total_used": self.used,
|
||||
})
|
||||
|
||||
return True
|
||||
|
||||
@property
|
||||
def remaining(self) -> int:
|
||||
"""Get remaining token budget"""
|
||||
return self.limit - self.used
|
||||
|
||||
@property
|
||||
def usage_percentage(self) -> float:
|
||||
"""Get budget usage percentage"""
|
||||
return (self.used / self.limit) * 100 if self.limit > 0 else 0.0
|
||||
|
||||
@property
|
||||
def is_low(self) -> bool:
|
||||
"""Check if budget is running low (<20% remaining)"""
|
||||
return self.remaining < (self.limit * 0.2)
|
||||
|
||||
@property
|
||||
def is_critical(self) -> bool:
|
||||
"""Check if budget is critical (<10% remaining)"""
|
||||
return self.remaining < (self.limit * 0.1)
|
||||
|
||||
def get_status(self) -> Dict[str, any]:
|
||||
"""
|
||||
Get current budget status
|
||||
|
||||
Returns:
|
||||
Dict with status information
|
||||
"""
|
||||
return {
|
||||
"complexity": self.complexity.value,
|
||||
"limit": self.limit,
|
||||
"used": self.used,
|
||||
"remaining": self.remaining,
|
||||
"usage_percentage": round(self.usage_percentage, 1),
|
||||
"is_low": self.is_low,
|
||||
"is_critical": self.is_critical,
|
||||
"operations_count": len(self.operations),
|
||||
}
|
||||
|
||||
def get_recommendation(self) -> str:
|
||||
"""
|
||||
Get recommendation based on current budget status
|
||||
|
||||
Returns:
|
||||
str: Recommendation message
|
||||
"""
|
||||
if self.is_critical:
|
||||
return "🚨 CRITICAL: <10% budget remaining - Use symbols only, compress heavily"
|
||||
elif self.is_low:
|
||||
return "⚠️ LOW: <20% budget remaining - Compress output, avoid verbose explanations"
|
||||
elif self.usage_percentage > 50:
|
||||
return "📊 MODERATE: >50% budget used - Start token-efficient communication"
|
||||
else:
|
||||
return "✅ HEALTHY: Budget sufficient for standard operations"
|
||||
|
||||
def format_usage_report(self) -> str:
|
||||
"""
|
||||
Format budget usage report
|
||||
|
||||
Returns:
|
||||
str: Formatted report
|
||||
"""
|
||||
status = self.get_status()
|
||||
|
||||
report = [
|
||||
f"🧠 Token Budget Report",
|
||||
f"━━━━━━━━━━━━━━━━━━━━━━",
|
||||
f"Complexity: {status['complexity']}",
|
||||
f"Limit: {status['limit']} tokens",
|
||||
f"Used: {status['used']} tokens ({status['usage_percentage']}%)",
|
||||
f"Remaining: {status['remaining']} tokens",
|
||||
f"",
|
||||
f"Recommendation:",
|
||||
f"{self.get_recommendation()}",
|
||||
]
|
||||
|
||||
if self.operations:
|
||||
report.append(f"")
|
||||
report.append(f"Recent Operations:")
|
||||
for op in self.operations[-5:]: # Last 5 operations
|
||||
operation_name = op['operation'] or "unnamed"
|
||||
report.append(
|
||||
f" • {operation_name}: {op['tokens']} tokens "
|
||||
f"(total: {op['total_used']})"
|
||||
)
|
||||
|
||||
return "\n".join(report)
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset budget usage (keep limit)"""
|
||||
self.used = 0
|
||||
self.operations = []
|
||||
|
||||
def set_complexity(self, complexity: Literal["simple", "medium", "complex"]) -> None:
|
||||
"""
|
||||
Update complexity level and reset budget
|
||||
|
||||
Args:
|
||||
complexity: New complexity level
|
||||
"""
|
||||
self.complexity = ComplexityLevel(complexity)
|
||||
self.limit = self.BUDGETS[self.complexity]
|
||||
self.reset()
|
||||
|
||||
@classmethod
|
||||
def estimate_complexity(cls, context: Dict[str, any]) -> ComplexityLevel:
|
||||
"""
|
||||
Estimate complexity level from context
|
||||
|
||||
Heuristics:
|
||||
- Simple: Single file, <50 lines changed, no new files
|
||||
- Medium: Multiple files, <200 lines changed, or refactoring
|
||||
- Complex: New features, >200 lines, architectural changes
|
||||
|
||||
Args:
|
||||
context: Context dict with task information
|
||||
|
||||
Returns:
|
||||
ComplexityLevel: Estimated complexity
|
||||
"""
|
||||
# Check lines changed
|
||||
lines_changed = context.get("lines_changed", 0)
|
||||
if lines_changed > 200:
|
||||
return ComplexityLevel.COMPLEX
|
||||
|
||||
# Check files modified
|
||||
files_modified = context.get("files_modified", 0)
|
||||
if files_modified > 3:
|
||||
return ComplexityLevel.COMPLEX
|
||||
elif files_modified > 1:
|
||||
return ComplexityLevel.MEDIUM
|
||||
|
||||
# Check task type
|
||||
task_type = context.get("task_type", "").lower()
|
||||
if any(keyword in task_type for keyword in ["feature", "implement", "add"]):
|
||||
return ComplexityLevel.COMPLEX
|
||||
elif any(keyword in task_type for keyword in ["fix", "bug", "refactor"]):
|
||||
return ComplexityLevel.MEDIUM
|
||||
else:
|
||||
return ComplexityLevel.SIMPLE
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""String representation"""
|
||||
return (
|
||||
f"TokenBudget({self.complexity.value}: "
|
||||
f"{self.used}/{self.limit} tokens, "
|
||||
f"{self.usage_percentage:.1f}% used)"
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Developer representation"""
|
||||
return (
|
||||
f"TokenBudgetManager(complexity={self.complexity.value!r}, "
|
||||
f"limit={self.limit}, used={self.used})"
|
||||
)
|
||||
222
src/superclaude/pytest_plugin.py
Normal file
222
src/superclaude/pytest_plugin.py
Normal file
@@ -0,0 +1,222 @@
|
||||
"""
|
||||
SuperClaude pytest plugin
|
||||
|
||||
Auto-loaded when superclaude is installed.
|
||||
Provides PM Agent fixtures and hooks for enhanced testing.
|
||||
|
||||
Entry point registered in pyproject.toml:
|
||||
[project.entry-points.pytest11]
|
||||
superclaude = "superclaude.pytest_plugin"
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
from .pm_agent.confidence import ConfidenceChecker
|
||||
from .pm_agent.self_check import SelfCheckProtocol
|
||||
from .pm_agent.reflexion import ReflexionPattern
|
||||
from .pm_agent.token_budget import TokenBudgetManager
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
"""
|
||||
Register SuperClaude plugin and custom markers
|
||||
|
||||
Markers:
|
||||
- confidence_check: Pre-execution confidence assessment
|
||||
- self_check: Post-implementation validation
|
||||
- reflexion: Error learning and prevention
|
||||
- complexity(level): Set test complexity (simple, medium, complex)
|
||||
"""
|
||||
config.addinivalue_line(
|
||||
"markers",
|
||||
"confidence_check: Pre-execution confidence assessment (min 70%)"
|
||||
)
|
||||
config.addinivalue_line(
|
||||
"markers",
|
||||
"self_check: Post-implementation validation with evidence requirement"
|
||||
)
|
||||
config.addinivalue_line(
|
||||
"markers",
|
||||
"reflexion: Error learning and prevention pattern"
|
||||
)
|
||||
config.addinivalue_line(
|
||||
"markers",
|
||||
"complexity(level): Set test complexity (simple, medium, complex)"
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def confidence_checker():
|
||||
"""
|
||||
Fixture for pre-execution confidence checking
|
||||
|
||||
Usage:
|
||||
def test_example(confidence_checker):
|
||||
confidence = confidence_checker.assess(context)
|
||||
assert confidence >= 0.7
|
||||
"""
|
||||
return ConfidenceChecker()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def self_check_protocol():
|
||||
"""
|
||||
Fixture for post-implementation self-check protocol
|
||||
|
||||
Usage:
|
||||
def test_example(self_check_protocol):
|
||||
passed, issues = self_check_protocol.validate(implementation)
|
||||
assert passed
|
||||
"""
|
||||
return SelfCheckProtocol()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def reflexion_pattern():
|
||||
"""
|
||||
Fixture for reflexion error learning pattern
|
||||
|
||||
Usage:
|
||||
def test_example(reflexion_pattern):
|
||||
reflexion_pattern.record_error(...)
|
||||
solution = reflexion_pattern.get_solution(error_signature)
|
||||
"""
|
||||
return ReflexionPattern()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def token_budget(request):
|
||||
"""
|
||||
Fixture for token budget management
|
||||
|
||||
Complexity levels:
|
||||
- simple: 200 tokens (typo fix)
|
||||
- medium: 1,000 tokens (bug fix)
|
||||
- complex: 2,500 tokens (feature implementation)
|
||||
|
||||
Usage:
|
||||
@pytest.mark.complexity("medium")
|
||||
def test_example(token_budget):
|
||||
assert token_budget.limit == 1000
|
||||
"""
|
||||
# Get test complexity from marker
|
||||
marker = request.node.get_closest_marker("complexity")
|
||||
complexity = marker.args[0] if marker else "medium"
|
||||
return TokenBudgetManager(complexity=complexity)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pm_context(tmp_path):
|
||||
"""
|
||||
Fixture providing PM Agent context for testing
|
||||
|
||||
Creates temporary memory directory structure:
|
||||
- docs/memory/pm_context.md
|
||||
- docs/memory/last_session.md
|
||||
- docs/memory/next_actions.md
|
||||
|
||||
Usage:
|
||||
def test_example(pm_context):
|
||||
assert pm_context["memory_dir"].exists()
|
||||
pm_context["pm_context"].write_text("# Context")
|
||||
"""
|
||||
memory_dir = tmp_path / "docs" / "memory"
|
||||
memory_dir.mkdir(parents=True)
|
||||
|
||||
# Create empty memory files
|
||||
(memory_dir / "pm_context.md").touch()
|
||||
(memory_dir / "last_session.md").touch()
|
||||
(memory_dir / "next_actions.md").touch()
|
||||
|
||||
return {
|
||||
"memory_dir": memory_dir,
|
||||
"pm_context": memory_dir / "pm_context.md",
|
||||
"last_session": memory_dir / "last_session.md",
|
||||
"next_actions": memory_dir / "next_actions.md",
|
||||
}
|
||||
|
||||
|
||||
def pytest_runtest_setup(item):
|
||||
"""
|
||||
Pre-test hook for confidence checking
|
||||
|
||||
If test is marked with @pytest.mark.confidence_check,
|
||||
run pre-execution confidence assessment and skip if < 70%.
|
||||
"""
|
||||
marker = item.get_closest_marker("confidence_check")
|
||||
if marker:
|
||||
checker = ConfidenceChecker()
|
||||
|
||||
# Build context from test
|
||||
context = {
|
||||
"test_name": item.name,
|
||||
"test_file": str(item.fspath),
|
||||
"markers": [m.name for m in item.iter_markers()],
|
||||
}
|
||||
|
||||
confidence = checker.assess(context)
|
||||
|
||||
if confidence < 0.7:
|
||||
pytest.skip(
|
||||
f"Confidence too low: {confidence:.0%} (minimum: 70%)"
|
||||
)
|
||||
|
||||
|
||||
def pytest_runtest_makereport(item, call):
|
||||
"""
|
||||
Post-test hook for self-check and reflexion
|
||||
|
||||
Records test outcomes for reflexion learning.
|
||||
Stores error information for future pattern matching.
|
||||
"""
|
||||
if call.when == "call":
|
||||
# Check for reflexion marker
|
||||
marker = item.get_closest_marker("reflexion")
|
||||
|
||||
if marker and call.excinfo is not None:
|
||||
# Test failed - apply reflexion pattern
|
||||
reflexion = ReflexionPattern()
|
||||
|
||||
# Record error for future learning
|
||||
error_info = {
|
||||
"test_name": item.name,
|
||||
"test_file": str(item.fspath),
|
||||
"error_type": type(call.excinfo.value).__name__,
|
||||
"error_message": str(call.excinfo.value),
|
||||
"traceback": str(call.excinfo.traceback),
|
||||
}
|
||||
|
||||
reflexion.record_error(error_info)
|
||||
|
||||
|
||||
def pytest_report_header(config):
|
||||
"""Add SuperClaude version to pytest header"""
|
||||
from . import __version__
|
||||
return f"SuperClaude: {__version__}"
|
||||
|
||||
|
||||
def pytest_collection_modifyitems(config, items):
|
||||
"""
|
||||
Modify test collection to add automatic markers
|
||||
|
||||
- Adds 'unit' marker to test files in tests/unit/
|
||||
- Adds 'integration' marker to test files in tests/integration/
|
||||
- Adds 'hallucination' marker to test files matching *hallucination*
|
||||
- Adds 'performance' marker to test files matching *performance*
|
||||
"""
|
||||
for item in items:
|
||||
test_path = str(item.fspath)
|
||||
|
||||
# Auto-mark by directory
|
||||
if "/unit/" in test_path:
|
||||
item.add_marker(pytest.mark.unit)
|
||||
elif "/integration/" in test_path:
|
||||
item.add_marker(pytest.mark.integration)
|
||||
|
||||
# Auto-mark by filename
|
||||
if "hallucination" in test_path:
|
||||
item.add_marker(pytest.mark.hallucination)
|
||||
elif "performance" in test_path or "benchmark" in test_path:
|
||||
item.add_marker(pytest.mark.performance)
|
||||
Reference in New Issue
Block a user