mirror of
https://github.com/SuperClaude-Org/SuperClaude_Framework.git
synced 2025-12-29 16:16:08 +00:00
Initial commit: SuperClaude v3 Beta clean architecture
Complete foundational restructure with: - Simplified project architecture - Comprehensive documentation system - Modern installation framework - Clean configuration management - Updated profiles and settings 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
15
setup/core/__init__.py
Normal file
15
setup/core/__init__.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""Core modules for SuperClaude installation system"""
|
||||
|
||||
from .config_manager import ConfigManager
|
||||
from .settings_manager import SettingsManager
|
||||
from .file_manager import FileManager
|
||||
from .validator import Validator
|
||||
from .registry import ComponentRegistry
|
||||
|
||||
__all__ = [
|
||||
'ConfigManager',
|
||||
'SettingsManager',
|
||||
'FileManager',
|
||||
'Validator',
|
||||
'ComponentRegistry'
|
||||
]
|
||||
399
setup/core/config_manager.py
Normal file
399
setup/core/config_manager.py
Normal file
@@ -0,0 +1,399 @@
|
||||
"""
|
||||
Configuration management for SuperClaude installation system
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import Dict, Any, List, Optional
|
||||
from pathlib import Path
|
||||
|
||||
# Handle jsonschema import - if not available, use basic validation
|
||||
try:
|
||||
import jsonschema
|
||||
from jsonschema import validate, ValidationError
|
||||
JSONSCHEMA_AVAILABLE = True
|
||||
except ImportError:
|
||||
JSONSCHEMA_AVAILABLE = False
|
||||
|
||||
class ValidationError(Exception):
|
||||
"""Simple validation error for when jsonschema is not available"""
|
||||
def __init__(self, message):
|
||||
self.message = message
|
||||
super().__init__(message)
|
||||
|
||||
def validate(instance, schema):
|
||||
"""Dummy validation function"""
|
||||
# Basic type checking only
|
||||
if "type" in schema:
|
||||
expected_type = schema["type"]
|
||||
if expected_type == "object" and not isinstance(instance, dict):
|
||||
raise ValidationError(f"Expected object, got {type(instance).__name__}")
|
||||
elif expected_type == "array" and not isinstance(instance, list):
|
||||
raise ValidationError(f"Expected array, got {type(instance).__name__}")
|
||||
elif expected_type == "string" and not isinstance(instance, str):
|
||||
raise ValidationError(f"Expected string, got {type(instance).__name__}")
|
||||
elif expected_type == "integer" and not isinstance(instance, int):
|
||||
raise ValidationError(f"Expected integer, got {type(instance).__name__}")
|
||||
# Skip detailed validation if jsonschema not available
|
||||
|
||||
|
||||
class ConfigManager:
|
||||
"""Manages configuration files and validation"""
|
||||
|
||||
def __init__(self, config_dir: Path):
|
||||
"""
|
||||
Initialize config manager
|
||||
|
||||
Args:
|
||||
config_dir: Directory containing configuration files
|
||||
"""
|
||||
self.config_dir = config_dir
|
||||
self.features_file = config_dir / "features.json"
|
||||
self.requirements_file = config_dir / "requirements.json"
|
||||
self._features_cache = None
|
||||
self._requirements_cache = None
|
||||
|
||||
# Schema for features.json
|
||||
self.features_schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"components": {
|
||||
"type": "object",
|
||||
"patternProperties": {
|
||||
"^[a-zA-Z_][a-zA-Z0-9_]*$": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {"type": "string"},
|
||||
"version": {"type": "string"},
|
||||
"description": {"type": "string"},
|
||||
"category": {"type": "string"},
|
||||
"dependencies": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"}
|
||||
},
|
||||
"enabled": {"type": "boolean"},
|
||||
"required_tools": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"}
|
||||
}
|
||||
},
|
||||
"required": ["name", "version", "description", "category"],
|
||||
"additionalProperties": False
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["components"],
|
||||
"additionalProperties": False
|
||||
}
|
||||
|
||||
# Schema for requirements.json
|
||||
self.requirements_schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"python": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"min_version": {"type": "string"},
|
||||
"max_version": {"type": "string"}
|
||||
},
|
||||
"required": ["min_version"]
|
||||
},
|
||||
"node": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"min_version": {"type": "string"},
|
||||
"max_version": {"type": "string"},
|
||||
"required_for": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"}
|
||||
}
|
||||
},
|
||||
"required": ["min_version"]
|
||||
},
|
||||
"disk_space_mb": {"type": "integer"},
|
||||
"external_tools": {
|
||||
"type": "object",
|
||||
"patternProperties": {
|
||||
"^[a-zA-Z_][a-zA-Z0-9_-]*$": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"command": {"type": "string"},
|
||||
"min_version": {"type": "string"},
|
||||
"required_for": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"}
|
||||
},
|
||||
"optional": {"type": "boolean"}
|
||||
},
|
||||
"required": ["command"],
|
||||
"additionalProperties": False
|
||||
}
|
||||
}
|
||||
},
|
||||
"installation_commands": {
|
||||
"type": "object",
|
||||
"patternProperties": {
|
||||
"^[a-zA-Z_][a-zA-Z0-9_-]*$": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"linux": {"type": "string"},
|
||||
"darwin": {"type": "string"},
|
||||
"win32": {"type": "string"},
|
||||
"all": {"type": "string"},
|
||||
"description": {"type": "string"}
|
||||
},
|
||||
"additionalProperties": False
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["python", "disk_space_mb"],
|
||||
"additionalProperties": False
|
||||
}
|
||||
|
||||
def load_features(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Load and validate features configuration
|
||||
|
||||
Returns:
|
||||
Features configuration dict
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If features.json not found
|
||||
ValidationError: If features.json is invalid
|
||||
"""
|
||||
if self._features_cache is not None:
|
||||
return self._features_cache
|
||||
|
||||
if not self.features_file.exists():
|
||||
raise FileNotFoundError(f"Features config not found: {self.features_file}")
|
||||
|
||||
try:
|
||||
with open(self.features_file, 'r') as f:
|
||||
features = json.load(f)
|
||||
|
||||
# Validate schema
|
||||
validate(instance=features, schema=self.features_schema)
|
||||
|
||||
self._features_cache = features
|
||||
return features
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValidationError(f"Invalid JSON in {self.features_file}: {e}")
|
||||
except ValidationError as e:
|
||||
raise ValidationError(f"Invalid features schema: {e.message}")
|
||||
|
||||
def load_requirements(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Load and validate requirements configuration
|
||||
|
||||
Returns:
|
||||
Requirements configuration dict
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If requirements.json not found
|
||||
ValidationError: If requirements.json is invalid
|
||||
"""
|
||||
if self._requirements_cache is not None:
|
||||
return self._requirements_cache
|
||||
|
||||
if not self.requirements_file.exists():
|
||||
raise FileNotFoundError(f"Requirements config not found: {self.requirements_file}")
|
||||
|
||||
try:
|
||||
with open(self.requirements_file, 'r') as f:
|
||||
requirements = json.load(f)
|
||||
|
||||
# Validate schema
|
||||
validate(instance=requirements, schema=self.requirements_schema)
|
||||
|
||||
self._requirements_cache = requirements
|
||||
return requirements
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValidationError(f"Invalid JSON in {self.requirements_file}: {e}")
|
||||
except ValidationError as e:
|
||||
raise ValidationError(f"Invalid requirements schema: {e.message}")
|
||||
|
||||
def get_component_info(self, component_name: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Get information about a specific component
|
||||
|
||||
Args:
|
||||
component_name: Name of component
|
||||
|
||||
Returns:
|
||||
Component info dict or None if not found
|
||||
"""
|
||||
features = self.load_features()
|
||||
return features.get("components", {}).get(component_name)
|
||||
|
||||
def get_enabled_components(self) -> List[str]:
|
||||
"""
|
||||
Get list of enabled component names
|
||||
|
||||
Returns:
|
||||
List of enabled component names
|
||||
"""
|
||||
features = self.load_features()
|
||||
enabled = []
|
||||
|
||||
for name, info in features.get("components", {}).items():
|
||||
if info.get("enabled", True): # Default to enabled
|
||||
enabled.append(name)
|
||||
|
||||
return enabled
|
||||
|
||||
def get_components_by_category(self, category: str) -> List[str]:
|
||||
"""
|
||||
Get component names by category
|
||||
|
||||
Args:
|
||||
category: Component category
|
||||
|
||||
Returns:
|
||||
List of component names in category
|
||||
"""
|
||||
features = self.load_features()
|
||||
components = []
|
||||
|
||||
for name, info in features.get("components", {}).items():
|
||||
if info.get("category") == category:
|
||||
components.append(name)
|
||||
|
||||
return components
|
||||
|
||||
def get_component_dependencies(self, component_name: str) -> List[str]:
|
||||
"""
|
||||
Get dependencies for a component
|
||||
|
||||
Args:
|
||||
component_name: Name of component
|
||||
|
||||
Returns:
|
||||
List of dependency component names
|
||||
"""
|
||||
component_info = self.get_component_info(component_name)
|
||||
if component_info:
|
||||
return component_info.get("dependencies", [])
|
||||
return []
|
||||
|
||||
def load_profile(self, profile_path: Path) -> Dict[str, Any]:
|
||||
"""
|
||||
Load installation profile
|
||||
|
||||
Args:
|
||||
profile_path: Path to profile JSON file
|
||||
|
||||
Returns:
|
||||
Profile configuration dict
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If profile not found
|
||||
ValidationError: If profile is invalid
|
||||
"""
|
||||
if not profile_path.exists():
|
||||
raise FileNotFoundError(f"Profile not found: {profile_path}")
|
||||
|
||||
try:
|
||||
with open(profile_path, 'r') as f:
|
||||
profile = json.load(f)
|
||||
|
||||
# Basic validation
|
||||
if "components" not in profile:
|
||||
raise ValidationError("Profile must contain 'components' field")
|
||||
|
||||
if not isinstance(profile["components"], list):
|
||||
raise ValidationError("Profile 'components' must be a list")
|
||||
|
||||
# Validate that all components exist
|
||||
features = self.load_features()
|
||||
available_components = set(features.get("components", {}).keys())
|
||||
|
||||
for component in profile["components"]:
|
||||
if component not in available_components:
|
||||
raise ValidationError(f"Unknown component in profile: {component}")
|
||||
|
||||
return profile
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValidationError(f"Invalid JSON in {profile_path}: {e}")
|
||||
|
||||
def get_system_requirements(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Get system requirements
|
||||
|
||||
Returns:
|
||||
System requirements dict
|
||||
"""
|
||||
return self.load_requirements()
|
||||
|
||||
def get_requirements_for_components(self, component_names: List[str]) -> Dict[str, Any]:
|
||||
"""
|
||||
Get consolidated requirements for specific components
|
||||
|
||||
Args:
|
||||
component_names: List of component names
|
||||
|
||||
Returns:
|
||||
Consolidated requirements dict
|
||||
"""
|
||||
requirements = self.load_requirements()
|
||||
features = self.load_features()
|
||||
|
||||
# Start with base requirements
|
||||
result = {
|
||||
"python": requirements["python"],
|
||||
"disk_space_mb": requirements["disk_space_mb"],
|
||||
"external_tools": {}
|
||||
}
|
||||
|
||||
# Add Node.js requirements if needed
|
||||
node_required = False
|
||||
for component_name in component_names:
|
||||
component_info = features.get("components", {}).get(component_name, {})
|
||||
required_tools = component_info.get("required_tools", [])
|
||||
|
||||
if "node" in required_tools:
|
||||
node_required = True
|
||||
break
|
||||
|
||||
if node_required and "node" in requirements:
|
||||
result["node"] = requirements["node"]
|
||||
|
||||
# Add external tool requirements
|
||||
for component_name in component_names:
|
||||
component_info = features.get("components", {}).get(component_name, {})
|
||||
required_tools = component_info.get("required_tools", [])
|
||||
|
||||
for tool in required_tools:
|
||||
if tool in requirements.get("external_tools", {}):
|
||||
result["external_tools"][tool] = requirements["external_tools"][tool]
|
||||
|
||||
return result
|
||||
|
||||
def validate_config_files(self) -> List[str]:
|
||||
"""
|
||||
Validate all configuration files
|
||||
|
||||
Returns:
|
||||
List of validation errors (empty if all valid)
|
||||
"""
|
||||
errors = []
|
||||
|
||||
try:
|
||||
self.load_features()
|
||||
except Exception as e:
|
||||
errors.append(f"Features config error: {e}")
|
||||
|
||||
try:
|
||||
self.load_requirements()
|
||||
except Exception as e:
|
||||
errors.append(f"Requirements config error: {e}")
|
||||
|
||||
return errors
|
||||
|
||||
def clear_cache(self) -> None:
|
||||
"""Clear cached configuration data"""
|
||||
self._features_cache = None
|
||||
self._requirements_cache = None
|
||||
428
setup/core/file_manager.py
Normal file
428
setup/core/file_manager.py
Normal file
@@ -0,0 +1,428 @@
|
||||
"""
|
||||
Cross-platform file management for SuperClaude installation system
|
||||
"""
|
||||
|
||||
import shutil
|
||||
import stat
|
||||
from typing import List, Optional, Callable, Dict, Any
|
||||
from pathlib import Path
|
||||
import fnmatch
|
||||
import hashlib
|
||||
|
||||
|
||||
class FileManager:
|
||||
"""Cross-platform file operations manager"""
|
||||
|
||||
def __init__(self, dry_run: bool = False):
|
||||
"""
|
||||
Initialize file manager
|
||||
|
||||
Args:
|
||||
dry_run: If True, only simulate file operations
|
||||
"""
|
||||
self.dry_run = dry_run
|
||||
self.copied_files: List[Path] = []
|
||||
self.created_dirs: List[Path] = []
|
||||
|
||||
def copy_file(self, source: Path, target: Path, preserve_permissions: bool = True) -> bool:
|
||||
"""
|
||||
Copy single file with permission preservation
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
target: Target file path
|
||||
preserve_permissions: Whether to preserve file permissions
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
if not source.exists():
|
||||
raise FileNotFoundError(f"Source file not found: {source}")
|
||||
|
||||
if not source.is_file():
|
||||
raise ValueError(f"Source is not a file: {source}")
|
||||
|
||||
if self.dry_run:
|
||||
print(f"[DRY RUN] Would copy {source} -> {target}")
|
||||
return True
|
||||
|
||||
try:
|
||||
# Ensure target directory exists
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Copy file
|
||||
if preserve_permissions:
|
||||
shutil.copy2(source, target)
|
||||
else:
|
||||
shutil.copy(source, target)
|
||||
|
||||
self.copied_files.append(target)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error copying {source} to {target}: {e}")
|
||||
return False
|
||||
|
||||
def copy_directory(self, source: Path, target: Path, ignore_patterns: Optional[List[str]] = None) -> bool:
|
||||
"""
|
||||
Recursively copy directory with gitignore-style patterns
|
||||
|
||||
Args:
|
||||
source: Source directory path
|
||||
target: Target directory path
|
||||
ignore_patterns: List of patterns to ignore (gitignore style)
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
if not source.exists():
|
||||
raise FileNotFoundError(f"Source directory not found: {source}")
|
||||
|
||||
if not source.is_dir():
|
||||
raise ValueError(f"Source is not a directory: {source}")
|
||||
|
||||
ignore_patterns = ignore_patterns or []
|
||||
default_ignores = ['.git', '.gitignore', '__pycache__', '*.pyc', '.DS_Store']
|
||||
all_ignores = ignore_patterns + default_ignores
|
||||
|
||||
if self.dry_run:
|
||||
print(f"[DRY RUN] Would copy directory {source} -> {target}")
|
||||
return True
|
||||
|
||||
try:
|
||||
# Create ignore function
|
||||
def ignore_func(directory: str, contents: List[str]) -> List[str]:
|
||||
ignored = []
|
||||
for item in contents:
|
||||
item_path = Path(directory) / item
|
||||
rel_path = item_path.relative_to(source)
|
||||
|
||||
# Check against ignore patterns
|
||||
for pattern in all_ignores:
|
||||
if fnmatch.fnmatch(item, pattern) or fnmatch.fnmatch(str(rel_path), pattern):
|
||||
ignored.append(item)
|
||||
break
|
||||
|
||||
return ignored
|
||||
|
||||
# Copy tree
|
||||
shutil.copytree(source, target, ignore=ignore_func, dirs_exist_ok=True)
|
||||
|
||||
# Track created directories and files
|
||||
for item in target.rglob('*'):
|
||||
if item.is_dir():
|
||||
self.created_dirs.append(item)
|
||||
else:
|
||||
self.copied_files.append(item)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error copying directory {source} to {target}: {e}")
|
||||
return False
|
||||
|
||||
def ensure_directory(self, directory: Path, mode: int = 0o755) -> bool:
|
||||
"""
|
||||
Create directory and parents if they don't exist
|
||||
|
||||
Args:
|
||||
directory: Directory path to create
|
||||
mode: Directory permissions (Unix only)
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
if self.dry_run:
|
||||
print(f"[DRY RUN] Would create directory {directory}")
|
||||
return True
|
||||
|
||||
try:
|
||||
directory.mkdir(parents=True, exist_ok=True, mode=mode)
|
||||
|
||||
if directory not in self.created_dirs:
|
||||
self.created_dirs.append(directory)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error creating directory {directory}: {e}")
|
||||
return False
|
||||
|
||||
def remove_file(self, file_path: Path) -> bool:
|
||||
"""
|
||||
Remove single file
|
||||
|
||||
Args:
|
||||
file_path: Path to file to remove
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
if not file_path.exists():
|
||||
return True # Already gone
|
||||
|
||||
if self.dry_run:
|
||||
print(f"[DRY RUN] Would remove file {file_path}")
|
||||
return True
|
||||
|
||||
try:
|
||||
if file_path.is_file():
|
||||
file_path.unlink()
|
||||
else:
|
||||
print(f"Warning: {file_path} is not a file, skipping")
|
||||
return False
|
||||
|
||||
# Remove from tracking
|
||||
if file_path in self.copied_files:
|
||||
self.copied_files.remove(file_path)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error removing file {file_path}: {e}")
|
||||
return False
|
||||
|
||||
def remove_directory(self, directory: Path, recursive: bool = False) -> bool:
|
||||
"""
|
||||
Remove directory
|
||||
|
||||
Args:
|
||||
directory: Directory path to remove
|
||||
recursive: Whether to remove recursively
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
if not directory.exists():
|
||||
return True # Already gone
|
||||
|
||||
if self.dry_run:
|
||||
action = "recursively remove" if recursive else "remove"
|
||||
print(f"[DRY RUN] Would {action} directory {directory}")
|
||||
return True
|
||||
|
||||
try:
|
||||
if recursive:
|
||||
shutil.rmtree(directory)
|
||||
else:
|
||||
directory.rmdir() # Only works if empty
|
||||
|
||||
# Remove from tracking
|
||||
if directory in self.created_dirs:
|
||||
self.created_dirs.remove(directory)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error removing directory {directory}: {e}")
|
||||
return False
|
||||
|
||||
def resolve_home_path(self, path: str) -> Path:
|
||||
"""
|
||||
Convert path with ~ to actual home path on any OS
|
||||
|
||||
Args:
|
||||
path: Path string potentially containing ~
|
||||
|
||||
Returns:
|
||||
Resolved Path object
|
||||
"""
|
||||
return Path(path).expanduser().resolve()
|
||||
|
||||
def make_executable(self, file_path: Path) -> bool:
|
||||
"""
|
||||
Make file executable (Unix/Linux/macOS)
|
||||
|
||||
Args:
|
||||
file_path: Path to file to make executable
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
if not file_path.exists():
|
||||
return False
|
||||
|
||||
if self.dry_run:
|
||||
print(f"[DRY RUN] Would make {file_path} executable")
|
||||
return True
|
||||
|
||||
try:
|
||||
# Get current permissions
|
||||
current_mode = file_path.stat().st_mode
|
||||
|
||||
# Add execute permissions for owner, group, and others
|
||||
new_mode = current_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
|
||||
|
||||
file_path.chmod(new_mode)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error making {file_path} executable: {e}")
|
||||
return False
|
||||
|
||||
def get_file_hash(self, file_path: Path, algorithm: str = 'sha256') -> Optional[str]:
|
||||
"""
|
||||
Calculate file hash
|
||||
|
||||
Args:
|
||||
file_path: Path to file
|
||||
algorithm: Hash algorithm (md5, sha1, sha256, etc.)
|
||||
|
||||
Returns:
|
||||
Hex hash string or None if error
|
||||
"""
|
||||
if not file_path.exists() or not file_path.is_file():
|
||||
return None
|
||||
|
||||
try:
|
||||
hasher = hashlib.new(algorithm)
|
||||
|
||||
with open(file_path, 'rb') as f:
|
||||
# Read in chunks for large files
|
||||
for chunk in iter(lambda: f.read(8192), b""):
|
||||
hasher.update(chunk)
|
||||
|
||||
return hasher.hexdigest()
|
||||
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def verify_file_integrity(self, file_path: Path, expected_hash: str, algorithm: str = 'sha256') -> bool:
|
||||
"""
|
||||
Verify file integrity using hash
|
||||
|
||||
Args:
|
||||
file_path: Path to file to verify
|
||||
expected_hash: Expected hash value
|
||||
algorithm: Hash algorithm used
|
||||
|
||||
Returns:
|
||||
True if file matches expected hash, False otherwise
|
||||
"""
|
||||
actual_hash = self.get_file_hash(file_path, algorithm)
|
||||
return actual_hash is not None and actual_hash.lower() == expected_hash.lower()
|
||||
|
||||
def get_directory_size(self, directory: Path) -> int:
|
||||
"""
|
||||
Calculate total size of directory in bytes
|
||||
|
||||
Args:
|
||||
directory: Directory path
|
||||
|
||||
Returns:
|
||||
Total size in bytes
|
||||
"""
|
||||
if not directory.exists() or not directory.is_dir():
|
||||
return 0
|
||||
|
||||
total_size = 0
|
||||
try:
|
||||
for file_path in directory.rglob('*'):
|
||||
if file_path.is_file():
|
||||
total_size += file_path.stat().st_size
|
||||
except Exception:
|
||||
pass # Skip files we can't access
|
||||
|
||||
return total_size
|
||||
|
||||
def find_files(self, directory: Path, pattern: str = '*', recursive: bool = True) -> List[Path]:
|
||||
"""
|
||||
Find files matching pattern
|
||||
|
||||
Args:
|
||||
directory: Directory to search
|
||||
pattern: Glob pattern to match
|
||||
recursive: Whether to search recursively
|
||||
|
||||
Returns:
|
||||
List of matching file paths
|
||||
"""
|
||||
if not directory.exists() or not directory.is_dir():
|
||||
return []
|
||||
|
||||
try:
|
||||
if recursive:
|
||||
return list(directory.rglob(pattern))
|
||||
else:
|
||||
return list(directory.glob(pattern))
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def backup_file(self, file_path: Path, backup_suffix: str = '.backup') -> Optional[Path]:
|
||||
"""
|
||||
Create backup copy of file
|
||||
|
||||
Args:
|
||||
file_path: Path to file to backup
|
||||
backup_suffix: Suffix to add to backup file
|
||||
|
||||
Returns:
|
||||
Path to backup file or None if failed
|
||||
"""
|
||||
if not file_path.exists() or not file_path.is_file():
|
||||
return None
|
||||
|
||||
backup_path = file_path.with_suffix(file_path.suffix + backup_suffix)
|
||||
|
||||
if self.copy_file(file_path, backup_path):
|
||||
return backup_path
|
||||
return None
|
||||
|
||||
def get_free_space(self, path: Path) -> int:
|
||||
"""
|
||||
Get free disk space at path in bytes
|
||||
|
||||
Args:
|
||||
path: Path to check (can be file or directory)
|
||||
|
||||
Returns:
|
||||
Free space in bytes
|
||||
"""
|
||||
try:
|
||||
if path.is_file():
|
||||
path = path.parent
|
||||
|
||||
stat_result = shutil.disk_usage(path)
|
||||
return stat_result.free
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
def cleanup_tracked_files(self) -> None:
|
||||
"""Remove all files and directories created during this session"""
|
||||
if self.dry_run:
|
||||
print("[DRY RUN] Would cleanup tracked files")
|
||||
return
|
||||
|
||||
# Remove files first
|
||||
for file_path in reversed(self.copied_files):
|
||||
try:
|
||||
if file_path.exists():
|
||||
file_path.unlink()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Remove directories (in reverse order of creation)
|
||||
for directory in reversed(self.created_dirs):
|
||||
try:
|
||||
if directory.exists() and not any(directory.iterdir()):
|
||||
directory.rmdir()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self.copied_files.clear()
|
||||
self.created_dirs.clear()
|
||||
|
||||
def get_operation_summary(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Get summary of file operations performed
|
||||
|
||||
Returns:
|
||||
Dict with operation statistics
|
||||
"""
|
||||
return {
|
||||
'files_copied': len(self.copied_files),
|
||||
'directories_created': len(self.created_dirs),
|
||||
'dry_run': self.dry_run,
|
||||
'copied_files': [str(f) for f in self.copied_files],
|
||||
'created_directories': [str(d) for d in self.created_dirs]
|
||||
}
|
||||
395
setup/core/registry.py
Normal file
395
setup/core/registry.py
Normal file
@@ -0,0 +1,395 @@
|
||||
"""
|
||||
Component registry for auto-discovery and dependency resolution
|
||||
"""
|
||||
|
||||
import importlib
|
||||
import inspect
|
||||
from typing import Dict, List, Set, Optional, Type
|
||||
from pathlib import Path
|
||||
from ..base.component import Component
|
||||
|
||||
|
||||
class ComponentRegistry:
|
||||
"""Auto-discovery and management of installable components"""
|
||||
|
||||
def __init__(self, components_dir: Path):
|
||||
"""
|
||||
Initialize component registry
|
||||
|
||||
Args:
|
||||
components_dir: Directory containing component modules
|
||||
"""
|
||||
self.components_dir = components_dir
|
||||
self.component_classes: Dict[str, Type[Component]] = {}
|
||||
self.component_instances: Dict[str, Component] = {}
|
||||
self.dependency_graph: Dict[str, Set[str]] = {}
|
||||
self._discovered = False
|
||||
|
||||
def discover_components(self, force_reload: bool = False) -> None:
|
||||
"""
|
||||
Auto-discover all component classes in components directory
|
||||
|
||||
Args:
|
||||
force_reload: Force rediscovery even if already done
|
||||
"""
|
||||
if self._discovered and not force_reload:
|
||||
return
|
||||
|
||||
self.component_classes.clear()
|
||||
self.component_instances.clear()
|
||||
self.dependency_graph.clear()
|
||||
|
||||
if not self.components_dir.exists():
|
||||
return
|
||||
|
||||
# Add components directory to Python path temporarily
|
||||
import sys
|
||||
original_path = sys.path.copy()
|
||||
|
||||
try:
|
||||
# Add parent directory to path so we can import setup.components
|
||||
setup_dir = self.components_dir.parent
|
||||
if str(setup_dir) not in sys.path:
|
||||
sys.path.insert(0, str(setup_dir))
|
||||
|
||||
# Discover all Python files in components directory
|
||||
for py_file in self.components_dir.glob("*.py"):
|
||||
if py_file.name.startswith("__"):
|
||||
continue
|
||||
|
||||
module_name = py_file.stem
|
||||
self._load_component_module(module_name)
|
||||
|
||||
finally:
|
||||
# Restore original Python path
|
||||
sys.path = original_path
|
||||
|
||||
# Build dependency graph
|
||||
self._build_dependency_graph()
|
||||
self._discovered = True
|
||||
|
||||
def _load_component_module(self, module_name: str) -> None:
|
||||
"""
|
||||
Load component classes from a module
|
||||
|
||||
Args:
|
||||
module_name: Name of module to load
|
||||
"""
|
||||
try:
|
||||
# Import the module
|
||||
full_module_name = f"setup.components.{module_name}"
|
||||
module = importlib.import_module(full_module_name)
|
||||
|
||||
# Find all Component subclasses in the module
|
||||
for name, obj in inspect.getmembers(module):
|
||||
if (inspect.isclass(obj) and
|
||||
issubclass(obj, Component) and
|
||||
obj is not Component):
|
||||
|
||||
# Create instance to get metadata
|
||||
try:
|
||||
instance = obj()
|
||||
metadata = instance.get_metadata()
|
||||
component_name = metadata["name"]
|
||||
|
||||
self.component_classes[component_name] = obj
|
||||
self.component_instances[component_name] = instance
|
||||
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not instantiate component {name}: {e}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not load component module {module_name}: {e}")
|
||||
|
||||
def _build_dependency_graph(self) -> None:
|
||||
"""Build dependency graph for all discovered components"""
|
||||
for name, instance in self.component_instances.items():
|
||||
try:
|
||||
dependencies = instance.get_dependencies()
|
||||
self.dependency_graph[name] = set(dependencies)
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not get dependencies for {name}: {e}")
|
||||
self.dependency_graph[name] = set()
|
||||
|
||||
def get_component_class(self, component_name: str) -> Optional[Type[Component]]:
|
||||
"""
|
||||
Get component class by name
|
||||
|
||||
Args:
|
||||
component_name: Name of component
|
||||
|
||||
Returns:
|
||||
Component class or None if not found
|
||||
"""
|
||||
self.discover_components()
|
||||
return self.component_classes.get(component_name)
|
||||
|
||||
def get_component_instance(self, component_name: str, install_dir: Optional[Path] = None) -> Optional[Component]:
|
||||
"""
|
||||
Get component instance by name
|
||||
|
||||
Args:
|
||||
component_name: Name of component
|
||||
install_dir: Installation directory (creates new instance with this dir)
|
||||
|
||||
Returns:
|
||||
Component instance or None if not found
|
||||
"""
|
||||
self.discover_components()
|
||||
|
||||
if install_dir is not None:
|
||||
# Create new instance with specified install directory
|
||||
component_class = self.component_classes.get(component_name)
|
||||
if component_class:
|
||||
try:
|
||||
return component_class(install_dir)
|
||||
except Exception as e:
|
||||
print(f"Error creating component instance {component_name}: {e}")
|
||||
return None
|
||||
|
||||
return self.component_instances.get(component_name)
|
||||
|
||||
def list_components(self) -> List[str]:
|
||||
"""
|
||||
Get list of all discovered component names
|
||||
|
||||
Returns:
|
||||
List of component names
|
||||
"""
|
||||
self.discover_components()
|
||||
return list(self.component_classes.keys())
|
||||
|
||||
def get_component_metadata(self, component_name: str) -> Optional[Dict[str, str]]:
|
||||
"""
|
||||
Get metadata for a component
|
||||
|
||||
Args:
|
||||
component_name: Name of component
|
||||
|
||||
Returns:
|
||||
Component metadata dict or None if not found
|
||||
"""
|
||||
self.discover_components()
|
||||
instance = self.component_instances.get(component_name)
|
||||
if instance:
|
||||
try:
|
||||
return instance.get_metadata()
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
|
||||
def resolve_dependencies(self, component_names: List[str]) -> List[str]:
|
||||
"""
|
||||
Resolve component dependencies in correct installation order
|
||||
|
||||
Args:
|
||||
component_names: List of component names to install
|
||||
|
||||
Returns:
|
||||
Ordered list of component names including dependencies
|
||||
|
||||
Raises:
|
||||
ValueError: If circular dependencies detected or unknown component
|
||||
"""
|
||||
self.discover_components()
|
||||
|
||||
resolved = []
|
||||
resolving = set()
|
||||
|
||||
def resolve(name: str):
|
||||
if name in resolved:
|
||||
return
|
||||
|
||||
if name in resolving:
|
||||
raise ValueError(f"Circular dependency detected involving {name}")
|
||||
|
||||
if name not in self.dependency_graph:
|
||||
raise ValueError(f"Unknown component: {name}")
|
||||
|
||||
resolving.add(name)
|
||||
|
||||
# Resolve dependencies first
|
||||
for dep in self.dependency_graph[name]:
|
||||
resolve(dep)
|
||||
|
||||
resolving.remove(name)
|
||||
resolved.append(name)
|
||||
|
||||
# Resolve each requested component
|
||||
for name in component_names:
|
||||
resolve(name)
|
||||
|
||||
return resolved
|
||||
|
||||
def get_dependencies(self, component_name: str) -> Set[str]:
|
||||
"""
|
||||
Get direct dependencies for a component
|
||||
|
||||
Args:
|
||||
component_name: Name of component
|
||||
|
||||
Returns:
|
||||
Set of dependency component names
|
||||
"""
|
||||
self.discover_components()
|
||||
return self.dependency_graph.get(component_name, set())
|
||||
|
||||
def get_dependents(self, component_name: str) -> Set[str]:
|
||||
"""
|
||||
Get components that depend on the given component
|
||||
|
||||
Args:
|
||||
component_name: Name of component
|
||||
|
||||
Returns:
|
||||
Set of component names that depend on this component
|
||||
"""
|
||||
self.discover_components()
|
||||
dependents = set()
|
||||
|
||||
for name, deps in self.dependency_graph.items():
|
||||
if component_name in deps:
|
||||
dependents.add(name)
|
||||
|
||||
return dependents
|
||||
|
||||
def validate_dependency_graph(self) -> List[str]:
|
||||
"""
|
||||
Validate dependency graph for cycles and missing dependencies
|
||||
|
||||
Returns:
|
||||
List of validation errors (empty if valid)
|
||||
"""
|
||||
self.discover_components()
|
||||
errors = []
|
||||
|
||||
# Check for missing dependencies
|
||||
all_components = set(self.dependency_graph.keys())
|
||||
for name, deps in self.dependency_graph.items():
|
||||
missing_deps = deps - all_components
|
||||
if missing_deps:
|
||||
errors.append(f"Component {name} has missing dependencies: {missing_deps}")
|
||||
|
||||
# Check for circular dependencies
|
||||
for name in all_components:
|
||||
try:
|
||||
self.resolve_dependencies([name])
|
||||
except ValueError as e:
|
||||
errors.append(str(e))
|
||||
|
||||
return errors
|
||||
|
||||
def get_components_by_category(self, category: str) -> List[str]:
|
||||
"""
|
||||
Get components filtered by category
|
||||
|
||||
Args:
|
||||
category: Component category to filter by
|
||||
|
||||
Returns:
|
||||
List of component names in the category
|
||||
"""
|
||||
self.discover_components()
|
||||
components = []
|
||||
|
||||
for name, instance in self.component_instances.items():
|
||||
try:
|
||||
metadata = instance.get_metadata()
|
||||
if metadata.get("category") == category:
|
||||
components.append(name)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
return components
|
||||
|
||||
def get_installation_order(self, component_names: List[str]) -> List[List[str]]:
|
||||
"""
|
||||
Get installation order grouped by dependency levels
|
||||
|
||||
Args:
|
||||
component_names: List of component names to install
|
||||
|
||||
Returns:
|
||||
List of lists, where each inner list contains components
|
||||
that can be installed in parallel at that dependency level
|
||||
"""
|
||||
self.discover_components()
|
||||
|
||||
# Get all components including dependencies
|
||||
all_components = set(self.resolve_dependencies(component_names))
|
||||
|
||||
# Group by dependency level
|
||||
levels = []
|
||||
remaining = all_components.copy()
|
||||
|
||||
while remaining:
|
||||
# Find components with no unresolved dependencies
|
||||
current_level = []
|
||||
for name in list(remaining):
|
||||
deps = self.dependency_graph.get(name, set())
|
||||
unresolved_deps = deps & remaining
|
||||
|
||||
if not unresolved_deps:
|
||||
current_level.append(name)
|
||||
|
||||
if not current_level:
|
||||
# This shouldn't happen if dependency graph is valid
|
||||
raise ValueError("Circular dependency detected in installation order calculation")
|
||||
|
||||
levels.append(current_level)
|
||||
remaining -= set(current_level)
|
||||
|
||||
return levels
|
||||
|
||||
def create_component_instances(self, component_names: List[str], install_dir: Optional[Path] = None) -> Dict[str, Component]:
|
||||
"""
|
||||
Create instances for multiple components
|
||||
|
||||
Args:
|
||||
component_names: List of component names
|
||||
install_dir: Installation directory for instances
|
||||
|
||||
Returns:
|
||||
Dict mapping component names to instances
|
||||
"""
|
||||
self.discover_components()
|
||||
instances = {}
|
||||
|
||||
for name in component_names:
|
||||
instance = self.get_component_instance(name, install_dir)
|
||||
if instance:
|
||||
instances[name] = instance
|
||||
else:
|
||||
print(f"Warning: Could not create instance for component {name}")
|
||||
|
||||
return instances
|
||||
|
||||
def get_registry_info(self) -> Dict[str, any]:
|
||||
"""
|
||||
Get comprehensive registry information
|
||||
|
||||
Returns:
|
||||
Dict with registry statistics and component info
|
||||
"""
|
||||
self.discover_components()
|
||||
|
||||
# Group components by category
|
||||
categories = {}
|
||||
for name, instance in self.component_instances.items():
|
||||
try:
|
||||
metadata = instance.get_metadata()
|
||||
category = metadata.get("category", "unknown")
|
||||
if category not in categories:
|
||||
categories[category] = []
|
||||
categories[category].append(name)
|
||||
except Exception:
|
||||
if "unknown" not in categories:
|
||||
categories["unknown"] = []
|
||||
categories["unknown"].append(name)
|
||||
|
||||
return {
|
||||
"total_components": len(self.component_classes),
|
||||
"categories": categories,
|
||||
"dependency_graph": {name: list(deps) for name, deps in self.dependency_graph.items()},
|
||||
"validation_errors": self.validate_dependency_graph()
|
||||
}
|
||||
380
setup/core/settings_manager.py
Normal file
380
setup/core/settings_manager.py
Normal file
@@ -0,0 +1,380 @@
|
||||
"""
|
||||
Settings management for SuperClaude installation system
|
||||
Handles settings.json manipulation with deep merge and backup
|
||||
"""
|
||||
|
||||
import json
|
||||
import shutil
|
||||
from typing import Dict, Any, Optional, List
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
import copy
|
||||
|
||||
|
||||
class SettingsManager:
|
||||
"""Manages settings.json file operations"""
|
||||
|
||||
def __init__(self, install_dir: Path):
|
||||
"""
|
||||
Initialize settings manager
|
||||
|
||||
Args:
|
||||
install_dir: Installation directory containing settings.json
|
||||
"""
|
||||
self.install_dir = install_dir
|
||||
self.settings_file = install_dir / "settings.json"
|
||||
self.backup_dir = install_dir / "backups" / "settings"
|
||||
|
||||
def load_settings(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Load settings from settings.json
|
||||
|
||||
Returns:
|
||||
Settings dict (empty if file doesn't exist)
|
||||
"""
|
||||
if not self.settings_file.exists():
|
||||
return {}
|
||||
|
||||
try:
|
||||
with open(self.settings_file, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, IOError) as e:
|
||||
raise ValueError(f"Could not load settings from {self.settings_file}: {e}")
|
||||
|
||||
def save_settings(self, settings: Dict[str, Any], create_backup: bool = True) -> None:
|
||||
"""
|
||||
Save settings to settings.json with optional backup
|
||||
|
||||
Args:
|
||||
settings: Settings dict to save
|
||||
create_backup: Whether to create backup before saving
|
||||
"""
|
||||
# Create backup if requested and file exists
|
||||
if create_backup and self.settings_file.exists():
|
||||
self._create_settings_backup()
|
||||
|
||||
# Ensure directory exists
|
||||
self.settings_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Save with pretty formatting
|
||||
try:
|
||||
with open(self.settings_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(settings, f, indent=2, ensure_ascii=False, sort_keys=True)
|
||||
except IOError as e:
|
||||
raise ValueError(f"Could not save settings to {self.settings_file}: {e}")
|
||||
|
||||
def merge_settings(self, modifications: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Deep merge modifications into existing settings
|
||||
|
||||
Args:
|
||||
modifications: Settings modifications to merge
|
||||
|
||||
Returns:
|
||||
Merged settings dict
|
||||
"""
|
||||
existing = self.load_settings()
|
||||
return self._deep_merge(existing, modifications)
|
||||
|
||||
def update_settings(self, modifications: Dict[str, Any], create_backup: bool = True) -> None:
|
||||
"""
|
||||
Update settings with modifications
|
||||
|
||||
Args:
|
||||
modifications: Settings modifications to apply
|
||||
create_backup: Whether to create backup before updating
|
||||
"""
|
||||
merged = self.merge_settings(modifications)
|
||||
self.save_settings(merged, create_backup)
|
||||
|
||||
def get_setting(self, key_path: str, default: Any = None) -> Any:
|
||||
"""
|
||||
Get setting value using dot-notation path
|
||||
|
||||
Args:
|
||||
key_path: Dot-separated path (e.g., "hooks.enabled")
|
||||
default: Default value if key not found
|
||||
|
||||
Returns:
|
||||
Setting value or default
|
||||
"""
|
||||
settings = self.load_settings()
|
||||
|
||||
try:
|
||||
value = settings
|
||||
for key in key_path.split('.'):
|
||||
value = value[key]
|
||||
return value
|
||||
except (KeyError, TypeError):
|
||||
return default
|
||||
|
||||
def set_setting(self, key_path: str, value: Any, create_backup: bool = True) -> None:
|
||||
"""
|
||||
Set setting value using dot-notation path
|
||||
|
||||
Args:
|
||||
key_path: Dot-separated path (e.g., "hooks.enabled")
|
||||
value: Value to set
|
||||
create_backup: Whether to create backup before updating
|
||||
"""
|
||||
# Build nested dict structure
|
||||
keys = key_path.split('.')
|
||||
modification = {}
|
||||
current = modification
|
||||
|
||||
for key in keys[:-1]:
|
||||
current[key] = {}
|
||||
current = current[key]
|
||||
|
||||
current[keys[-1]] = value
|
||||
|
||||
self.update_settings(modification, create_backup)
|
||||
|
||||
def remove_setting(self, key_path: str, create_backup: bool = True) -> bool:
|
||||
"""
|
||||
Remove setting using dot-notation path
|
||||
|
||||
Args:
|
||||
key_path: Dot-separated path to remove
|
||||
create_backup: Whether to create backup before updating
|
||||
|
||||
Returns:
|
||||
True if setting was removed, False if not found
|
||||
"""
|
||||
settings = self.load_settings()
|
||||
keys = key_path.split('.')
|
||||
|
||||
# Navigate to parent of target key
|
||||
current = settings
|
||||
try:
|
||||
for key in keys[:-1]:
|
||||
current = current[key]
|
||||
|
||||
# Remove the target key
|
||||
if keys[-1] in current:
|
||||
del current[keys[-1]]
|
||||
self.save_settings(settings, create_backup)
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
except (KeyError, TypeError):
|
||||
return False
|
||||
|
||||
def add_component_registration(self, component_name: str, component_info: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Add component to registry in settings
|
||||
|
||||
Args:
|
||||
component_name: Name of component
|
||||
component_info: Component metadata dict
|
||||
"""
|
||||
modification = {
|
||||
"components": {
|
||||
component_name: {
|
||||
**component_info,
|
||||
"installed_at": datetime.now().isoformat()
|
||||
}
|
||||
}
|
||||
}
|
||||
self.update_settings(modification)
|
||||
|
||||
def remove_component_registration(self, component_name: str) -> bool:
|
||||
"""
|
||||
Remove component from registry in settings
|
||||
|
||||
Args:
|
||||
component_name: Name of component to remove
|
||||
|
||||
Returns:
|
||||
True if component was removed, False if not found
|
||||
"""
|
||||
return self.remove_setting(f"components.{component_name}")
|
||||
|
||||
def get_installed_components(self) -> Dict[str, Dict[str, Any]]:
|
||||
"""
|
||||
Get all installed components from registry
|
||||
|
||||
Returns:
|
||||
Dict of component_name -> component_info
|
||||
"""
|
||||
return self.get_setting("components", {})
|
||||
|
||||
def is_component_installed(self, component_name: str) -> bool:
|
||||
"""
|
||||
Check if component is registered as installed
|
||||
|
||||
Args:
|
||||
component_name: Name of component to check
|
||||
|
||||
Returns:
|
||||
True if component is installed, False otherwise
|
||||
"""
|
||||
components = self.get_installed_components()
|
||||
return component_name in components
|
||||
|
||||
def get_component_version(self, component_name: str) -> Optional[str]:
|
||||
"""
|
||||
Get installed version of component
|
||||
|
||||
Args:
|
||||
component_name: Name of component
|
||||
|
||||
Returns:
|
||||
Version string or None if not installed
|
||||
"""
|
||||
components = self.get_installed_components()
|
||||
component_info = components.get(component_name, {})
|
||||
return component_info.get("version")
|
||||
|
||||
def update_framework_version(self, version: str) -> None:
|
||||
"""
|
||||
Update SuperClaude framework version in settings
|
||||
|
||||
Args:
|
||||
version: Framework version string
|
||||
"""
|
||||
modification = {
|
||||
"framework": {
|
||||
"version": version,
|
||||
"updated_at": datetime.now().isoformat()
|
||||
}
|
||||
}
|
||||
self.update_settings(modification)
|
||||
|
||||
def get_framework_version(self) -> Optional[str]:
|
||||
"""
|
||||
Get SuperClaude framework version from settings
|
||||
|
||||
Returns:
|
||||
Version string or None if not set
|
||||
"""
|
||||
return self.get_setting("framework.version")
|
||||
|
||||
def _deep_merge(self, base: Dict[str, Any], overlay: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Deep merge two dictionaries
|
||||
|
||||
Args:
|
||||
base: Base dictionary
|
||||
overlay: Dictionary to merge on top
|
||||
|
||||
Returns:
|
||||
Merged dictionary
|
||||
"""
|
||||
result = copy.deepcopy(base)
|
||||
|
||||
for key, value in overlay.items():
|
||||
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
|
||||
result[key] = self._deep_merge(result[key], value)
|
||||
else:
|
||||
result[key] = copy.deepcopy(value)
|
||||
|
||||
return result
|
||||
|
||||
def _create_settings_backup(self) -> Path:
|
||||
"""
|
||||
Create timestamped backup of settings.json
|
||||
|
||||
Returns:
|
||||
Path to backup file
|
||||
"""
|
||||
if not self.settings_file.exists():
|
||||
raise ValueError("Cannot backup non-existent settings file")
|
||||
|
||||
# Create backup directory
|
||||
self.backup_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create timestamped backup
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
backup_file = self.backup_dir / f"settings_{timestamp}.json"
|
||||
|
||||
shutil.copy2(self.settings_file, backup_file)
|
||||
|
||||
# Keep only last 10 backups
|
||||
self._cleanup_old_backups()
|
||||
|
||||
return backup_file
|
||||
|
||||
def _cleanup_old_backups(self, keep_count: int = 10) -> None:
|
||||
"""
|
||||
Remove old backup files, keeping only the most recent
|
||||
|
||||
Args:
|
||||
keep_count: Number of backups to keep
|
||||
"""
|
||||
if not self.backup_dir.exists():
|
||||
return
|
||||
|
||||
# Get all backup files sorted by modification time
|
||||
backup_files = []
|
||||
for file in self.backup_dir.glob("settings_*.json"):
|
||||
backup_files.append((file.stat().st_mtime, file))
|
||||
|
||||
backup_files.sort(reverse=True) # Most recent first
|
||||
|
||||
# Remove old backups
|
||||
for _, file in backup_files[keep_count:]:
|
||||
try:
|
||||
file.unlink()
|
||||
except OSError:
|
||||
pass # Ignore errors when cleaning up
|
||||
|
||||
def list_backups(self) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
List available settings backups
|
||||
|
||||
Returns:
|
||||
List of backup info dicts with name, path, and timestamp
|
||||
"""
|
||||
if not self.backup_dir.exists():
|
||||
return []
|
||||
|
||||
backups = []
|
||||
for file in self.backup_dir.glob("settings_*.json"):
|
||||
try:
|
||||
stat = file.stat()
|
||||
backups.append({
|
||||
"name": file.name,
|
||||
"path": str(file),
|
||||
"size": stat.st_size,
|
||||
"created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
|
||||
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat()
|
||||
})
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
# Sort by creation time, most recent first
|
||||
backups.sort(key=lambda x: x["created"], reverse=True)
|
||||
return backups
|
||||
|
||||
def restore_backup(self, backup_name: str) -> bool:
|
||||
"""
|
||||
Restore settings from backup
|
||||
|
||||
Args:
|
||||
backup_name: Name of backup file to restore
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
backup_file = self.backup_dir / backup_name
|
||||
|
||||
if not backup_file.exists():
|
||||
return False
|
||||
|
||||
try:
|
||||
# Validate backup file first
|
||||
with open(backup_file, 'r', encoding='utf-8') as f:
|
||||
json.load(f) # Will raise exception if invalid
|
||||
|
||||
# Create backup of current settings
|
||||
if self.settings_file.exists():
|
||||
self._create_settings_backup()
|
||||
|
||||
# Restore backup
|
||||
shutil.copy2(backup_file, self.settings_file)
|
||||
return True
|
||||
|
||||
except (json.JSONDecodeError, IOError):
|
||||
return False
|
||||
681
setup/core/validator.py
Normal file
681
setup/core/validator.py
Normal file
@@ -0,0 +1,681 @@
|
||||
"""
|
||||
System validation for SuperClaude installation requirements
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import sys
|
||||
import shutil
|
||||
from typing import Tuple, List, Dict, Any, Optional
|
||||
from pathlib import Path
|
||||
import re
|
||||
|
||||
# Handle packaging import - if not available, use a simple version comparison
|
||||
try:
|
||||
from packaging import version
|
||||
PACKAGING_AVAILABLE = True
|
||||
except ImportError:
|
||||
PACKAGING_AVAILABLE = False
|
||||
|
||||
class SimpleVersion:
|
||||
def __init__(self, version_str: str):
|
||||
self.version_str = version_str
|
||||
# Simple version parsing: split by dots and convert to integers
|
||||
try:
|
||||
self.parts = [int(x) for x in version_str.split('.')]
|
||||
except ValueError:
|
||||
self.parts = [0, 0, 0]
|
||||
|
||||
def __lt__(self, other):
|
||||
if isinstance(other, str):
|
||||
other = SimpleVersion(other)
|
||||
# Pad with zeros to same length
|
||||
max_len = max(len(self.parts), len(other.parts))
|
||||
self_parts = self.parts + [0] * (max_len - len(self.parts))
|
||||
other_parts = other.parts + [0] * (max_len - len(other.parts))
|
||||
return self_parts < other_parts
|
||||
|
||||
def __gt__(self, other):
|
||||
if isinstance(other, str):
|
||||
other = SimpleVersion(other)
|
||||
return not (self < other) and not (self == other)
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, str):
|
||||
other = SimpleVersion(other)
|
||||
return self.parts == other.parts
|
||||
|
||||
class version:
|
||||
@staticmethod
|
||||
def parse(version_str: str):
|
||||
return SimpleVersion(version_str)
|
||||
|
||||
|
||||
class Validator:
|
||||
"""System requirements validator"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize validator"""
|
||||
self.validation_cache: Dict[str, Any] = {}
|
||||
|
||||
def check_python(self, min_version: str = "3.8", max_version: Optional[str] = None) -> Tuple[bool, str]:
|
||||
"""
|
||||
Check Python version requirements
|
||||
|
||||
Args:
|
||||
min_version: Minimum required Python version
|
||||
max_version: Maximum supported Python version (optional)
|
||||
|
||||
Returns:
|
||||
Tuple of (success: bool, message: str)
|
||||
"""
|
||||
cache_key = f"python_{min_version}_{max_version}"
|
||||
if cache_key in self.validation_cache:
|
||||
return self.validation_cache[cache_key]
|
||||
|
||||
try:
|
||||
# Get current Python version
|
||||
current_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
|
||||
|
||||
# Check minimum version
|
||||
if version.parse(current_version) < version.parse(min_version):
|
||||
help_msg = self.get_installation_help("python")
|
||||
result = (False, f"Python {min_version}+ required, found {current_version}{help_msg}")
|
||||
self.validation_cache[cache_key] = result
|
||||
return result
|
||||
|
||||
# Check maximum version if specified
|
||||
if max_version and version.parse(current_version) > version.parse(max_version):
|
||||
result = (False, f"Python version {current_version} exceeds maximum supported {max_version}")
|
||||
self.validation_cache[cache_key] = result
|
||||
return result
|
||||
|
||||
result = (True, f"Python {current_version} meets requirements")
|
||||
self.validation_cache[cache_key] = result
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
result = (False, f"Could not check Python version: {e}")
|
||||
self.validation_cache[cache_key] = result
|
||||
return result
|
||||
|
||||
def check_node(self, min_version: str = "16.0", max_version: Optional[str] = None) -> Tuple[bool, str]:
|
||||
"""
|
||||
Check Node.js version requirements
|
||||
|
||||
Args:
|
||||
min_version: Minimum required Node.js version
|
||||
max_version: Maximum supported Node.js version (optional)
|
||||
|
||||
Returns:
|
||||
Tuple of (success: bool, message: str)
|
||||
"""
|
||||
cache_key = f"node_{min_version}_{max_version}"
|
||||
if cache_key in self.validation_cache:
|
||||
return self.validation_cache[cache_key]
|
||||
|
||||
try:
|
||||
# Check if node is installed
|
||||
result = subprocess.run(
|
||||
['node', '--version'],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
help_msg = self.get_installation_help("node")
|
||||
result_tuple = (False, f"Node.js not found in PATH{help_msg}")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
# Parse version (format: v18.17.0)
|
||||
version_output = result.stdout.strip()
|
||||
if version_output.startswith('v'):
|
||||
current_version = version_output[1:]
|
||||
else:
|
||||
current_version = version_output
|
||||
|
||||
# Check minimum version
|
||||
if version.parse(current_version) < version.parse(min_version):
|
||||
help_msg = self.get_installation_help("node")
|
||||
result_tuple = (False, f"Node.js {min_version}+ required, found {current_version}{help_msg}")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
# Check maximum version if specified
|
||||
if max_version and version.parse(current_version) > version.parse(max_version):
|
||||
result_tuple = (False, f"Node.js version {current_version} exceeds maximum supported {max_version}")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
result_tuple = (True, f"Node.js {current_version} meets requirements")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
result_tuple = (False, "Node.js version check timed out")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
except FileNotFoundError:
|
||||
help_msg = self.get_installation_help("node")
|
||||
result_tuple = (False, f"Node.js not found in PATH{help_msg}")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
except Exception as e:
|
||||
result_tuple = (False, f"Could not check Node.js version: {e}")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
def check_claude_cli(self, min_version: Optional[str] = None) -> Tuple[bool, str]:
|
||||
"""
|
||||
Check Claude CLI installation and version
|
||||
|
||||
Args:
|
||||
min_version: Minimum required Claude CLI version (optional)
|
||||
|
||||
Returns:
|
||||
Tuple of (success: bool, message: str)
|
||||
"""
|
||||
cache_key = f"claude_cli_{min_version}"
|
||||
if cache_key in self.validation_cache:
|
||||
return self.validation_cache[cache_key]
|
||||
|
||||
try:
|
||||
# Check if claude is installed
|
||||
result = subprocess.run(
|
||||
['claude', '--version'],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
help_msg = self.get_installation_help("claude_cli")
|
||||
result_tuple = (False, f"Claude CLI not found in PATH{help_msg}")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
# Parse version from output
|
||||
version_output = result.stdout.strip()
|
||||
version_match = re.search(r'(\d+\.\d+\.\d+)', version_output)
|
||||
|
||||
if not version_match:
|
||||
result_tuple = (True, "Claude CLI found (version format unknown)")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
current_version = version_match.group(1)
|
||||
|
||||
# Check minimum version if specified
|
||||
if min_version and version.parse(current_version) < version.parse(min_version):
|
||||
result_tuple = (False, f"Claude CLI {min_version}+ required, found {current_version}")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
result_tuple = (True, f"Claude CLI {current_version} found")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
result_tuple = (False, "Claude CLI version check timed out")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
except FileNotFoundError:
|
||||
help_msg = self.get_installation_help("claude_cli")
|
||||
result_tuple = (False, f"Claude CLI not found in PATH{help_msg}")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
except Exception as e:
|
||||
result_tuple = (False, f"Could not check Claude CLI: {e}")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
def check_external_tool(self, tool_name: str, command: str, min_version: Optional[str] = None) -> Tuple[bool, str]:
|
||||
"""
|
||||
Check external tool availability and version
|
||||
|
||||
Args:
|
||||
tool_name: Display name of tool
|
||||
command: Command to check version
|
||||
min_version: Minimum required version (optional)
|
||||
|
||||
Returns:
|
||||
Tuple of (success: bool, message: str)
|
||||
"""
|
||||
cache_key = f"tool_{tool_name}_{command}_{min_version}"
|
||||
if cache_key in self.validation_cache:
|
||||
return self.validation_cache[cache_key]
|
||||
|
||||
try:
|
||||
# Split command into parts
|
||||
cmd_parts = command.split()
|
||||
|
||||
result = subprocess.run(
|
||||
cmd_parts,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
result_tuple = (False, f"{tool_name} not found or command failed")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
# Extract version if min_version specified
|
||||
if min_version:
|
||||
version_output = result.stdout + result.stderr
|
||||
version_match = re.search(r'(\d+\.\d+(?:\.\d+)?)', version_output)
|
||||
|
||||
if version_match:
|
||||
current_version = version_match.group(1)
|
||||
|
||||
if version.parse(current_version) < version.parse(min_version):
|
||||
result_tuple = (False, f"{tool_name} {min_version}+ required, found {current_version}")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
result_tuple = (True, f"{tool_name} {current_version} found")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
else:
|
||||
result_tuple = (True, f"{tool_name} found (version unknown)")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
else:
|
||||
result_tuple = (True, f"{tool_name} found")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
result_tuple = (False, f"{tool_name} check timed out")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
except FileNotFoundError:
|
||||
result_tuple = (False, f"{tool_name} not found in PATH")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
except Exception as e:
|
||||
result_tuple = (False, f"Could not check {tool_name}: {e}")
|
||||
self.validation_cache[cache_key] = result_tuple
|
||||
return result_tuple
|
||||
|
||||
def check_disk_space(self, path: Path, required_mb: int = 500) -> Tuple[bool, str]:
|
||||
"""
|
||||
Check available disk space
|
||||
|
||||
Args:
|
||||
path: Path to check (file or directory)
|
||||
required_mb: Required free space in MB
|
||||
|
||||
Returns:
|
||||
Tuple of (success: bool, message: str)
|
||||
"""
|
||||
cache_key = f"disk_{path}_{required_mb}"
|
||||
if cache_key in self.validation_cache:
|
||||
return self.validation_cache[cache_key]
|
||||
|
||||
try:
|
||||
# Get parent directory if path is a file
|
||||
check_path = path.parent if path.is_file() else path
|
||||
|
||||
# Get disk usage
|
||||
stat_result = shutil.disk_usage(check_path)
|
||||
free_mb = stat_result.free / (1024 * 1024)
|
||||
|
||||
if free_mb < required_mb:
|
||||
result = (False, f"Insufficient disk space: {free_mb:.1f}MB free, {required_mb}MB required")
|
||||
else:
|
||||
result = (True, f"Sufficient disk space: {free_mb:.1f}MB free")
|
||||
|
||||
self.validation_cache[cache_key] = result
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
result = (False, f"Could not check disk space: {e}")
|
||||
self.validation_cache[cache_key] = result
|
||||
return result
|
||||
|
||||
def check_write_permissions(self, path: Path) -> Tuple[bool, str]:
|
||||
"""
|
||||
Check write permissions for path
|
||||
|
||||
Args:
|
||||
path: Path to check
|
||||
|
||||
Returns:
|
||||
Tuple of (success: bool, message: str)
|
||||
"""
|
||||
cache_key = f"write_{path}"
|
||||
if cache_key in self.validation_cache:
|
||||
return self.validation_cache[cache_key]
|
||||
|
||||
try:
|
||||
# Create parent directories if needed
|
||||
if not path.exists():
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Test write access
|
||||
test_file = path / ".write_test"
|
||||
test_file.touch()
|
||||
test_file.unlink()
|
||||
|
||||
result = (True, f"Write access confirmed for {path}")
|
||||
self.validation_cache[cache_key] = result
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
result = (False, f"No write access to {path}: {e}")
|
||||
self.validation_cache[cache_key] = result
|
||||
return result
|
||||
|
||||
def validate_requirements(self, requirements: Dict[str, Any]) -> Tuple[bool, List[str]]:
|
||||
"""
|
||||
Validate all system requirements
|
||||
|
||||
Args:
|
||||
requirements: Requirements configuration dict
|
||||
|
||||
Returns:
|
||||
Tuple of (all_passed: bool, error_messages: List[str])
|
||||
"""
|
||||
errors = []
|
||||
|
||||
# Check Python requirements
|
||||
if "python" in requirements:
|
||||
python_req = requirements["python"]
|
||||
success, message = self.check_python(
|
||||
python_req["min_version"],
|
||||
python_req.get("max_version")
|
||||
)
|
||||
if not success:
|
||||
errors.append(f"Python: {message}")
|
||||
|
||||
# Check Node.js requirements
|
||||
if "node" in requirements:
|
||||
node_req = requirements["node"]
|
||||
success, message = self.check_node(
|
||||
node_req["min_version"],
|
||||
node_req.get("max_version")
|
||||
)
|
||||
if not success:
|
||||
errors.append(f"Node.js: {message}")
|
||||
|
||||
# Check disk space
|
||||
if "disk_space_mb" in requirements:
|
||||
success, message = self.check_disk_space(
|
||||
Path.home(),
|
||||
requirements["disk_space_mb"]
|
||||
)
|
||||
if not success:
|
||||
errors.append(f"Disk space: {message}")
|
||||
|
||||
# Check external tools
|
||||
if "external_tools" in requirements:
|
||||
for tool_name, tool_req in requirements["external_tools"].items():
|
||||
# Skip optional tools that fail
|
||||
is_optional = tool_req.get("optional", False)
|
||||
|
||||
success, message = self.check_external_tool(
|
||||
tool_name,
|
||||
tool_req["command"],
|
||||
tool_req.get("min_version")
|
||||
)
|
||||
|
||||
if not success and not is_optional:
|
||||
errors.append(f"{tool_name}: {message}")
|
||||
|
||||
return len(errors) == 0, errors
|
||||
|
||||
def validate_component_requirements(self, component_names: List[str], all_requirements: Dict[str, Any]) -> Tuple[bool, List[str]]:
|
||||
"""
|
||||
Validate requirements for specific components
|
||||
|
||||
Args:
|
||||
component_names: List of component names to validate
|
||||
all_requirements: Full requirements configuration
|
||||
|
||||
Returns:
|
||||
Tuple of (all_passed: bool, error_messages: List[str])
|
||||
"""
|
||||
errors = []
|
||||
|
||||
# Start with base requirements
|
||||
base_requirements = {
|
||||
"python": all_requirements.get("python", {}),
|
||||
"disk_space_mb": all_requirements.get("disk_space_mb", 500)
|
||||
}
|
||||
|
||||
# Add conditional requirements based on components
|
||||
external_tools = {}
|
||||
|
||||
# Check if any component needs Node.js
|
||||
node_components = []
|
||||
for component in component_names:
|
||||
# This would be enhanced with actual component metadata
|
||||
if component in ["mcp"]: # MCP component needs Node.js
|
||||
node_components.append(component)
|
||||
|
||||
if node_components and "node" in all_requirements:
|
||||
base_requirements["node"] = all_requirements["node"]
|
||||
|
||||
# Add external tools needed by components
|
||||
if "external_tools" in all_requirements:
|
||||
for tool_name, tool_req in all_requirements["external_tools"].items():
|
||||
required_for = tool_req.get("required_for", [])
|
||||
|
||||
# Check if any of our components need this tool
|
||||
if any(comp in required_for for comp in component_names):
|
||||
external_tools[tool_name] = tool_req
|
||||
|
||||
if external_tools:
|
||||
base_requirements["external_tools"] = external_tools
|
||||
|
||||
# Validate consolidated requirements
|
||||
return self.validate_requirements(base_requirements)
|
||||
|
||||
def get_system_info(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Get comprehensive system information
|
||||
|
||||
Returns:
|
||||
Dict with system information
|
||||
"""
|
||||
info = {
|
||||
"platform": sys.platform,
|
||||
"python_version": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
|
||||
"python_executable": sys.executable
|
||||
}
|
||||
|
||||
# Add Node.js info if available
|
||||
node_success, node_msg = self.check_node()
|
||||
info["node_available"] = node_success
|
||||
if node_success:
|
||||
info["node_message"] = node_msg
|
||||
|
||||
# Add Claude CLI info if available
|
||||
claude_success, claude_msg = self.check_claude_cli()
|
||||
info["claude_cli_available"] = claude_success
|
||||
if claude_success:
|
||||
info["claude_cli_message"] = claude_msg
|
||||
|
||||
# Add disk space info
|
||||
try:
|
||||
home_path = Path.home()
|
||||
stat_result = shutil.disk_usage(home_path)
|
||||
info["disk_space"] = {
|
||||
"total_gb": stat_result.total / (1024**3),
|
||||
"free_gb": stat_result.free / (1024**3),
|
||||
"used_gb": (stat_result.total - stat_result.free) / (1024**3)
|
||||
}
|
||||
except Exception:
|
||||
info["disk_space"] = {"error": "Could not determine disk space"}
|
||||
|
||||
return info
|
||||
|
||||
def get_platform(self) -> str:
|
||||
"""
|
||||
Get current platform for installation commands
|
||||
|
||||
Returns:
|
||||
Platform string (linux, darwin, win32)
|
||||
"""
|
||||
return sys.platform
|
||||
|
||||
def load_installation_commands(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Load installation commands from requirements configuration
|
||||
|
||||
Returns:
|
||||
Installation commands dict
|
||||
"""
|
||||
try:
|
||||
from .config_manager import ConfigManager
|
||||
from .. import PROJECT_ROOT
|
||||
|
||||
config_manager = ConfigManager(PROJECT_ROOT / "config")
|
||||
requirements = config_manager.load_requirements()
|
||||
return requirements.get("installation_commands", {})
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
def get_installation_help(self, tool_name: str, platform: Optional[str] = None) -> str:
|
||||
"""
|
||||
Get installation help for a specific tool
|
||||
|
||||
Args:
|
||||
tool_name: Name of tool to get help for
|
||||
platform: Target platform (auto-detected if None)
|
||||
|
||||
Returns:
|
||||
Installation help string
|
||||
"""
|
||||
if platform is None:
|
||||
platform = self.get_platform()
|
||||
|
||||
commands = self.load_installation_commands()
|
||||
tool_commands = commands.get(tool_name, {})
|
||||
|
||||
if not tool_commands:
|
||||
return f"No installation instructions available for {tool_name}"
|
||||
|
||||
# Get platform-specific command or fallback to 'all'
|
||||
install_cmd = tool_commands.get(platform, tool_commands.get("all", ""))
|
||||
description = tool_commands.get("description", "")
|
||||
|
||||
if install_cmd:
|
||||
help_text = f"\n💡 Installation Help for {tool_name}:\n"
|
||||
if description:
|
||||
help_text += f" {description}\n"
|
||||
help_text += f" Command: {install_cmd}\n"
|
||||
return help_text
|
||||
|
||||
return f"No installation instructions available for {tool_name} on {platform}"
|
||||
|
||||
def diagnose_system(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Perform comprehensive system diagnostics
|
||||
|
||||
Returns:
|
||||
Diagnostic information dict
|
||||
"""
|
||||
diagnostics = {
|
||||
"platform": self.get_platform(),
|
||||
"checks": {},
|
||||
"issues": [],
|
||||
"recommendations": []
|
||||
}
|
||||
|
||||
# Check Python
|
||||
python_success, python_msg = self.check_python()
|
||||
diagnostics["checks"]["python"] = {
|
||||
"status": "pass" if python_success else "fail",
|
||||
"message": python_msg
|
||||
}
|
||||
if not python_success:
|
||||
diagnostics["issues"].append("Python version issue")
|
||||
diagnostics["recommendations"].append(self.get_installation_help("python"))
|
||||
|
||||
# Check Node.js
|
||||
node_success, node_msg = self.check_node()
|
||||
diagnostics["checks"]["node"] = {
|
||||
"status": "pass" if node_success else "fail",
|
||||
"message": node_msg
|
||||
}
|
||||
if not node_success:
|
||||
diagnostics["issues"].append("Node.js not found or version issue")
|
||||
diagnostics["recommendations"].append(self.get_installation_help("node"))
|
||||
|
||||
# Check Claude CLI
|
||||
claude_success, claude_msg = self.check_claude_cli()
|
||||
diagnostics["checks"]["claude_cli"] = {
|
||||
"status": "pass" if claude_success else "fail",
|
||||
"message": claude_msg
|
||||
}
|
||||
if not claude_success:
|
||||
diagnostics["issues"].append("Claude CLI not found")
|
||||
diagnostics["recommendations"].append(self.get_installation_help("claude_cli"))
|
||||
|
||||
# Check disk space
|
||||
disk_success, disk_msg = self.check_disk_space(Path.home())
|
||||
diagnostics["checks"]["disk_space"] = {
|
||||
"status": "pass" if disk_success else "fail",
|
||||
"message": disk_msg
|
||||
}
|
||||
if not disk_success:
|
||||
diagnostics["issues"].append("Insufficient disk space")
|
||||
|
||||
# Check common PATH issues
|
||||
self._diagnose_path_issues(diagnostics)
|
||||
|
||||
return diagnostics
|
||||
|
||||
def _diagnose_path_issues(self, diagnostics: Dict[str, Any]) -> None:
|
||||
"""Add PATH-related diagnostics"""
|
||||
path_issues = []
|
||||
|
||||
# Check if tools are in PATH, with alternatives for some tools
|
||||
tool_checks = [
|
||||
# For Python, check if either python3 OR python is available
|
||||
(["python3", "python"], "Python (python3 or python)"),
|
||||
(["node"], "Node.js"),
|
||||
(["npm"], "npm"),
|
||||
(["claude"], "Claude CLI")
|
||||
]
|
||||
|
||||
for tool_alternatives, display_name in tool_checks:
|
||||
tool_found = False
|
||||
for tool in tool_alternatives:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["which" if sys.platform != "win32" else "where", tool],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
if result.returncode == 0:
|
||||
tool_found = True
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not tool_found:
|
||||
# Only report as missing if none of the alternatives were found
|
||||
if len(tool_alternatives) > 1:
|
||||
path_issues.append(f"{display_name} not found in PATH")
|
||||
else:
|
||||
path_issues.append(f"{tool_alternatives[0]} not found in PATH")
|
||||
|
||||
if path_issues:
|
||||
diagnostics["issues"].extend(path_issues)
|
||||
diagnostics["recommendations"].append(
|
||||
"\n💡 PATH Issue Help:\n"
|
||||
" Some tools may not be in your PATH. Try:\n"
|
||||
" - Restart your terminal after installation\n"
|
||||
" - Check your shell configuration (.bashrc, .zshrc)\n"
|
||||
" - Use full paths to tools if needed\n"
|
||||
)
|
||||
|
||||
def clear_cache(self) -> None:
|
||||
"""Clear validation cache"""
|
||||
self.validation_cache.clear()
|
||||
Reference in New Issue
Block a user