feat: add comprehensive validation framework

Add validators package with 6 specialized validators:
- base.py: Abstract base validator with common patterns
- context_contract.py: PM mode context validation
- dep_sanity.py: Dependency consistency checks
- runtime_policy.py: Runtime policy enforcement
- security_roughcheck.py: Security vulnerability scanning
- test_runner.py: Automated test execution validation

Supports validation gates for quality assurance and risk mitigation.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
kazuki
2025-10-20 03:52:40 +09:00
parent c5690e4a71
commit ca29e595fd
7 changed files with 868 additions and 0 deletions

View File

@@ -0,0 +1,27 @@
"""Validators for PM Mode
Enforce Context Contract rules before code execution:
- context_contract: Project-specific rules (Kong, Infisical, etc.)
- dep_sanity: Dependency existence and version sanity
- runtime_policy: Runtime (Node/Python) version validation
- test_runner: Test execution and validation
- security_roughcheck: Common security anti-patterns
"""
from .base import ValidationResult, Validator, ValidationStatus
from .context_contract import ContextContractValidator
from .dep_sanity import DependencySanityValidator
from .runtime_policy import RuntimePolicyValidator
from .test_runner import TestRunnerValidator
from .security_roughcheck import SecurityRoughcheckValidator
__all__ = [
"ValidationResult",
"ValidationStatus",
"Validator",
"ContextContractValidator",
"DependencySanityValidator",
"RuntimePolicyValidator",
"TestRunnerValidator",
"SecurityRoughcheckValidator",
]

View File

