SuperClaude V4 Beta: Major framework restructuring

- Restructured core framework components
- Added new Agents, MCP servers, and Modes documentation
- Introduced SuperClaude-Lite minimal implementation
- Enhanced Commands with session management capabilities
- Added comprehensive Hooks system with Python integration
- Removed legacy setup and profile components
- Updated .gitignore to exclude Tests/, ClaudeDocs/, and .serena/
- Consolidated configuration into SuperClaude/Config/
- Added Templates for consistent component creation

This is the initial commit for the V4 Beta branch containing all recent framework improvements and architectural changes.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
NomenAK
2025-08-05 13:59:17 +02:00
parent ebeee4e6cb
commit 1d03832f2d
177 changed files with 35002 additions and 10971 deletions

View File

@@ -0,0 +1,25 @@
"""
SuperClaude-Lite Shared Infrastructure
Core components for the executable SuperClaude intelligence framework.
Provides shared functionality across all 7 Claude Code hooks.
"""
__version__ = "1.0.0"
__author__ = "SuperClaude Framework"
from .yaml_loader import UnifiedConfigLoader
from .framework_logic import FrameworkLogic
from .pattern_detection import PatternDetector
from .mcp_intelligence import MCPIntelligence
from .compression_engine import CompressionEngine
from .learning_engine import LearningEngine
__all__ = [
'UnifiedConfigLoader',
'FrameworkLogic',
'PatternDetector',
'MCPIntelligence',
'CompressionEngine',
'LearningEngine'
]

View File

