""" Local Code Scanner ================== Scans a local directory — no GitHub API needed. Pipeline: 1. Walk the filesystem, respecting .gitignore rules and config ignore patterns 2. Flag files matching known sensitive patterns (always include these) 3. Send the file list to the AI for triage (pick the most security-critical ones) 4. Read each triaged file and send to AI for OWASP vulnerability analysis 5. Return structured list of vulnerability findings """ import asyncio import logging from dataclasses import dataclass, field from pathlib import Path from typing import Optional import pathspec from securelens.ai import call_ai, call_ai_json from securelens.ai.prompts import triage_prompt, analysis_prompt from securelens.config import CLIConfig logger = logging.getLogger(__name__) # ── File extension blocklist (binary / generated — no security signal) ──────── BINARY_EXTENSIONS = { ".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".ico", ".pdf", ".doc", ".docx", ".xls", ".xlsx", ".zip", ".tar", ".gz", ".rar", ".7z", ".whl", ".egg", ".jar", ".war", ".ear", ".mp4", ".mp3", ".avi", ".mov", ".ttf", ".woff", ".woff2", ".eot", ".pyc", ".pyo", ".class", ".so", ".dll", ".dylib", ".exe", ".db", ".sqlite", ".sqlite3", } # ── Files that are always included regardless of AI triage ─────────────────── ALWAYS_SCAN_PATTERNS = [ "*.env", ".env", ".env.*", "*.env.*", "config.py", "settings.py", "config.js", "config.ts", "secrets.py", "credentials.py", "keys.py", "Dockerfile", "docker-compose.yml", "docker-compose.yaml", "*.pem", "*.key", "*.p12", "*.pfx", "requirements.txt", "package.json", "Gemfile", ] SENSITIVE_NAME_KEYWORDS = [ "secret", "password", "passwd", "credential", "cred", "api_key", "apikey", "auth", "jwt", "token", "private", "priv_key", "access_key", ] @dataclass class VulnerabilityFinding: file_path: str severity: str # Critical | High | Medium | Low issue: str explanation: str suggested_fix: str line_number: Optional[int] = None @dataclass class LocalScanResult: target: str total_files_found: int files_triaged: list[str] = field(default_factory=list) vulnerabilities: list[VulnerabilityFinding] = field(default_factory=list) ai_summary: str = "" score: int = 100 grade: str = "A" def compute_score(self) -> None: """Deterministic score: deduct points by severity.""" weights = {"Critical": 20, "High": 12, "Medium": 5, "Low": 2} deduction = sum(weights.get(v.severity, 0) for v in self.vulnerabilities) self.score = max(100 - deduction, 0) self.grade = _score_to_grade(self.score) def _score_to_grade(score: int) -> str: if score >= 90: return "A" if score >= 80: return "B" if score >= 70: return "C" if score >= 60: return "D" return "F" # ── Phase 1: File Discovery ─────────────────────────────────────────────────── def discover_files(root: Path, cfg: CLIConfig) -> list[Path]: """ Walk the directory tree and return candidate files. Respects .gitignore in the root and cfg.ignore_patterns. Skips binaries and files larger than cfg.max_file_size_kb. """ import os # Build a combined spec from config ignore_patterns + .gitignore ignore_patterns = list(cfg.ignore_patterns) gitignore_path = root / ".gitignore" if gitignore_path.exists(): with open(gitignore_path) as f: ignore_patterns.extend( line.strip() for line in f if line.strip() and not line.startswith("#") ) spec = pathspec.PathSpec.from_lines("gitwildmatch", ignore_patterns) max_bytes = cfg.max_file_size_kb * 1024 # Hardcoded directory blacklist to prune execution paths immediately prune_dirs = { ".git", "node_modules", "venv", ".venv", "__pycache__", "dist", "build", ".next", ".cache", ".npm", ".cargo", ".rustup", ".local", ".ssh", ".gnupg", ".docker", ".vscode", ".idea", "Library", "Pictures", "Music", "Videos", "Documents" } candidates: list[Path] = [] for dirpath, dirnames, filenames in os.walk(root): # 1. Prune standard blacklisted folders in-place dirnames[:] = [d for d in dirnames if d not in prune_dirs] # 2. Prune directories matching the ignore spec active_dirs = [] for d in dirnames: rel_path = os.path.relpath(os.path.join(dirpath, d), root) if not spec.match_file(rel_path + "/"): active_dirs.append(d) dirnames[:] = active_dirs # 3. Process files in the active directory for f in filenames: p = Path(dirpath) / f rel = p.relative_to(root).as_posix() if spec.match_file(rel): continue if p.suffix.lower() in BINARY_EXTENSIONS: continue try: if p.stat().st_size > max_bytes: continue except OSError: continue candidates.append(p) # Capping safeguard: limit to 1000 candidate files if len(candidates) >= 1000: break if len(candidates) >= 1000: break return sorted(candidates) def _is_always_scan(path: Path) -> bool: """Returns True if this file should always be scanned regardless of triage.""" name = path.name.lower() # Check sensitive name keywords if any(kw in name for kw in SENSITIVE_NAME_KEYWORDS): return True # Check always-scan patterns for pattern in ALWAYS_SCAN_PATTERNS: if path.match(pattern): return True return False # ── Phase 2: AI Triage ─────────────────────────────────────────────────────── async def triage_files( candidates: list[Path], root: Path, cfg: CLIConfig, ) -> list[Path]: """ Ask the AI to pick the most security-relevant files. Always-scan files are added automatically regardless of AI choice. """ # Separate forced files from candidates forced = [p for p in candidates if _is_always_scan(p)] non_forced = [p for p in candidates if not _is_always_scan(p)] # Build file list for AI (relative paths — cleaner prompt) rel_paths = [p.relative_to(root).as_posix() for p in non_forced] remaining_budget = max(0, cfg.max_files_to_scan - len(forced)) ai_selected: list[Path] = [] if rel_paths and remaining_budget > 0 and cfg.api_key: file_list_str = "\n".join(rel_paths[:300]) # cap to ~300 paths for token budget prompt = triage_prompt(file_list_str, remaining_budget) result = await call_ai_json(prompt, cfg.api_key, cfg.default_model, temperature=0.1, api_base=cfg.api_base) if result and "critical_files" in result: for rel in result["critical_files"]: abs_path = root / rel if abs_path.exists(): ai_selected.append(abs_path) # Merge: forced first, then AI-selected (deduplicated) seen = set() final: list[Path] = [] for p in forced + ai_selected: if p not in seen: seen.add(p) final.append(p) return final[:cfg.max_files_to_scan] # ── Phase 3: File Analysis ──────────────────────────────────────────────────── async def analyze_file( path: Path, root: Path, cfg: CLIConfig, ) -> list[VulnerabilityFinding]: """Send a single file's content to the AI for OWASP analysis.""" rel = path.relative_to(root).as_posix() try: content = path.read_text(errors="replace") except Exception as e: logger.warning(f"Could not read {rel}: {e}") return [] # Cap content to avoid token overflow if len(content) > 30_000: content = content[:30_000] + "\n... (truncated)" prompt = analysis_prompt(rel, content) result = await call_ai_json(prompt, cfg.api_key, cfg.default_model, temperature=0.2, api_base=cfg.api_base) if not result: return [] findings: list[VulnerabilityFinding] = [] for v in result.get("vulnerabilities", []): findings.append(VulnerabilityFinding( file_path=rel, severity=v.get("severity", "Medium"), issue=v.get("issue", "Unknown Issue"), explanation=v.get("explanation", ""), suggested_fix=v.get("suggested_fix", ""), line_number=v.get("line_number"), )) return findings async def analyze_files( triaged: list[Path], root: Path, cfg: CLIConfig, progress_callback=None, ) -> list[VulnerabilityFinding]: """ Analyze all triaged files concurrently. Uses a semaphore to avoid hammering the API with too many simultaneous calls. """ semaphore = asyncio.Semaphore(4) all_findings: list[VulnerabilityFinding] = [] async def _analyze_with_sem(path: Path, idx: int) -> list[VulnerabilityFinding]: async with semaphore: result = await analyze_file(path, root, cfg) if progress_callback: await progress_callback(idx + 1, len(triaged), path.relative_to(root).as_posix()) return result tasks = [_analyze_with_sem(p, i) for i, p in enumerate(triaged)] results = await asyncio.gather(*tasks) for r in results: all_findings.extend(r) return all_findings