@@ -0,0 +1,145 @@
"""Base validator classes and utilities"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from enum import Enum
class ValidationStatus(Enum):
"""Validation result status"""
PASSED = "passed"
FAILED = "failed"
WARNING = "warning"
SKIPPED = "skipped"
@dataclass
class ValidationResult:
"""Result of a validation check"""
status: ValidationStatus
validator_name: str
message: str
details: Optional[Dict[str, Any]] = None
suggestions: Optional[List[str]] = None
@property
def passed(self) -> bool:
"""Check if validation passed"""
return self.status == ValidationStatus.PASSED
@property
def failed(self) -> bool:
"""Check if validation failed"""
return self.status == ValidationStatus.FAILED
def __str__(self) -> str:
"""String representation"""
symbol = {
ValidationStatus.PASSED: "",
ValidationStatus.FAILED: "",
ValidationStatus.WARNING: "⚠️",
ValidationStatus.SKIPPED: "⏭️"
}[self.status]
lines = [f"{symbol} {self.validator_name}: {self.message}"]
if self.details:
lines.append(f" Details: {self.details}")
if self.suggestions:
lines.append(" Suggestions:")
for suggestion in self.suggestions:
lines.append(f" - {suggestion}")
return "\n".join(lines)
class Validator(ABC):
"""Base validator class"""
def __init__(self, name: str):
self.name = name
@abstractmethod
def validate(self, context: Dict[str, Any]) -> ValidationResult:
"""
Validate against context.
Args:
context: Validation context (changes, contract, etc.)
Returns:
ValidationResult
"""
pass
def _pass(self, message: str, **kwargs) -> ValidationResult:
"""Create a PASSED result"""
return ValidationResult(
status=ValidationStatus.PASSED,
validator_name=self.name,
message=message,
**kwargs
)
def _fail(self, message: str, **kwargs) -> ValidationResult:
"""Create a FAILED result"""
return ValidationResult(
status=ValidationStatus.FAILED,
validator_name=self.name,
message=message,
**kwargs
)
def _warning(self, message: str, **kwargs) -> ValidationResult:
"""Create a WARNING result"""
return ValidationResult(
status=ValidationStatus.WARNING,
validator_name=self.name,
message=message,
**kwargs
)
def _skip(self, message: str, **kwargs) -> ValidationResult:
"""Create a SKIPPED result"""
return ValidationResult(
status=ValidationStatus.SKIPPED,
validator_name=self.name,
message=message,
**kwargs
)
class ValidatorChain:
"""Chain of validators that runs in sequence"""
def __init__(self, validators: List[Validator]):
self.validators = validators
def validate(self, context: Dict[str, Any]) -> List[ValidationResult]:
"""Run all validators"""
results = []
for validator in self.validators:
result = validator.validate(context)
results.append(result)
return results
def validate_with_early_stop(self, context: Dict[str, Any]) -> List[ValidationResult]:
"""Run validators until first failure"""
results = []
for validator in self.validators:
result = validator.validate(context)
results.append(result)
# Stop on first failure
if result.failed:
break
return results
@property
def all_passed(self) -> bool:
"""Check if all validations passed"""
return all(not r.failed for r in self.validate({}))

View File

@@ -0,0 +1,94 @@
"""Context Contract Validator
Enforces project-specific rules from Context Contract:
- Kong/Traefik routing requirements
- Infisical usage requirements
- .env file prohibitions
- Security policies
"""
from typing import Dict, Any, List
import re
from pathlib import Path
from .base import Validator, ValidationResult
class ContextContractValidator(Validator):
"""Validates against Context Contract rules"""
def __init__(self):
super().__init__("Context Contract")
def validate(self, context: Dict[str, Any]) -> ValidationResult:
"""
Validate changes against Context Contract.
Context should contain:
- contract: Context Contract data
- changes: Dict of file changes (path -> content)
"""
contract = context.get("contract")
if not contract:
return self._skip("No Context Contract available")
changes = context.get("changes", {})
if not changes:
return self._pass("No changes to validate")
violations = []
suggestions = []
# Check principles
principles = contract.get("principles", {})
# Check 1: .env file creation prohibition
if principles.get("no_env_files"):
for file_path in changes.keys():
if ".env" in Path(file_path).name:
violations.append(f"❌ .env file creation prohibited: {file_path}")
suggestions.append("Use Infisical for secret management")
# Check 2: Hardcoded secrets
if principles.get("use_infisical_only"):
secret_patterns = [
r'INFISICAL_TOKEN\s*=\s*[\'"]st\.', # Infisical token
r'SUPABASE_SERVICE_ROLE_KEY\s*=\s*[\'"]eyJ', # Supabase JWT
r'OPENAI_API_KEY\s*=\s*[\'"]sk-', # OpenAI key
r'DATABASE_URL\s*=\s*[\'"]postgres.*password', # DB password
]
for file_path, content in changes.items():
for pattern in secret_patterns:
if re.search(pattern, content):
violations.append(f"❌ Hardcoded secret detected in {file_path}")
suggestions.append("Use Infisical or environment variables")
break
# Check 3: Outbound traffic routing
outbound_proxy = principles.get("outbound_through")
if outbound_proxy == "kong":
# Check if Kong routing is mentioned in docker-compose changes
for file_path, content in changes.items():
if "docker-compose" in file_path:
if "external" in content and "kong" not in content.lower():
violations.append(f"❌ External service without Kong routing in {file_path}")
suggestions.append("All external services must route through Kong")
elif outbound_proxy == "traefik":
# Check if Traefik labels are present
for file_path, content in changes.items():
if "docker-compose" in file_path:
if "external" in content and "traefik.enable" not in content:
violations.append(f"❌ External service without Traefik labels in {file_path}")
suggestions.append("All external services must use Traefik labels")
# Return result
if violations:
return self._fail(
f"Context Contract violations detected ({len(violations)} issues)",
details={"violations": violations},
suggestions=suggestions
)
return self._pass("All Context Contract rules satisfied")

View File

@@ -0,0 +1,151 @@
"""Dependency Sanity Validator
Validates that:
- Proposed packages exist on registries (npm, PyPI)
- Versions are compatible with lockfiles
- No conflicting dependencies
"""
from typing import Dict, Any, List, Optional
import subprocess
import json
import re
from .base import Validator, ValidationResult
class DependencySanityValidator(Validator):
"""Validates dependency sanity"""
def __init__(self):
super().__init__("Dependency Sanity")
def validate(self, context: Dict[str, Any]) -> ValidationResult:
"""
Validate dependency changes.
Context should contain:
- changes: Dict of file changes
- contract: Context Contract (for runtime info)
"""
changes = context.get("changes", {})
contract = context.get("contract", {})
issues = []
warnings = []
# Check package.json changes
for file_path, content in changes.items():
if "package.json" in file_path:
result = self._validate_npm_deps(content, contract)
issues.extend(result.get("issues", []))
warnings.extend(result.get("warnings", []))
elif "pyproject.toml" in file_path or "requirements.txt" in file_path:
result = self._validate_python_deps(content, contract)
issues.extend(result.get("issues", []))
warnings.extend(result.get("warnings", []))
# Return result
if issues:
return self._fail(
f"Dependency issues detected ({len(issues)} issues)",
details={"issues": issues, "warnings": warnings}
)
if warnings:
return self._warning(
f"Dependency warnings ({len(warnings)} warnings)",
details={"warnings": warnings}
)
return self._pass("All dependencies validated")
def _validate_npm_deps(self, package_json_content: str, contract: Dict[str, Any]) -> Dict[str, List[str]]:
"""Validate npm dependencies"""
issues = []
warnings = []
try:
# Parse package.json
data = json.loads(package_json_content)
dependencies = {**data.get("dependencies", {}), **data.get("devDependencies", {})}
# Check if packages exist (basic validation)
for pkg_name, version in dependencies.items():
# Check for common typos
if pkg_name.startswith("@"):
# Scoped package
if not re.match(r"^@[\w-]+/[\w-]+$", pkg_name):
issues.append(f"Invalid scoped package name: {pkg_name}")
else:
# Regular package
if not re.match(r"^[\w-]+$", pkg_name):
issues.append(f"Invalid package name: {pkg_name}")
# Check version format
if version and not re.match(r"^[\^~]?\d+\.\d+\.\d+|latest|workspace:\*", version):
warnings.append(f"Unusual version format for {pkg_name}: {version}")
except json.JSONDecodeError:
issues.append("Invalid package.json format")
return {"issues": issues, "warnings": warnings}
def _validate_python_deps(self, content: str, contract: Dict[str, Any]) -> Dict[str, List[str]]:
"""Validate Python dependencies"""
issues = []
warnings = []
# Extract package names from requirements.txt or pyproject.toml
if "[tool.poetry.dependencies]" in content or "[project.dependencies]" in content:
# pyproject.toml format - basic validation
# More sophisticated parsing would use tomli/tomlkit
pass
else:
# requirements.txt format
for line in content.split("\n"):
line = line.strip()
if not line or line.startswith("#"):
continue
# Check for common issues
if "==" in line:
pkg_spec = line.split("==")
if len(pkg_spec) != 2:
issues.append(f"Invalid requirement format: {line}")
else:
pkg_name, version = pkg_spec
# Basic package name validation
if not re.match(r"^[a-zA-Z0-9_-]+$", pkg_name):
issues.append(f"Invalid package name: {pkg_name}")
return {"issues": issues, "warnings": warnings}
def check_npm_package_exists(self, package_name: str) -> bool:
"""Check if npm package exists on registry"""
try:
result = subprocess.run(
["npm", "view", package_name, "version"],
capture_output=True,
text=True,
timeout=5,
check=False
)
return result.returncode == 0
except Exception:
return False
def check_pypi_package_exists(self, package_name: str) -> bool:
"""Check if PyPI package exists"""
try:
result = subprocess.run(
["pip", "index", "versions", package_name],
capture_output=True,
text=True,
timeout=5,
check=False
)
return result.returncode == 0
except Exception:
return False

View File

@@ -0,0 +1,152 @@
"""Runtime Policy Validator
Validates runtime requirements:
- Node.js version (LTS, latest, project-specified)
- Python version (LTS, latest, project-specified)
- Consistency with lockfiles
"""
from typing import Dict, Any, List, Optional
import subprocess
import json
import re
from .base import Validator, ValidationResult
class RuntimePolicyValidator(Validator):
"""Validates runtime policies"""
def __init__(self):
super().__init__("Runtime Policy")
def validate(self, context: Dict[str, Any]) -> ValidationResult:
"""
Validate runtime requirements.
Context should contain:
- contract: Context Contract (for runtime info)
- changes: File changes (to detect version changes)
"""
contract = context.get("contract", {})
changes = context.get("changes", {})
runtime = contract.get("runtime", {})
if not runtime:
return self._skip("No runtime requirements specified")
issues = []
warnings = []
# Validate Node.js runtime
if "node" in runtime:
node_result = self._validate_node_runtime(runtime["node"], changes)
issues.extend(node_result.get("issues", []))
warnings.extend(node_result.get("warnings", []))
# Validate Python runtime
if "python" in runtime:
python_result = self._validate_python_runtime(runtime["python"], changes)
issues.extend(python_result.get("issues", []))
warnings.extend(python_result.get("warnings", []))
# Return result
if issues:
return self._fail(
f"Runtime policy violations ({len(issues)} issues)",
details={"issues": issues, "warnings": warnings}
)
if warnings:
return self._warning(
f"Runtime policy warnings ({len(warnings)} warnings)",
details={"warnings": warnings}
)
return self._pass("Runtime requirements satisfied")
def _validate_node_runtime(self, node_config: Dict[str, Any], changes: Dict[str, str]) -> Dict[str, List[str]]:
"""Validate Node.js runtime"""
issues = []
warnings = []
manager = node_config.get("manager", "npm")
source = node_config.get("source", "package-json-defined")
# Check if package.json specifies engines
for file_path, content in changes.items():
if "package.json" in file_path:
try:
data = json.loads(content)
engines = data.get("engines", {})
node_version = engines.get("node")
if not node_version and source == "package-json-defined":
warnings.append("No Node.js version specified in package.json engines")
if manager == "pnpm" and "pnpm" not in engines:
warnings.append("Using pnpm but no pnpm version in engines")
except json.JSONDecodeError:
issues.append("Invalid package.json format")
return {"issues": issues, "warnings": warnings}
def _validate_python_runtime(self, python_config: Dict[str, Any], changes: Dict[str, str]) -> Dict[str, List[str]]:
"""Validate Python runtime"""
issues = []
warnings = []
manager = python_config.get("manager", "pip")
source = python_config.get("source", "pyproject-defined")
# Check if pyproject.toml specifies python version
for file_path, content in changes.items():
if "pyproject.toml" in file_path:
# Basic check for python version requirement
if "requires-python" in content:
# Extract version requirement
match = re.search(r'requires-python\s*=\s*[\'"]([^"\']+)[\'"]', content)
if match:
version_spec = match.group(1)
# Validate version format
if not re.match(r'^[><=~^!]+\d+\.\d+', version_spec):
warnings.append(f"Unusual Python version format: {version_spec}")
else:
warnings.append("Could not parse requires-python version")
elif source == "pyproject-defined":
warnings.append("No requires-python specified in pyproject.toml")
return {"issues": issues, "warnings": warnings}
def get_current_node_version(self) -> Optional[str]:
"""Get current Node.js version"""
try:
result = subprocess.run(
["node", "--version"],
capture_output=True,
text=True,
timeout=2,
check=False
)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
return None
def get_current_python_version(self) -> Optional[str]:
"""Get current Python version"""
try:
result = subprocess.run(
["python", "--version"],
capture_output=True,
text=True,
timeout=2,
check=False
)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
return None

View File

@@ -0,0 +1,144 @@
"""Security Roughcheck Validator
Detects common security anti-patterns:
- Hardcoded secrets (API keys, tokens, passwords)
- .env file creation in commits
- Exposed credentials in code
- Unsafe practices
"""
from typing import Dict, Any, List
import re
from pathlib import Path
from .base import Validator, ValidationResult
class SecurityRoughcheckValidator(Validator):
"""Validates against common security issues"""
# Secret detection patterns
SECRET_PATTERNS = [
(r'sk_live_[a-zA-Z0-9]{24,}', 'Stripe live secret key'),
(r'pk_live_[a-zA-Z0-9]{24,}', 'Stripe live publishable key'),
(r'sk_test_[a-zA-Z0-9]{24,}', 'Stripe test secret key'),
(r'SUPABASE_SERVICE_ROLE_KEY\s*=\s*[\'"]eyJ', 'Supabase service role key'),
(r'SUPABASE_ANON_KEY\s*=\s*[\'"]eyJ', 'Supabase anon key'),
(r'OPENAI_API_KEY\s*=\s*[\'"]sk-', 'OpenAI API key'),
(r'TWILIO_AUTH_TOKEN\s*=\s*[\'"][a-f0-9]{32}', 'Twilio auth token'),
(r'INFISICAL_TOKEN\s*=\s*[\'"]st\.', 'Infisical token'),
(r'DATABASE_URL\s*=\s*[\'"]postgres.*password', 'Database password in URL'),
(r'AWS_SECRET_ACCESS_KEY\s*=\s*[\'"][\w/+=]{40}', 'AWS secret access key'),
(r'GITHUB_TOKEN\s*=\s*[\'"]gh[ps]_[a-zA-Z0-9]{36}', 'GitHub token'),
]
# Unsafe patterns
UNSAFE_PATTERNS = [
(r'eval\s*\(', 'Use of eval() function'),
(r'exec\s*\(', 'Use of exec() function'),
(r'__import__\s*\(', 'Dynamic import with __import__'),
(r'shell=True', 'Shell command execution'),
(r'pickle\.loads?\s*\(', 'Unsafe pickle deserialization'),
]
def __init__(self):
super().__init__("Security Roughcheck")
def validate(self, context: Dict[str, Any]) -> ValidationResult:
"""
Validate security.
Context should contain:
- changes: File changes
"""
changes = context.get("changes", {})
if not changes:
return self._pass("No changes to validate")
critical_issues = []
warnings = []
for file_path, content in changes.items():
# Check 1: .env file creation
if ".env" in Path(file_path).name:
critical_issues.append(f"❌ CRITICAL: .env file detected: {file_path}")
# Check 2: Hardcoded secrets
for pattern, description in self.SECRET_PATTERNS:
matches = re.findall(pattern, content)
if matches:
critical_issues.append(
f"❌ CRITICAL: {description} detected in {file_path}"
)
# Check 3: Unsafe patterns
for pattern, description in self.UNSAFE_PATTERNS:
matches = re.findall(pattern, content)
if matches:
warnings.append(f"⚠️ {description} in {file_path}")
# Check 4: Exposed API endpoints without auth
if self._looks_like_api_route(file_path):
if not self._has_auth_check(content):
warnings.append(
f"⚠️ Possible unauthenticated API endpoint in {file_path}"
)
# Generate suggestions
suggestions = []
if critical_issues:
suggestions.extend([
"Remove hardcoded secrets immediately",
"Use environment variables or secret management (Infisical)",
"Never commit .env files - add to .gitignore"
])
if warnings:
suggestions.extend([
"Review security warnings carefully",
"Consider safer alternatives where possible"
])
# Return result
if critical_issues:
return self._fail(
f"CRITICAL security issues detected ({len(critical_issues)} issues)",
details={
"critical": critical_issues,
"warnings": warnings
},
suggestions=suggestions
)
if warnings:
return self._warning(
f"Security warnings detected ({len(warnings)} warnings)",
details={"warnings": warnings},
suggestions=suggestions
)
return self._pass("No security issues detected")
def _looks_like_api_route(self, file_path: str) -> bool:
"""Check if file looks like an API route"""
api_indicators = [
"/api/",
"/routes/",
"/endpoints/",
".route.",
".api.",
]
return any(indicator in file_path.lower() for indicator in api_indicators)
def _has_auth_check(self, content: str) -> bool:
"""Check if content has authentication checks"""
auth_patterns = [
r'@auth', # Decorator
r'authenticate',
r'authorize',
r'requireAuth',
r'verifyToken',
r'checkAuth',
r'isAuthenticated',
]
return any(re.search(pattern, content, re.IGNORECASE) for pattern in auth_patterns)

View File

@@ -0,0 +1,155 @@
"""Test Runner Validator
Validates that:
- Unit tests exist for changes
- Tests pass before implementation is approved
- Test coverage is maintained
"""
from typing import Dict, Any, List, Optional
import subprocess
from pathlib import Path
from .base import Validator, ValidationResult
class TestRunnerValidator(Validator):
"""Validates test execution"""
def __init__(self):
super().__init__("Test Runner")
def validate(self, context: Dict[str, Any]) -> ValidationResult:
"""
Validate tests.
Context should contain:
- changes: File changes
- git_root: Repository root
- contract: Context Contract
- test_command: Optional custom test command
"""
changes = context.get("changes", {})
git_root = context.get("git_root")
test_command = context.get("test_command")
if not git_root:
return self._skip("No git root provided")
# Detect test files in changes
test_files = [
path for path in changes.keys()
if self._is_test_file(path)
]
# If no tests and no test files changed, skip
if not test_files and not test_command:
return self._warning("No tests detected for changes")
# Run tests
test_result = self._run_tests(git_root, test_command)
if test_result["success"]:
return self._pass(
"Tests passed",
details={
"test_files": test_files,
"output": test_result.get("output", "")[:500] # First 500 chars
}
)
else:
return self._fail(
"Tests failed",
details={
"test_files": test_files,
"output": test_result.get("output", "")[:1000], # First 1000 chars
"error": test_result.get("error", "")[:500]
},
suggestions=[
"Fix failing tests before proceeding",
"Review test output for specific failures"
]
)
def _is_test_file(self, file_path: str) -> bool:
"""Check if file is a test file"""
path = Path(file_path)
# Common test file patterns
test_patterns = [
"test_", # Python: test_*.py
"_test.", # Go: *_test.go
".test.", # JS/TS: *.test.js, *.test.ts
".spec.", # JS/TS: *.spec.js, *.spec.ts
"/tests/", # In tests directory
"/test/", # In test directory
"/__tests__/", # React convention
]
file_path_lower = file_path.lower()
return any(pattern in file_path_lower for pattern in test_patterns)
def _run_tests(self, git_root: Path, test_command: Optional[str] = None) -> Dict[str, Any]:
"""Run tests and return results"""
if test_command:
# Use custom test command
return self._execute_test_command(git_root, test_command)
# Auto-detect test framework
if (git_root / "package.json").exists():
return self._run_npm_tests(git_root)
elif (git_root / "pyproject.toml").exists():
return self._run_python_tests(git_root)
else:
return {
"success": False,
"output": "",
"error": "Could not detect test framework"
}
def _execute_test_command(self, git_root: Path, command: str) -> Dict[str, Any]:
"""Execute custom test command"""
try:
result = subprocess.run(
command,
shell=True,
cwd=git_root,
capture_output=True,
text=True,
timeout=300, # 5 minutes max
check=False
)
return {
"success": result.returncode == 0,
"output": result.stdout,
"error": result.stderr
}
except subprocess.TimeoutExpired:
return {
"success": False,
"output": "",
"error": "Test execution timed out (5 minutes)"
}
except Exception as e:
return {
"success": False,
"output": "",
"error": f"Test execution failed: {str(e)}"
}
def _run_npm_tests(self, git_root: Path) -> Dict[str, Any]:
"""Run npm/pnpm tests"""
# Try pnpm first, fall back to npm
if (git_root / "pnpm-lock.yaml").exists():
return self._execute_test_command(git_root, "pnpm test")
else:
return self._execute_test_command(git_root, "npm test")
def _run_python_tests(self, git_root: Path) -> Dict[str, Any]:
"""Run Python tests (pytest/unittest)"""
# Try UV first, fall back to pytest
if (git_root / "uv.lock").exists():
return self._execute_test_command(git_root, "uv run pytest")
else:
return self._execute_test_command(git_root, "pytest")