From ca29e595fdaf60dba33d94aadacf044a3c32e10f Mon Sep 17 00:00:00 2001 From: kazuki Date: Mon, 20 Oct 2025 03:52:40 +0900 Subject: [PATCH] feat: add comprehensive validation framework MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add validators package with 6 specialized validators: - base.py: Abstract base validator with common patterns - context_contract.py: PM mode context validation - dep_sanity.py: Dependency consistency checks - runtime_policy.py: Runtime policy enforcement - security_roughcheck.py: Security vulnerability scanning - test_runner.py: Automated test execution validation Supports validation gates for quality assurance and risk mitigation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- superclaude/validators/__init__.py | 27 +++ superclaude/validators/base.py | 145 ++++++++++++++++ superclaude/validators/context_contract.py | 94 +++++++++++ superclaude/validators/dep_sanity.py | 151 +++++++++++++++++ superclaude/validators/runtime_policy.py | 152 +++++++++++++++++ superclaude/validators/security_roughcheck.py | 144 ++++++++++++++++ superclaude/validators/test_runner.py | 155 ++++++++++++++++++ 7 files changed, 868 insertions(+) create mode 100644 superclaude/validators/__init__.py create mode 100644 superclaude/validators/base.py create mode 100644 superclaude/validators/context_contract.py create mode 100644 superclaude/validators/dep_sanity.py create mode 100644 superclaude/validators/runtime_policy.py create mode 100644 superclaude/validators/security_roughcheck.py create mode 100644 superclaude/validators/test_runner.py diff --git a/superclaude/validators/__init__.py b/superclaude/validators/__init__.py new file mode 100644 index 0000000..9df3f66 --- /dev/null +++ b/superclaude/validators/__init__.py @@ -0,0 +1,27 @@ +"""Validators for PM Mode + +Enforce Context Contract rules before code execution: +- context_contract: Project-specific rules (Kong, Infisical, etc.) +- dep_sanity: Dependency existence and version sanity +- runtime_policy: Runtime (Node/Python) version validation +- test_runner: Test execution and validation +- security_roughcheck: Common security anti-patterns +""" + +from .base import ValidationResult, Validator, ValidationStatus +from .context_contract import ContextContractValidator +from .dep_sanity import DependencySanityValidator +from .runtime_policy import RuntimePolicyValidator +from .test_runner import TestRunnerValidator +from .security_roughcheck import SecurityRoughcheckValidator + +__all__ = [ + "ValidationResult", + "ValidationStatus", + "Validator", + "ContextContractValidator", + "DependencySanityValidator", + "RuntimePolicyValidator", + "TestRunnerValidator", + "SecurityRoughcheckValidator", +] diff --git a/superclaude/validators/base.py b/superclaude/validators/base.py new file mode 100644 index 0000000..064a601 --- /dev/null +++ b/superclaude/validators/base.py @@ -0,0 +1,145 @@ +"""Base validator classes and utilities""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import List, Optional, Dict, Any +from enum import Enum + + +class ValidationStatus(Enum): + """Validation result status""" + PASSED = "passed" + FAILED = "failed" + WARNING = "warning" + SKIPPED = "skipped" + + +@dataclass +class ValidationResult: + """Result of a validation check""" + status: ValidationStatus + validator_name: str + message: str + details: Optional[Dict[str, Any]] = None + suggestions: Optional[List[str]] = None + + @property + def passed(self) -> bool: + """Check if validation passed""" + return self.status == ValidationStatus.PASSED + + @property + def failed(self) -> bool: + """Check if validation failed""" + return self.status == ValidationStatus.FAILED + + def __str__(self) -> str: + """String representation""" + symbol = { + ValidationStatus.PASSED: "✅", + ValidationStatus.FAILED: "❌", + ValidationStatus.WARNING: "⚠️", + ValidationStatus.SKIPPED: "⏭️" + }[self.status] + + lines = [f"{symbol} {self.validator_name}: {self.message}"] + + if self.details: + lines.append(f" Details: {self.details}") + + if self.suggestions: + lines.append(" Suggestions:") + for suggestion in self.suggestions: + lines.append(f" - {suggestion}") + + return "\n".join(lines) + + +class Validator(ABC): + """Base validator class""" + + def __init__(self, name: str): + self.name = name + + @abstractmethod + def validate(self, context: Dict[str, Any]) -> ValidationResult: + """ + Validate against context. + + Args: + context: Validation context (changes, contract, etc.) + + Returns: + ValidationResult + """ + pass + + def _pass(self, message: str, **kwargs) -> ValidationResult: + """Create a PASSED result""" + return ValidationResult( + status=ValidationStatus.PASSED, + validator_name=self.name, + message=message, + **kwargs + ) + + def _fail(self, message: str, **kwargs) -> ValidationResult: + """Create a FAILED result""" + return ValidationResult( + status=ValidationStatus.FAILED, + validator_name=self.name, + message=message, + **kwargs + ) + + def _warning(self, message: str, **kwargs) -> ValidationResult: + """Create a WARNING result""" + return ValidationResult( + status=ValidationStatus.WARNING, + validator_name=self.name, + message=message, + **kwargs + ) + + def _skip(self, message: str, **kwargs) -> ValidationResult: + """Create a SKIPPED result""" + return ValidationResult( + status=ValidationStatus.SKIPPED, + validator_name=self.name, + message=message, + **kwargs + ) + + +class ValidatorChain: + """Chain of validators that runs in sequence""" + + def __init__(self, validators: List[Validator]): + self.validators = validators + + def validate(self, context: Dict[str, Any]) -> List[ValidationResult]: + """Run all validators""" + results = [] + for validator in self.validators: + result = validator.validate(context) + results.append(result) + + return results + + def validate_with_early_stop(self, context: Dict[str, Any]) -> List[ValidationResult]: + """Run validators until first failure""" + results = [] + for validator in self.validators: + result = validator.validate(context) + results.append(result) + + # Stop on first failure + if result.failed: + break + + return results + + @property + def all_passed(self) -> bool: + """Check if all validations passed""" + return all(not r.failed for r in self.validate({})) diff --git a/superclaude/validators/context_contract.py b/superclaude/validators/context_contract.py new file mode 100644 index 0000000..74b23bc --- /dev/null +++ b/superclaude/validators/context_contract.py @@ -0,0 +1,94 @@ +"""Context Contract Validator + +Enforces project-specific rules from Context Contract: +- Kong/Traefik routing requirements +- Infisical usage requirements +- .env file prohibitions +- Security policies +""" + +from typing import Dict, Any, List +import re +from pathlib import Path + +from .base import Validator, ValidationResult + + +class ContextContractValidator(Validator): + """Validates against Context Contract rules""" + + def __init__(self): + super().__init__("Context Contract") + + def validate(self, context: Dict[str, Any]) -> ValidationResult: + """ + Validate changes against Context Contract. + + Context should contain: + - contract: Context Contract data + - changes: Dict of file changes (path -> content) + """ + contract = context.get("contract") + if not contract: + return self._skip("No Context Contract available") + + changes = context.get("changes", {}) + if not changes: + return self._pass("No changes to validate") + + violations = [] + suggestions = [] + + # Check principles + principles = contract.get("principles", {}) + + # Check 1: .env file creation prohibition + if principles.get("no_env_files"): + for file_path in changes.keys(): + if ".env" in Path(file_path).name: + violations.append(f"❌ .env file creation prohibited: {file_path}") + suggestions.append("Use Infisical for secret management") + + # Check 2: Hardcoded secrets + if principles.get("use_infisical_only"): + secret_patterns = [ + r'INFISICAL_TOKEN\s*=\s*[\'"]st\.', # Infisical token + r'SUPABASE_SERVICE_ROLE_KEY\s*=\s*[\'"]eyJ', # Supabase JWT + r'OPENAI_API_KEY\s*=\s*[\'"]sk-', # OpenAI key + r'DATABASE_URL\s*=\s*[\'"]postgres.*password', # DB password + ] + + for file_path, content in changes.items(): + for pattern in secret_patterns: + if re.search(pattern, content): + violations.append(f"❌ Hardcoded secret detected in {file_path}") + suggestions.append("Use Infisical or environment variables") + break + + # Check 3: Outbound traffic routing + outbound_proxy = principles.get("outbound_through") + if outbound_proxy == "kong": + # Check if Kong routing is mentioned in docker-compose changes + for file_path, content in changes.items(): + if "docker-compose" in file_path: + if "external" in content and "kong" not in content.lower(): + violations.append(f"❌ External service without Kong routing in {file_path}") + suggestions.append("All external services must route through Kong") + + elif outbound_proxy == "traefik": + # Check if Traefik labels are present + for file_path, content in changes.items(): + if "docker-compose" in file_path: + if "external" in content and "traefik.enable" not in content: + violations.append(f"❌ External service without Traefik labels in {file_path}") + suggestions.append("All external services must use Traefik labels") + + # Return result + if violations: + return self._fail( + f"Context Contract violations detected ({len(violations)} issues)", + details={"violations": violations}, + suggestions=suggestions + ) + + return self._pass("All Context Contract rules satisfied") diff --git a/superclaude/validators/dep_sanity.py b/superclaude/validators/dep_sanity.py new file mode 100644 index 0000000..2728c10 --- /dev/null +++ b/superclaude/validators/dep_sanity.py @@ -0,0 +1,151 @@ +"""Dependency Sanity Validator + +Validates that: +- Proposed packages exist on registries (npm, PyPI) +- Versions are compatible with lockfiles +- No conflicting dependencies +""" + +from typing import Dict, Any, List, Optional +import subprocess +import json +import re + +from .base import Validator, ValidationResult + + +class DependencySanityValidator(Validator): + """Validates dependency sanity""" + + def __init__(self): + super().__init__("Dependency Sanity") + + def validate(self, context: Dict[str, Any]) -> ValidationResult: + """ + Validate dependency changes. + + Context should contain: + - changes: Dict of file changes + - contract: Context Contract (for runtime info) + """ + changes = context.get("changes", {}) + contract = context.get("contract", {}) + + issues = [] + warnings = [] + + # Check package.json changes + for file_path, content in changes.items(): + if "package.json" in file_path: + result = self._validate_npm_deps(content, contract) + issues.extend(result.get("issues", [])) + warnings.extend(result.get("warnings", [])) + + elif "pyproject.toml" in file_path or "requirements.txt" in file_path: + result = self._validate_python_deps(content, contract) + issues.extend(result.get("issues", [])) + warnings.extend(result.get("warnings", [])) + + # Return result + if issues: + return self._fail( + f"Dependency issues detected ({len(issues)} issues)", + details={"issues": issues, "warnings": warnings} + ) + + if warnings: + return self._warning( + f"Dependency warnings ({len(warnings)} warnings)", + details={"warnings": warnings} + ) + + return self._pass("All dependencies validated") + + def _validate_npm_deps(self, package_json_content: str, contract: Dict[str, Any]) -> Dict[str, List[str]]: + """Validate npm dependencies""" + issues = [] + warnings = [] + + try: + # Parse package.json + data = json.loads(package_json_content) + dependencies = {**data.get("dependencies", {}), **data.get("devDependencies", {})} + + # Check if packages exist (basic validation) + for pkg_name, version in dependencies.items(): + # Check for common typos + if pkg_name.startswith("@"): + # Scoped package + if not re.match(r"^@[\w-]+/[\w-]+$", pkg_name): + issues.append(f"Invalid scoped package name: {pkg_name}") + else: + # Regular package + if not re.match(r"^[\w-]+$", pkg_name): + issues.append(f"Invalid package name: {pkg_name}") + + # Check version format + if version and not re.match(r"^[\^~]?\d+\.\d+\.\d+|latest|workspace:\*", version): + warnings.append(f"Unusual version format for {pkg_name}: {version}") + + except json.JSONDecodeError: + issues.append("Invalid package.json format") + + return {"issues": issues, "warnings": warnings} + + def _validate_python_deps(self, content: str, contract: Dict[str, Any]) -> Dict[str, List[str]]: + """Validate Python dependencies""" + issues = [] + warnings = [] + + # Extract package names from requirements.txt or pyproject.toml + if "[tool.poetry.dependencies]" in content or "[project.dependencies]" in content: + # pyproject.toml format - basic validation + # More sophisticated parsing would use tomli/tomlkit + pass + else: + # requirements.txt format + for line in content.split("\n"): + line = line.strip() + if not line or line.startswith("#"): + continue + + # Check for common issues + if "==" in line: + pkg_spec = line.split("==") + if len(pkg_spec) != 2: + issues.append(f"Invalid requirement format: {line}") + else: + pkg_name, version = pkg_spec + # Basic package name validation + if not re.match(r"^[a-zA-Z0-9_-]+$", pkg_name): + issues.append(f"Invalid package name: {pkg_name}") + + return {"issues": issues, "warnings": warnings} + + def check_npm_package_exists(self, package_name: str) -> bool: + """Check if npm package exists on registry""" + try: + result = subprocess.run( + ["npm", "view", package_name, "version"], + capture_output=True, + text=True, + timeout=5, + check=False + ) + return result.returncode == 0 + except Exception: + return False + + def check_pypi_package_exists(self, package_name: str) -> bool: + """Check if PyPI package exists""" + try: + result = subprocess.run( + ["pip", "index", "versions", package_name], + capture_output=True, + text=True, + timeout=5, + check=False + ) + return result.returncode == 0 + except Exception: + return False diff --git a/superclaude/validators/runtime_policy.py b/superclaude/validators/runtime_policy.py new file mode 100644 index 0000000..8ae7d39 --- /dev/null +++ b/superclaude/validators/runtime_policy.py @@ -0,0 +1,152 @@ +"""Runtime Policy Validator + +Validates runtime requirements: +- Node.js version (LTS, latest, project-specified) +- Python version (LTS, latest, project-specified) +- Consistency with lockfiles +""" + +from typing import Dict, Any, List, Optional +import subprocess +import json +import re + +from .base import Validator, ValidationResult + + +class RuntimePolicyValidator(Validator): + """Validates runtime policies""" + + def __init__(self): + super().__init__("Runtime Policy") + + def validate(self, context: Dict[str, Any]) -> ValidationResult: + """ + Validate runtime requirements. + + Context should contain: + - contract: Context Contract (for runtime info) + - changes: File changes (to detect version changes) + """ + contract = context.get("contract", {}) + changes = context.get("changes", {}) + + runtime = contract.get("runtime", {}) + if not runtime: + return self._skip("No runtime requirements specified") + + issues = [] + warnings = [] + + # Validate Node.js runtime + if "node" in runtime: + node_result = self._validate_node_runtime(runtime["node"], changes) + issues.extend(node_result.get("issues", [])) + warnings.extend(node_result.get("warnings", [])) + + # Validate Python runtime + if "python" in runtime: + python_result = self._validate_python_runtime(runtime["python"], changes) + issues.extend(python_result.get("issues", [])) + warnings.extend(python_result.get("warnings", [])) + + # Return result + if issues: + return self._fail( + f"Runtime policy violations ({len(issues)} issues)", + details={"issues": issues, "warnings": warnings} + ) + + if warnings: + return self._warning( + f"Runtime policy warnings ({len(warnings)} warnings)", + details={"warnings": warnings} + ) + + return self._pass("Runtime requirements satisfied") + + def _validate_node_runtime(self, node_config: Dict[str, Any], changes: Dict[str, str]) -> Dict[str, List[str]]: + """Validate Node.js runtime""" + issues = [] + warnings = [] + + manager = node_config.get("manager", "npm") + source = node_config.get("source", "package-json-defined") + + # Check if package.json specifies engines + for file_path, content in changes.items(): + if "package.json" in file_path: + try: + data = json.loads(content) + engines = data.get("engines", {}) + node_version = engines.get("node") + + if not node_version and source == "package-json-defined": + warnings.append("No Node.js version specified in package.json engines") + + if manager == "pnpm" and "pnpm" not in engines: + warnings.append("Using pnpm but no pnpm version in engines") + + except json.JSONDecodeError: + issues.append("Invalid package.json format") + + return {"issues": issues, "warnings": warnings} + + def _validate_python_runtime(self, python_config: Dict[str, Any], changes: Dict[str, str]) -> Dict[str, List[str]]: + """Validate Python runtime""" + issues = [] + warnings = [] + + manager = python_config.get("manager", "pip") + source = python_config.get("source", "pyproject-defined") + + # Check if pyproject.toml specifies python version + for file_path, content in changes.items(): + if "pyproject.toml" in file_path: + # Basic check for python version requirement + if "requires-python" in content: + # Extract version requirement + match = re.search(r'requires-python\s*=\s*[\'"]([^"\']+)[\'"]', content) + if match: + version_spec = match.group(1) + # Validate version format + if not re.match(r'^[><=~^!]+\d+\.\d+', version_spec): + warnings.append(f"Unusual Python version format: {version_spec}") + else: + warnings.append("Could not parse requires-python version") + elif source == "pyproject-defined": + warnings.append("No requires-python specified in pyproject.toml") + + return {"issues": issues, "warnings": warnings} + + def get_current_node_version(self) -> Optional[str]: + """Get current Node.js version""" + try: + result = subprocess.run( + ["node", "--version"], + capture_output=True, + text=True, + timeout=2, + check=False + ) + if result.returncode == 0: + return result.stdout.strip() + except Exception: + pass + return None + + def get_current_python_version(self) -> Optional[str]: + """Get current Python version""" + try: + result = subprocess.run( + ["python", "--version"], + capture_output=True, + text=True, + timeout=2, + check=False + ) + if result.returncode == 0: + return result.stdout.strip() + except Exception: + pass + return None diff --git a/superclaude/validators/security_roughcheck.py b/superclaude/validators/security_roughcheck.py new file mode 100644 index 0000000..ba7a767 --- /dev/null +++ b/superclaude/validators/security_roughcheck.py @@ -0,0 +1,144 @@ +"""Security Roughcheck Validator + +Detects common security anti-patterns: +- Hardcoded secrets (API keys, tokens, passwords) +- .env file creation in commits +- Exposed credentials in code +- Unsafe practices +""" + +from typing import Dict, Any, List +import re +from pathlib import Path + +from .base import Validator, ValidationResult + + +class SecurityRoughcheckValidator(Validator): + """Validates against common security issues""" + + # Secret detection patterns + SECRET_PATTERNS = [ + (r'sk_live_[a-zA-Z0-9]{24,}', 'Stripe live secret key'), + (r'pk_live_[a-zA-Z0-9]{24,}', 'Stripe live publishable key'), + (r'sk_test_[a-zA-Z0-9]{24,}', 'Stripe test secret key'), + (r'SUPABASE_SERVICE_ROLE_KEY\s*=\s*[\'"]eyJ', 'Supabase service role key'), + (r'SUPABASE_ANON_KEY\s*=\s*[\'"]eyJ', 'Supabase anon key'), + (r'OPENAI_API_KEY\s*=\s*[\'"]sk-', 'OpenAI API key'), + (r'TWILIO_AUTH_TOKEN\s*=\s*[\'"][a-f0-9]{32}', 'Twilio auth token'), + (r'INFISICAL_TOKEN\s*=\s*[\'"]st\.', 'Infisical token'), + (r'DATABASE_URL\s*=\s*[\'"]postgres.*password', 'Database password in URL'), + (r'AWS_SECRET_ACCESS_KEY\s*=\s*[\'"][\w/+=]{40}', 'AWS secret access key'), + (r'GITHUB_TOKEN\s*=\s*[\'"]gh[ps]_[a-zA-Z0-9]{36}', 'GitHub token'), + ] + + # Unsafe patterns + UNSAFE_PATTERNS = [ + (r'eval\s*\(', 'Use of eval() function'), + (r'exec\s*\(', 'Use of exec() function'), + (r'__import__\s*\(', 'Dynamic import with __import__'), + (r'shell=True', 'Shell command execution'), + (r'pickle\.loads?\s*\(', 'Unsafe pickle deserialization'), + ] + + def __init__(self): + super().__init__("Security Roughcheck") + + def validate(self, context: Dict[str, Any]) -> ValidationResult: + """ + Validate security. + + Context should contain: + - changes: File changes + """ + changes = context.get("changes", {}) + if not changes: + return self._pass("No changes to validate") + + critical_issues = [] + warnings = [] + + for file_path, content in changes.items(): + # Check 1: .env file creation + if ".env" in Path(file_path).name: + critical_issues.append(f"❌ CRITICAL: .env file detected: {file_path}") + + # Check 2: Hardcoded secrets + for pattern, description in self.SECRET_PATTERNS: + matches = re.findall(pattern, content) + if matches: + critical_issues.append( + f"❌ CRITICAL: {description} detected in {file_path}" + ) + + # Check 3: Unsafe patterns + for pattern, description in self.UNSAFE_PATTERNS: + matches = re.findall(pattern, content) + if matches: + warnings.append(f"⚠️ {description} in {file_path}") + + # Check 4: Exposed API endpoints without auth + if self._looks_like_api_route(file_path): + if not self._has_auth_check(content): + warnings.append( + f"⚠️ Possible unauthenticated API endpoint in {file_path}" + ) + + # Generate suggestions + suggestions = [] + if critical_issues: + suggestions.extend([ + "Remove hardcoded secrets immediately", + "Use environment variables or secret management (Infisical)", + "Never commit .env files - add to .gitignore" + ]) + + if warnings: + suggestions.extend([ + "Review security warnings carefully", + "Consider safer alternatives where possible" + ]) + + # Return result + if critical_issues: + return self._fail( + f"CRITICAL security issues detected ({len(critical_issues)} issues)", + details={ + "critical": critical_issues, + "warnings": warnings + }, + suggestions=suggestions + ) + + if warnings: + return self._warning( + f"Security warnings detected ({len(warnings)} warnings)", + details={"warnings": warnings}, + suggestions=suggestions + ) + + return self._pass("No security issues detected") + + def _looks_like_api_route(self, file_path: str) -> bool: + """Check if file looks like an API route""" + api_indicators = [ + "/api/", + "/routes/", + "/endpoints/", + ".route.", + ".api.", + ] + return any(indicator in file_path.lower() for indicator in api_indicators) + + def _has_auth_check(self, content: str) -> bool: + """Check if content has authentication checks""" + auth_patterns = [ + r'@auth', # Decorator + r'authenticate', + r'authorize', + r'requireAuth', + r'verifyToken', + r'checkAuth', + r'isAuthenticated', + ] + return any(re.search(pattern, content, re.IGNORECASE) for pattern in auth_patterns) diff --git a/superclaude/validators/test_runner.py b/superclaude/validators/test_runner.py new file mode 100644 index 0000000..3fd86f6 --- /dev/null +++ b/superclaude/validators/test_runner.py @@ -0,0 +1,155 @@ +"""Test Runner Validator + +Validates that: +- Unit tests exist for changes +- Tests pass before implementation is approved +- Test coverage is maintained +""" + +from typing import Dict, Any, List, Optional +import subprocess +from pathlib import Path + +from .base import Validator, ValidationResult + + +class TestRunnerValidator(Validator): + """Validates test execution""" + + def __init__(self): + super().__init__("Test Runner") + + def validate(self, context: Dict[str, Any]) -> ValidationResult: + """ + Validate tests. + + Context should contain: + - changes: File changes + - git_root: Repository root + - contract: Context Contract + - test_command: Optional custom test command + """ + changes = context.get("changes", {}) + git_root = context.get("git_root") + test_command = context.get("test_command") + + if not git_root: + return self._skip("No git root provided") + + # Detect test files in changes + test_files = [ + path for path in changes.keys() + if self._is_test_file(path) + ] + + # If no tests and no test files changed, skip + if not test_files and not test_command: + return self._warning("No tests detected for changes") + + # Run tests + test_result = self._run_tests(git_root, test_command) + + if test_result["success"]: + return self._pass( + "Tests passed", + details={ + "test_files": test_files, + "output": test_result.get("output", "")[:500] # First 500 chars + } + ) + else: + return self._fail( + "Tests failed", + details={ + "test_files": test_files, + "output": test_result.get("output", "")[:1000], # First 1000 chars + "error": test_result.get("error", "")[:500] + }, + suggestions=[ + "Fix failing tests before proceeding", + "Review test output for specific failures" + ] + ) + + def _is_test_file(self, file_path: str) -> bool: + """Check if file is a test file""" + path = Path(file_path) + + # Common test file patterns + test_patterns = [ + "test_", # Python: test_*.py + "_test.", # Go: *_test.go + ".test.", # JS/TS: *.test.js, *.test.ts + ".spec.", # JS/TS: *.spec.js, *.spec.ts + "/tests/", # In tests directory + "/test/", # In test directory + "/__tests__/", # React convention + ] + + file_path_lower = file_path.lower() + return any(pattern in file_path_lower for pattern in test_patterns) + + def _run_tests(self, git_root: Path, test_command: Optional[str] = None) -> Dict[str, Any]: + """Run tests and return results""" + if test_command: + # Use custom test command + return self._execute_test_command(git_root, test_command) + + # Auto-detect test framework + if (git_root / "package.json").exists(): + return self._run_npm_tests(git_root) + elif (git_root / "pyproject.toml").exists(): + return self._run_python_tests(git_root) + else: + return { + "success": False, + "output": "", + "error": "Could not detect test framework" + } + + def _execute_test_command(self, git_root: Path, command: str) -> Dict[str, Any]: + """Execute custom test command""" + try: + result = subprocess.run( + command, + shell=True, + cwd=git_root, + capture_output=True, + text=True, + timeout=300, # 5 minutes max + check=False + ) + + return { + "success": result.returncode == 0, + "output": result.stdout, + "error": result.stderr + } + except subprocess.TimeoutExpired: + return { + "success": False, + "output": "", + "error": "Test execution timed out (5 minutes)" + } + except Exception as e: + return { + "success": False, + "output": "", + "error": f"Test execution failed: {str(e)}" + } + + def _run_npm_tests(self, git_root: Path) -> Dict[str, Any]: + """Run npm/pnpm tests""" + # Try pnpm first, fall back to npm + if (git_root / "pnpm-lock.yaml").exists(): + return self._execute_test_command(git_root, "pnpm test") + else: + return self._execute_test_command(git_root, "npm test") + + def _run_python_tests(self, git_root: Path) -> Dict[str, Any]: + """Run Python tests (pytest/unittest)""" + # Try UV first, fall back to pytest + if (git_root / "uv.lock").exists(): + return self._execute_test_command(git_root, "uv run pytest") + else: + return self._execute_test_command(git_root, "pytest")