refactor: PEP8 compliance - directory rename and code formatting (#425)

* fix(orchestration): add WebFetch auto-trigger for infrastructure configuration

Problem: Infrastructure configuration changes (e.g., Traefik port settings)
were being made based on assumptions without consulting official documentation,
violating the 'Evidence > assumptions' principle in PRINCIPLES.md.

Solution:
- Added Infrastructure Configuration Validation section to MODE_Orchestration.md
- Auto-triggers WebFetch for infrastructure tools (Traefik, nginx, Docker, etc.)
- Enforces MODE_DeepResearch activation for investigation
- BLOCKS assumption-based configuration changes

Testing: Verified WebFetch successfully retrieves Traefik official docs (port 80 default)

This prevents production outages from infrastructure misconfiguration by ensuring
all technical recommendations are backed by official documentation.

* feat: Add PM Agent (Project Manager Agent) for seamless orchestration

Introduces PM Agent as the default orchestration layer that coordinates
all sub-agents and manages workflows automatically.

Key Features:
- Default orchestration: All user interactions handled by PM Agent
- Auto-delegation: Intelligent sub-agent selection based on task analysis
- Docker Gateway integration: Zero-token baseline with dynamic MCP loading
- Self-improvement loop: Automatic documentation of patterns and mistakes
- Optional override: Users can specify sub-agents explicitly if desired

Architecture:
- Agent spec: SuperClaude/Agents/pm-agent.md
- Command: SuperClaude/Commands/pm.md
- Updated docs: README.md (15→16 agents), agents.md (new Orchestration category)

User Experience:
- Default: PM Agent handles everything (seamless, no manual routing)
- Optional: Explicit --agent flag for direct sub-agent access
- Both modes available simultaneously (no user downside)

Implementation Status:
-  Specification complete
-  Documentation complete
-  Prototype implementation needed
-  Docker Gateway integration needed
-  Testing and validation needed

Refs: kazukinakai/docker-mcp-gateway (IRIS MCP Gateway integration)

* feat: Add Agent Orchestration rules for PM Agent default activation

Implements PM Agent as the default orchestration layer in RULES.md.

Key Changes:
- New 'Agent Orchestration' section (CRITICAL priority)
- PM Agent receives ALL user requests by default
- Manual override with @agent-[name] bypasses PM Agent
- Agent Selection Priority clearly defined:
  1. Manual override → Direct routing
  2. Default → PM Agent → Auto-delegation
  3. Delegation based on keywords, file types, complexity, context

User Experience:
- Default: PM Agent handles everything (seamless)
- Override: @agent-[name] for direct specialist access
- Transparent: PM Agent reports delegation decisions

This establishes PM Agent as the orchestration layer while
respecting existing auto-activation patterns and manual overrides.

Next Steps:
- Local testing in agiletec project
- Iteration based on actual behavior
- Documentation updates as needed

* refactor(pm-agent): redesign as self-improvement meta-layer

Problem Resolution:
PM Agent's initial design competed with existing auto-activation for task routing,
creating confusion about orchestration responsibilities and adding unnecessary complexity.

Design Change:
Redefined PM Agent as a meta-layer agent that operates AFTER specialist agents
complete tasks, focusing on:
- Post-implementation documentation and pattern recording
- Immediate mistake analysis with prevention checklists
- Monthly documentation maintenance and noise reduction
- Pattern extraction and knowledge synthesis

Two-Layer Orchestration System:
1. Task Execution Layer: Existing auto-activation handles task routing (unchanged)
2. Self-Improvement Layer: PM Agent meta-layer handles documentation (new)

Files Modified:
- SuperClaude/Agents/pm-agent.md: Complete rewrite with meta-layer design
  - Category: orchestration → meta
  - Triggers: All user interactions → Post-implementation, mistakes, monthly
  - Behavioral Mindset: Continuous learning system
  - Self-Improvement Workflow: BEFORE/DURING/AFTER/MISTAKE RECOVERY/MAINTENANCE

- SuperClaude/Core/RULES.md: Agent Orchestration section updated
  - Split into Task Execution Layer + Self-Improvement Layer
  - Added orchestration flow diagram
  - Clarified PM Agent activates AFTER task completion

- README.md: Updated PM Agent description
  - "orchestrates all interactions" → "ensures continuous learning"

- Docs/User-Guide/agents.md: PM Agent section rewritten
  - Section: Orchestration Agent → Meta-Layer Agent
  - Expertise: Project orchestration → Self-improvement workflow executor
  - Examples: Task coordination → Post-implementation documentation

- PR_DOCUMENTATION.md: Comprehensive PR documentation added
  - Summary, motivation, changes, testing, breaking changes
  - Two-layer orchestration system diagram
  - Verification checklist

Integration Validated:
Tested with agiletec project's self-improvement-workflow.md:
 PM Agent aligns with existing BEFORE/DURING/AFTER/MISTAKE RECOVERY phases
 Complements (not competes with) existing workflow
 agiletec workflow defines WHAT, PM Agent defines WHO executes it

Breaking Changes: None
- Existing auto-activation continues unchanged
- Specialist agents unaffected
- User workflows remain the same
- New capability: Automatic documentation and knowledge maintenance

Value Proposition:
Transforms SuperClaude into a continuously learning system that accumulates
knowledge, prevents recurring mistakes, and maintains fresh documentation
without manual intervention.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: add Claude Code conversation history management research

Research covering .jsonl file structure, performance impact, and retention policies.

Content:
- Claude Code .jsonl file format and message types
- Performance issues from GitHub (memory leaks, conversation compaction)
- Retention policies (consumer vs enterprise)
- Rotation recommendations based on actual data
- File history snapshot tracking mechanics

Source: Moved from agiletec project (research applicable to all Claude Code projects)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: add Development documentation structure

Phase 1: Documentation Structure complete

- Add Docs/Development/ directory for development documentation
- Add ARCHITECTURE.md - System architecture with PM Agent meta-layer
- Add ROADMAP.md - 5-phase development plan with checkboxes
- Add TASKS.md - Daily task tracking with progress indicators
- Add PROJECT_STATUS.md - Current status dashboard and metrics
- Add pm-agent-integration.md - Implementation guide for PM Agent mode

This establishes comprehensive documentation foundation for:
- System architecture understanding
- Development planning and tracking
- Implementation guidance
- Progress visibility

Related: #pm-agent-mode #documentation #phase-1

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: PM Agent session lifecycle and PDCA implementation

Phase 2: PM Agent Mode Integration (Design Phase)

Commands/pm.md updates:
- Add "Always-Active Foundation Layer" concept
- Add Session Lifecycle (Session Start/During Work/Session End)
- Add PDCA Cycle (Plan/Do/Check/Act) automation
- Add Serena MCP Memory Integration (list/read/write_memory)
- Document auto-activation triggers

Agents/pm-agent.md updates:
- Add Session Start Protocol (MANDATORY auto-activation)
- Add During Work PDCA Cycle with example workflows
- Add Session End Protocol with state preservation
- Add PDCA Self-Evaluation Pattern
- Add Documentation Strategy (temp → patterns/mistakes)
- Add Memory Operations Reference

Key Features:
- Session start auto-activation for context restoration
- 30-minute checkpoint saves during work
- Self-evaluation with think_about_* operations
- Systematic documentation lifecycle
- Knowledge evolution to CLAUDE.md

Implementation Status:
-  Design complete (Commands/pm.md, Agents/pm-agent.md)
-  Implementation pending (Core components)
-  Serena MCP integration pending

Salvaged from mistaken development in ~/.claude directory

Related: #pm-agent-mode #session-lifecycle #pdca-cycle #phase-2

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: disable Serena MCP auto-browser launch

Disable web dashboard and GUI log window auto-launch in Serena MCP server
to prevent intrusive browser popups on startup. Users can still manually
access the dashboard at http://localhost:24282/dashboard/ if needed.

Changes:
- Add CLI flags to Serena run command:
  - --enable-web-dashboard false
  - --enable-gui-log-window false
- Ensures Git-tracked configuration (no reliance on ~/.serena/serena_config.yml)
- Aligns with AIRIS MCP Gateway integration approach

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: rename directories to lowercase for PEP8 compliance

- Rename superclaude/Agents -> superclaude/agents
- Rename superclaude/Commands -> superclaude/commands
- Rename superclaude/Core -> superclaude/core
- Rename superclaude/Examples -> superclaude/examples
- Rename superclaude/MCP -> superclaude/mcp
- Rename superclaude/Modes -> superclaude/modes

This change follows Python PEP8 naming conventions for package directories.

* style: fix PEP8 violations and update package name to lowercase

Changes:
- Format all Python files with black (43 files reformatted)
- Update package name from 'SuperClaude' to 'superclaude' in pyproject.toml
- Fix import statements to use lowercase package name
- Add missing imports (timedelta, __version__)
- Remove old SuperClaude.egg-info directory

PEP8 violations reduced from 2672 to 701 (mostly E501 line length due to black's 88 char vs flake8's 79 char limit).

* docs: add PM Agent development documentation

Add comprehensive PM Agent development documentation:
- PM Agent ideal workflow (7-phase autonomous cycle)
- Project structure understanding (Git vs installed environment)
- Installation flow understanding (CommandsComponent behavior)
- Task management system (current-tasks.md)

Purpose: Eliminate repeated explanations and enable autonomous PDCA cycles

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* feat(pm-agent): add self-correcting execution and warning investigation culture

## Changes

### superclaude/commands/pm.md
- Add "Self-Correcting Execution" section with root cause analysis protocol
- Add "Warning/Error Investigation Culture" section enforcing zero-tolerance for dismissal
- Define error detection protocol: STOP → Investigate → Hypothesis → Different Solution → Execute
- Document anti-patterns (retry without understanding) and correct patterns (research-first)

### docs/Development/hypothesis-pm-autonomous-enhancement-2025-10-14.md
- Add PDCA workflow hypothesis document for PM Agent autonomous enhancement

## Rationale

PM Agent must never retry failed operations without understanding root causes.
All warnings and errors require investigation via context7/WebFetch/documentation
to ensure production-quality code and prevent technical debt accumulation.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* feat(installer): add airis-mcp-gateway MCP server option

## Changes

- Add airis-mcp-gateway to MCP server options in installer
- Configuration: GitHub-based installation via uvx
- Repository: https://github.com/oraios/airis-mcp-gateway
- Purpose: Dynamic MCP Gateway for zero-token baseline and on-demand tool loading

## Implementation

Added to setup/components/mcp.py self.mcp_servers dictionary with:
- install_method: github
- install_command: uvx test installation
- run_command: uvx runtime execution
- required: False (optional server)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: kazuki <kazuki@kazukinoMacBook-Air.local>
Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
kazuki nakai
2025-10-14 12:17:09 +09:00
committed by GitHub
parent 302c5851b1
commit 050d5ea2ab
194 changed files with 9698 additions and 3693 deletions

View File

@@ -4,11 +4,4 @@ from .ui import ProgressBar, Menu, confirm, Colors
from .logger import Logger
from .security import SecurityValidator
__all__ = [
'ProgressBar',
'Menu',
'confirm',
'Colors',
'Logger',
'SecurityValidator'
]
__all__ = ["ProgressBar", "Menu", "confirm", "Colors", "Logger", "SecurityValidator"]

View File

@@ -18,6 +18,7 @@ from .paths import get_home_directory
def _get_env_tracking_file() -> Path:
"""Get path to environment variable tracking file"""
from .. import DEFAULT_INSTALL_DIR
install_dir = get_home_directory() / ".claude"
install_dir.mkdir(exist_ok=True)
return install_dir / "superclaude_env_vars.json"
@@ -26,23 +27,23 @@ def _get_env_tracking_file() -> Path:
def _load_env_tracking() -> Dict[str, Dict[str, str]]:
"""Load environment variable tracking data"""
tracking_file = _get_env_tracking_file()
try:
if tracking_file.exists():
with open(tracking_file, 'r') as f:
with open(tracking_file, "r") as f:
return json.load(f)
except Exception as e:
get_logger().warning(f"Could not load environment tracking: {e}")
return {}
def _save_env_tracking(tracking_data: Dict[str, Dict[str, str]]) -> bool:
"""Save environment variable tracking data"""
tracking_file = _get_env_tracking_file()
try:
with open(tracking_file, 'w') as f:
with open(tracking_file, "w") as f:
json.dump(tracking_data, f, indent=2)
return True
except Exception as e:
@@ -54,17 +55,17 @@ def _add_env_tracking(env_vars: Dict[str, str]) -> None:
"""Add environment variables to tracking"""
if not env_vars:
return
tracking_data = _load_env_tracking()
timestamp = datetime.now().isoformat()
for env_var, value in env_vars.items():
tracking_data[env_var] = {
"set_by": "superclaude",
"timestamp": timestamp,
"value_hash": str(hash(value)) # Store hash, not actual value for security
"value_hash": str(hash(value)), # Store hash, not actual value for security
}
_save_env_tracking(tracking_data)
get_logger().info(f"Added {len(env_vars)} environment variables to tracking")
@@ -73,13 +74,13 @@ def _remove_env_tracking(env_vars: list) -> None:
"""Remove environment variables from tracking"""
if not env_vars:
return
tracking_data = _load_env_tracking()
for env_var in env_vars:
if env_var in tracking_data:
del tracking_data[env_var]
_save_env_tracking(tracking_data)
get_logger().info(f"Removed {len(env_vars)} environment variables from tracking")
@@ -87,24 +88,24 @@ def _remove_env_tracking(env_vars: list) -> None:
def detect_shell_config() -> Optional[Path]:
"""
Detect user's shell configuration file
Returns:
Path to the shell configuration file, or None if not found
"""
home = get_home_directory()
# Check in order of preference
configs = [
home / ".zshrc", # Zsh (Mac default)
home / ".bashrc", # Bash
home / ".profile", # Generic shell profile
home / ".bash_profile" # Mac Bash profile
home / ".zshrc", # Zsh (Mac default)
home / ".bashrc", # Bash
home / ".profile", # Generic shell profile
home / ".bash_profile", # Mac Bash profile
]
for config in configs:
if config.exists():
return config
# Default to .bashrc if none exist (will be created)
return home / ".bashrc"
@@ -112,103 +113,113 @@ def detect_shell_config() -> Optional[Path]:
def setup_environment_variables(api_keys: Dict[str, str]) -> bool:
"""
Set up environment variables across platforms
Args:
api_keys: Dictionary of environment variable names to values
Returns:
True if all variables were set successfully, False otherwise
"""
logger = get_logger()
success = True
if not api_keys:
return True
print(f"\n{Colors.BLUE}[INFO] Setting up environment variables...{Colors.RESET}")
for env_var, value in api_keys.items():
try:
# Set for current session
os.environ[env_var] = value
if os.name == 'nt': # Windows
if os.name == "nt": # Windows
# Use setx for persistent user variable
result = subprocess.run(
['setx', env_var, value],
capture_output=True,
text=True
["setx", env_var, value], capture_output=True, text=True
)
if result.returncode != 0:
display_warning(f"Could not set {env_var} persistently: {result.stderr.strip()}")
display_warning(
f"Could not set {env_var} persistently: {result.stderr.strip()}"
)
success = False
else:
logger.info(f"Windows environment variable {env_var} set persistently")
logger.info(
f"Windows environment variable {env_var} set persistently"
)
else: # Unix-like systems
shell_config = detect_shell_config()
# Check if the export already exists
export_line = f'export {env_var}="{value}"'
try:
with open(shell_config, 'r') as f:
with open(shell_config, "r") as f:
content = f.read()
# Check if this environment variable is already set
if f'export {env_var}=' in content:
if f"export {env_var}=" in content:
# Variable exists - don't duplicate
logger.info(f"Environment variable {env_var} already exists in {shell_config.name}")
logger.info(
f"Environment variable {env_var} already exists in {shell_config.name}"
)
else:
# Append export to shell config
with open(shell_config, 'a') as f:
f.write(f'\n# SuperClaude API Key\n{export_line}\n')
with open(shell_config, "a") as f:
f.write(f"\n# SuperClaude API Key\n{export_line}\n")
display_info(f"Added {env_var} to {shell_config.name}")
logger.info(f"Added {env_var} to {shell_config}")
except Exception as e:
display_warning(f"Could not update {shell_config.name}: {e}")
success = False
logger.info(f"Environment variable {env_var} configured for current session")
logger.info(
f"Environment variable {env_var} configured for current session"
)
except Exception as e:
logger.error(f"Failed to set {env_var}: {e}")
display_warning(f"Failed to set {env_var}: {e}")
success = False
if success:
# Add to tracking
_add_env_tracking(api_keys)
display_success("Environment variables configured successfully")
if os.name != 'nt':
display_info("Restart your terminal or run 'source ~/.bashrc' to apply changes")
if os.name != "nt":
display_info(
"Restart your terminal or run 'source ~/.bashrc' to apply changes"
)
else:
display_info("New environment variables will be available in new terminal sessions")
display_info(
"New environment variables will be available in new terminal sessions"
)
else:
display_warning("Some environment variables could not be set persistently")
display_info("You can set them manually or check the logs for details")
return success
def validate_environment_setup(env_vars: Dict[str, str]) -> bool:
"""
Validate that environment variables are properly set
Args:
env_vars: Dictionary of environment variable names to expected values
Returns:
True if all variables are set correctly, False otherwise
"""
logger = get_logger()
all_valid = True
for env_var, expected_value in env_vars.items():
current_value = os.environ.get(env_var)
if current_value is None:
logger.warning(f"Environment variable {env_var} is not set")
all_valid = False
@@ -217,73 +228,75 @@ def validate_environment_setup(env_vars: Dict[str, str]) -> bool:
all_valid = False
else:
logger.info(f"Environment variable {env_var} is set correctly")
return all_valid
def get_shell_name() -> str:
"""
Get the name of the current shell
Returns:
Name of the shell (e.g., 'bash', 'zsh', 'fish')
"""
shell_path = os.environ.get('SHELL', '')
shell_path = os.environ.get("SHELL", "")
if shell_path:
return Path(shell_path).name
return 'unknown'
return "unknown"
def get_superclaude_environment_variables() -> Dict[str, str]:
"""
Get environment variables that were set by SuperClaude
Returns:
Dictionary of environment variable names to their current values
"""
# Load tracking data to get SuperClaude-managed variables
tracking_data = _load_env_tracking()
found_vars = {}
for env_var, metadata in tracking_data.items():
if metadata.get("set_by") == "superclaude":
value = os.environ.get(env_var)
if value:
found_vars[env_var] = value
# Fallback: check known SuperClaude API key environment variables
# (for backwards compatibility with existing installations)
known_superclaude_env_vars = [
"TWENTYFIRST_API_KEY", # Magic server
"MORPH_API_KEY" # Morphllm server
"MORPH_API_KEY", # Morphllm server
]
for env_var in known_superclaude_env_vars:
if env_var not in found_vars:
value = os.environ.get(env_var)
if value:
found_vars[env_var] = value
return found_vars
def cleanup_environment_variables(env_vars_to_remove: Dict[str, str], create_restore_script: bool = True) -> bool:
def cleanup_environment_variables(
env_vars_to_remove: Dict[str, str], create_restore_script: bool = True
) -> bool:
"""
Safely remove environment variables with backup and restore options
Args:
env_vars_to_remove: Dictionary of environment variable names to remove
create_restore_script: Whether to create a script to restore the variables
Returns:
True if cleanup was successful, False otherwise
"""
logger = get_logger()
success = True
if not env_vars_to_remove:
return True
# Create restore script if requested
if create_restore_script:
restore_script_path = _create_restore_script(env_vars_to_remove)
@@ -291,50 +304,54 @@ def cleanup_environment_variables(env_vars_to_remove: Dict[str, str], create_res
display_info(f"Created restore script: {restore_script_path}")
else:
display_warning("Could not create restore script")
print(f"\n{Colors.BLUE}[INFO] Removing environment variables...{Colors.RESET}")
for env_var, value in env_vars_to_remove.items():
try:
# Remove from current session
if env_var in os.environ:
del os.environ[env_var]
logger.info(f"Removed {env_var} from current session")
if os.name == 'nt': # Windows
if os.name == "nt": # Windows
# Remove persistent user variable using reg command
result = subprocess.run(
['reg', 'delete', 'HKCU\\Environment', '/v', env_var, '/f'],
["reg", "delete", "HKCU\\Environment", "/v", env_var, "/f"],
capture_output=True,
text=True
text=True,
)
if result.returncode != 0:
# Variable might not exist in registry, which is fine
logger.debug(f"Registry deletion for {env_var}: {result.stderr.strip()}")
logger.debug(
f"Registry deletion for {env_var}: {result.stderr.strip()}"
)
else:
logger.info(f"Removed {env_var} from Windows registry")
else: # Unix-like systems
shell_config = detect_shell_config()
if shell_config and shell_config.exists():
_remove_env_var_from_shell_config(shell_config, env_var)
except Exception as e:
logger.error(f"Failed to remove {env_var}: {e}")
display_warning(f"Could not remove {env_var}: {e}")
success = False
if success:
# Remove from tracking
_remove_env_tracking(list(env_vars_to_remove.keys()))
display_success("Environment variables removed successfully")
if os.name != 'nt':
display_info("Restart your terminal or source your shell config to apply changes")
if os.name != "nt":
display_info(
"Restart your terminal or source your shell config to apply changes"
)
else:
display_info("Changes will take effect in new terminal sessions")
else:
display_warning("Some environment variables could not be removed")
return success
@@ -342,9 +359,9 @@ def _create_restore_script(env_vars: Dict[str, str]) -> Optional[Path]:
"""Create a script to restore environment variables"""
try:
home = get_home_directory()
if os.name == 'nt': # Windows
if os.name == "nt": # Windows
script_path = home / "restore_superclaude_env.bat"
with open(script_path, 'w') as f:
with open(script_path, "w") as f:
f.write("@echo off\n")
f.write("REM SuperClaude Environment Variable Restore Script\n")
f.write("REM Generated during uninstall\n\n")
@@ -354,7 +371,7 @@ def _create_restore_script(env_vars: Dict[str, str]) -> Optional[Path]:
f.write("pause\n")
else: # Unix-like
script_path = home / "restore_superclaude_env.sh"
with open(script_path, 'w') as f:
with open(script_path, "w") as f:
f.write("#!/bin/bash\n")
f.write("# SuperClaude Environment Variable Restore Script\n")
f.write("# Generated during uninstall\n\n")
@@ -362,14 +379,16 @@ def _create_restore_script(env_vars: Dict[str, str]) -> Optional[Path]:
for env_var, value in env_vars.items():
f.write(f'export {env_var}="{value}"\n')
if shell_config:
f.write(f'echo \'export {env_var}="{value}"\' >> {shell_config}\n')
f.write(
f"echo 'export {env_var}=\"{value}\"' >> {shell_config}\n"
)
f.write("\necho 'Environment variables restored'\n")
# Make script executable
script_path.chmod(0o755)
return script_path
except Exception as e:
get_logger().error(f"Failed to create restore script: {e}")
return None
@@ -379,90 +398,92 @@ def _remove_env_var_from_shell_config(shell_config: Path, env_var: str) -> bool:
"""Remove environment variable export from shell configuration file"""
try:
# Read current content
with open(shell_config, 'r') as f:
with open(shell_config, "r") as f:
lines = f.readlines()
# Filter out lines that export this variable
filtered_lines = []
skip_next_blank = False
for line in lines:
# Check if this line exports our variable
if f'export {env_var}=' in line or line.strip() == f'# SuperClaude API Key':
if f"export {env_var}=" in line or line.strip() == f"# SuperClaude API Key":
skip_next_blank = True
continue
# Skip blank line after removed export
if skip_next_blank and line.strip() == '':
if skip_next_blank and line.strip() == "":
skip_next_blank = False
continue
skip_next_blank = False
filtered_lines.append(line)
# Write back the filtered content
with open(shell_config, 'w') as f:
with open(shell_config, "w") as f:
f.writelines(filtered_lines)
get_logger().info(f"Removed {env_var} export from {shell_config.name}")
return True
except Exception as e:
get_logger().error(f"Failed to remove {env_var} from {shell_config}: {e}")
return False
def create_env_file(api_keys: Dict[str, str], env_file_path: Optional[Path] = None) -> bool:
def create_env_file(
api_keys: Dict[str, str], env_file_path: Optional[Path] = None
) -> bool:
"""
Create a .env file with the API keys (alternative to shell config)
Args:
api_keys: Dictionary of environment variable names to values
env_file_path: Path to the .env file (defaults to home directory)
Returns:
True if .env file was created successfully, False otherwise
"""
if env_file_path is None:
env_file_path = get_home_directory() / ".env"
logger = get_logger()
try:
# Read existing .env file if it exists
existing_content = ""
if env_file_path.exists():
with open(env_file_path, 'r') as f:
with open(env_file_path, "r") as f:
existing_content = f.read()
# Prepare new content
new_lines = []
for env_var, value in api_keys.items():
line = f'{env_var}="{value}"'
# Check if this variable already exists
if f'{env_var}=' in existing_content:
if f"{env_var}=" in existing_content:
logger.info(f"Variable {env_var} already exists in .env file")
else:
new_lines.append(line)
# Append new lines if any
if new_lines:
with open(env_file_path, 'a') as f:
if existing_content and not existing_content.endswith('\n'):
f.write('\n')
f.write('# SuperClaude API Keys\n')
with open(env_file_path, "a") as f:
if existing_content and not existing_content.endswith("\n"):
f.write("\n")
f.write("# SuperClaude API Keys\n")
for line in new_lines:
f.write(line + '\n')
f.write(line + "\n")
# Set file permissions (readable only by owner)
env_file_path.chmod(0o600)
display_success(f"Created .env file at {env_file_path}")
logger.info(f"Created .env file with {len(new_lines)} new variables")
return True
except Exception as e:
logger.error(f"Failed to create .env file: {e}")
display_warning(f"Could not create .env file: {e}")
@@ -472,13 +493,13 @@ def create_env_file(api_keys: Dict[str, str], env_file_path: Optional[Path] = No
def check_research_prerequisites() -> tuple[bool, list[str]]:
"""
Check if deep research prerequisites are met
Returns:
Tuple of (success: bool, warnings: List[str])
"""
warnings = []
logger = get_logger()
# Check Tavily API key
if not os.environ.get("TAVILY_API_KEY"):
warnings.append(
@@ -488,9 +509,10 @@ def check_research_prerequisites() -> tuple[bool, list[str]]:
logger.warning("TAVILY_API_KEY not found in environment")
else:
logger.info("Found TAVILY_API_KEY in environment")
# Check Node.js for MCP
import shutil
if not shutil.which("node"):
warnings.append(
"Node.js not found - Required for Tavily MCP\n"
@@ -499,7 +521,7 @@ def check_research_prerequisites() -> tuple[bool, list[str]]:
logger.warning("Node.js not found - required for Tavily MCP")
else:
logger.info("Node.js found")
# Check npm
if not shutil.which("npm"):
warnings.append(
@@ -509,5 +531,5 @@ def check_research_prerequisites() -> tuple[bool, list[str]]:
logger.warning("npm not found - required for MCP installation")
else:
logger.info("npm found")
return len(warnings) == 0, warnings
return len(warnings) == 0, warnings

View File

@@ -16,6 +16,7 @@ from .paths import get_home_directory
class LogLevel(Enum):
"""Log levels"""
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
@@ -25,11 +26,17 @@ class LogLevel(Enum):
class Logger:
"""Enhanced logger with console and file output"""
def __init__(self, name: str = "superclaude", log_dir: Optional[Path] = None, console_level: LogLevel = LogLevel.INFO, file_level: LogLevel = LogLevel.DEBUG):
def __init__(
self,
name: str = "superclaude",
log_dir: Optional[Path] = None,
console_level: LogLevel = LogLevel.INFO,
file_level: LogLevel = LogLevel.DEBUG,
):
"""
Initialize logger
Args:
name: Logger name
log_dir: Directory for log files (defaults to ~/.claude/logs)
@@ -41,146 +48,146 @@ class Logger:
self.console_level = console_level
self.file_level = file_level
self.session_start = datetime.now()
# Create logger
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG) # Accept all levels, handlers will filter
# Remove existing handlers to avoid duplicates
self.logger.handlers.clear()
# Setup handlers
self._setup_console_handler()
self._setup_file_handler()
self.log_counts: Dict[str, int] = {
'debug': 0,
'info': 0,
'warning': 0,
'error': 0,
'critical': 0
"debug": 0,
"info": 0,
"warning": 0,
"error": 0,
"critical": 0,
}
def _setup_console_handler(self) -> None:
"""Setup colorized console handler"""
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(self.console_level.value)
# Custom formatter with colors
class ColorFormatter(logging.Formatter):
def format(self, record):
# Color mapping
colors = {
'DEBUG': Colors.WHITE,
'INFO': Colors.BLUE,
'WARNING': Colors.YELLOW,
'ERROR': Colors.RED,
'CRITICAL': Colors.RED + Colors.BRIGHT
"DEBUG": Colors.WHITE,
"INFO": Colors.BLUE,
"WARNING": Colors.YELLOW,
"ERROR": Colors.RED,
"CRITICAL": Colors.RED + Colors.BRIGHT,
}
# Prefix mapping
prefixes = {
'DEBUG': '[DEBUG]',
'INFO': '[INFO]',
'WARNING': '[!]',
'ERROR': f'[{symbols.crossmark}]',
'CRITICAL': '[CRITICAL]'
"DEBUG": "[DEBUG]",
"INFO": "[INFO]",
"WARNING": "[!]",
"ERROR": f"[{symbols.crossmark}]",
"CRITICAL": "[CRITICAL]",
}
color = colors.get(record.levelname, Colors.WHITE)
prefix = prefixes.get(record.levelname, '[LOG]')
prefix = prefixes.get(record.levelname, "[LOG]")
return f"{color}{prefix} {record.getMessage()}{Colors.RESET}"
handler.setFormatter(ColorFormatter())
self.logger.addHandler(handler)
def _setup_file_handler(self) -> None:
"""Setup file handler with rotation"""
try:
# Ensure log directory exists
self.log_dir.mkdir(parents=True, exist_ok=True)
# Create timestamped log file
timestamp = self.session_start.strftime("%Y%m%d_%H%M%S")
log_file = self.log_dir / f"{self.name}_{timestamp}.log"
handler = logging.FileHandler(log_file, encoding='utf-8')
handler = logging.FileHandler(log_file, encoding="utf-8")
handler.setLevel(self.file_level.value)
# Detailed formatter for files
formatter = logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
"%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.log_file = log_file
# Clean up old log files (keep last 10)
self._cleanup_old_logs()
except Exception as e:
# If file logging fails, continue with console only
print(f"{Colors.YELLOW}[!] Could not setup file logging: {e}{Colors.RESET}")
self.log_file = None
def _cleanup_old_logs(self, keep_count: int = 10) -> None:
"""Clean up old log files"""
try:
# Get all log files for this logger
log_files = list(self.log_dir.glob(f"{self.name}_*.log"))
# Sort by modification time, newest first
log_files.sort(key=lambda f: f.stat().st_mtime, reverse=True)
# Remove old files
for old_file in log_files[keep_count:]:
try:
old_file.unlink()
except OSError:
pass # Ignore errors when cleaning up
except Exception:
pass # Ignore cleanup errors
def debug(self, message: str, **kwargs) -> None:
"""Log debug message"""
self.logger.debug(message, **kwargs)
self.log_counts['debug'] += 1
self.log_counts["debug"] += 1
def info(self, message: str, **kwargs) -> None:
"""Log info message"""
self.logger.info(message, **kwargs)
self.log_counts['info'] += 1
self.log_counts["info"] += 1
def warning(self, message: str, **kwargs) -> None:
"""Log warning message"""
self.logger.warning(message, **kwargs)
self.log_counts['warning'] += 1
self.log_counts["warning"] += 1
def error(self, message: str, **kwargs) -> None:
"""Log error message"""
self.logger.error(message, **kwargs)
self.log_counts['error'] += 1
self.log_counts["error"] += 1
def critical(self, message: str, **kwargs) -> None:
"""Log critical message"""
self.logger.critical(message, **kwargs)
self.log_counts['critical'] += 1
self.log_counts["critical"] += 1
def success(self, message: str, **kwargs) -> None:
"""Log success message (info level with special formatting)"""
# Use a custom success formatter for console
if self.logger.handlers:
console_handler = self.logger.handlers[0]
if hasattr(console_handler, 'formatter'):
if hasattr(console_handler, "formatter"):
original_format = console_handler.formatter.format
def success_format(record):
return f"{Colors.GREEN}[{symbols.checkmark}] {record.getMessage()}{Colors.RESET}"
console_handler.formatter.format = success_format
self.logger.info(message, **kwargs)
console_handler.formatter.format = original_format
@@ -188,92 +195,108 @@ class Logger:
self.logger.info(f"SUCCESS: {message}", **kwargs)
else:
self.logger.info(f"SUCCESS: {message}", **kwargs)
self.log_counts['info'] += 1
self.log_counts["info"] += 1
def step(self, step: int, total: int, message: str, **kwargs) -> None:
"""Log step progress"""
step_msg = f"[{step}/{total}] {message}"
self.info(step_msg, **kwargs)
def section(self, title: str, **kwargs) -> None:
"""Log section header"""
separator = "=" * min(50, len(title) + 4)
self.info(separator, **kwargs)
self.info(f" {title}", **kwargs)
self.info(separator, **kwargs)
def exception(self, message: str, exc_info: bool = True, **kwargs) -> None:
"""Log exception with traceback"""
self.logger.error(message, exc_info=exc_info, **kwargs)
self.log_counts['error'] += 1
self.log_counts["error"] += 1
def log_system_info(self, info: Dict[str, Any]) -> None:
"""Log system information"""
self.section("System Information")
for key, value in info.items():
self.info(f"{key}: {value}")
def log_operation_start(self, operation: str, details: Optional[Dict[str, Any]] = None) -> None:
def log_operation_start(
self, operation: str, details: Optional[Dict[str, Any]] = None
) -> None:
"""Log start of operation"""
self.section(f"Starting: {operation}")
if details:
for key, value in details.items():
self.info(f"{key}: {value}")
def log_operation_end(self, operation: str, success: bool, duration: float, details: Optional[Dict[str, Any]] = None) -> None:
def log_operation_end(
self,
operation: str,
success: bool,
duration: float,
details: Optional[Dict[str, Any]] = None,
) -> None:
"""Log end of operation"""
status = "SUCCESS" if success else "FAILED"
self.info(f"Operation {operation} completed: {status} (Duration: {duration:.2f}s)")
self.info(
f"Operation {operation} completed: {status} (Duration: {duration:.2f}s)"
)
if details:
for key, value in details.items():
self.info(f"{key}: {value}")
def get_statistics(self) -> Dict[str, Any]:
"""Get logging statistics"""
runtime = datetime.now() - self.session_start
return {
'session_start': self.session_start.isoformat(),
'runtime_seconds': runtime.total_seconds(),
'log_counts': self.log_counts.copy(),
'total_messages': sum(self.log_counts.values()),
'log_file': str(self.log_file) if hasattr(self, 'log_file') and self.log_file else None,
'has_errors': self.log_counts['error'] + self.log_counts['critical'] > 0
"session_start": self.session_start.isoformat(),
"runtime_seconds": runtime.total_seconds(),
"log_counts": self.log_counts.copy(),
"total_messages": sum(self.log_counts.values()),
"log_file": (
str(self.log_file)
if hasattr(self, "log_file") and self.log_file
else None
),
"has_errors": self.log_counts["error"] + self.log_counts["critical"] > 0,
}
def set_console_level(self, level: LogLevel) -> None:
"""Change console logging level"""
self.console_level = level
if self.logger.handlers:
self.logger.handlers[0].setLevel(level.value)
def set_file_level(self, level: LogLevel) -> None:
"""Change file logging level"""
self.file_level = level
if len(self.logger.handlers) > 1:
self.logger.handlers[1].setLevel(level.value)
def flush(self) -> None:
"""Flush all handlers"""
for handler in self.logger.handlers:
if hasattr(handler, 'flush'):
if hasattr(handler, "flush"):
handler.flush()
def close(self) -> None:
"""Close logger and handlers"""
self.section("Installation Session Complete")
stats = self.get_statistics()
self.info(f"Total runtime: {stats['runtime_seconds']:.1f} seconds")
self.info(f"Messages logged: {stats['total_messages']}")
if stats['has_errors']:
self.warning(f"Errors/warnings: {stats['log_counts']['error'] + stats['log_counts']['warning']}")
if stats['log_file']:
if stats["has_errors"]:
self.warning(
f"Errors/warnings: {stats['log_counts']['error'] + stats['log_counts']['warning']}"
)
if stats["log_file"]:
self.info(f"Full log saved to: {stats['log_file']}")
# Close all handlers
for handler in self.logger.handlers[:]:
handler.close()
@@ -287,14 +310,19 @@ _global_logger: Optional[Logger] = None
def get_logger(name: str = "superclaude") -> Logger:
"""Get or create global logger instance"""
global _global_logger
if _global_logger is None or _global_logger.name != name:
_global_logger = Logger(name)
return _global_logger
def setup_logging(name: str = "superclaude", log_dir: Optional[Path] = None, console_level: LogLevel = LogLevel.INFO, file_level: LogLevel = LogLevel.DEBUG) -> Logger:
def setup_logging(
name: str = "superclaude",
log_dir: Optional[Path] = None,
console_level: LogLevel = LogLevel.INFO,
file_level: LogLevel = LogLevel.DEBUG,
) -> Logger:
"""Setup logging with specified configuration"""
global _global_logger
_global_logger = Logger(name, log_dir, console_level, file_level)
@@ -329,4 +357,4 @@ def critical(message: str, **kwargs) -> None:
def success(message: str, **kwargs) -> None:
"""Log success message using global logger"""
get_logger().success(message, **kwargs)
get_logger().success(message, **kwargs)

View File

@@ -30,19 +30,19 @@ def get_home_directory() -> Path:
# Fallback methods for edge cases and immutable distros
# Method 1: Use $HOME environment variable
home_env = os.environ.get('HOME')
home_env = os.environ.get("HOME")
if home_env:
home_path = Path(home_env)
if home_path.exists() and home_path.is_dir():
return home_path
# Method 2: Check for immutable distro patterns
username = os.environ.get('USER') or os.environ.get('USERNAME')
username = os.environ.get("USER") or os.environ.get("USERNAME")
if username:
# Check common immutable distro paths
immutable_paths = [
Path(f'/var/home/{username}'), # Fedora Silverblue/Universal Blue
Path(f'/home/{username}'), # Standard Linux
Path(f"/var/home/{username}"), # Fedora Silverblue/Universal Blue
Path(f"/home/{username}"), # Standard Linux
]
for path in immutable_paths:
@@ -51,4 +51,4 @@ def get_home_directory() -> Path:
# Method 3: Last resort - use the original Path.home() even if it seems wrong
# This ensures we don't crash the installation
return Path.home()
return Path.home()

File diff suppressed because it is too large Load Diff

View File

@@ -21,14 +21,14 @@ def can_display_unicode() -> bool:
try:
# Test if we can encode common Unicode symbols
test_symbols = "✓✗█░⠋═"
test_symbols.encode(sys.stdout.encoding or 'cp1252')
test_symbols.encode(sys.stdout.encoding or "cp1252")
return True
except (UnicodeEncodeError, LookupError):
return False
# Check if stdout encoding supports Unicode
encoding = getattr(sys.stdout, 'encoding', None)
if encoding and encoding.lower() in ['utf-8', 'utf8']:
encoding = getattr(sys.stdout, "encoding", None)
if encoding and encoding.lower() in ["utf-8", "utf8"]:
return True
# Conservative fallback for unknown systems
@@ -131,8 +131,8 @@ def safe_print(*args, **kwargs):
for arg in args:
if isinstance(arg, str):
# Replace problematic Unicode characters
safe_arg = (arg
.replace("", "+")
safe_arg = (
arg.replace("", "+")
.replace("", "x")
.replace("", "#")
.replace("", "-")
@@ -165,7 +165,7 @@ def safe_print(*args, **kwargs):
final_args = []
for arg in safe_args:
if isinstance(arg, str):
final_args.append(arg.encode('ascii', 'replace').decode('ascii'))
final_args.append(arg.encode("ascii", "replace").decode("ascii"))
else:
final_args.append(str(arg))
print(*final_args, **kwargs)
@@ -195,4 +195,4 @@ def format_with_symbols(text: str) -> str:
for unicode_char, safe_char in replacements.items():
text = text.replace(unicode_char, safe_char)
return text
return text

View File

@@ -15,30 +15,33 @@ from .symbols import symbols, safe_print, format_with_symbols
try:
import colorama
from colorama import Fore, Back, Style
colorama.init(autoreset=True)
COLORAMA_AVAILABLE = True
except ImportError:
COLORAMA_AVAILABLE = False
# Fallback color codes for Unix-like systems
class MockFore:
RED = '\033[91m' if sys.platform != 'win32' else ''
GREEN = '\033[92m' if sys.platform != 'win32' else ''
YELLOW = '\033[93m' if sys.platform != 'win32' else ''
BLUE = '\033[94m' if sys.platform != 'win32' else ''
MAGENTA = '\033[95m' if sys.platform != 'win32' else ''
CYAN = '\033[96m' if sys.platform != 'win32' else ''
WHITE = '\033[97m' if sys.platform != 'win32' else ''
RED = "\033[91m" if sys.platform != "win32" else ""
GREEN = "\033[92m" if sys.platform != "win32" else ""
YELLOW = "\033[93m" if sys.platform != "win32" else ""
BLUE = "\033[94m" if sys.platform != "win32" else ""
MAGENTA = "\033[95m" if sys.platform != "win32" else ""
CYAN = "\033[96m" if sys.platform != "win32" else ""
WHITE = "\033[97m" if sys.platform != "win32" else ""
class MockStyle:
RESET_ALL = '\033[0m' if sys.platform != 'win32' else ''
BRIGHT = '\033[1m' if sys.platform != 'win32' else ''
RESET_ALL = "\033[0m" if sys.platform != "win32" else ""
BRIGHT = "\033[1m" if sys.platform != "win32" else ""
Fore = MockFore()
Style = MockStyle()
class Colors:
"""Color constants for console output"""
RED = Fore.RED
GREEN = Fore.GREEN
YELLOW = Fore.YELLOW
@@ -52,11 +55,11 @@ class Colors:
class ProgressBar:
"""Cross-platform progress bar with customizable display"""
def __init__(self, total: int, width: int = 50, prefix: str = '', suffix: str = ''):
def __init__(self, total: int, width: int = 50, prefix: str = "", suffix: str = ""):
"""
Initialize progress bar
Args:
total: Total number of items to process
width: Width of progress bar in characters
@@ -69,29 +72,31 @@ class ProgressBar:
self.suffix = suffix
self.current = 0
self.start_time = time.time()
# Get terminal width for responsive display
try:
self.terminal_width = shutil.get_terminal_size().columns
except OSError:
self.terminal_width = 80
def update(self, current: int, message: str = '') -> None:
def update(self, current: int, message: str = "") -> None:
"""
Update progress bar
Args:
current: Current progress value
message: Optional message to display
"""
self.current = current
percent = min(100, (current / self.total) * 100) if self.total > 0 else 100
# Calculate filled and empty portions
filled_width = int(self.width * current / self.total) if self.total > 0 else self.width
filled_width = (
int(self.width * current / self.total) if self.total > 0 else self.width
)
filled = symbols.block_filled * filled_width
empty = symbols.block_empty * (self.width - filled_width)
# Calculate elapsed time and ETA
elapsed = time.time() - self.start_time
if current > 0:
@@ -99,47 +104,51 @@ class ProgressBar:
eta_str = f" ETA: {self._format_time(eta)}"
else:
eta_str = ""
# Format progress line
if message:
status = f" {message}"
else:
status = ""
progress_line = (
f"\r{self.prefix}[{Colors.GREEN}{filled}{Colors.WHITE}{empty}{Colors.RESET}] "
f"{percent:5.1f}%{status}{eta_str}"
)
# Truncate if too long for terminal
max_length = self.terminal_width - 5
if len(progress_line) > max_length:
# Remove color codes for length calculation
plain_line = progress_line.replace(Colors.GREEN, '').replace(Colors.WHITE, '').replace(Colors.RESET, '')
plain_line = (
progress_line.replace(Colors.GREEN, "")
.replace(Colors.WHITE, "")
.replace(Colors.RESET, "")
)
if len(plain_line) > max_length:
progress_line = progress_line[:max_length] + "..."
safe_print(progress_line, end='', flush=True)
def increment(self, message: str = '') -> None:
safe_print(progress_line, end="", flush=True)
def increment(self, message: str = "") -> None:
"""
Increment progress by 1
Args:
message: Optional message to display
"""
self.update(self.current + 1, message)
def finish(self, message: str = 'Complete') -> None:
def finish(self, message: str = "Complete") -> None:
"""
Complete progress bar
Args:
message: Completion message
"""
self.update(self.total, message)
print() # New line after completion
def _format_time(self, seconds: float) -> str:
"""Format time duration as human-readable string"""
if seconds < 60:
@@ -154,11 +163,11 @@ class ProgressBar:
class Menu:
"""Interactive menu system with keyboard navigation"""
def __init__(self, title: str, options: List[str], multi_select: bool = False):
"""
Initialize menu
Args:
title: Menu title
options: List of menu options
@@ -168,42 +177,46 @@ class Menu:
self.options = options
self.multi_select = multi_select
self.selected = set() if multi_select else None
def display(self) -> Union[int, List[int]]:
"""
Display menu and get user selection
Returns:
Selected option index (single) or list of indices (multi-select)
"""
print(f"\n{Colors.CYAN}{Colors.BRIGHT}{self.title}{Colors.RESET}")
print("=" * len(self.title))
for i, option in enumerate(self.options, 1):
if self.multi_select:
marker = "[x]" if i-1 in (self.selected or set()) else "[ ]"
marker = "[x]" if i - 1 in (self.selected or set()) else "[ ]"
print(f"{Colors.YELLOW}{i:2d}.{Colors.RESET} {marker} {option}")
else:
print(f"{Colors.YELLOW}{i:2d}.{Colors.RESET} {option}")
if self.multi_select:
print(f"\n{Colors.BLUE}Enter numbers separated by commas (e.g., 1,3,5) or 'all' for all options:{Colors.RESET}")
print(
f"\n{Colors.BLUE}Enter numbers separated by commas (e.g., 1,3,5) or 'all' for all options:{Colors.RESET}"
)
else:
print(f"\n{Colors.BLUE}Enter your choice (1-{len(self.options)}):{Colors.RESET}")
print(
f"\n{Colors.BLUE}Enter your choice (1-{len(self.options)}):{Colors.RESET}"
)
while True:
try:
user_input = input("> ").strip().lower()
if self.multi_select:
if user_input == 'all':
if user_input == "all":
return list(range(len(self.options)))
elif user_input == '':
elif user_input == "":
return []
else:
# Parse comma-separated numbers
selections = []
for part in user_input.split(','):
for part in user_input.split(","):
part = part.strip()
if part.isdigit():
idx = int(part) - 1
@@ -220,10 +233,12 @@ class Menu:
if 0 <= choice < len(self.options):
return choice
else:
print(f"{Colors.RED}Invalid choice. Please enter a number between 1 and {len(self.options)}.{Colors.RESET}")
print(
f"{Colors.RED}Invalid choice. Please enter a number between 1 and {len(self.options)}.{Colors.RESET}"
)
else:
print(f"{Colors.RED}Please enter a valid number.{Colors.RESET}")
except (ValueError, KeyboardInterrupt) as e:
if isinstance(e, KeyboardInterrupt):
print(f"\n{Colors.YELLOW}Operation cancelled.{Colors.RESET}")
@@ -235,44 +250,46 @@ class Menu:
def confirm(message: str, default: bool = True) -> bool:
"""
Ask for user confirmation
Args:
message: Confirmation message
default: Default response if user just presses Enter
Returns:
True if confirmed, False otherwise
"""
suffix = "[Y/n]" if default else "[y/N]"
print(f"{Colors.BLUE}{message} {suffix}{Colors.RESET}")
while True:
try:
response = input("> ").strip().lower()
if response == '':
if response == "":
return default
elif response in ['y', 'yes', 'true', '1']:
elif response in ["y", "yes", "true", "1"]:
return True
elif response in ['n', 'no', 'false', '0']:
elif response in ["n", "no", "false", "0"]:
return False
else:
print(f"{Colors.RED}Please enter 'y' or 'n' (or press Enter for default).{Colors.RESET}")
print(
f"{Colors.RED}Please enter 'y' or 'n' (or press Enter for default).{Colors.RESET}"
)
except KeyboardInterrupt:
print(f"\n{Colors.YELLOW}Operation cancelled.{Colors.RESET}")
return False
def display_header(title: str, subtitle: str = '') -> None:
def display_header(title: str, subtitle: str = "") -> None:
"""
Display formatted header
Args:
title: Main title
subtitle: Optional subtitle
"""
from SuperClaude import __author__, __email__
from superclaude import __author__, __email__
print(f"\n{Colors.CYAN}{Colors.BRIGHT}{'='*60}{Colors.RESET}")
print(f"{Colors.CYAN}{Colors.BRIGHT}{title:^60}{Colors.RESET}")
@@ -280,13 +297,13 @@ def display_header(title: str, subtitle: str = '') -> None:
print(f"{Colors.WHITE}{subtitle:^60}{Colors.RESET}")
# Display authors
authors = [a.strip() for a in __author__.split(',')]
emails = [e.strip() for e in __email__.split(',')]
authors = [a.strip() for a in __author__.split(",")]
emails = [e.strip() for e in __email__.split(",")]
author_lines = []
for i in range(len(authors)):
name = authors[i]
email = emails[i] if i < len(emails) else ''
email = emails[i] if i < len(emails) else ""
author_lines.append(f"{name} <{email}>")
authors_str = " | ".join(author_lines)
@@ -297,20 +314,20 @@ def display_header(title: str, subtitle: str = '') -> None:
def display_authors() -> None:
"""Display author information"""
from SuperClaude import __author__, __email__, __github__
from superclaude import __author__, __email__, __github__
print(f"\n{Colors.CYAN}{Colors.BRIGHT}{'='*60}{Colors.RESET}")
print(f"{Colors.CYAN}{Colors.BRIGHT}{'SuperClaude Authors':^60}{Colors.RESET}")
print(f"{Colors.CYAN}{Colors.BRIGHT}{'superclaude Authors':^60}{Colors.RESET}")
print(f"{Colors.CYAN}{Colors.BRIGHT}{'='*60}{Colors.RESET}\n")
authors = [a.strip() for a in __author__.split(',')]
emails = [e.strip() for e in __email__.split(',')]
github_users = [g.strip() for g in __github__.split(',')]
authors = [a.strip() for a in __author__.split(",")]
emails = [e.strip() for e in __email__.split(",")]
github_users = [g.strip() for g in __github__.split(",")]
for i in range(len(authors)):
name = authors[i]
email = emails[i] if i < len(emails) else 'N/A'
github = github_users[i] if i < len(github_users) else 'N/A'
email = emails[i] if i < len(emails) else "N/A"
github = github_users[i] if i < len(github_users) else "N/A"
print(f" {Colors.BRIGHT}{name}{Colors.RESET}")
print(f" Email: {Colors.YELLOW}{email}{Colors.RESET}")
@@ -345,10 +362,10 @@ def display_step(step: int, total: int, message: str) -> None:
print(f"{Colors.CYAN}[{step}/{total}] {message}{Colors.RESET}")
def display_table(headers: List[str], rows: List[List[str]], title: str = '') -> None:
def display_table(headers: List[str], rows: List[List[str]], title: str = "") -> None:
"""
Display data in table format
Args:
headers: Column headers
rows: Data rows
@@ -356,64 +373,80 @@ def display_table(headers: List[str], rows: List[List[str]], title: str = '') ->
"""
if not rows:
return
# Calculate column widths
col_widths = [len(header) for header in headers]
for row in rows:
for i, cell in enumerate(row):
if i < len(col_widths):
col_widths[i] = max(col_widths[i], len(str(cell)))
# Display title
if title:
print(f"\n{Colors.CYAN}{Colors.BRIGHT}{title}{Colors.RESET}")
print()
# Display headers
header_line = " | ".join(f"{header:<{col_widths[i]}}" for i, header in enumerate(headers))
header_line = " | ".join(
f"{header:<{col_widths[i]}}" for i, header in enumerate(headers)
)
print(f"{Colors.YELLOW}{header_line}{Colors.RESET}")
print("-" * len(header_line))
# Display rows
for row in rows:
row_line = " | ".join(f"{str(cell):<{col_widths[i]}}" for i, cell in enumerate(row))
row_line = " | ".join(
f"{str(cell):<{col_widths[i]}}" for i, cell in enumerate(row)
)
print(row_line)
print()
def prompt_api_key(service_name: str, env_var_name: str) -> Optional[str]:
"""
Prompt for API key with security and UX best practices
Args:
service_name: Human-readable service name (e.g., "Magic", "Morphllm")
env_var_name: Environment variable name (e.g., "TWENTYFIRST_API_KEY")
Returns:
API key string if provided, None if skipped
"""
print(f"{Colors.BLUE}[API KEY] {service_name} requires: {Colors.BRIGHT}{env_var_name}{Colors.RESET}")
print(f"{Colors.WHITE}Visit the service documentation to obtain your API key{Colors.RESET}")
print(f"{Colors.YELLOW}Press Enter to skip (you can set this manually later){Colors.RESET}")
print(
f"{Colors.BLUE}[API KEY] {service_name} requires: {Colors.BRIGHT}{env_var_name}{Colors.RESET}"
)
print(
f"{Colors.WHITE}Visit the service documentation to obtain your API key{Colors.RESET}"
)
print(
f"{Colors.YELLOW}Press Enter to skip (you can set this manually later){Colors.RESET}"
)
try:
# Use getpass for hidden input
api_key = getpass.getpass(f"Enter {env_var_name}: ").strip()
if not api_key:
print(f"{Colors.YELLOW}[SKIPPED] {env_var_name} - set manually later{Colors.RESET}")
print(
f"{Colors.YELLOW}[SKIPPED] {env_var_name} - set manually later{Colors.RESET}"
)
return None
# Basic validation (non-empty, reasonable length)
if len(api_key) < 10:
print(f"{Colors.RED}[WARNING] API key seems too short. Continue anyway? (y/N){Colors.RESET}")
print(
f"{Colors.RED}[WARNING] API key seems too short. Continue anyway? (y/N){Colors.RESET}"
)
if not confirm("", default=False):
return None
safe_print(f"{Colors.GREEN}[{symbols.checkmark}] {env_var_name} configured{Colors.RESET}")
safe_print(
f"{Colors.GREEN}[{symbols.checkmark}] {env_var_name} configured{Colors.RESET}"
)
return api_key
except KeyboardInterrupt:
safe_print(f"\n{Colors.YELLOW}[SKIPPED] {env_var_name}{Colors.RESET}")
return None
@@ -430,16 +463,17 @@ def wait_for_key(message: str = "Press Enter to continue...") -> None:
def clear_screen() -> None:
"""Clear terminal screen"""
import os
os.system('cls' if os.name == 'nt' else 'clear')
os.system("cls" if os.name == "nt" else "clear")
class StatusSpinner:
"""Simple status spinner for long operations"""
def __init__(self, message: str = "Working..."):
"""
Initialize spinner
Args:
message: Message to display with spinner
"""
@@ -447,35 +481,39 @@ class StatusSpinner:
self.spinning = False
self.chars = symbols.spinner_chars
self.current = 0
def start(self) -> None:
"""Start spinner in background thread"""
import threading
def spin():
while self.spinning:
char = self.chars[self.current % len(self.chars)]
safe_print(f"\r{Colors.BLUE}{char} {self.message}{Colors.RESET}", end='', flush=True)
safe_print(
f"\r{Colors.BLUE}{char} {self.message}{Colors.RESET}",
end="",
flush=True,
)
self.current += 1
time.sleep(0.1)
self.spinning = True
self.thread = threading.Thread(target=spin, daemon=True)
self.thread.start()
def stop(self, final_message: str = '') -> None:
def stop(self, final_message: str = "") -> None:
"""
Stop spinner
Args:
final_message: Final message to display
"""
self.spinning = False
if hasattr(self, 'thread'):
if hasattr(self, "thread"):
self.thread.join(timeout=0.2)
# Clear spinner line
safe_print(f"\r{' ' * (len(self.message) + 5)}\r", end='')
safe_print(f"\r{' ' * (len(self.message) + 5)}\r", end="")
if final_message:
safe_print(final_message)
@@ -483,7 +521,7 @@ class StatusSpinner:
def format_size(size_bytes: int) -> str:
"""Format file size in human-readable format"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
for unit in ["B", "KB", "MB", "GB", "TB"]:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
@@ -510,5 +548,5 @@ def truncate_text(text: str, max_length: int, suffix: str = "...") -> str:
"""Truncate text to maximum length with optional suffix"""
if len(text) <= max_length:
return text
return text[:max_length - len(suffix)] + suffix
return text[: max_length - len(suffix)] + suffix

View File

@@ -22,94 +22,97 @@ from .paths import get_home_directory
class UpdateChecker:
"""Handles automatic update checking for SuperClaude"""
PYPI_URL = "https://pypi.org/pypi/SuperClaude/json"
PYPI_URL = "https://pypi.org/pypi/superclaude/json"
CACHE_FILE = get_home_directory() / ".claude" / ".update_check"
CHECK_INTERVAL = 86400 # 24 hours in seconds
TIMEOUT = 2 # seconds
def __init__(self, current_version: str):
"""
Initialize update checker
Args:
current_version: Current installed version
"""
self.current_version = current_version
self.logger = get_logger()
def should_check_update(self, force: bool = False) -> bool:
"""
Determine if we should check for updates based on last check time
Args:
force: Force check regardless of last check time
Returns:
True if update check should be performed
"""
if force:
return True
if not self.CACHE_FILE.exists():
return True
try:
with open(self.CACHE_FILE, 'r') as f:
with open(self.CACHE_FILE, "r") as f:
data = json.load(f)
last_check = data.get('last_check', 0)
last_check = data.get("last_check", 0)
# Check if 24 hours have passed
if time.time() - last_check > self.CHECK_INTERVAL:
return True
except (json.JSONDecodeError, KeyError):
return True
return False
def save_check_timestamp(self):
"""Save the current timestamp as last check time"""
self.CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
data = {}
if self.CACHE_FILE.exists():
try:
with open(self.CACHE_FILE, 'r') as f:
with open(self.CACHE_FILE, "r") as f:
data = json.load(f)
except:
pass
data['last_check'] = time.time()
with open(self.CACHE_FILE, 'w') as f:
data["last_check"] = time.time()
with open(self.CACHE_FILE, "w") as f:
json.dump(data, f)
def get_latest_version(self) -> Optional[str]:
"""
Query PyPI for the latest version of SuperClaude
Returns:
Latest version string or None if check fails
"""
try:
# Create request with timeout
req = urllib.request.Request(
self.PYPI_URL,
headers={'User-Agent': 'SuperClaude-Updater'}
self.PYPI_URL, headers={"User-Agent": "superclaude-Updater"}
)
# Set timeout for the request
with urllib.request.urlopen(req, timeout=self.TIMEOUT) as response:
data = json.loads(response.read().decode())
latest = data.get('info', {}).get('version')
latest = data.get("info", {}).get("version")
if self.logger:
self.logger.debug(f"Latest PyPI version: {latest}")
return latest
except (urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError) as e:
except (
urllib.error.URLError,
urllib.error.HTTPError,
json.JSONDecodeError,
) as e:
if self.logger:
self.logger.debug(f"Failed to check PyPI: {e}")
return None
@@ -117,14 +120,14 @@ class UpdateChecker:
if self.logger:
self.logger.debug(f"Unexpected error checking updates: {e}")
return None
def compare_versions(self, latest: str) -> bool:
"""
Compare current version with latest version
Args:
latest: Latest version string
Returns:
True if update is available
"""
@@ -132,183 +135,195 @@ class UpdateChecker:
return version.parse(latest) > version.parse(self.current_version)
except Exception:
return False
def detect_installation_method(self) -> str:
"""
Detect how SuperClaude was installed (pip, pipx, etc.)
Returns:
Installation method string
"""
# Check pipx first
try:
result = subprocess.run(
['pipx', 'list'],
capture_output=True,
text=True,
timeout=2
["pipx", "list"], capture_output=True, text=True, timeout=2
)
if 'SuperClaude' in result.stdout or 'superclaude' in result.stdout:
return 'pipx'
if "superclaude" in result.stdout or "superclaude" in result.stdout:
return "pipx"
except:
pass
# Check if pip installation exists
try:
result = subprocess.run(
[sys.executable, '-m', 'pip', 'show', 'SuperClaude'],
[sys.executable, "-m", "pip", "show", "superclaude"],
capture_output=True,
text=True,
timeout=2
timeout=2,
)
if result.returncode == 0:
# Check if it's a user installation
if '--user' in result.stdout or get_home_directory() in Path(result.stdout):
return 'pip-user'
return 'pip'
if "--user" in result.stdout or get_home_directory() in Path(
result.stdout
):
return "pip-user"
return "pip"
except:
pass
return 'unknown'
return "unknown"
def get_update_command(self) -> str:
"""
Get the appropriate update command based on installation method
Returns:
Update command string
"""
method = self.detect_installation_method()
commands = {
'pipx': 'pipx upgrade SuperClaude',
'pip-user': 'pip install --upgrade --user SuperClaude',
'pip': 'pip install --upgrade SuperClaude',
'unknown': 'pip install --upgrade SuperClaude'
"pipx": "pipx upgrade SuperClaude",
"pip-user": "pip install --upgrade --user SuperClaude",
"pip": "pip install --upgrade SuperClaude",
"unknown": "pip install --upgrade SuperClaude",
}
return commands.get(method, commands['unknown'])
return commands.get(method, commands["unknown"])
def show_update_banner(self, latest: str, auto_update: bool = False) -> bool:
"""
Display update available banner
Args:
latest: Latest version available
auto_update: Whether to auto-update without prompting
Returns:
True if user wants to update
"""
update_cmd = self.get_update_command()
# Display banner
print(f"\n{Colors.CYAN}+================================================+{Colors.RESET}")
print(f"{Colors.CYAN}{Colors.YELLOW} 🚀 Update Available: {self.current_version}{latest} {Colors.CYAN}{Colors.RESET}")
print(f"{Colors.CYAN}{Colors.GREEN} Run: {update_cmd:<30} {Colors.CYAN}{Colors.RESET}")
print(f"{Colors.CYAN}+================================================+{Colors.RESET}\n")
print(
f"\n{Colors.CYAN}+================================================+{Colors.RESET}"
)
print(
f"{Colors.CYAN}{Colors.YELLOW} 🚀 Update Available: {self.current_version}{latest} {Colors.CYAN}{Colors.RESET}"
)
print(
f"{Colors.CYAN}{Colors.GREEN} Run: {update_cmd:<30} {Colors.CYAN}{Colors.RESET}"
)
print(
f"{Colors.CYAN}+================================================+{Colors.RESET}\n"
)
if auto_update:
return True
# Check if running in non-interactive mode
if not sys.stdin.isatty():
return False
# Prompt user
try:
response = input(f"{Colors.YELLOW}Would you like to update now? (y/N): {Colors.RESET}").strip().lower()
return response in ['y', 'yes']
response = (
input(
f"{Colors.YELLOW}Would you like to update now? (y/N): {Colors.RESET}"
)
.strip()
.lower()
)
return response in ["y", "yes"]
except (EOFError, KeyboardInterrupt):
return False
def perform_update(self) -> bool:
"""
Execute the update command
Returns:
True if update succeeded
"""
update_cmd = self.get_update_command()
print(f"{Colors.CYAN}🔄 Updating SuperClaude...{Colors.RESET}")
print(f"{Colors.CYAN}🔄 Updating superclaude...{Colors.RESET}")
try:
result = subprocess.run(
update_cmd.split(),
capture_output=False,
text=True
)
result = subprocess.run(update_cmd.split(), capture_output=False, text=True)
if result.returncode == 0:
display_success("Update completed successfully!")
print(f"{Colors.YELLOW}Please restart SuperClaude to use the new version.{Colors.RESET}")
print(
f"{Colors.YELLOW}Please restart SuperClaude to use the new version.{Colors.RESET}"
)
return True
else:
display_warning("Update failed. Please run manually:")
print(f" {update_cmd}")
return False
except Exception as e:
display_warning(f"Could not auto-update: {e}")
print(f"Please run manually: {update_cmd}")
return False
def check_and_notify(self, force: bool = False, auto_update: bool = False) -> bool:
"""
Main method to check for updates and notify user
Args:
force: Force check regardless of last check time
auto_update: Automatically update if available
Returns:
True if update was performed
"""
# Check if we should skip based on environment variable
if os.getenv('SUPERCLAUDE_NO_UPDATE_CHECK', '').lower() in ['true', '1', 'yes']:
if os.getenv("SUPERCLAUDE_NO_UPDATE_CHECK", "").lower() in ["true", "1", "yes"]:
return False
# Check if auto-update is enabled via environment
if os.getenv('SUPERCLAUDE_AUTO_UPDATE', '').lower() in ['true', '1', 'yes']:
if os.getenv("SUPERCLAUDE_AUTO_UPDATE", "").lower() in ["true", "1", "yes"]:
auto_update = True
# Check if enough time has passed
if not self.should_check_update(force):
return False
# Get latest version
latest = self.get_latest_version()
if not latest:
return False
# Save timestamp
self.save_check_timestamp()
# Compare versions
if not self.compare_versions(latest):
return False
# Show banner and potentially update
if self.show_update_banner(latest, auto_update):
return self.perform_update()
return False
def check_for_updates(current_version: str = None, **kwargs) -> bool:
"""
Convenience function to check for updates
Args:
current_version: Current installed version (defaults to reading from setup)
**kwargs: Additional arguments passed to check_and_notify
Returns:
True if update was performed
"""
if current_version is None:
from setup import __version__
current_version = __version__
checker = UpdateChecker(current_version)
return checker.check_and_notify(**kwargs)
return checker.check_and_notify(**kwargs)