@@ -0,0 +1,567 @@
"""
Compression Engine for SuperClaude-Lite
Intelligent token optimization implementing MODE_Token_Efficiency.md algorithms
with adaptive compression, symbol systems, and quality-gated validation.
"""
import re
import json
import hashlib
from typing import Dict, Any, List, Optional, Tuple, Set
from dataclasses import dataclass
from enum import Enum
from yaml_loader import config_loader
class CompressionLevel(Enum):
"""Compression levels from MODE_Token_Efficiency.md."""
MINIMAL = "minimal" # 0-40% compression
EFFICIENT = "efficient" # 40-70% compression
COMPRESSED = "compressed" # 70-85% compression
CRITICAL = "critical" # 85-95% compression
EMERGENCY = "emergency" # 95%+ compression
class ContentType(Enum):
"""Types of content for selective compression."""
FRAMEWORK_CONTENT = "framework" # SuperClaude framework - EXCLUDE
SESSION_DATA = "session" # Session metadata - COMPRESS
USER_CONTENT = "user" # User project files - PRESERVE
WORKING_ARTIFACTS = "artifacts" # Analysis results - COMPRESS
@dataclass
class CompressionResult:
"""Result of compression operation."""
original_length: int
compressed_length: int
compression_ratio: float
quality_score: float # 0.0 to 1.0
techniques_used: List[str]
preservation_score: float # Information preservation
processing_time_ms: float
@dataclass
class CompressionStrategy:
"""Strategy configuration for compression."""
level: CompressionLevel
symbol_systems_enabled: bool
abbreviation_systems_enabled: bool
structural_optimization: bool
selective_preservation: Dict[str, bool]
quality_threshold: float
class CompressionEngine:
"""
Intelligent token optimization engine implementing MODE_Token_Efficiency.md.
Features:
- 5-level adaptive compression (minimal to emergency)
- Symbol systems for mathematical and logical relationships
- Abbreviation systems for technical domains
- Selective compression with framework/user content protection
- Quality-gated validation with ≥95% information preservation
- Real-time compression effectiveness monitoring
"""
def __init__(self):
self.config = config_loader.load_config('compression')
self.symbol_mappings = self._load_symbol_mappings()
self.abbreviation_mappings = self._load_abbreviation_mappings()
self.compression_cache = {}
self.performance_metrics = {}
def _load_symbol_mappings(self) -> Dict[str, str]:
"""Load symbol system mappings from configuration."""
return {
# Core Logic & Flow
'leads to': '',
'implies': '',
'transforms to': '',
'converts to': '',
'rollback': '',
'reverse': '',
'bidirectional': '',
'sync': '',
'and': '&',
'combine': '&',
'separator': '|',
'or': '|',
'define': ':',
'specify': ':',
'sequence': '»',
'then': '»',
'therefore': '',
'because': '',
'equivalent': '',
'approximately': '',
'not equal': '',
# Status & Progress
'completed': '',
'passed': '',
'failed': '',
'error': '',
'warning': '⚠️',
'information': '',
'in progress': '🔄',
'processing': '🔄',
'waiting': '',
'pending': '',
'critical': '🚨',
'urgent': '🚨',
'target': '🎯',
'goal': '🎯',
'metrics': '📊',
'data': '📊',
'insight': '💡',
'learning': '💡',
# Technical Domains
'performance': '',
'optimization': '',
'analysis': '🔍',
'investigation': '🔍',
'configuration': '🔧',
'setup': '🔧',
'security': '🛡️',
'protection': '🛡️',
'deployment': '📦',
'package': '📦',
'design': '🎨',
'frontend': '🎨',
'network': '🌐',
'connectivity': '🌐',
'mobile': '📱',
'responsive': '📱',
'architecture': '🏗️',
'system structure': '🏗️',
'components': '🧩',
'modular': '🧩'
}
def _load_abbreviation_mappings(self) -> Dict[str, str]:
"""Load abbreviation system mappings from configuration."""
return {
# System & Architecture
'configuration': 'cfg',
'settings': 'cfg',
'implementation': 'impl',
'code structure': 'impl',
'architecture': 'arch',
'system design': 'arch',
'performance': 'perf',
'optimization': 'perf',
'operations': 'ops',
'deployment': 'ops',
'environment': 'env',
'runtime context': 'env',
# Development Process
'requirements': 'req',
'dependencies': 'deps',
'packages': 'deps',
'validation': 'val',
'verification': 'val',
'testing': 'test',
'quality assurance': 'test',
'documentation': 'docs',
'guides': 'docs',
'standards': 'std',
'conventions': 'std',
# Quality & Analysis
'quality': 'qual',
'maintainability': 'qual',
'security': 'sec',
'safety measures': 'sec',
'error': 'err',
'exception handling': 'err',
'recovery': 'rec',
'resilience': 'rec',
'severity': 'sev',
'priority level': 'sev',
'optimization': 'opt',
'improvement': 'opt'
}
def determine_compression_level(self, context: Dict[str, Any]) -> CompressionLevel:
"""
Determine appropriate compression level based on context.
Args:
context: Session context including resource usage, conversation length, etc.
Returns:
Appropriate CompressionLevel for the situation
"""
resource_usage = context.get('resource_usage_percent', 0)
conversation_length = context.get('conversation_length', 0)
user_requests_brevity = context.get('user_requests_brevity', False)
complexity_score = context.get('complexity_score', 0.0)
# Emergency compression for critical resource constraints
if resource_usage >= 95:
return CompressionLevel.EMERGENCY
# Critical compression for high resource usage
if resource_usage >= 85 or conversation_length > 200:
return CompressionLevel.CRITICAL
# Compressed level for moderate constraints
if resource_usage >= 70 or conversation_length > 100 or user_requests_brevity:
return CompressionLevel.COMPRESSED
# Efficient level for mild constraints or complex operations
if resource_usage >= 40 or complexity_score > 0.6:
return CompressionLevel.EFFICIENT
# Minimal compression for normal operations
return CompressionLevel.MINIMAL
def classify_content(self, content: str, metadata: Dict[str, Any]) -> ContentType:
"""
Classify content type for selective compression.
Args:
content: Content to classify
metadata: Metadata about the content (file paths, context, etc.)
Returns:
ContentType for compression decision making
"""
file_path = metadata.get('file_path', '')
context_type = metadata.get('context_type', '')
# Framework content - complete exclusion
framework_patterns = [
'/SuperClaude/SuperClaude/',
'~/.claude/',
'.claude/',
'SuperClaude/',
'CLAUDE.md',
'FLAGS.md',
'PRINCIPLES.md',
'ORCHESTRATOR.md',
'MCP_',
'MODE_',
'SESSION_LIFECYCLE.md'
]
for pattern in framework_patterns:
if pattern in file_path or pattern in content:
return ContentType.FRAMEWORK_CONTENT
# Session data - apply compression
if context_type in ['session_metadata', 'checkpoint_data', 'cache_content']:
return ContentType.SESSION_DATA
# Working artifacts - apply compression
if context_type in ['analysis_results', 'processing_data', 'working_artifacts']:
return ContentType.WORKING_ARTIFACTS
# User content - preserve with minimal compression only
user_patterns = [
'project_files',
'user_documentation',
'source_code',
'configuration_files',
'custom_content'
]
for pattern in user_patterns:
if pattern in context_type or pattern in file_path:
return ContentType.USER_CONTENT
# Default to user content preservation
return ContentType.USER_CONTENT
def compress_content(self,
content: str,
context: Dict[str, Any],
metadata: Dict[str, Any] = None) -> CompressionResult:
"""
Compress content with intelligent optimization.
Args:
content: Content to compress
context: Session context for compression level determination
metadata: Content metadata for selective compression
Returns:
CompressionResult with metrics and compressed content
"""
import time
start_time = time.time()
if metadata is None:
metadata = {}
# Classify content type
content_type = self.classify_content(content, metadata)
# Framework content - no compression
if content_type == ContentType.FRAMEWORK_CONTENT:
return CompressionResult(
original_length=len(content),
compressed_length=len(content),
compression_ratio=0.0,
quality_score=1.0,
techniques_used=['framework_exclusion'],
preservation_score=1.0,
processing_time_ms=(time.time() - start_time) * 1000
)
# User content - minimal compression only
if content_type == ContentType.USER_CONTENT:
compression_level = CompressionLevel.MINIMAL
else:
compression_level = self.determine_compression_level(context)
# Create compression strategy
strategy = self._create_compression_strategy(compression_level, content_type)
# Apply compression techniques
compressed_content = content
techniques_used = []
if strategy.symbol_systems_enabled:
compressed_content, symbol_techniques = self._apply_symbol_systems(compressed_content)
techniques_used.extend(symbol_techniques)
if strategy.abbreviation_systems_enabled:
compressed_content, abbrev_techniques = self._apply_abbreviation_systems(compressed_content)
techniques_used.extend(abbrev_techniques)
if strategy.structural_optimization:
compressed_content, struct_techniques = self._apply_structural_optimization(
compressed_content, compression_level
)
techniques_used.extend(struct_techniques)
# Calculate metrics
original_length = len(content)
compressed_length = len(compressed_content)
compression_ratio = (original_length - compressed_length) / original_length if original_length > 0 else 0.0
# Quality validation
quality_score = self._validate_compression_quality(content, compressed_content, strategy)
preservation_score = self._calculate_information_preservation(content, compressed_content)
processing_time = (time.time() - start_time) * 1000
# Cache result for performance
cache_key = hashlib.md5(content.encode()).hexdigest()
self.compression_cache[cache_key] = compressed_content
return CompressionResult(
original_length=original_length,
compressed_length=compressed_length,
compression_ratio=compression_ratio,
quality_score=quality_score,
techniques_used=techniques_used,
preservation_score=preservation_score,
processing_time_ms=processing_time
)
def _create_compression_strategy(self, level: CompressionLevel, content_type: ContentType) -> CompressionStrategy:
"""Create compression strategy based on level and content type."""
level_configs = {
CompressionLevel.MINIMAL: {
'symbol_systems': False,
'abbreviations': False,
'structural': False,
'quality_threshold': 0.98
},
CompressionLevel.EFFICIENT: {
'symbol_systems': True,
'abbreviations': False,
'structural': True,
'quality_threshold': 0.95
},
CompressionLevel.COMPRESSED: {
'symbol_systems': True,
'abbreviations': True,
'structural': True,
'quality_threshold': 0.90
},
CompressionLevel.CRITICAL: {
'symbol_systems': True,
'abbreviations': True,
'structural': True,
'quality_threshold': 0.85
},
CompressionLevel.EMERGENCY: {
'symbol_systems': True,
'abbreviations': True,
'structural': True,
'quality_threshold': 0.80
}
}
config = level_configs[level]
# Adjust for content type
if content_type == ContentType.USER_CONTENT:
# More conservative for user content
config['quality_threshold'] = min(config['quality_threshold'] + 0.1, 1.0)
return CompressionStrategy(
level=level,
symbol_systems_enabled=config['symbol_systems'],
abbreviation_systems_enabled=config['abbreviations'],
structural_optimization=config['structural'],
selective_preservation={},
quality_threshold=config['quality_threshold']
)
def _apply_symbol_systems(self, content: str) -> Tuple[str, List[str]]:
"""Apply symbol system replacements."""
compressed = content
techniques = []
# Apply symbol mappings with word boundary protection
for phrase, symbol in self.symbol_mappings.items():
pattern = r'\b' + re.escape(phrase) + r'\b'
if re.search(pattern, compressed, re.IGNORECASE):
compressed = re.sub(pattern, symbol, compressed, flags=re.IGNORECASE)
techniques.append(f"symbol_{phrase.replace(' ', '_')}")
return compressed, techniques
def _apply_abbreviation_systems(self, content: str) -> Tuple[str, List[str]]:
"""Apply abbreviation system replacements."""
compressed = content
techniques = []
# Apply abbreviation mappings with context awareness
for phrase, abbrev in self.abbreviation_mappings.items():
pattern = r'\b' + re.escape(phrase) + r'\b'
if re.search(pattern, compressed, re.IGNORECASE):
compressed = re.sub(pattern, abbrev, compressed, flags=re.IGNORECASE)
techniques.append(f"abbrev_{phrase.replace(' ', '_')}")
return compressed, techniques
def _apply_structural_optimization(self, content: str, level: CompressionLevel) -> Tuple[str, List[str]]:
"""Apply structural optimizations for token efficiency."""
compressed = content
techniques = []
# Remove redundant whitespace
compressed = re.sub(r'\s+', ' ', compressed)
compressed = re.sub(r'\n\s*\n', '\n', compressed)
techniques.append('whitespace_optimization')
# Aggressive optimizations for higher compression levels
if level in [CompressionLevel.COMPRESSED, CompressionLevel.CRITICAL, CompressionLevel.EMERGENCY]:
# Remove redundant words
compressed = re.sub(r'\b(the|a|an)\s+', '', compressed, flags=re.IGNORECASE)
techniques.append('article_removal')
# Simplify common phrases
phrase_simplifications = {
r'in order to': 'to',
r'it is important to note that': 'note:',
r'please be aware that': 'note:',
r'it should be noted that': 'note:',
r'for the purpose of': 'for',
r'with regard to': 'regarding',
r'in relation to': 'regarding'
}
for pattern, replacement in phrase_simplifications.items():
if re.search(pattern, compressed, re.IGNORECASE):
compressed = re.sub(pattern, replacement, compressed, flags=re.IGNORECASE)
techniques.append(f'phrase_simplification_{replacement}')
return compressed, techniques
def _validate_compression_quality(self, original: str, compressed: str, strategy: CompressionStrategy) -> float:
"""Validate compression quality against thresholds."""
# Simple quality heuristics (real implementation would be more sophisticated)
# Check if key information is preserved
original_words = set(re.findall(r'\b\w+\b', original.lower()))
compressed_words = set(re.findall(r'\b\w+\b', compressed.lower()))
# Word preservation ratio
word_preservation = len(compressed_words & original_words) / len(original_words) if original_words else 1.0
# Length efficiency (not too aggressive)
length_ratio = len(compressed) / len(original) if original else 1.0
# Penalize over-compression
if length_ratio < 0.3:
word_preservation *= 0.8
quality_score = (word_preservation * 0.7) + (min(length_ratio * 2, 1.0) * 0.3)
return min(quality_score, 1.0)
def _calculate_information_preservation(self, original: str, compressed: str) -> float:
"""Calculate information preservation score."""
# Simple preservation metric based on key information retention
# Extract key concepts (capitalized words, technical terms)
original_concepts = set(re.findall(r'\b[A-Z][a-z]+\b|\b\w+\.(js|py|md|yaml|json)\b', original))
compressed_concepts = set(re.findall(r'\b[A-Z][a-z]+\b|\b\w+\.(js|py|md|yaml|json)\b', compressed))
if not original_concepts:
return 1.0
preservation_ratio = len(compressed_concepts & original_concepts) / len(original_concepts)
return preservation_ratio
def get_compression_recommendations(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Get recommendations for optimizing compression."""
recommendations = []
current_level = self.determine_compression_level(context)
resource_usage = context.get('resource_usage_percent', 0)
# Resource-based recommendations
if resource_usage > 85:
recommendations.append("Enable emergency compression mode for critical resource constraints")
elif resource_usage > 70:
recommendations.append("Consider compressed mode for better resource efficiency")
elif resource_usage < 40:
recommendations.append("Resource usage low - minimal compression sufficient")
# Performance recommendations
if context.get('processing_time_ms', 0) > 500:
recommendations.append("Compression processing time high - consider caching strategies")
return {
'current_level': current_level.value,
'recommendations': recommendations,
'estimated_savings': self._estimate_compression_savings(current_level),
'quality_impact': self._estimate_quality_impact(current_level),
'performance_metrics': self.performance_metrics
}
def _estimate_compression_savings(self, level: CompressionLevel) -> Dict[str, float]:
"""Estimate compression savings for a given level."""
savings_map = {
CompressionLevel.MINIMAL: {'token_reduction': 0.15, 'time_savings': 0.05},
CompressionLevel.EFFICIENT: {'token_reduction': 0.40, 'time_savings': 0.15},
CompressionLevel.COMPRESSED: {'token_reduction': 0.60, 'time_savings': 0.25},
CompressionLevel.CRITICAL: {'token_reduction': 0.75, 'time_savings': 0.35},
CompressionLevel.EMERGENCY: {'token_reduction': 0.85, 'time_savings': 0.45}
}
return savings_map.get(level, {'token_reduction': 0.0, 'time_savings': 0.0})
def _estimate_quality_impact(self, level: CompressionLevel) -> float:
"""Estimate quality preservation for a given level."""
quality_map = {
CompressionLevel.MINIMAL: 0.98,
CompressionLevel.EFFICIENT: 0.95,
CompressionLevel.COMPRESSED: 0.90,
CompressionLevel.CRITICAL: 0.85,
CompressionLevel.EMERGENCY: 0.80
}
return quality_map.get(level, 0.95)

View File

@@ -0,0 +1,343 @@
"""
Core SuperClaude Framework Logic
Implements the core decision-making algorithms from the SuperClaude framework,
including RULES.md, PRINCIPLES.md, and ORCHESTRATOR.md patterns.
"""
import json
import time
from typing import Dict, Any, List, Optional, Tuple, Union
from dataclasses import dataclass
from enum import Enum
from yaml_loader import config_loader
class OperationType(Enum):
"""Types of operations SuperClaude can perform."""
READ = "read"
WRITE = "write"
EDIT = "edit"
ANALYZE = "analyze"
BUILD = "build"
TEST = "test"
DEPLOY = "deploy"
REFACTOR = "refactor"
class RiskLevel(Enum):
"""Risk levels for operations."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class OperationContext:
"""Context information for an operation."""
operation_type: OperationType
file_count: int
directory_count: int
has_tests: bool
is_production: bool
user_expertise: str # beginner, intermediate, expert
project_type: str # web, api, cli, library, etc.
complexity_score: float # 0.0 to 1.0
risk_level: RiskLevel
@dataclass
class ValidationResult:
"""Result of validation checks."""
is_valid: bool
issues: List[str]
warnings: List[str]
suggestions: List[str]
quality_score: float # 0.0 to 1.0
class FrameworkLogic:
"""
Core SuperClaude framework logic implementation.
Encapsulates decision-making algorithms from:
- RULES.md: Operational rules and security patterns
- PRINCIPLES.md: Development principles and quality standards
- ORCHESTRATOR.md: Intelligent routing and coordination
"""
def __init__(self):
# Load performance targets from SuperClaude configuration
self.performance_targets = {}
# Get hook-specific performance targets
self.performance_targets['session_start_ms'] = config_loader.get_hook_config(
'session_start', 'performance_target_ms', 50
)
self.performance_targets['tool_routing_ms'] = config_loader.get_hook_config(
'pre_tool_use', 'performance_target_ms', 200
)
self.performance_targets['validation_ms'] = config_loader.get_hook_config(
'post_tool_use', 'performance_target_ms', 100
)
self.performance_targets['compression_ms'] = config_loader.get_hook_config(
'pre_compact', 'performance_target_ms', 150
)
# Load additional performance settings from global configuration
global_perf = config_loader.get_performance_targets()
if global_perf:
self.performance_targets.update(global_perf)
def should_use_read_before_write(self, context: OperationContext) -> bool:
"""
RULES.md: Always use Read tool before Write or Edit operations.
"""
return context.operation_type in [OperationType.WRITE, OperationType.EDIT]
def calculate_complexity_score(self, operation_data: Dict[str, Any]) -> float:
"""
Calculate operation complexity score (0.0 to 1.0).
Factors:
- File count and types
- Operation scope
- Dependencies
- Risk factors
"""
score = 0.0
# File count factor (0.0 to 0.3)
file_count = operation_data.get('file_count', 1)
if file_count <= 1:
score += 0.0
elif file_count <= 3:
score += 0.1
elif file_count <= 10:
score += 0.2
else:
score += 0.3
# Directory factor (0.0 to 0.2)
dir_count = operation_data.get('directory_count', 1)
if dir_count > 2:
score += 0.2
elif dir_count > 1:
score += 0.1
# Operation type factor (0.0 to 0.3)
op_type = operation_data.get('operation_type', '')
if op_type in ['refactor', 'architecture', 'system-wide']:
score += 0.3
elif op_type in ['build', 'implement', 'migrate']:
score += 0.2
elif op_type in ['fix', 'update', 'improve']:
score += 0.1
# Language/framework factor (0.0 to 0.2)
if operation_data.get('multi_language', False):
score += 0.2
elif operation_data.get('framework_changes', False):
score += 0.1
return min(score, 1.0)
def assess_risk_level(self, context: OperationContext) -> RiskLevel:
"""
Assess risk level based on operation context.
"""
if context.is_production:
return RiskLevel.HIGH
if context.complexity_score > 0.7:
return RiskLevel.HIGH
elif context.complexity_score > 0.4:
return RiskLevel.MEDIUM
elif context.file_count > 10:
return RiskLevel.MEDIUM
else:
return RiskLevel.LOW
def should_enable_validation(self, context: OperationContext) -> bool:
"""
ORCHESTRATOR.md: Enable validation for production code or high-risk operations.
"""
return (
context.is_production or
context.risk_level in [RiskLevel.HIGH, RiskLevel.CRITICAL] or
context.operation_type in [OperationType.DEPLOY, OperationType.REFACTOR]
)
def should_enable_delegation(self, context: OperationContext) -> Tuple[bool, str]:
"""
ORCHESTRATOR.md: Enable delegation for multi-file operations.
Returns:
(should_delegate, delegation_strategy)
"""
if context.file_count > 3:
return True, "files"
elif context.directory_count > 2:
return True, "folders"
elif context.complexity_score > 0.4:
return True, "auto"
else:
return False, "none"
def validate_operation(self, operation_data: Dict[str, Any]) -> ValidationResult:
"""
PRINCIPLES.md: Validate operation against core principles.
"""
issues = []
warnings = []
suggestions = []
quality_score = 1.0
# Check for evidence-based decision making
if 'evidence' not in operation_data:
warnings.append("No evidence provided for decision")
quality_score -= 0.1
# Check for proper error handling
if operation_data.get('operation_type') in ['write', 'edit', 'deploy']:
if not operation_data.get('has_error_handling', False):
issues.append("Error handling not implemented")
quality_score -= 0.2
# Check for test coverage
if operation_data.get('affects_logic', False):
if not operation_data.get('has_tests', False):
warnings.append("No tests found for logic changes")
quality_score -= 0.1
suggestions.append("Add unit tests for new logic")
# Check for documentation
if operation_data.get('is_public_api', False):
if not operation_data.get('has_documentation', False):
warnings.append("Public API lacks documentation")
quality_score -= 0.1
suggestions.append("Add API documentation")
# Security checks
if operation_data.get('handles_user_input', False):
if not operation_data.get('has_input_validation', False):
issues.append("User input handling without validation")
quality_score -= 0.3
is_valid = len(issues) == 0 and quality_score >= 0.7
return ValidationResult(
is_valid=is_valid,
issues=issues,
warnings=warnings,
suggestions=suggestions,
quality_score=max(quality_score, 0.0)
)
def determine_thinking_mode(self, context: OperationContext) -> Optional[str]:
"""
FLAGS.md: Determine appropriate thinking mode based on complexity.
"""
if context.complexity_score >= 0.8:
return "--ultrathink"
elif context.complexity_score >= 0.6:
return "--think-hard"
elif context.complexity_score >= 0.3:
return "--think"
else:
return None
def should_enable_efficiency_mode(self, session_data: Dict[str, Any]) -> bool:
"""
MODE_Token_Efficiency.md: Enable efficiency mode based on resource usage.
"""
resource_usage = session_data.get('resource_usage_percent', 0)
conversation_length = session_data.get('conversation_length', 0)
return (
resource_usage > 75 or
conversation_length > 100 or
session_data.get('user_requests_brevity', False)
)
def get_quality_gates(self, context: OperationContext) -> List[str]:
"""
ORCHESTRATOR.md: Get appropriate quality gates for operation.
"""
gates = ['syntax_validation']
if context.operation_type in [OperationType.WRITE, OperationType.EDIT]:
gates.extend(['type_analysis', 'code_quality'])
if self.should_enable_validation(context):
gates.extend(['security_assessment', 'performance_analysis'])
if context.has_tests:
gates.append('test_validation')
if context.operation_type == OperationType.DEPLOY:
gates.extend(['integration_testing', 'deployment_validation'])
return gates
def estimate_performance_impact(self, context: OperationContext) -> Dict[str, Any]:
"""
Estimate performance impact and suggested optimizations.
"""
base_time = 100 # ms
# Calculate estimated time based on complexity
estimated_time = base_time * (1 + context.complexity_score * 3)
# Factor in file count
if context.file_count > 5:
estimated_time *= 1.5
# Suggest optimizations
optimizations = []
if context.file_count > 3:
optimizations.append("Consider parallel processing")
if context.complexity_score > 0.6:
optimizations.append("Enable delegation mode")
if context.directory_count > 2:
optimizations.append("Use folder-based delegation")
return {
'estimated_time_ms': int(estimated_time),
'performance_risk': 'high' if estimated_time > 1000 else 'low',
'suggested_optimizations': optimizations,
'efficiency_gains_possible': len(optimizations) > 0
}
def apply_superclaude_principles(self, operation_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Apply SuperClaude core principles to operation planning.
Returns enhanced operation data with principle-based recommendations.
"""
enhanced_data = operation_data.copy()
# Evidence > assumptions
if 'assumptions' in enhanced_data and not enhanced_data.get('evidence'):
enhanced_data['recommendations'] = enhanced_data.get('recommendations', [])
enhanced_data['recommendations'].append(
"Gather evidence to validate assumptions"
)
# Code > documentation
if enhanced_data.get('operation_type') == 'document' and not enhanced_data.get('has_working_code'):
enhanced_data['warnings'] = enhanced_data.get('warnings', [])
enhanced_data['warnings'].append(
"Ensure working code exists before extensive documentation"
)
# Efficiency > verbosity
if enhanced_data.get('output_length', 0) > 1000 and not enhanced_data.get('justification_for_length'):
enhanced_data['efficiency_suggestions'] = enhanced_data.get('efficiency_suggestions', [])
enhanced_data['efficiency_suggestions'].append(
"Consider token efficiency techniques for long outputs"
)
return enhanced_data

View File

@@ -0,0 +1,615 @@
"""
Learning Engine for SuperClaude-Lite
Cross-hook adaptation system that learns from user patterns, operation effectiveness,
and system performance to continuously improve SuperClaude intelligence.
"""
import json
import time
import statistics
from typing import Dict, Any, List, Optional, Tuple, Set
from dataclasses import dataclass, asdict
from enum import Enum
from pathlib import Path
from yaml_loader import config_loader
class LearningType(Enum):
"""Types of learning patterns."""
USER_PREFERENCE = "user_preference"
OPERATION_PATTERN = "operation_pattern"
PERFORMANCE_OPTIMIZATION = "performance_optimization"
ERROR_RECOVERY = "error_recovery"
EFFECTIVENESS_FEEDBACK = "effectiveness_feedback"
class AdaptationScope(Enum):
"""Scope of learning adaptations."""
SESSION = "session" # Apply only to current session
PROJECT = "project" # Apply to current project
USER = "user" # Apply across all user sessions
GLOBAL = "global" # Apply to all users (anonymized)
@dataclass
class LearningRecord:
"""Record of a learning event."""
timestamp: float
learning_type: LearningType
scope: AdaptationScope
context: Dict[str, Any]
pattern: Dict[str, Any]
effectiveness_score: float # 0.0 to 1.0
confidence: float # 0.0 to 1.0
metadata: Dict[str, Any]
@dataclass
class Adaptation:
"""An adaptation learned from patterns."""
adaptation_id: str
pattern_signature: str
trigger_conditions: Dict[str, Any]
modifications: Dict[str, Any]
effectiveness_history: List[float]
usage_count: int
last_used: float
confidence_score: float
@dataclass
class LearningInsight:
"""Insight derived from learning patterns."""
insight_type: str
description: str
evidence: List[str]
recommendations: List[str]
confidence: float
impact_score: float
class LearningEngine:
"""
Cross-hook adaptation system for continuous improvement.
Features:
- User preference learning and adaptation
- Operation pattern recognition and optimization
- Performance feedback integration
- Cross-hook coordination and knowledge sharing
- Effectiveness measurement and validation
- Personalization and project-specific adaptations
"""
def __init__(self, cache_dir: Path):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.learning_records: List[LearningRecord] = []
self.adaptations: Dict[str, Adaptation] = {}
self.user_preferences: Dict[str, Any] = {}
self.project_patterns: Dict[str, Dict[str, Any]] = {}
self._load_learning_data()
def _load_learning_data(self):
"""Load existing learning data from cache."""
try:
# Load learning records
records_file = self.cache_dir / "learning_records.json"
if records_file.exists():
with open(records_file, 'r') as f:
data = json.load(f)
self.learning_records = [
LearningRecord(**record) for record in data
]
# Load adaptations
adaptations_file = self.cache_dir / "adaptations.json"
if adaptations_file.exists():
with open(adaptations_file, 'r') as f:
data = json.load(f)
self.adaptations = {
k: Adaptation(**v) for k, v in data.items()
}
# Load user preferences
preferences_file = self.cache_dir / "user_preferences.json"
if preferences_file.exists():
with open(preferences_file, 'r') as f:
self.user_preferences = json.load(f)
# Load project patterns
patterns_file = self.cache_dir / "project_patterns.json"
if patterns_file.exists():
with open(patterns_file, 'r') as f:
self.project_patterns = json.load(f)
except Exception as e:
# Initialize empty data on error
self.learning_records = []
self.adaptations = {}
self.user_preferences = {}
self.project_patterns = {}
def record_learning_event(self,
learning_type: LearningType,
scope: AdaptationScope,
context: Dict[str, Any],
pattern: Dict[str, Any],
effectiveness_score: float,
confidence: float = 1.0,
metadata: Dict[str, Any] = None) -> str:
"""
Record a learning event for future adaptation.
Args:
learning_type: Type of learning event
scope: Scope of the learning (session, project, user, global)
context: Context in which the learning occurred
pattern: Pattern or behavior that was observed
effectiveness_score: How effective the pattern was (0.0 to 1.0)
confidence: Confidence in the learning (0.0 to 1.0)
metadata: Additional metadata about the learning event
Returns:
Learning record ID
"""
if metadata is None:
metadata = {}
record = LearningRecord(
timestamp=time.time(),
learning_type=learning_type,
scope=scope,
context=context,
pattern=pattern,
effectiveness_score=effectiveness_score,
confidence=confidence,
metadata=metadata
)
self.learning_records.append(record)
# Trigger adaptation creation if pattern is significant
if effectiveness_score > 0.7 and confidence > 0.6:
self._create_adaptation_from_record(record)
# Save to cache
self._save_learning_data()
return f"learning_{int(record.timestamp)}"
def _create_adaptation_from_record(self, record: LearningRecord):
"""Create an adaptation from a significant learning record."""
pattern_signature = self._generate_pattern_signature(record.pattern, record.context)
# Check if adaptation already exists
if pattern_signature in self.adaptations:
adaptation = self.adaptations[pattern_signature]
adaptation.effectiveness_history.append(record.effectiveness_score)
adaptation.usage_count += 1
adaptation.last_used = record.timestamp
# Update confidence based on consistency
if len(adaptation.effectiveness_history) > 1:
consistency = 1.0 - statistics.stdev(adaptation.effectiveness_history[-5:]) / max(statistics.mean(adaptation.effectiveness_history[-5:]), 0.1)
adaptation.confidence_score = min(consistency * record.confidence, 1.0)
else:
# Create new adaptation
adaptation_id = f"adapt_{int(record.timestamp)}_{len(self.adaptations)}"
adaptation = Adaptation(
adaptation_id=adaptation_id,
pattern_signature=pattern_signature,
trigger_conditions=self._extract_trigger_conditions(record.context),
modifications=self._extract_modifications(record.pattern),
effectiveness_history=[record.effectiveness_score],
usage_count=1,
last_used=record.timestamp,
confidence_score=record.confidence
)
self.adaptations[pattern_signature] = adaptation
def _generate_pattern_signature(self, pattern: Dict[str, Any], context: Dict[str, Any]) -> str:
"""Generate a unique signature for a pattern."""
# Create a simplified signature based on key pattern elements
key_elements = []
# Pattern type
if 'type' in pattern:
key_elements.append(f"type:{pattern['type']}")
# Context elements
if 'operation_type' in context:
key_elements.append(f"op:{context['operation_type']}")
if 'complexity_score' in context:
complexity_bucket = int(context['complexity_score'] * 10) / 10 # Round to 0.1
key_elements.append(f"complexity:{complexity_bucket}")
if 'file_count' in context:
file_bucket = min(context['file_count'], 10) # Cap at 10 for grouping
key_elements.append(f"files:{file_bucket}")
# Pattern-specific elements
for key in ['mcp_server', 'mode', 'compression_level', 'delegation_strategy']:
if key in pattern:
key_elements.append(f"{key}:{pattern[key]}")
return "_".join(sorted(key_elements))
def _extract_trigger_conditions(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Extract trigger conditions from context."""
conditions = {}
# Operational conditions
for key in ['operation_type', 'complexity_score', 'file_count', 'directory_count']:
if key in context:
conditions[key] = context[key]
# Environmental conditions
for key in ['resource_usage_percent', 'conversation_length', 'user_expertise']:
if key in context:
conditions[key] = context[key]
# Project conditions
for key in ['project_type', 'has_tests', 'is_production']:
if key in context:
conditions[key] = context[key]
return conditions
def _extract_modifications(self, pattern: Dict[str, Any]) -> Dict[str, Any]:
"""Extract modifications to apply from pattern."""
modifications = {}
# MCP server preferences
if 'mcp_server' in pattern:
modifications['preferred_mcp_server'] = pattern['mcp_server']
# Mode preferences
if 'mode' in pattern:
modifications['preferred_mode'] = pattern['mode']
# Flag preferences
if 'flags' in pattern:
modifications['suggested_flags'] = pattern['flags']
# Performance optimizations
if 'optimization' in pattern:
modifications['optimization'] = pattern['optimization']
return modifications
def get_adaptations_for_context(self, context: Dict[str, Any]) -> List[Adaptation]:
"""Get relevant adaptations for the current context."""
relevant_adaptations = []
for adaptation in self.adaptations.values():
if self._matches_trigger_conditions(adaptation.trigger_conditions, context):
# Check effectiveness threshold
if adaptation.confidence_score > 0.5 and len(adaptation.effectiveness_history) > 0:
avg_effectiveness = statistics.mean(adaptation.effectiveness_history)
if avg_effectiveness > 0.6:
relevant_adaptations.append(adaptation)
# Sort by effectiveness and confidence
relevant_adaptations.sort(
key=lambda a: statistics.mean(a.effectiveness_history) * a.confidence_score,
reverse=True
)
return relevant_adaptations
def _matches_trigger_conditions(self, conditions: Dict[str, Any], context: Dict[str, Any]) -> bool:
"""Check if context matches adaptation trigger conditions."""
for key, expected_value in conditions.items():
if key not in context:
continue
context_value = context[key]
# Exact match for strings and booleans
if isinstance(expected_value, (str, bool)):
if context_value != expected_value:
return False
# Range match for numbers
elif isinstance(expected_value, (int, float)):
tolerance = 0.1 if isinstance(expected_value, float) else 1
if abs(context_value - expected_value) > tolerance:
return False
return True
def apply_adaptations(self,
context: Dict[str, Any],
base_recommendations: Dict[str, Any]) -> Dict[str, Any]:
"""
Apply learned adaptations to enhance recommendations.
Args:
context: Current operation context
base_recommendations: Base recommendations before adaptation
Returns:
Enhanced recommendations with learned adaptations applied
"""
relevant_adaptations = self.get_adaptations_for_context(context)
enhanced_recommendations = base_recommendations.copy()
for adaptation in relevant_adaptations:
# Apply modifications from adaptation
for modification_type, modification_value in adaptation.modifications.items():
if modification_type == 'preferred_mcp_server':
# Enhance MCP server selection
if 'recommended_mcp_servers' not in enhanced_recommendations:
enhanced_recommendations['recommended_mcp_servers'] = []
servers = enhanced_recommendations['recommended_mcp_servers']
if modification_value not in servers:
servers.insert(0, modification_value) # Prioritize learned preference
elif modification_type == 'preferred_mode':
# Enhance mode selection
if 'recommended_modes' not in enhanced_recommendations:
enhanced_recommendations['recommended_modes'] = []
modes = enhanced_recommendations['recommended_modes']
if modification_value not in modes:
modes.insert(0, modification_value)
elif modification_type == 'suggested_flags':
# Enhance flag suggestions
if 'suggested_flags' not in enhanced_recommendations:
enhanced_recommendations['suggested_flags'] = []
for flag in modification_value:
if flag not in enhanced_recommendations['suggested_flags']:
enhanced_recommendations['suggested_flags'].append(flag)
elif modification_type == 'optimization':
# Apply performance optimizations
if 'optimizations' not in enhanced_recommendations:
enhanced_recommendations['optimizations'] = []
enhanced_recommendations['optimizations'].append(modification_value)
# Update usage tracking
adaptation.usage_count += 1
adaptation.last_used = time.time()
# Add learning metadata
enhanced_recommendations['applied_adaptations'] = [
{
'id': adaptation.adaptation_id,
'confidence': adaptation.confidence_score,
'effectiveness': statistics.mean(adaptation.effectiveness_history)
}
for adaptation in relevant_adaptations
]
return enhanced_recommendations
def record_effectiveness_feedback(self,
adaptation_ids: List[str],
effectiveness_score: float,
context: Dict[str, Any]):
"""Record feedback on adaptation effectiveness."""
for adaptation_id in adaptation_ids:
# Find adaptation by ID
adaptation = None
for adapt in self.adaptations.values():
if adapt.adaptation_id == adaptation_id:
adaptation = adapt
break
if adaptation:
adaptation.effectiveness_history.append(effectiveness_score)
# Update confidence based on consistency
if len(adaptation.effectiveness_history) > 2:
recent_scores = adaptation.effectiveness_history[-5:]
consistency = 1.0 - statistics.stdev(recent_scores) / max(statistics.mean(recent_scores), 0.1)
adaptation.confidence_score = min(consistency, 1.0)
# Record learning event
self.record_learning_event(
LearningType.EFFECTIVENESS_FEEDBACK,
AdaptationScope.USER,
context,
{'adaptation_id': adaptation_id},
effectiveness_score,
adaptation.confidence_score
)
def generate_learning_insights(self) -> List[LearningInsight]:
"""Generate insights from learning patterns."""
insights = []
# User preference insights
insights.extend(self._analyze_user_preferences())
# Performance pattern insights
insights.extend(self._analyze_performance_patterns())
# Error pattern insights
insights.extend(self._analyze_error_patterns())
# Effectiveness insights
insights.extend(self._analyze_effectiveness_patterns())
return insights
def _analyze_user_preferences(self) -> List[LearningInsight]:
"""Analyze user preference patterns."""
insights = []
# Analyze MCP server preferences
mcp_usage = {}
for record in self.learning_records:
if record.learning_type == LearningType.USER_PREFERENCE:
server = record.pattern.get('mcp_server')
if server:
if server not in mcp_usage:
mcp_usage[server] = []
mcp_usage[server].append(record.effectiveness_score)
if mcp_usage:
# Find most effective server
server_effectiveness = {
server: statistics.mean(scores)
for server, scores in mcp_usage.items()
if len(scores) >= 3
}
if server_effectiveness:
best_server = max(server_effectiveness, key=server_effectiveness.get)
best_score = server_effectiveness[best_server]
if best_score > 0.8:
insights.append(LearningInsight(
insight_type="user_preference",
description=f"User consistently prefers {best_server} MCP server",
evidence=[f"Effectiveness score: {best_score:.2f}", f"Usage count: {len(mcp_usage[best_server])}"],
recommendations=[f"Auto-suggest {best_server} for similar operations"],
confidence=min(best_score, 1.0),
impact_score=0.7
))
return insights
def _analyze_performance_patterns(self) -> List[LearningInsight]:
"""Analyze performance optimization patterns."""
insights = []
# Analyze delegation effectiveness
delegation_records = [
r for r in self.learning_records
if r.learning_type == LearningType.PERFORMANCE_OPTIMIZATION
and 'delegation' in r.pattern
]
if len(delegation_records) >= 5:
avg_effectiveness = statistics.mean([r.effectiveness_score for r in delegation_records])
if avg_effectiveness > 0.75:
insights.append(LearningInsight(
insight_type="performance_optimization",
description="Delegation consistently improves performance",
evidence=[f"Average effectiveness: {avg_effectiveness:.2f}", f"Sample size: {len(delegation_records)}"],
recommendations=["Enable delegation for multi-file operations", "Lower delegation threshold"],
confidence=avg_effectiveness,
impact_score=0.8
))
return insights
def _analyze_error_patterns(self) -> List[LearningInsight]:
"""Analyze error recovery patterns."""
insights = []
error_records = [
r for r in self.learning_records
if r.learning_type == LearningType.ERROR_RECOVERY
]
if len(error_records) >= 3:
# Analyze common error contexts
error_contexts = {}
for record in error_records:
context_key = record.context.get('operation_type', 'unknown')
if context_key not in error_contexts:
error_contexts[context_key] = []
error_contexts[context_key].append(record)
for context, records in error_contexts.items():
if len(records) >= 2:
avg_recovery_effectiveness = statistics.mean([r.effectiveness_score for r in records])
insights.append(LearningInsight(
insight_type="error_recovery",
description=f"Error patterns identified for {context} operations",
evidence=[f"Occurrence count: {len(records)}", f"Recovery effectiveness: {avg_recovery_effectiveness:.2f}"],
recommendations=[f"Add proactive validation for {context} operations"],
confidence=min(len(records) / 5, 1.0),
impact_score=0.6
))
return insights
def _analyze_effectiveness_patterns(self) -> List[LearningInsight]:
"""Analyze overall effectiveness patterns."""
insights = []
if len(self.learning_records) >= 10:
recent_records = sorted(self.learning_records, key=lambda r: r.timestamp)[-10:]
avg_effectiveness = statistics.mean([r.effectiveness_score for r in recent_records])
if avg_effectiveness > 0.8:
insights.append(LearningInsight(
insight_type="effectiveness_trend",
description="SuperClaude effectiveness is high and improving",
evidence=[f"Recent average effectiveness: {avg_effectiveness:.2f}"],
recommendations=["Continue current learning patterns", "Consider expanding adaptation scope"],
confidence=avg_effectiveness,
impact_score=0.9
))
elif avg_effectiveness < 0.6:
insights.append(LearningInsight(
insight_type="effectiveness_concern",
description="SuperClaude effectiveness below optimal",
evidence=[f"Recent average effectiveness: {avg_effectiveness:.2f}"],
recommendations=["Review recent adaptations", "Gather more user feedback", "Adjust learning thresholds"],
confidence=1.0 - avg_effectiveness,
impact_score=0.8
))
return insights
def _save_learning_data(self):
"""Save learning data to cache files."""
try:
# Save learning records
records_file = self.cache_dir / "learning_records.json"
with open(records_file, 'w') as f:
json.dump([asdict(record) for record in self.learning_records], f, indent=2)
# Save adaptations
adaptations_file = self.cache_dir / "adaptations.json"
with open(adaptations_file, 'w') as f:
json.dump({k: asdict(v) for k, v in self.adaptations.items()}, f, indent=2)
# Save user preferences
preferences_file = self.cache_dir / "user_preferences.json"
with open(preferences_file, 'w') as f:
json.dump(self.user_preferences, f, indent=2)
# Save project patterns
patterns_file = self.cache_dir / "project_patterns.json"
with open(patterns_file, 'w') as f:
json.dump(self.project_patterns, f, indent=2)
except Exception as e:
pass # Silent fail for cache operations
def cleanup_old_data(self, max_age_days: int = 30):
"""Clean up old learning data to prevent cache bloat."""
cutoff_time = time.time() - (max_age_days * 24 * 60 * 60)
# Remove old learning records
self.learning_records = [
record for record in self.learning_records
if record.timestamp > cutoff_time
]
# Remove unused adaptations
self.adaptations = {
k: v for k, v in self.adaptations.items()
if v.last_used > cutoff_time or v.usage_count > 5
}
self._save_learning_data()

View File

@@ -0,0 +1,275 @@
"""
Simple logger for SuperClaude-Lite hooks.
Provides structured logging of hook events for later analysis.
Focuses on capturing hook lifecycle, decisions, and errors in a
structured format without any analysis or complex features.
"""
import json
import logging
import os
import time
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional, Dict, Any
import uuid
import glob
# Import configuration loader
try:
from .yaml_loader import UnifiedConfigLoader
except ImportError:
# Fallback if yaml_loader is not available
UnifiedConfigLoader = None
class HookLogger:
"""Simple logger for SuperClaude-Lite hooks."""
def __init__(self, log_dir: str = None, retention_days: int = None):
"""
Initialize the logger.
Args:
log_dir: Directory to store log files. Defaults to cache/logs/
retention_days: Number of days to keep log files. Defaults to 30.
"""
# Load configuration
self.config = self._load_config()
# Check if logging is enabled
if not self.config.get('logging', {}).get('enabled', True):
self.enabled = False
return
self.enabled = True
# Set up log directory
if log_dir is None:
# Get SuperClaude-Lite root directory (2 levels up from shared/)
root_dir = Path(__file__).parent.parent.parent
log_dir_config = self.config.get('logging', {}).get('file_settings', {}).get('log_directory', 'cache/logs')
log_dir = root_dir / log_dir_config
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
# Log retention settings
if retention_days is None:
retention_days = self.config.get('logging', {}).get('file_settings', {}).get('retention_days', 30)
self.retention_days = retention_days
# Session ID for correlating events
self.session_id = str(uuid.uuid4())[:8]
# Set up Python logger
self._setup_logger()
# Clean up old logs on initialization
self._cleanup_old_logs()
def _load_config(self) -> Dict[str, Any]:
"""Load logging configuration from YAML file."""
if UnifiedConfigLoader is None:
# Return default configuration if loader not available
return {
'logging': {
'enabled': True,
'level': 'INFO',
'file_settings': {
'log_directory': 'cache/logs',
'retention_days': 30
}
}
}
try:
# Get project root
root_dir = Path(__file__).parent.parent.parent
loader = UnifiedConfigLoader(root_dir)
# Load logging configuration
config = loader.load_yaml('logging')
return config or {}
except Exception:
# Return default configuration on error
return {
'logging': {
'enabled': True,
'level': 'INFO',
'file_settings': {
'log_directory': 'cache/logs',
'retention_days': 30
}
}
}
def _setup_logger(self):
"""Set up the Python logger with JSON formatting."""
self.logger = logging.getLogger("superclaude_lite_hooks")
# Set log level from configuration
log_level_str = self.config.get('logging', {}).get('level', 'INFO').upper()
log_level = getattr(logging, log_level_str, logging.INFO)
self.logger.setLevel(log_level)
# Remove existing handlers to avoid duplicates
self.logger.handlers.clear()
# Create daily log file
today = datetime.now().strftime("%Y-%m-%d")
log_file = self.log_dir / f"superclaude-lite-{today}.log"
# File handler
handler = logging.FileHandler(log_file, mode='a', encoding='utf-8')
handler.setLevel(logging.INFO)
# Simple formatter - just output the message (which is already JSON)
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def _create_event(self, event_type: str, hook_name: str, data: Dict[str, Any] = None) -> Dict[str, Any]:
"""Create a structured event."""
event = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"session": self.session_id,
"hook": hook_name,
"event": event_type
}
if data:
event["data"] = data
return event
def _should_log_event(self, hook_name: str, event_type: str) -> bool:
"""Check if this event should be logged based on configuration."""
if not self.enabled:
return False
# Check hook-specific configuration
hook_config = self.config.get('hook_configuration', {}).get(hook_name, {})
if not hook_config.get('enabled', True):
return False
# Check event type configuration
hook_logging = self.config.get('logging', {}).get('hook_logging', {})
event_mapping = {
'start': 'log_lifecycle',
'end': 'log_lifecycle',
'decision': 'log_decisions',
'error': 'log_errors'
}
config_key = event_mapping.get(event_type, 'log_lifecycle')
return hook_logging.get(config_key, True)
def log_hook_start(self, hook_name: str, context: Optional[Dict[str, Any]] = None):
"""Log the start of a hook execution."""
if not self._should_log_event(hook_name, 'start'):
return
event = self._create_event("start", hook_name, context)
self.logger.info(json.dumps(event))
def log_hook_end(self, hook_name: str, duration_ms: int, success: bool, result: Optional[Dict[str, Any]] = None):
"""Log the end of a hook execution."""
if not self._should_log_event(hook_name, 'end'):
return
data = {
"duration_ms": duration_ms,
"success": success
}
if result:
data["result"] = result
event = self._create_event("end", hook_name, data)
self.logger.info(json.dumps(event))
def log_decision(self, hook_name: str, decision_type: str, choice: str, reason: str):
"""Log a decision made by a hook."""
if not self._should_log_event(hook_name, 'decision'):
return
data = {
"type": decision_type,
"choice": choice,
"reason": reason
}
event = self._create_event("decision", hook_name, data)
self.logger.info(json.dumps(event))
def log_error(self, hook_name: str, error: str, context: Optional[Dict[str, Any]] = None):
"""Log an error that occurred in a hook."""
if not self._should_log_event(hook_name, 'error'):
return
data = {
"error": error
}
if context:
data["context"] = context
event = self._create_event("error", hook_name, data)
self.logger.info(json.dumps(event))
def _cleanup_old_logs(self):
"""Remove log files older than retention_days."""
if self.retention_days <= 0:
return
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
# Find all log files
log_pattern = self.log_dir / "superclaude-lite-*.log"
for log_file in glob.glob(str(log_pattern)):
try:
# Extract date from filename
filename = os.path.basename(log_file)
date_str = filename.replace("superclaude-lite-", "").replace(".log", "")
file_date = datetime.strptime(date_str, "%Y-%m-%d")
# Remove if older than cutoff
if file_date < cutoff_date:
os.remove(log_file)
except (ValueError, OSError):
# Skip files that don't match expected format or can't be removed
continue
# Global logger instance
_logger = None
def get_logger() -> HookLogger:
"""Get the global logger instance."""
global _logger
if _logger is None:
_logger = HookLogger()
return _logger
# Convenience functions for easy hook integration
def log_hook_start(hook_name: str, context: Optional[Dict[str, Any]] = None):
"""Log the start of a hook execution."""
get_logger().log_hook_start(hook_name, context)
def log_hook_end(hook_name: str, duration_ms: int, success: bool, result: Optional[Dict[str, Any]] = None):
"""Log the end of a hook execution."""
get_logger().log_hook_end(hook_name, duration_ms, success, result)
def log_decision(hook_name: str, decision_type: str, choice: str, reason: str):
"""Log a decision made by a hook."""
get_logger().log_decision(hook_name, decision_type, choice, reason)
def log_error(hook_name: str, error: str, context: Optional[Dict[str, Any]] = None):
"""Log an error that occurred in a hook."""
get_logger().log_error(hook_name, error, context)

View File

@@ -0,0 +1,478 @@
"""
MCP Intelligence Engine for SuperClaude-Lite
Intelligent MCP server activation, coordination, and optimization based on
ORCHESTRATOR.md patterns and real-time context analysis.
"""
import json
import time
from typing import Dict, Any, List, Optional, Set, Tuple
from dataclasses import dataclass
from enum import Enum
from yaml_loader import config_loader
from pattern_detection import PatternDetector, PatternMatch
class MCPServerState(Enum):
"""States of MCP server availability."""
AVAILABLE = "available"
UNAVAILABLE = "unavailable"
LOADING = "loading"
ERROR = "error"
@dataclass
class MCPServerCapability:
"""Capability definition for an MCP server."""
server_name: str
primary_functions: List[str]
performance_profile: str # lightweight, standard, intensive
activation_cost_ms: int
token_efficiency: float # 0.0 to 1.0
quality_impact: float # 0.0 to 1.0
@dataclass
class MCPActivationPlan:
"""Plan for MCP server activation."""
servers_to_activate: List[str]
activation_order: List[str]
estimated_cost_ms: int
efficiency_gains: Dict[str, float]
fallback_strategy: Dict[str, str]
coordination_strategy: str
class MCPIntelligence:
"""
Intelligent MCP server management and coordination.
Implements ORCHESTRATOR.md patterns for:
- Smart server selection based on context
- Performance-optimized activation sequences
- Fallback strategies for server failures
- Cross-server coordination and caching
- Real-time adaptation based on effectiveness
"""
def __init__(self):
self.pattern_detector = PatternDetector()
self.server_capabilities = self._load_server_capabilities()
self.server_states = self._initialize_server_states()
self.activation_history = []
self.performance_metrics = {}
def _load_server_capabilities(self) -> Dict[str, MCPServerCapability]:
"""Load MCP server capabilities from configuration."""
config = config_loader.load_config('orchestrator')
capabilities = {}
servers_config = config.get('mcp_servers', {})
capabilities['context7'] = MCPServerCapability(
server_name='context7',
primary_functions=['library_docs', 'framework_patterns', 'best_practices'],
performance_profile='standard',
activation_cost_ms=150,
token_efficiency=0.8,
quality_impact=0.9
)
capabilities['sequential'] = MCPServerCapability(
server_name='sequential',
primary_functions=['complex_analysis', 'multi_step_reasoning', 'debugging'],
performance_profile='intensive',
activation_cost_ms=200,
token_efficiency=0.6,
quality_impact=0.95
)
capabilities['magic'] = MCPServerCapability(
server_name='magic',
primary_functions=['ui_components', 'design_systems', 'frontend_generation'],
performance_profile='standard',
activation_cost_ms=120,
token_efficiency=0.85,
quality_impact=0.9
)
capabilities['playwright'] = MCPServerCapability(
server_name='playwright',
primary_functions=['e2e_testing', 'browser_automation', 'performance_testing'],
performance_profile='intensive',
activation_cost_ms=300,
token_efficiency=0.7,
quality_impact=0.85
)
capabilities['morphllm'] = MCPServerCapability(
server_name='morphllm',
primary_functions=['intelligent_editing', 'pattern_application', 'fast_apply'],
performance_profile='lightweight',
activation_cost_ms=80,
token_efficiency=0.9,
quality_impact=0.8
)
capabilities['serena'] = MCPServerCapability(
server_name='serena',
primary_functions=['semantic_analysis', 'project_context', 'memory_management'],
performance_profile='standard',
activation_cost_ms=100,
token_efficiency=0.75,
quality_impact=0.95
)
return capabilities
def _initialize_server_states(self) -> Dict[str, MCPServerState]:
"""Initialize server state tracking."""
return {
server: MCPServerState.AVAILABLE
for server in self.server_capabilities.keys()
}
def create_activation_plan(self,
user_input: str,
context: Dict[str, Any],
operation_data: Dict[str, Any]) -> MCPActivationPlan:
"""
Create intelligent MCP server activation plan.
Args:
user_input: User's request or command
context: Session and environment context
operation_data: Information about the planned operation
Returns:
MCPActivationPlan with optimized server selection and coordination
"""
# Detect patterns to determine server needs
detection_result = self.pattern_detector.detect_patterns(
user_input, context, operation_data
)
# Extract recommended servers from pattern detection
recommended_servers = detection_result.recommended_mcp_servers
# Apply intelligent selection based on context
optimized_servers = self._optimize_server_selection(
recommended_servers, context, operation_data
)
# Determine activation order for optimal performance
activation_order = self._calculate_activation_order(optimized_servers, context)
# Calculate estimated costs and gains
estimated_cost = self._calculate_activation_cost(optimized_servers)
efficiency_gains = self._calculate_efficiency_gains(optimized_servers, operation_data)
# Create fallback strategy
fallback_strategy = self._create_fallback_strategy(optimized_servers)
# Determine coordination strategy
coordination_strategy = self._determine_coordination_strategy(
optimized_servers, operation_data
)
return MCPActivationPlan(
servers_to_activate=optimized_servers,
activation_order=activation_order,
estimated_cost_ms=estimated_cost,
efficiency_gains=efficiency_gains,
fallback_strategy=fallback_strategy,
coordination_strategy=coordination_strategy
)
def _optimize_server_selection(self,
recommended_servers: List[str],
context: Dict[str, Any],
operation_data: Dict[str, Any]) -> List[str]:
"""Apply intelligent optimization to server selection."""
optimized = set(recommended_servers)
# Morphllm vs Serena intelligence selection
file_count = operation_data.get('file_count', 1)
complexity_score = operation_data.get('complexity_score', 0.0)
if 'morphllm' in optimized and 'serena' in optimized:
# Choose the more appropriate server based on complexity
if file_count > 10 or complexity_score > 0.6:
optimized.remove('morphllm') # Use Serena for complex operations
else:
optimized.remove('serena') # Use Morphllm for efficient operations
elif file_count > 10 or complexity_score > 0.6:
# Auto-add Serena for complex operations
optimized.add('serena')
optimized.discard('morphllm')
elif file_count <= 10 and complexity_score <= 0.6:
# Auto-add Morphllm for simple operations
optimized.add('morphllm')
optimized.discard('serena')
# Resource constraint optimization
resource_usage = context.get('resource_usage_percent', 0)
if resource_usage > 85:
# Remove intensive servers under resource constraints
intensive_servers = {
name for name, cap in self.server_capabilities.items()
if cap.performance_profile == 'intensive'
}
optimized -= intensive_servers
# Performance optimization based on operation type
operation_type = operation_data.get('operation_type', '')
if operation_type in ['read', 'analyze'] and 'sequential' not in optimized:
# Add Sequential for analysis operations
optimized.add('sequential')
# Auto-add Context7 if external libraries detected
if operation_data.get('has_external_dependencies', False):
optimized.add('context7')
return list(optimized)
def _calculate_activation_order(self, servers: List[str], context: Dict[str, Any]) -> List[str]:
"""Calculate optimal activation order for performance."""
if not servers:
return []
# Sort by activation cost (lightweight first)
server_costs = [
(server, self.server_capabilities[server].activation_cost_ms)
for server in servers
]
server_costs.sort(key=lambda x: x[1])
# Special ordering rules
ordered = []
# 1. Serena first if present (provides context for others)
if 'serena' in servers:
ordered.append('serena')
servers = [s for s in servers if s != 'serena']
# 2. Context7 early for documentation context
if 'context7' in servers:
ordered.append('context7')
servers = [s for s in servers if s != 'context7']
# 3. Remaining servers by cost
remaining_costs = [
(server, self.server_capabilities[server].activation_cost_ms)
for server in servers
]
remaining_costs.sort(key=lambda x: x[1])
ordered.extend([server for server, _ in remaining_costs])
return ordered
def _calculate_activation_cost(self, servers: List[str]) -> int:
"""Calculate total activation cost in milliseconds."""
return sum(
self.server_capabilities[server].activation_cost_ms
for server in servers
if server in self.server_capabilities
)
def _calculate_efficiency_gains(self, servers: List[str], operation_data: Dict[str, Any]) -> Dict[str, float]:
"""Calculate expected efficiency gains from server activation."""
gains = {}
for server in servers:
if server not in self.server_capabilities:
continue
capability = self.server_capabilities[server]
# Base efficiency gain
base_gain = capability.token_efficiency * capability.quality_impact
# Context-specific adjustments
if server == 'morphllm' and operation_data.get('file_count', 1) <= 5:
gains[server] = base_gain * 1.2 # Extra efficient for small operations
elif server == 'serena' and operation_data.get('complexity_score', 0) > 0.6:
gains[server] = base_gain * 1.3 # Extra valuable for complex operations
elif server == 'sequential' and 'debug' in operation_data.get('operation_type', ''):
gains[server] = base_gain * 1.4 # Extra valuable for debugging
else:
gains[server] = base_gain
return gains
def _create_fallback_strategy(self, servers: List[str]) -> Dict[str, str]:
"""Create fallback strategy for server failures."""
fallbacks = {}
# Define fallback mappings
fallback_map = {
'morphllm': 'serena', # Serena can handle editing
'serena': 'morphllm', # Morphllm can handle simple edits
'sequential': 'context7', # Context7 for documentation-based analysis
'context7': 'sequential', # Sequential for complex analysis
'magic': 'morphllm', # Morphllm for component generation
'playwright': 'sequential' # Sequential for test planning
}
for server in servers:
fallback = fallback_map.get(server)
if fallback and fallback not in servers:
fallbacks[server] = fallback
else:
fallbacks[server] = 'native_tools' # Fall back to native Claude tools
return fallbacks
def _determine_coordination_strategy(self, servers: List[str], operation_data: Dict[str, Any]) -> str:
"""Determine how servers should coordinate."""
if len(servers) <= 1:
return 'single_server'
# Sequential coordination for complex analysis
if 'sequential' in servers and operation_data.get('complexity_score', 0) > 0.6:
return 'sequential_lead'
# Serena coordination for multi-file operations
if 'serena' in servers and operation_data.get('file_count', 1) > 5:
return 'serena_lead'
# Parallel coordination for independent operations
if len(servers) >= 3:
return 'parallel_with_sync'
return 'collaborative'
def execute_activation_plan(self, plan: MCPActivationPlan, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute MCP server activation plan with error handling and performance tracking.
Args:
plan: MCPActivationPlan to execute
context: Current session context
Returns:
Execution results with performance metrics and activated servers
"""
start_time = time.time()
activated_servers = []
failed_servers = []
fallback_activations = []
for server in plan.activation_order:
try:
# Check server availability
if self.server_states.get(server) == MCPServerState.UNAVAILABLE:
failed_servers.append(server)
self._handle_server_fallback(server, plan, fallback_activations)
continue
# Activate server (simulated - real implementation would call MCP)
self.server_states[server] = MCPServerState.LOADING
activation_start = time.time()
# Simulate activation time
expected_cost = self.server_capabilities[server].activation_cost_ms
actual_cost = expected_cost * (0.8 + 0.4 * hash(server) % 1000 / 1000) # Simulated variance
self.server_states[server] = MCPServerState.AVAILABLE
activated_servers.append(server)
# Track performance
activation_time = (time.time() - activation_start) * 1000
self.performance_metrics[server] = {
'last_activation_ms': activation_time,
'expected_ms': expected_cost,
'efficiency_ratio': expected_cost / max(activation_time, 1)
}
except Exception as e:
failed_servers.append(server)
self.server_states[server] = MCPServerState.ERROR
self._handle_server_fallback(server, plan, fallback_activations)
total_time = (time.time() - start_time) * 1000
# Update activation history
self.activation_history.append({
'timestamp': time.time(),
'plan': plan,
'activated': activated_servers,
'failed': failed_servers,
'fallbacks': fallback_activations,
'total_time_ms': total_time
})
return {
'activated_servers': activated_servers,
'failed_servers': failed_servers,
'fallback_activations': fallback_activations,
'total_activation_time_ms': total_time,
'coordination_strategy': plan.coordination_strategy,
'performance_metrics': self.performance_metrics
}
def _handle_server_fallback(self, failed_server: str, plan: MCPActivationPlan, fallback_activations: List[str]):
"""Handle server activation failure with fallback strategy."""
fallback = plan.fallback_strategy.get(failed_server)
if fallback and fallback != 'native_tools' and fallback not in plan.servers_to_activate:
# Try to activate fallback server
if self.server_states.get(fallback) == MCPServerState.AVAILABLE:
fallback_activations.append(f"{failed_server}->{fallback}")
# In real implementation, would activate fallback server
def get_optimization_recommendations(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Get recommendations for optimizing MCP server usage."""
recommendations = []
# Analyze activation history for patterns
if len(self.activation_history) >= 5:
recent_activations = self.activation_history[-5:]
# Check for frequently failing servers
failed_counts = {}
for activation in recent_activations:
for failed in activation['failed']:
failed_counts[failed] = failed_counts.get(failed, 0) + 1
for server, count in failed_counts.items():
if count >= 3:
recommendations.append(f"Server {server} failing frequently - consider fallback strategy")
# Check for performance issues
avg_times = {}
for activation in recent_activations:
total_time = activation['total_time_ms']
server_count = len(activation['activated'])
if server_count > 0:
avg_time_per_server = total_time / server_count
avg_times[len(activation['activated'])] = avg_time_per_server
if avg_times and max(avg_times.values()) > 500:
recommendations.append("Consider reducing concurrent server activations for better performance")
# Resource usage recommendations
resource_usage = context.get('resource_usage_percent', 0)
if resource_usage > 80:
recommendations.append("High resource usage - consider lightweight servers only")
return {
'recommendations': recommendations,
'performance_metrics': self.performance_metrics,
'server_states': {k: v.value for k, v in self.server_states.items()},
'efficiency_score': self._calculate_overall_efficiency()
}
def _calculate_overall_efficiency(self) -> float:
"""Calculate overall MCP system efficiency."""
if not self.performance_metrics:
return 1.0
efficiency_scores = []
for server, metrics in self.performance_metrics.items():
efficiency_ratio = metrics.get('efficiency_ratio', 1.0)
efficiency_scores.append(min(efficiency_ratio, 2.0)) # Cap at 200% efficiency
return sum(efficiency_scores) / len(efficiency_scores) if efficiency_scores else 1.0

View File

@@ -0,0 +1,459 @@
"""
Pattern Detection Engine for SuperClaude-Lite
Intelligent pattern detection for automatic mode activation,
MCP server selection, and operational optimization.
"""
import re
import json
from typing import Dict, Any, List, Set, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
from yaml_loader import config_loader
class PatternType(Enum):
"""Types of patterns we can detect."""
MODE_TRIGGER = "mode_trigger"
MCP_SERVER = "mcp_server"
OPERATION_TYPE = "operation_type"
COMPLEXITY_INDICATOR = "complexity_indicator"
PERSONA_HINT = "persona_hint"
PERFORMANCE_HINT = "performance_hint"
@dataclass
class PatternMatch:
"""A detected pattern match."""
pattern_type: PatternType
pattern_name: str
confidence: float # 0.0 to 1.0
matched_text: str
suggestions: List[str]
metadata: Dict[str, Any]
@dataclass
class DetectionResult:
"""Result of pattern detection analysis."""
matches: List[PatternMatch]
recommended_modes: List[str]
recommended_mcp_servers: List[str]
suggested_flags: List[str]
complexity_score: float
confidence_score: float
class PatternDetector:
"""
Intelligent pattern detection system.
Analyzes user input, context, and operation patterns to determine:
- Which SuperClaude modes should be activated
- Which MCP servers are needed
- What optimization flags to apply
- Complexity and performance considerations
"""
def __init__(self):
self.patterns = config_loader.load_config('modes')
self.mcp_patterns = config_loader.load_config('orchestrator')
self._compile_patterns()
def _compile_patterns(self):
"""Compile regex patterns for efficient matching."""
self.compiled_patterns = {}
# Mode detection patterns
for mode_name, mode_config in self.patterns.get('mode_detection', {}).items():
patterns = mode_config.get('trigger_patterns', [])
self.compiled_patterns[f"mode_{mode_name}"] = [
re.compile(pattern, re.IGNORECASE) for pattern in patterns
]
# MCP server patterns
for server_name, server_config in self.mcp_patterns.get('routing_patterns', {}).items():
triggers = server_config.get('triggers', [])
self.compiled_patterns[f"mcp_{server_name}"] = [
re.compile(trigger, re.IGNORECASE) for trigger in triggers
]
def detect_patterns(self,
user_input: str,
context: Dict[str, Any],
operation_data: Dict[str, Any]) -> DetectionResult:
"""
Perform comprehensive pattern detection.
Args:
user_input: User's request or command
context: Session and environment context
operation_data: Information about the planned operation
Returns:
DetectionResult with all detected patterns and recommendations
"""
matches = []
# Detect mode triggers
mode_matches = self._detect_mode_patterns(user_input, context)
matches.extend(mode_matches)
# Detect MCP server needs
mcp_matches = self._detect_mcp_patterns(user_input, context, operation_data)
matches.extend(mcp_matches)
# Detect complexity indicators
complexity_matches = self._detect_complexity_patterns(user_input, operation_data)
matches.extend(complexity_matches)
# Detect persona hints
persona_matches = self._detect_persona_patterns(user_input, context)
matches.extend(persona_matches)
# Calculate overall scores
complexity_score = self._calculate_complexity_score(matches, operation_data)
confidence_score = self._calculate_confidence_score(matches)
# Generate recommendations
recommended_modes = self._get_recommended_modes(matches, complexity_score)
recommended_mcp_servers = self._get_recommended_mcp_servers(matches, context)
suggested_flags = self._get_suggested_flags(matches, complexity_score, context)
return DetectionResult(
matches=matches,
recommended_modes=recommended_modes,
recommended_mcp_servers=recommended_mcp_servers,
suggested_flags=suggested_flags,
complexity_score=complexity_score,
confidence_score=confidence_score
)
def _detect_mode_patterns(self, user_input: str, context: Dict[str, Any]) -> List[PatternMatch]:
"""Detect which SuperClaude modes should be activated."""
matches = []
# Brainstorming mode detection
brainstorm_indicators = [
r"(?:i want to|thinking about|not sure|maybe|could we)\s+(?:build|create|make)",
r"(?:brainstorm|explore|figure out|discuss)",
r"(?:new project|startup idea|feature concept)",
r"(?:ambiguous|uncertain|unclear)\s+(?:requirements|needs)"
]
for pattern in brainstorm_indicators:
if re.search(pattern, user_input, re.IGNORECASE):
matches.append(PatternMatch(
pattern_type=PatternType.MODE_TRIGGER,
pattern_name="brainstorming",
confidence=0.8,
matched_text=re.search(pattern, user_input, re.IGNORECASE).group(),
suggestions=["Enable brainstorming mode for requirements discovery"],
metadata={"mode": "brainstorming", "auto_activate": True}
))
break
# Task management mode detection
task_management_indicators = [
r"(?:multiple|many|several)\s+(?:tasks|files|components)",
r"(?:build|implement|create)\s+(?:system|feature|application)",
r"(?:complex|comprehensive|large-scale)",
r"(?:manage|coordinate|orchestrate)\s+(?:work|tasks|operations)"
]
for pattern in task_management_indicators:
if re.search(pattern, user_input, re.IGNORECASE):
matches.append(PatternMatch(
pattern_type=PatternType.MODE_TRIGGER,
pattern_name="task_management",
confidence=0.7,
matched_text=re.search(pattern, user_input, re.IGNORECASE).group(),
suggestions=["Enable task management for complex operations"],
metadata={"mode": "task_management", "delegation_likely": True}
))
break
# Token efficiency mode detection
efficiency_indicators = [
r"(?:brief|concise|compressed|short)",
r"(?:token|resource|memory)\s+(?:limit|constraint|optimization)",
r"(?:efficient|optimized|minimal)\s+(?:output|response)"
]
for pattern in efficiency_indicators:
if re.search(pattern, user_input, re.IGNORECASE):
matches.append(PatternMatch(
pattern_type=PatternType.MODE_TRIGGER,
pattern_name="token_efficiency",
confidence=0.9,
matched_text=re.search(pattern, user_input, re.IGNORECASE).group(),
suggestions=["Enable token efficiency mode"],
metadata={"mode": "token_efficiency", "compression_needed": True}
))
break
# Check resource usage for automatic efficiency mode
resource_usage = context.get('resource_usage_percent', 0)
if resource_usage > 75:
matches.append(PatternMatch(
pattern_type=PatternType.MODE_TRIGGER,
pattern_name="token_efficiency",
confidence=0.85,
matched_text="high_resource_usage",
suggestions=["Auto-enable token efficiency due to resource constraints"],
metadata={"mode": "token_efficiency", "trigger": "resource_constraint"}
))
return matches
def _detect_mcp_patterns(self, user_input: str, context: Dict[str, Any], operation_data: Dict[str, Any]) -> List[PatternMatch]:
"""Detect which MCP servers should be activated."""
matches = []
# Context7 (library documentation)
context7_patterns = [
r"(?:library|framework|package)\s+(?:documentation|docs|patterns)",
r"(?:react|vue|angular|express|django|flask)",
r"(?:import|require|install|dependency)",
r"(?:official|standard|best practice)\s+(?:way|pattern|approach)"
]
for pattern in context7_patterns:
if re.search(pattern, user_input, re.IGNORECASE):
matches.append(PatternMatch(
pattern_type=PatternType.MCP_SERVER,
pattern_name="context7",
confidence=0.8,
matched_text=re.search(pattern, user_input, re.IGNORECASE).group(),
suggestions=["Enable Context7 for library documentation"],
metadata={"mcp_server": "context7", "focus": "documentation"}
))
break
# Sequential (complex analysis)
sequential_patterns = [
r"(?:analyze|debug|troubleshoot|investigate)",
r"(?:complex|complicated|multi-step|systematic)",
r"(?:architecture|system|design)\s+(?:review|analysis)",
r"(?:root cause|performance|bottleneck)"
]
for pattern in sequential_patterns:
if re.search(pattern, user_input, re.IGNORECASE):
matches.append(PatternMatch(
pattern_type=PatternType.MCP_SERVER,
pattern_name="sequential",
confidence=0.75,
matched_text=re.search(pattern, user_input, re.IGNORECASE).group(),
suggestions=["Enable Sequential for multi-step analysis"],
metadata={"mcp_server": "sequential", "analysis_type": "complex"}
))
break
# Magic (UI components)
magic_patterns = [
r"(?:component|button|form|modal|dialog)",
r"(?:ui|frontend|interface|design)",
r"(?:react|vue|angular)\s+(?:component|element)",
r"(?:responsive|mobile|accessibility)"
]
for pattern in magic_patterns:
if re.search(pattern, user_input, re.IGNORECASE):
matches.append(PatternMatch(
pattern_type=PatternType.MCP_SERVER,
pattern_name="magic",
confidence=0.85,
matched_text=re.search(pattern, user_input, re.IGNORECASE).group(),
suggestions=["Enable Magic for UI component generation"],
metadata={"mcp_server": "magic", "component_type": "ui"}
))
break
# Playwright (testing)
playwright_patterns = [
r"(?:test|testing|e2e|end-to-end)",
r"(?:browser|cross-browser|automation)",
r"(?:performance|visual|regression)\s+(?:test|testing)",
r"(?:validate|verify|check)\s+(?:functionality|behavior)"
]
for pattern in playwright_patterns:
if re.search(pattern, user_input, re.IGNORECASE):
matches.append(PatternMatch(
pattern_type=PatternType.MCP_SERVER,
pattern_name="playwright",
confidence=0.8,
matched_text=re.search(pattern, user_input, re.IGNORECASE).group(),
suggestions=["Enable Playwright for testing operations"],
metadata={"mcp_server": "playwright", "test_type": "e2e"}
))
break
# Morphllm vs Serena intelligence selection
file_count = operation_data.get('file_count', 1)
complexity = operation_data.get('complexity_score', 0.0)
if file_count > 10 or complexity > 0.6:
matches.append(PatternMatch(
pattern_type=PatternType.MCP_SERVER,
pattern_name="serena",
confidence=0.9,
matched_text="high_complexity_operation",
suggestions=["Use Serena for complex multi-file operations"],
metadata={"mcp_server": "serena", "reason": "complexity_threshold"}
))
elif file_count <= 10 and complexity <= 0.6:
matches.append(PatternMatch(
pattern_type=PatternType.MCP_SERVER,
pattern_name="morphllm",
confidence=0.8,
matched_text="moderate_complexity_operation",
suggestions=["Use Morphllm for efficient editing operations"],
metadata={"mcp_server": "morphllm", "reason": "efficiency_optimized"}
))
return matches
def _detect_complexity_patterns(self, user_input: str, operation_data: Dict[str, Any]) -> List[PatternMatch]:
"""Detect complexity indicators in the request."""
matches = []
# High complexity indicators
high_complexity_patterns = [
r"(?:entire|whole|complete)\s+(?:codebase|system|application)",
r"(?:refactor|migrate|restructure)\s+(?:all|everything|entire)",
r"(?:architecture|system-wide|comprehensive)\s+(?:change|update|redesign)",
r"(?:complex|complicated|sophisticated)\s+(?:logic|algorithm|system)"
]
for pattern in high_complexity_patterns:
if re.search(pattern, user_input, re.IGNORECASE):
matches.append(PatternMatch(
pattern_type=PatternType.COMPLEXITY_INDICATOR,
pattern_name="high_complexity",
confidence=0.8,
matched_text=re.search(pattern, user_input, re.IGNORECASE).group(),
suggestions=["Consider delegation and thinking modes"],
metadata={"complexity_level": "high", "score_boost": 0.3}
))
break
# File count indicators
file_count = operation_data.get('file_count', 1)
if file_count > 5:
matches.append(PatternMatch(
pattern_type=PatternType.COMPLEXITY_INDICATOR,
pattern_name="multi_file_operation",
confidence=0.9,
matched_text=f"{file_count}_files",
suggestions=["Enable delegation for multi-file operations"],
metadata={"file_count": file_count, "delegation_recommended": True}
))
return matches
def _detect_persona_patterns(self, user_input: str, context: Dict[str, Any]) -> List[PatternMatch]:
"""Detect hints about which persona should be active."""
matches = []
persona_patterns = {
"architect": [r"(?:architecture|design|structure|system)\s+(?:review|analysis|planning)"],
"performance": [r"(?:performance|optimization|speed|efficiency|bottleneck)"],
"security": [r"(?:security|vulnerability|audit|secure|safety)"],
"frontend": [r"(?:ui|frontend|interface|component|design|responsive)"],
"backend": [r"(?:api|server|database|backend|service)"],
"devops": [r"(?:deploy|deployment|ci|cd|infrastructure|docker|kubernetes)"],
"testing": [r"(?:test|testing|qa|quality|coverage|validation)"]
}
for persona, patterns in persona_patterns.items():
for pattern in patterns:
if re.search(pattern, user_input, re.IGNORECASE):
matches.append(PatternMatch(
pattern_type=PatternType.PERSONA_HINT,
pattern_name=persona,
confidence=0.7,
matched_text=re.search(pattern, user_input, re.IGNORECASE).group(),
suggestions=[f"Consider {persona} persona for specialized expertise"],
metadata={"persona": persona, "domain_specific": True}
))
break
return matches
def _calculate_complexity_score(self, matches: List[PatternMatch], operation_data: Dict[str, Any]) -> float:
"""Calculate overall complexity score from detected patterns."""
base_score = operation_data.get('complexity_score', 0.0)
# Add complexity from pattern matches
for match in matches:
if match.pattern_type == PatternType.COMPLEXITY_INDICATOR:
score_boost = match.metadata.get('score_boost', 0.1)
base_score += score_boost
return min(base_score, 1.0)
def _calculate_confidence_score(self, matches: List[PatternMatch]) -> float:
"""Calculate overall confidence in pattern detection."""
if not matches:
return 0.0
total_confidence = sum(match.confidence for match in matches)
return min(total_confidence / len(matches), 1.0)
def _get_recommended_modes(self, matches: List[PatternMatch], complexity_score: float) -> List[str]:
"""Get recommended modes based on detected patterns."""
modes = set()
for match in matches:
if match.pattern_type == PatternType.MODE_TRIGGER:
modes.add(match.pattern_name)
# Auto-activate based on complexity
if complexity_score > 0.6:
modes.add("task_management")
return list(modes)
def _get_recommended_mcp_servers(self, matches: List[PatternMatch], context: Dict[str, Any]) -> List[str]:
"""Get recommended MCP servers based on detected patterns."""
servers = set()
for match in matches:
if match.pattern_type == PatternType.MCP_SERVER:
servers.add(match.pattern_name)
return list(servers)
def _get_suggested_flags(self, matches: List[PatternMatch], complexity_score: float, context: Dict[str, Any]) -> List[str]:
"""Get suggested flags based on patterns and complexity."""
flags = []
# Thinking flags based on complexity
if complexity_score >= 0.8:
flags.append("--ultrathink")
elif complexity_score >= 0.6:
flags.append("--think-hard")
elif complexity_score >= 0.3:
flags.append("--think")
# Delegation flags
for match in matches:
if match.metadata.get("delegation_recommended"):
flags.append("--delegate auto")
break
# Efficiency flags
for match in matches:
if match.metadata.get("compression_needed") or context.get('resource_usage_percent', 0) > 75:
flags.append("--uc")
break
# Validation flags for high-risk operations
if complexity_score > 0.7 or context.get('is_production', False):
flags.append("--validate")
return flags

View File

@@ -0,0 +1,295 @@
"""
Unified Configuration Loader for SuperClaude-Lite
High-performance configuration loading with support for both JSON and YAML formats,
caching, hot-reload capabilities, and comprehensive error handling.
Supports:
- Claude Code settings.json (JSON format)
- SuperClaude superclaude-config.json (JSON format)
- YAML configuration files
- Unified configuration interface for hooks
"""
import os
import json
import yaml
import time
import hashlib
from typing import Dict, Any, Optional, Union
from pathlib import Path
class UnifiedConfigLoader:
"""
Intelligent configuration loader with support for JSON and YAML formats.
Features:
- Dual-configuration support (Claude Code + SuperClaude)
- File modification detection for hot-reload
- In-memory caching for performance (<10ms access)
- Comprehensive error handling and validation
- Environment variable interpolation
- Include/merge support for modular configs
- Unified configuration interface
"""
def __init__(self, project_root: Union[str, Path]):
self.project_root = Path(project_root)
self.config_dir = self.project_root / "config"
# Configuration file paths
self.claude_settings_path = self.project_root / "settings.json"
self.superclaude_config_path = self.project_root / "superclaude-config.json"
# Cache for all configuration sources
self._cache: Dict[str, Dict[str, Any]] = {}
self._file_hashes: Dict[str, str] = {}
self._last_check: Dict[str, float] = {}
self.check_interval = 1.0 # Check files every 1 second max
# Configuration source registry
self._config_sources = {
'claude_settings': self.claude_settings_path,
'superclaude_config': self.superclaude_config_path
}
def load_config(self, config_name: str, force_reload: bool = False) -> Dict[str, Any]:
"""
Load configuration with intelligent caching (supports JSON and YAML).
Args:
config_name: Name of config file or special config identifier
- For YAML: config file name without .yaml extension
- For JSON: 'claude_settings' or 'superclaude_config'
force_reload: Force reload even if cached
Returns:
Parsed configuration dictionary
Raises:
FileNotFoundError: If config file doesn't exist
ValueError: If config parsing fails
"""
# Handle special configuration sources
if config_name in self._config_sources:
return self._load_json_config(config_name, force_reload)
# Handle YAML configuration files
config_path = self.config_dir / f"{config_name}.yaml"
if not config_path.exists():
raise FileNotFoundError(f"Configuration file not found: {config_path}")
# Check if we need to reload
if not force_reload and self._should_use_cache(config_name, config_path):
return self._cache[config_name]
# Load and parse the YAML configuration
try:
with open(config_path, 'r', encoding='utf-8') as f:
content = f.read()
# Environment variable interpolation
content = self._interpolate_env_vars(content)
# Parse YAML
config = yaml.safe_load(content)
# Handle includes/merges
config = self._process_includes(config, config_path.parent)
# Update cache
self._cache[config_name] = config
self._file_hashes[config_name] = self._compute_hash(config_path)
self._last_check[config_name] = time.time()
return config
except yaml.YAMLError as e:
raise ValueError(f"YAML parsing error in {config_path}: {e}")
except Exception as e:
raise RuntimeError(f"Error loading config {config_name}: {e}")
def _load_json_config(self, config_name: str, force_reload: bool = False) -> Dict[str, Any]:
"""Load JSON configuration file."""
config_path = self._config_sources[config_name]
if not config_path.exists():
raise FileNotFoundError(f"Configuration file not found: {config_path}")
# Check if we need to reload
if not force_reload and self._should_use_cache(config_name, config_path):
return self._cache[config_name]
# Load and parse the JSON configuration
try:
with open(config_path, 'r', encoding='utf-8') as f:
content = f.read()
# Environment variable interpolation
content = self._interpolate_env_vars(content)
# Parse JSON
config = json.loads(content)
# Update cache
self._cache[config_name] = config
self._file_hashes[config_name] = self._compute_hash(config_path)
self._last_check[config_name] = time.time()
return config
except json.JSONDecodeError as e:
raise ValueError(f"JSON parsing error in {config_path}: {e}")
except Exception as e:
raise RuntimeError(f"Error loading JSON config {config_name}: {e}")
def get_section(self, config_name: str, section_path: str, default: Any = None) -> Any:
"""
Get specific section from configuration using dot notation.
Args:
config_name: Configuration file name or identifier
section_path: Dot-separated path (e.g., 'routing.ui_components')
default: Default value if section not found
Returns:
Configuration section value or default
"""
config = self.load_config(config_name)
try:
result = config
for key in section_path.split('.'):
result = result[key]
return result
except (KeyError, TypeError):
return default
def get_hook_config(self, hook_name: str, section_path: str = None, default: Any = None) -> Any:
"""
Get hook-specific configuration from SuperClaude config.
Args:
hook_name: Hook name (e.g., 'session_start', 'pre_tool_use')
section_path: Optional dot-separated path within hook config
default: Default value if not found
Returns:
Hook configuration or specific section
"""
base_path = f"hook_configurations.{hook_name}"
if section_path:
full_path = f"{base_path}.{section_path}"
else:
full_path = base_path
return self.get_section('superclaude_config', full_path, default)
def get_claude_hooks(self) -> Dict[str, Any]:
"""Get Claude Code hook definitions from settings.json."""
return self.get_section('claude_settings', 'hooks', {})
def get_superclaude_config(self, section_path: str = None, default: Any = None) -> Any:
"""
Get SuperClaude framework configuration.
Args:
section_path: Optional dot-separated path (e.g., 'global_configuration.performance_monitoring')
default: Default value if not found
Returns:
Configuration section or full config if no path specified
"""
if section_path:
return self.get_section('superclaude_config', section_path, default)
else:
return self.load_config('superclaude_config')
def get_mcp_server_config(self, server_name: str = None) -> Dict[str, Any]:
"""
Get MCP server configuration.
Args:
server_name: Optional specific server name
Returns:
MCP server configuration
"""
if server_name:
return self.get_section('superclaude_config', f'mcp_server_integration.servers.{server_name}', {})
else:
return self.get_section('superclaude_config', 'mcp_server_integration', {})
def get_performance_targets(self) -> Dict[str, Any]:
"""Get performance targets for all components."""
return self.get_section('superclaude_config', 'global_configuration.performance_monitoring', {})
def is_hook_enabled(self, hook_name: str) -> bool:
"""Check if a specific hook is enabled."""
return self.get_hook_config(hook_name, 'enabled', False)
def reload_all(self) -> None:
"""Force reload of all cached configurations."""
for config_name in list(self._cache.keys()):
self.load_config(config_name, force_reload=True)
def _should_use_cache(self, config_name: str, config_path: Path) -> bool:
"""Check if cached version is still valid."""
if config_name not in self._cache:
return False
# Rate limit file checks
now = time.time()
if now - self._last_check.get(config_name, 0) < self.check_interval:
return True
# Check if file changed
current_hash = self._compute_hash(config_path)
return current_hash == self._file_hashes.get(config_name)
def _compute_hash(self, file_path: Path) -> str:
"""Compute file hash for change detection."""
stat = file_path.stat()
return hashlib.md5(f"{stat.st_mtime}:{stat.st_size}".encode()).hexdigest()
def _interpolate_env_vars(self, content: str) -> str:
"""Replace environment variables in YAML content."""
import re
def replace_env_var(match):
var_name = match.group(1)
default_value = match.group(2) if match.group(2) else ""
return os.getenv(var_name, default_value)
# Support ${VAR} and ${VAR:default} syntax
pattern = r'\$\{([^}:]+)(?::([^}]*))?\}'
return re.sub(pattern, replace_env_var, content)
def _process_includes(self, config: Dict[str, Any], base_dir: Path) -> Dict[str, Any]:
"""Process include directives in configuration."""
if not isinstance(config, dict):
return config
# Handle special include key
if '__include__' in config:
includes = config.pop('__include__')
if isinstance(includes, str):
includes = [includes]
for include_file in includes:
include_path = base_dir / include_file
if include_path.exists():
with open(include_path, 'r', encoding='utf-8') as f:
included_config = yaml.safe_load(f.read())
if isinstance(included_config, dict):
# Merge included config (current config takes precedence)
included_config.update(config)
config = included_config
return config
# Global instance for shared use across hooks
config_loader = UnifiedConfigLoader(".")