From 542f607f25dd1b1257b10b245e38444266b98492 Mon Sep 17 00:00:00 2001 From: rarebuffalo Date: Fri, 15 May 2026 12:54:58 +0530 Subject: [PATCH] added cli installer ] --- cli/README.md | 174 +++++++++++ cli/install.sh | 36 +++ cli/pyproject.toml | 29 ++ cli/securelens/__init__.py | 5 + cli/securelens/ai/__init__.py | 77 +++++ cli/securelens/ai/prompts.py | 76 +++++ cli/securelens/cli.py | 392 +++++++++++++++++++++++++ cli/securelens/config.py | 100 +++++++ cli/securelens/output/__init__.py | 208 +++++++++++++ cli/securelens/output/exporters.py | 148 ++++++++++ cli/securelens/repl.py | 218 ++++++++++++++ cli/securelens/scanners/__init__.py | 248 ++++++++++++++++ cli/securelens/scanners/web_scanner.py | 339 +++++++++++++++++++++ 13 files changed, 2050 insertions(+) create mode 100644 cli/README.md create mode 100644 cli/install.sh create mode 100644 cli/pyproject.toml create mode 100644 cli/securelens/__init__.py create mode 100644 cli/securelens/ai/__init__.py create mode 100644 cli/securelens/ai/prompts.py create mode 100644 cli/securelens/cli.py create mode 100644 cli/securelens/config.py create mode 100644 cli/securelens/output/__init__.py create mode 100644 cli/securelens/output/exporters.py create mode 100644 cli/securelens/repl.py create mode 100644 cli/securelens/scanners/__init__.py create mode 100644 cli/securelens/scanners/web_scanner.py diff --git a/cli/README.md b/cli/README.md new file mode 100644 index 0000000..ce197c6 --- /dev/null +++ b/cli/README.md @@ -0,0 +1,174 @@ +# SecureLens AI — CLI + +> Scan codebases and URLs for security vulnerabilities, right in your terminal. +> Powered by AI. Works like Gemini CLI. + +--- + +## Install + +```bash +# From the project root +chmod +x cli/install.sh +./cli/install.sh + +# Then activate the venv +source venv/bin/activate +``` + +Or manually: +```bash +pip install click rich litellm httpx pyyaml pathspec questionary +pip install -e cli/ --no-build-isolation +``` + +--- + +## Quick Start + +```bash +# 1. Set up your API key +securelens configure + +# 2. Scan your current project +securelens scan . + +# 3. Scan a URL +securelens web https://example.com +``` + +--- + +## Commands + +### `securelens configure` +Interactive setup wizard. Saves config to `~/.securelens/config.yaml`. +``` +securelens configure +``` + +### `securelens scan ` +Scan a local codebase. The AI triages files, analyzes them for OWASP vulnerabilities, +and gives you an executive summary. Then you drop into a Q&A chat. + +```bash +securelens scan . # scan current directory +securelens scan ./my-project # scan a specific folder +securelens scan . --output markdown # save report as .md file +securelens scan . --model gpt-4o # use a different AI model +securelens scan . --max-files 30 # analyze more files +securelens scan . --no-ai # pattern-based only (no AI, fast) +securelens scan . --ci --fail-on high # CI mode — exits with code 1 +``` + +### `securelens web ` +Scan a URL for HTTP security issues (HTTPS, headers, cookies, exposed paths, SSL). + +```bash +securelens web https://example.com +securelens web https://my-app.com --output markdown +securelens web https://api.example.com --no-ai # skip AI summary +``` + +### `securelens version` +Print version and config info. + +--- + +## Interactive REPL + +After every scan, you drop into an interactive Q&A session (like Gemini CLI): + +``` +💬 Ask a follow-up (or press Ctrl+C to exit) +Type /help for available commands + +> What's the most critical issue? +> How do I fix the SQL injection in auth.py? +> Show me all high severity issues +> /export markdown +> /files +> /model gpt-4o-mini +> /exit +``` + +### Slash Commands + +| Command | Description | +|---|---| +| `/help` | Show available commands | +| `/files` | List files that were analyzed | +| `/score` | Show the security score | +| `/export markdown` | Save report as Markdown | +| `/export json` | Save report as JSON | +| `/model ` | Switch AI model mid-session | +| `/clear` | Clear the terminal | +| `/exit` | Exit the REPL | + +--- + +## Config File + +`~/.securelens/config.yaml`: + +```yaml +default_model: gemini/gemini-2.0-flash +api_key: YOUR_API_KEY +output_format: terminal # terminal | json | markdown | all +max_files_to_scan: 20 +max_file_size_kb: 200 +scan_timeout: 10 +ignore_patterns: + - "*.lock" + - "node_modules/**" + - ".git/**" + - "venv/**" +``` + +### Environment Variable Overrides + +```bash +export SECURELENS_API_KEY=your-key +export SECURELENS_MODEL=gpt-4o-mini +``` + +--- + +## Supported AI Providers + +| Provider | Model string | +|---|---| +| Google Gemini (default) | `gemini/gemini-2.0-flash` | +| OpenAI | `gpt-4o-mini`, `gpt-4o` | +| Anthropic | `claude-3-5-haiku-20241022` | +| OpenRouter | `openrouter/google/gemini-flash` | +| Ollama (local, no key) | `ollama/llama3.1` | + +--- + +## CI/CD Usage + +```bash +# GitHub Actions — fail the build if any high or critical issues found +securelens scan . --ci --fail-on high + +# Pre-commit hook +# Add to .pre-commit-config.yaml: +# - id: securelens +# name: SecureLens Security Scan +# entry: securelens scan +# args: [".", "--ci", "--fail-on", "critical"] +# language: python +# pass_filenames: false +``` + +--- + +## Output Formats + +| Format | Flag | Description | +|---|---|---| +| Terminal (default) | `--output terminal` | Rich colored display | +| Markdown | `--output markdown` | Saves `securelens-report-{timestamp}.md` | +| JSON | `--output json` | Machine-readable, good for CI | +| All | `--output all` | Terminal display + saves markdown | diff --git a/cli/install.sh b/cli/install.sh new file mode 100644 index 0000000..0d0cf9c --- /dev/null +++ b/cli/install.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +# install.sh — Install SecureLens AI CLI into the project venv +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +BACKEND_ROOT="$(dirname "$SCRIPT_DIR")" + +echo "🔍 SecureLens AI CLI — Installer" +echo "=================================" + +# Detect venv +VENV_PIP="" +if [ -f "$BACKEND_ROOT/venv/bin/pip" ]; then + VENV_PIP="$BACKEND_ROOT/venv/bin/pip" + echo " Using backend venv: $BACKEND_ROOT/venv" +elif command -v pip3 &>/dev/null; then + VENV_PIP="pip3" + echo " Using system pip3" +else + VENV_PIP="pip" + echo " Using system pip" +fi + +echo "" +echo " Installing dependencies..." +$VENV_PIP install click rich litellm httpx pyyaml pathspec questionary --quiet + +echo " Installing securelens-ai CLI..." +$VENV_PIP install -e "$SCRIPT_DIR" --no-build-isolation --quiet + +echo "" +echo "✓ Done! Run: securelens --help" +echo "" +echo " Or if using venv directly:" +echo " source $BACKEND_ROOT/venv/bin/activate" +echo " securelens configure" diff --git a/cli/pyproject.toml b/cli/pyproject.toml new file mode 100644 index 0000000..e378cf8 --- /dev/null +++ b/cli/pyproject.toml @@ -0,0 +1,29 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.backends.legacy:build" + +[project] +name = "securelens-ai" +version = "2.0.0" +description = "AI-powered CLI security scanner for codebases and URLs" +readme = "README.md" +requires-python = ">=3.10" +license = { text = "MIT" } +keywords = ["security", "cli", "sast", "ai", "scanner", "owasp"] +dependencies = [ + "click>=8.1", + "rich>=13.7", + "litellm>=1.40", + "httpx>=0.27", + "pyyaml>=6.0", + "questionary>=2.0", + "pathspec>=0.12", + "asyncio-throttle>=1.0", +] + +[project.scripts] +securelens = "securelens.cli:main" + +[tool.setuptools.packages.find] +where = ["."] +include = ["securelens*"] diff --git a/cli/securelens/__init__.py b/cli/securelens/__init__.py new file mode 100644 index 0000000..5971016 --- /dev/null +++ b/cli/securelens/__init__.py @@ -0,0 +1,5 @@ +""" +SecureLens AI — CLI Package +""" +__version__ = "2.0.0" +__app_name__ = "SecureLens AI" diff --git a/cli/securelens/ai/__init__.py b/cli/securelens/ai/__init__.py new file mode 100644 index 0000000..e148511 --- /dev/null +++ b/cli/securelens/ai/__init__.py @@ -0,0 +1,77 @@ +""" +AI Client +========= +Thin, model-agnostic wrapper around LiteLLM. +The CLI uses this instead of directly calling litellm +so we have one place to handle retries, logging, and key injection. +""" + +import json +import asyncio +import logging +from typing import Optional + +logger = logging.getLogger(__name__) + + +async def call_ai( + prompt: str, + api_key: str, + model: str, + temperature: float = 0.3, + json_mode: bool = False, + conversation_history: Optional[list] = None, +) -> str: + """ + Single entry-point for all AI calls in the CLI. + + Parameters + ---------- + prompt : The prompt to send (added as last user message) + api_key : LiteLLM-compatible API key + model : LiteLLM model string (e.g. "gemini/gemini-2.0-flash") + temperature : Creativity (0=deterministic, 1=creative) + json_mode : Ask the model to respond with valid JSON only + conversation_history : Optional list of {"role": ..., "content": ...} dicts + for multi-turn chat sessions + """ + import litellm + + litellm.suppress_debug_info = True + + messages = list(conversation_history or []) + messages.append({"role": "user", "content": prompt}) + + kwargs: dict = { + "model": model, + "messages": messages, + "temperature": temperature, + "api_key": api_key if api_key else None, + } + + if json_mode: + kwargs["response_format"] = {"type": "json_object"} + + try: + response = await litellm.acompletion(**kwargs) + return response.choices[0].message.content or "" + except Exception as e: + logger.error(f"AI call failed [{model}]: {e}") + return "" + + +async def call_ai_json( + prompt: str, + api_key: str, + model: str, + temperature: float = 0.2, +) -> Optional[dict]: + """Convenience wrapper — calls AI in JSON mode and parses the result.""" + raw = await call_ai(prompt, api_key, model, temperature=temperature, json_mode=True) + if not raw: + return None + try: + return json.loads(raw) + except json.JSONDecodeError as e: + logger.error(f"JSON parse failed: {e}\nRaw: {raw[:300]}") + return None diff --git a/cli/securelens/ai/prompts.py b/cli/securelens/ai/prompts.py new file mode 100644 index 0000000..81523dd --- /dev/null +++ b/cli/securelens/ai/prompts.py @@ -0,0 +1,76 @@ +""" +All AI prompts for the CLI agent — kept in one place so they're easy to tune. +""" + + +def triage_prompt(file_list: str, max_files: int) -> str: + return ( + "You are a Senior Application Security Engineer. " + "I have a local codebase with the following files:\n" + f"{file_list}\n\n" + f"Select the {max_files} most critical files to review for security vulnerabilities. " + "Focus on: authentication logic, database access, API routes, config files, " + "secret/credential handling, input validation, and file upload handlers.\n" + "Also prioritise any file that contains the words: secret, password, token, key, " + "auth, login, admin, cred, jwt, session, crypto, hash.\n" + "Return a JSON object with a single key 'critical_files' containing the list of " + "exact file paths. Do not select more than " + f"{max_files} files." + ) + + +def analysis_prompt(file_path: str, content: str) -> str: + return ( + f"Review the following code from '{file_path}' for security vulnerabilities.\n" + "Focus on OWASP Top 10:\n" + " A01 Broken Access Control, A02 Cryptographic Failures, A03 Injection,\n" + " A04 Insecure Design, A05 Security Misconfiguration, A06 Vulnerable Components,\n" + " A07 Auth Failures, A08 Integrity Failures, A09 Logging Failures, A10 SSRF.\n" + "Also check for: hardcoded secrets/API keys, debug flags left on, insecure defaults.\n\n" + f"CODE:\n{content}\n\n" + "Return a JSON object with key 'vulnerabilities' — a list of objects, each with:\n" + " 'severity' : Critical | High | Medium | Low\n" + " 'issue' : Short title of the vulnerability\n" + " 'explanation' : 1-2 sentences explaining the risk\n" + " 'suggested_fix' : Concrete code snippet or clear instruction to fix it\n" + " 'line_number' : Integer line number, or null if not applicable\n" + "If no vulnerabilities are found, return {\"vulnerabilities\": []}." + ) + + +def summary_prompt(target: str, issues_json: str) -> str: + return ( + "You are a Senior AppSec Manager writing an executive security report.\n" + f"Target: {target}\n\n" + "Here are all vulnerabilities found in the automated scan:\n" + f"{issues_json}\n\n" + "Write a 2-3 paragraph executive summary of the overall security posture. " + "Highlight the most critical risks, explain what an attacker could do with them, " + "and recommend the top 3 immediate priorities. " + "Keep it professional, direct, and actionable — avoid generic fluff." + ) + + +def chat_prompt(target: str, scan_context: str, user_question: str) -> str: + return ( + "You are SecureLens AI, an expert cybersecurity assistant embedded in a CLI tool.\n" + f"The developer just scanned: {target}\n\n" + "Here are the full scan results:\n" + f"{scan_context}\n\n" + f"Developer's question: {user_question}\n\n" + "Answer clearly and practically. Reference specific findings from the scan when relevant. " + "If asked about a fix, show concrete code where possible." + ) + + +def web_summary_prompt(url: str, issues_json: str, score: int, grade: str) -> str: + return ( + "You are SecureLens AI, a web security expert.\n" + f"I just ran a security scan on: {url}\n" + f"Overall score: {score}/100 Grade: {grade}\n\n" + "Issues found:\n" + f"{issues_json}\n\n" + "Write a concise 2-paragraph summary: first explain what the key risks are and how " + "an attacker could exploit them; second, give the top 3 most impactful fixes. " + "Be direct — the reader is a developer, not a manager." + ) diff --git a/cli/securelens/cli.py b/cli/securelens/cli.py new file mode 100644 index 0000000..420708b --- /dev/null +++ b/cli/securelens/cli.py @@ -0,0 +1,392 @@ +""" +SecureLens AI — CLI Entry Point +================================ +All Click commands live here. + +Commands: + securelens configure Interactive setup wizard + securelens scan Scan a local codebase + securelens web Scan a URL + securelens version Print version info +""" + +import asyncio +import json +import sys +from pathlib import Path + +import click +from rich.console import Console +from rich.prompt import Prompt, Confirm + +console = Console() + + +# ── Helpers ──────────────────────────────────────────────────────────────────── + +def _run(coro): + """Run an async coroutine from a sync Click command.""" + return asyncio.run(coro) + + +def _require_config(cfg): + """Exit early with a friendly message if no API key is set.""" + if not cfg.api_key: + console.print( + "\n[bold yellow]⚠ No API key configured.[/bold yellow]\n" + " Run [bold cyan]securelens configure[/bold cyan] to set one up.\n" + " Or set the [dim]SECURELENS_API_KEY[/dim] environment variable.\n" + ) + sys.exit(1) + + +# ── Main group ───────────────────────────────────────────────────────────────── + +@click.group() +@click.version_option("2.0.0", prog_name="SecureLens AI") +def main(): + """ + \b + SecureLens AI — AI-powered security scanner + Scan codebases, URLs and get instant security reports. + """ + pass + + +# ── configure ───────────────────────────────────────────────────────────────── + +@main.command() +def configure(): + """Interactive setup wizard — saves config to ~/.securelens/config.yaml""" + from securelens.config import load_config, save_config, CONFIG_FILE + from securelens.output import print_banner + + print_banner() + console.print("[bold]Setup Wizard[/bold]\n") + + cfg = load_config() + + # Provider selection + providers = { + "1": ("gemini/gemini-2.0-flash", "Google Gemini 2.0 Flash [free tier available]"), + "2": ("gemini/gemini-1.5-pro", "Google Gemini 1.5 Pro"), + "3": ("gpt-4o-mini", "OpenAI GPT-4o Mini"), + "4": ("gpt-4o", "OpenAI GPT-4o"), + "5": ("claude-3-5-haiku-20241022","Anthropic Claude 3.5 Haiku"), + "6": ("ollama/llama3.1", "Ollama (local, no key needed)"), + "7": ("custom", "Custom model string"), + } + console.print("[bold]Choose AI Provider:[/bold]") + for k, (_, desc) in providers.items(): + console.print(f" [{k}] {desc}") + console.print() + + choice = Prompt.ask("Select", choices=list(providers.keys()), default="1") + model_str, _ = providers[choice] + + if model_str == "custom": + model_str = Prompt.ask("Enter LiteLLM model string (e.g. openrouter/google/gemini-flash)") + + cfg.default_model = model_str + + # API key (skip for Ollama) + if not model_str.startswith("ollama/"): + key = Prompt.ask("API Key", password=True, default=cfg.api_key or "") + cfg.api_key = key.strip() + + # Output format + console.print("\n[bold]Default output format:[/bold]") + console.print(" [1] terminal (rich display)") + console.print(" [2] markdown (save .md file)") + console.print(" [3] json (machine-readable)") + console.print(" [4] all (terminal + save markdown)") + fmt_choice = Prompt.ask("Select", choices=["1", "2", "3", "4"], default="1") + cfg.output_format = {"1": "terminal", "2": "markdown", "3": "json", "4": "all"}[fmt_choice] + + save_config(cfg) + console.print(f"\n[bold green]✓ Config saved to {CONFIG_FILE}[/bold green]") + console.print(f" Model: [cyan]{cfg.default_model}[/cyan]") + console.print(f" Output: [cyan]{cfg.output_format}[/cyan]\n") + + +# ── scan ────────────────────────────────────────────────────────────────────── + +@main.command() +@click.argument("path", default=".", type=click.Path(exists=True, file_okay=False, dir_okay=True)) +@click.option("--model", "-m", default=None, help="Override AI model (e.g. gpt-4o-mini)") +@click.option("--output", "-o", default=None, + type=click.Choice(["terminal", "json", "markdown", "all"]), + help="Output format (overrides config)") +@click.option("--max-files", default=None, type=int, help="Max files to analyze (default: 20)") +@click.option("--ci", is_flag=True, help="CI mode: no REPL, exits with code 1 if issues found") +@click.option("--fail-on", default=None, + type=click.Choice(["critical", "high", "medium", "low"]), + help="In --ci mode, exit 1 if issues of this severity or above are found") +@click.option("--no-ai", is_flag=True, help="Skip AI triage & summary (pattern-based only, faster)") +def scan(path, model, output, max_files, ci, fail_on, no_ai): + """ + Scan a local codebase for security vulnerabilities. + + \b + Examples: + securelens scan . + securelens scan ./my-project --output markdown + securelens scan . --model gpt-4o --max-files 30 + securelens scan . --ci --fail-on high + """ + _run(_scan_async(path, model, output, max_files, ci, fail_on, no_ai)) + + +async def _scan_async(path, model, output, max_files, ci, fail_on, no_ai): + from securelens.config import load_config + from securelens.output import print_banner, print_scan_header, print_code_scan_report, make_progress, print_error + from securelens.output.exporters import save_json, save_markdown, to_json + from securelens.scanners import ( + discover_files, triage_files, analyze_files, LocalScanResult + ) + from securelens.ai import call_ai + from securelens.ai.prompts import summary_prompt + from securelens.repl import run_repl, ReplContext + + cfg = load_config() + if model: + cfg.default_model = model + if output: + cfg.output_format = output + if max_files: + cfg.max_files_to_scan = max_files + + if not no_ai: + _require_config(cfg) + + root = Path(path).resolve() + + if not ci: + print_banner() + print_scan_header(str(root), cfg.default_model) + + # ── Phase 1: Discover ──────────────────────────────────────────────────── + with make_progress() as progress: + task_discover = progress.add_task( + "[1/4] Discovering files...", total=None, detail="" + ) + candidates = discover_files(root, cfg) + progress.update(task_discover, completed=100, total=100, + detail=f"{len(candidates)} files found") + + # ── Phase 2: Triage ────────────────────────────────────────────────── + task_triage = progress.add_task( + "[2/4] Triaging with AI...", total=None, detail="" + ) + if no_ai: + # In --no-ai mode just take the top N by sensitivity heuristic + from securelens.scanners import _is_always_scan + triaged = [p for p in candidates if _is_always_scan(p)][:cfg.max_files_to_scan] + else: + triaged = await triage_files(candidates, root, cfg) + progress.update(task_triage, completed=100, total=100, + detail=f"{len(triaged)} files selected") + + # ── Phase 3: Analyze ───────────────────────────────────────────────── + task_analyze = progress.add_task( + "[3/4] Analyzing security...", total=len(triaged), detail="" + ) + analyzed_count = 0 + + async def on_progress(done, total, filename): + nonlocal analyzed_count + analyzed_count = done + progress.update(task_analyze, completed=done, detail=filename) + + if no_ai or not cfg.api_key: + vulnerabilities = [] + else: + vulnerabilities = await analyze_files(triaged, root, cfg, on_progress) + progress.update(task_analyze, completed=len(triaged), + detail=f"{len(vulnerabilities)} issues found") + + # ── Phase 4: Summary ───────────────────────────────────────────────── + task_summary = progress.add_task( + "[4/4] Generating AI report...", total=None, detail="" + ) + ai_summary = "" + if not no_ai and cfg.api_key and vulnerabilities: + import json as _json + issues_data = [ + {"file": v.file_path, "severity": v.severity, + "issue": v.issue, "explanation": v.explanation} + for v in vulnerabilities + ] + prompt = summary_prompt(str(root), _json.dumps(issues_data, indent=2)) + ai_summary = await call_ai(prompt, cfg.api_key, cfg.default_model, temperature=0.4) + progress.update(task_summary, completed=100, total=100, detail="Done") + + # ── Build result ───────────────────────────────────────────────────────── + result = LocalScanResult( + target=str(root), + total_files_found=len(candidates), + files_triaged=[p.relative_to(root).as_posix() for p in triaged], + vulnerabilities=vulnerabilities, + ai_summary=ai_summary, + ) + result.compute_score() + + # ── Output ─────────────────────────────────────────────────────────────── + fmt = cfg.output_format + + if fmt in ("terminal", "all"): + print_code_scan_report(result) + if fmt in ("json",): + console.print(to_json(result, "code")) + if fmt in ("markdown", "all"): + path_out = save_markdown(result, "code") + if not ci: + console.print(f" [green]✓ Markdown report saved:[/green] [dim]{path_out}[/dim]\n") + if fmt == "json" and not ci: + path_out = save_json(result, "code") + console.print(f" [green]✓ JSON report saved:[/green] [dim]{path_out}[/dim]\n") + + # ── CI exit code ───────────────────────────────────────────────────────── + if ci: + _ci_exit(result.vulnerabilities, fail_on, "code") + return + + # ── Interactive REPL ───────────────────────────────────────────────────── + if fmt in ("terminal", "all", "markdown") and not no_ai: + ctx = ReplContext( + target=str(root), + scan_result=result, + target_type="code", + api_key=cfg.api_key, + model=cfg.default_model, + ) + await run_repl(ctx) + + +# ── web ─────────────────────────────────────────────────────────────────────── + +@main.command() +@click.argument("url") +@click.option("--model", "-m", default=None, help="Override AI model") +@click.option("--output", "-o", default=None, + type=click.Choice(["terminal", "json", "markdown", "all"])) +@click.option("--ci", is_flag=True, help="CI mode — no REPL") +@click.option("--fail-on", default=None, + type=click.Choice(["critical", "warning", "info"])) +@click.option("--no-ai", is_flag=True, help="Skip AI summary") +def web(url, model, output, ci, fail_on, no_ai): + """ + Scan a URL for web security issues. + + \b + Examples: + securelens web https://example.com + securelens web https://my-app.com --output markdown + """ + _run(_web_async(url, model, output, ci, fail_on, no_ai)) + + +async def _web_async(url, model, output, ci, fail_on, no_ai): + from securelens.config import load_config + from securelens.output import ( + print_banner, print_scan_header, print_web_scan_report, + make_progress, console + ) + from securelens.output.exporters import save_json, save_markdown, to_json + from securelens.scanners.web_scanner import scan_url + from securelens.ai import call_ai + from securelens.ai.prompts import web_summary_prompt + from securelens.repl import run_repl, ReplContext + import json as _json + + # Normalise URL + if not url.startswith(("http://", "https://")): + url = "https://" + url + + cfg = load_config() + if model: + cfg.default_model = model + if output: + cfg.output_format = output + + if not ci: + print_banner() + print_scan_header(url, cfg.default_model) + + with make_progress() as progress: + task = progress.add_task("[1/2] Running web security checks...", total=None, detail="") + result = await scan_url(url, timeout=cfg.scan_timeout) + progress.update(task, completed=100, total=100, + detail=f"{len(result.issues)} issues found") + + task2 = progress.add_task("[2/2] Generating AI summary...", total=None, detail="") + if not no_ai and cfg.api_key and result.issues: + issues_data = [ + {"layer": i.layer, "severity": i.severity, "issue": i.issue} + for i in result.issues + ] + prompt = web_summary_prompt(url, _json.dumps(issues_data, indent=2), + result.score, result.grade) + result.ai_summary = await call_ai(prompt, cfg.api_key, cfg.default_model, temperature=0.4) + progress.update(task2, completed=100, total=100, detail="Done") + + fmt = cfg.output_format + if fmt in ("terminal", "all"): + print_web_scan_report(result) + if fmt == "json": + console.print(to_json(result, "web")) + if fmt in ("markdown", "all"): + p = save_markdown(result, "web") + if not ci: + console.print(f" [green]✓ Markdown saved:[/green] [dim]{p}[/dim]\n") + + if ci: + _ci_exit(result.issues, fail_on, "web") + return + + if fmt in ("terminal", "all", "markdown") and not no_ai: + ctx = ReplContext( + target=url, + scan_result=result, + target_type="web", + api_key=cfg.api_key, + model=cfg.default_model, + ) + await run_repl(ctx) + + +# ── version ─────────────────────────────────────────────────────────────────── + +@main.command() +def version(): + """Print SecureLens AI version and config info.""" + from securelens.config import load_config, CONFIG_FILE + from securelens import __version__ + + cfg = load_config() + console.print(f"\n [bold cyan]SecureLens AI[/bold cyan] v{__version__}") + console.print(f" Model: [dim]{cfg.default_model}[/dim]") + console.print(f" Config: [dim]{CONFIG_FILE}[/dim]") + console.print(f" API Key: [dim]{'✓ set' if cfg.api_key else '✗ not set'}[/dim]\n") + + +# ── CI exit helper ───────────────────────────────────────────────────────────── + +def _ci_exit(issues, fail_on, scan_type: str): + """Exit with code 1 if issues meet or exceed the fail_on threshold.""" + severity_rank = {"critical": 4, "high": 3, "warning": 3, "medium": 2, "low": 1, "info": 0} + + if not fail_on: + # Default: fail on any critical + fail_on = "critical" + + threshold = severity_rank.get(fail_on, 4) + for issue in issues: + sev = getattr(issue, "severity", "").lower() + if severity_rank.get(sev, 0) >= threshold: + sys.exit(1) + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/cli/securelens/config.py b/cli/securelens/config.py new file mode 100644 index 0000000..43e917d --- /dev/null +++ b/cli/securelens/config.py @@ -0,0 +1,100 @@ +""" +Config Manager +============== +Reads and writes ~/.securelens/config.yaml. +Falls back to environment variables so the CLI works in CI/CD +without a config file. +""" + +import os +import yaml +from pathlib import Path +from dataclasses import dataclass, field + +CONFIG_DIR = Path.home() / ".securelens" +CONFIG_FILE = CONFIG_DIR / "config.yaml" + + +@dataclass +class CLIConfig: + # AI backend + default_model: str = "gemini/gemini-2.0-flash" + api_key: str = "" + + # Scan behaviour + output_format: str = "terminal" # terminal | json | markdown | all + max_files_to_scan: int = 20 + max_file_size_kb: int = 200 + scan_timeout: int = 10 # seconds — for web scans + + # File exclusions (gitignore-style globs) + ignore_patterns: list = field(default_factory=lambda: [ + "*.lock", + "node_modules/**", + ".git/**", + "venv/**", + ".venv/**", + "__pycache__/**", + "*.pyc", + "dist/**", + "build/**", + ".next/**", + "*.min.js", + "*.min.css", + "*.map", + ]) + + +def load_config() -> CLIConfig: + """ + Load config from ~/.securelens/config.yaml, + then overlay any env-var overrides. + """ + cfg = CLIConfig() + + if CONFIG_FILE.exists(): + with open(CONFIG_FILE) as f: + data = yaml.safe_load(f) or {} + cfg.default_model = data.get("default_model", cfg.default_model) + cfg.api_key = data.get("api_key", cfg.api_key) + cfg.output_format = data.get("output_format", cfg.output_format) + cfg.max_files_to_scan = data.get("max_files_to_scan", cfg.max_files_to_scan) + cfg.max_file_size_kb = data.get("max_file_size_kb", cfg.max_file_size_kb) + cfg.scan_timeout = data.get("scan_timeout", cfg.scan_timeout) + cfg.ignore_patterns = data.get("ignore_patterns", cfg.ignore_patterns) + + # Env-var overrides (for CI/CD) + cfg.api_key = ( + os.environ.get("SECURELENS_API_KEY") + or os.environ.get("AI_API_KEY") + or os.environ.get("GEMINI_API_KEY") + or os.environ.get("OPENAI_API_KEY") + or cfg.api_key + ) + cfg.default_model = ( + os.environ.get("SECURELENS_MODEL") + or os.environ.get("AI_MODEL") + or cfg.default_model + ) + + return cfg + + +def save_config(cfg: CLIConfig) -> None: + """Persist the config object to ~/.securelens/config.yaml.""" + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + data = { + "default_model": cfg.default_model, + "api_key": cfg.api_key, + "output_format": cfg.output_format, + "max_files_to_scan": cfg.max_files_to_scan, + "max_file_size_kb": cfg.max_file_size_kb, + "scan_timeout": cfg.scan_timeout, + "ignore_patterns": cfg.ignore_patterns, + } + with open(CONFIG_FILE, "w") as f: + yaml.safe_dump(data, f, default_flow_style=False) + + +def config_exists() -> bool: + return CONFIG_FILE.exists() and bool(load_config().api_key) diff --git a/cli/securelens/output/__init__.py b/cli/securelens/output/__init__.py new file mode 100644 index 0000000..61d9624 --- /dev/null +++ b/cli/securelens/output/__init__.py @@ -0,0 +1,208 @@ +""" +Terminal Renderer +================= +All Rich-based output for the CLI — banners, progress, tables, panels. +""" + +import json +from datetime import datetime +from typing import Optional + +from rich.console import Console +from rich.panel import Panel +from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn +from rich.table import Table +from rich.text import Text +from rich import box +from rich.columns import Columns +from rich.rule import Rule +from rich.syntax import Syntax +from rich.live import Live +from rich.padding import Padding + +console = Console() + +# ── Severity colours ────────────────────────────────────────────────────────── +SEVERITY_COLOR = { + "Critical": "bold red", + "High": "bold orange1", + "Warning": "bold yellow", + "Medium": "bold yellow", + "Info": "bold blue", + "Low": "bold cyan", +} + +GRADE_COLOR = { + "A": "bold green", + "B": "bold cyan", + "C": "bold yellow", + "D": "bold orange1", + "F": "bold red", +} + + +def print_banner() -> None: + banner = Text() + banner.append("\n") + banner.append(" ███████╗███████╗ ██████╗██╗ ██╗██████╗ ███████╗██╗ ███████╗███╗ ██╗███████╗\n", style="bold cyan") + banner.append(" ██╔════╝██╔════╝██╔════╝██║ ██║██╔══██╗██╔════╝██║ ██╔════╝████╗ ██║██╔════╝\n", style="bold cyan") + banner.append(" ███████╗█████╗ ██║ ██║ ██║██████╔╝█████╗ ██║ █████╗ ██╔██╗ ██║███████╗\n", style="bold blue") + banner.append(" ╚════██║██╔══╝ ██║ ██║ ██║██╔══██╗██╔══╝ ██║ ██╔══╝ ██║╚██╗██║╚════██║\n", style="bold blue") + banner.append(" ███████║███████╗╚██████╗╚██████╔╝██║ ██║███████╗███████╗███████╗██║ ╚████║███████║\n", style="bold magenta") + banner.append(" ╚══════╝╚══════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚══════╝╚══════╝╚══════╝╚═╝ ╚═══╝╚══════╝\n", style="bold magenta") + banner.append(" AI Security Agent v2.0.0\n", style="dim") + console.print(banner) + + +def print_scan_header(target: str, model: str) -> None: + console.print(f" [bold]🔍 Target:[/bold] [cyan]{target}[/cyan]") + console.print(f" [bold]🧠 Model:[/bold] [dim]{model}[/dim]") + console.print() + + +def make_progress() -> Progress: + return Progress( + SpinnerColumn(), + TextColumn("[bold blue]{task.description}"), + BarColumn(bar_width=30), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TextColumn("• [dim]{task.fields[detail]}[/dim]"), + TimeElapsedColumn(), + console=console, + transient=False, + ) + + +def print_code_scan_report(result) -> None: + """Render a full local code scan report.""" + console.print() + console.rule("[bold white] SECURITY REPORT [/bold white]", style="bright_black") + console.print() + + # Score panel + grade_color = GRADE_COLOR.get(result.grade, "white") + score_text = Text() + score_text.append(f" {result.score}/100", style=f"bold {grade_color}") + score_text.append(" Grade: ", style="dim") + score_text.append(result.grade, style=grade_color) + score_text.append(f" • {len(result.vulnerabilities)} issue(s) found", style="dim") + score_text.append(f" • {len(result.files_triaged)} file(s) scanned", style="dim") + console.print(Panel(score_text, title="[bold]Overall Score[/bold]", border_style="bright_black")) + console.print() + + if not result.vulnerabilities: + console.print(" [bold green]✓ No vulnerabilities found![/bold green]") + console.print() + else: + _print_vulnerability_table(result.vulnerabilities) + + # AI Summary + if result.ai_summary: + console.print(Panel( + result.ai_summary, + title="[bold cyan]🤖 AI Security Summary[/bold cyan]", + border_style="cyan", + padding=(1, 2), + )) + console.print() + + +def print_web_scan_report(result) -> None: + """Render a full web scan report.""" + console.print() + console.rule("[bold white] WEB SECURITY REPORT [/bold white]", style="bright_black") + console.print() + + if not result.reachable: + console.print(" [bold red]✗ Could not reach the target URL[/bold red]") + return + + grade_color = GRADE_COLOR.get(result.grade, "white") + score_text = Text() + score_text.append(f" {result.score}/100", style=f"bold {grade_color}") + score_text.append(" Grade: ", style="dim") + score_text.append(result.grade, style=grade_color) + if result.ssl_expiry_days is not None: + score_text.append(f" • SSL expires in {result.ssl_expiry_days} days", style="dim") + console.print(Panel(score_text, title="[bold]Overall Score[/bold]", border_style="bright_black")) + console.print() + + if result.exposed_paths: + console.print(f" [bold red]⚠ Exposed sensitive paths:[/bold red] {', '.join(result.exposed_paths)}") + console.print() + + if not result.issues: + console.print(" [bold green]✓ No issues found![/bold green]") + else: + _print_web_issue_table(result.issues) + + if result.ai_summary: + console.print(Panel( + result.ai_summary, + title="[bold cyan]🤖 AI Security Summary[/bold cyan]", + border_style="cyan", + padding=(1, 2), + )) + console.print() + + +def _print_vulnerability_table(vulns) -> None: + """Render grouped vulnerability table by severity.""" + severity_order = ["Critical", "High", "Medium", "Low"] + grouped: dict = {s: [] for s in severity_order} + for v in vulns: + sev = v.severity if v.severity in grouped else "Low" + grouped[sev].append(v) + + for sev in severity_order: + items = grouped[sev] + if not items: + continue + color = SEVERITY_COLOR.get(sev, "white") + console.print(f" [{color}]▶ {sev.upper()} ({len(items)})[/{color}]") + for v in items: + loc = f"[dim]{v.file_path}" + if v.line_number: + loc += f":{v.line_number}" + loc += "[/dim]" + console.print(f" [bold]{v.issue}[/bold] {loc}") + console.print(f" [dim]{v.explanation}[/dim]") + console.print(f" [green]Fix:[/green] [dim]{v.suggested_fix}[/dim]") + console.print() + + +def _print_web_issue_table(issues) -> None: + """Render web scan issues grouped by layer.""" + layers: dict = {} + for issue in issues: + layers.setdefault(issue.layer, []).append(issue) + + for layer, items in layers.items(): + console.print(f" [bold bright_black]── {layer} ──[/bold bright_black]") + for item in items: + color = SEVERITY_COLOR.get(item.severity, "white") + console.print(f" [{color}]●[/{color}] [bold]{item.issue}[/bold]") + console.print(f" [green]Fix:[/green] [dim]{item.fix}[/dim]") + console.print() + + +def print_repl_prompt() -> None: + console.print("\n[bold cyan]💬 Ask a follow-up[/bold cyan] [dim](or press Ctrl+C to exit)[/dim]") + + +def print_ai_response(text: str) -> None: + console.print() + console.print(Panel(text, border_style="dim", padding=(0, 1))) + console.print() + + +def print_error(msg: str) -> None: + console.print(f"\n [bold red]✗ {msg}[/bold red]\n") + + +def print_success(msg: str) -> None: + console.print(f"\n [bold green]✓ {msg}[/bold green]\n") + + +def print_info(msg: str) -> None: + console.print(f" [dim]{msg}[/dim]") diff --git a/cli/securelens/output/exporters.py b/cli/securelens/output/exporters.py new file mode 100644 index 0000000..51085ee --- /dev/null +++ b/cli/securelens/output/exporters.py @@ -0,0 +1,148 @@ +""" +Export Formatters +================= +Serializes scan results to JSON and Markdown. +""" + +import json +from datetime import datetime +from pathlib import Path + + +# ── JSON ────────────────────────────────────────────────────────────────────── + +def to_json(result, target_type: str = "code") -> str: + """Serialize a scan result to a JSON string.""" + if target_type == "code": + data = { + "scan_type": "code", + "target": result.target, + "timestamp": datetime.now().isoformat(), + "score": result.score, + "grade": result.grade, + "files_scanned": result.files_triaged, + "total_issues": len(result.vulnerabilities), + "vulnerabilities": [ + { + "file": v.file_path, + "line": v.line_number, + "severity": v.severity, + "issue": v.issue, + "explanation": v.explanation, + "fix": v.suggested_fix, + } + for v in result.vulnerabilities + ], + "ai_summary": result.ai_summary, + } + else: # web + data = { + "scan_type": "web", + "target": result.url, + "timestamp": datetime.now().isoformat(), + "score": result.score, + "grade": result.grade, + "ssl_expiry_days": result.ssl_expiry_days, + "exposed_paths": result.exposed_paths, + "total_issues": len(result.issues), + "issues": [ + { + "layer": i.layer, + "severity": i.severity, + "issue": i.issue, + "fix": i.fix, + } + for i in result.issues + ], + "ai_summary": result.ai_summary, + } + return json.dumps(data, indent=2) + + +def save_json(result, target_type: str = "code") -> Path: + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + path = Path(f"securelens-report-{ts}.json") + path.write_text(to_json(result, target_type)) + return path + + +# ── Markdown ────────────────────────────────────────────────────────────────── + +def to_markdown(result, target_type: str = "code") -> str: + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + lines = [] + + if target_type == "code": + lines.append(f"# SecureLens AI — Code Security Report\n") + lines.append(f"**Target:** `{result.target}` ") + lines.append(f"**Score:** {result.score}/100 **Grade:** {result.grade} ") + lines.append(f"**Files Scanned:** {len(result.files_triaged)} ") + lines.append(f"**Issues Found:** {len(result.vulnerabilities)} ") + lines.append(f"**Generated:** {ts}\n") + + if result.ai_summary: + lines.append("## AI Summary\n") + lines.append(result.ai_summary) + lines.append("\n") + + severity_order = ["Critical", "High", "Medium", "Low"] + grouped: dict = {s: [] for s in severity_order} + for v in result.vulnerabilities: + sev = v.severity if v.severity in grouped else "Low" + grouped[sev].append(v) + + for sev in severity_order: + items = grouped[sev] + if not items: + continue + lines.append(f"## {sev} ({len(items)})\n") + for v in items: + loc = v.file_path + if v.line_number: + loc += f":{v.line_number}" + lines.append(f"### `{v.issue}`") + lines.append(f"**File:** `{loc}` ") + lines.append(f"**Risk:** {v.explanation} ") + lines.append(f"**Fix:** {v.suggested_fix}\n") + + lines.append("## Files Scanned\n") + for f in result.files_triaged: + lines.append(f"- `{f}`") + + else: # web + lines.append(f"# SecureLens AI — Web Security Report\n") + lines.append(f"**Target:** {result.url} ") + lines.append(f"**Score:** {result.score}/100 **Grade:** {result.grade} ") + lines.append(f"**Issues Found:** {len(result.issues)} ") + if result.ssl_expiry_days is not None: + lines.append(f"**SSL Expires In:** {result.ssl_expiry_days} days ") + lines.append(f"**Generated:** {ts}\n") + + if result.ai_summary: + lines.append("## AI Summary\n") + lines.append(result.ai_summary) + lines.append("\n") + + if result.exposed_paths: + lines.append("## Exposed Paths\n") + for p in result.exposed_paths: + lines.append(f"- `{p}`") + lines.append("") + + layers: dict = {} + for issue in result.issues: + layers.setdefault(issue.layer, []).append(issue) + for layer, items in layers.items(): + lines.append(f"## {layer}\n") + for item in items: + lines.append(f"**[{item.severity}]** {item.issue} ") + lines.append(f"*Fix:* {item.fix}\n") + + return "\n".join(lines) + + +def save_markdown(result, target_type: str = "code") -> Path: + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + path = Path(f"securelens-report-{ts}.md") + path.write_text(to_markdown(result, target_type)) + return path diff --git a/cli/securelens/repl.py b/cli/securelens/repl.py new file mode 100644 index 0000000..dfc403b --- /dev/null +++ b/cli/securelens/repl.py @@ -0,0 +1,218 @@ +""" +Interactive REPL +================ +Post-scan Q&A loop — the "Gemini CLI feel". + +After a scan completes, the user drops into this loop where they can: + - Ask natural-language questions about the scan results + - Use slash commands (/export, /files, /model, /clear, /help) + - Ctrl+C to exit + +The AI is given full scan context at the start of the conversation +and remembers the entire chat history during the session. +""" + +import asyncio +import json +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +from rich.console import Console + +from securelens.ai import call_ai +from securelens.ai.prompts import chat_prompt +from securelens.output import console, print_ai_response, print_info, print_error, print_success +from securelens.output.exporters import save_json, save_markdown + +console_out = Console() + +HELP_TEXT = """ +[bold cyan]Available commands:[/bold cyan] + + [bold]/help[/bold] Show this help message + [bold]/files[/bold] List files that were scanned + [bold]/score[/bold] Show the security score + [bold]/export markdown[/bold] Save the report as a Markdown file + [bold]/export json[/bold] Save the report as a JSON file + [bold]/model [/bold] Switch AI model (e.g. /model gpt-4o-mini) + [bold]/clear[/bold] Clear the terminal + [bold]/exit[/bold] Exit the REPL + +Or just type a question in plain English, e.g.: + [dim]> How do I fix the SQL injection?[/dim] + [dim]> What's the most critical issue?[/dim] + [dim]> Show me all issues in auth.py[/dim] +""" + + +@dataclass +class ReplContext: + target: str + scan_result: object # LocalScanResult or WebScanResult + target_type: str # "code" | "web" + api_key: str + model: str + conversation_history: list = field(default_factory=list) + + +async def run_repl(ctx: ReplContext) -> None: + """ + Enter the interactive REPL. Blocks until the user exits. + """ + # Build initial scan context string (injected into every AI prompt) + scan_ctx_str = _build_scan_context(ctx) + + console_out.print() + console_out.print("[bold cyan]💬 Ask a follow-up[/bold cyan] [dim](or press Ctrl+C / type /exit to quit)[/dim]") + console_out.print("[dim]Type /help for available commands[/dim]") + console_out.print() + + while True: + try: + user_input = _prompt_user() + except (KeyboardInterrupt, EOFError): + console_out.print("\n[dim]Goodbye![/dim]\n") + break + + user_input = user_input.strip() + if not user_input: + continue + + # ── Slash commands ────────────────────────────────────────────────── + if user_input.startswith("/"): + should_exit = await _handle_slash_command(user_input, ctx) + if should_exit: + break + continue + + # ── AI response ───────────────────────────────────────────────────── + if not ctx.api_key: + print_error("No API key configured. Run `securelens configure` to set one.") + continue + + prompt = chat_prompt(ctx.target, scan_ctx_str, user_input) + console_out.print("[dim] Thinking...[/dim]") + response = await call_ai( + prompt=prompt, + api_key=ctx.api_key, + model=ctx.model, + temperature=0.5, + conversation_history=ctx.conversation_history, + ) + + if response: + # Save to history for multi-turn context + ctx.conversation_history.append({"role": "user", "content": user_input}) + ctx.conversation_history.append({"role": "assistant", "content": response}) + print_ai_response(response) + else: + print_error("No response from AI. Check your API key and network connection.") + + +def _prompt_user() -> str: + """Read a line from stdin with a styled prompt.""" + sys.stdout.write("[dim bold cyan]>[/dim bold cyan] ") + sys.stdout.flush() + # Use input() — rich.prompt not used here to keep it simple + try: + from rich.prompt import Prompt + return Prompt.ask("[bold cyan]>[/bold cyan]") + except Exception: + return input("> ") + + +def _build_scan_context(ctx: ReplContext) -> str: + """Serialize the scan result into a compact string for the AI context.""" + result = ctx.scan_result + if ctx.target_type == "code": + vulns = [ + { + "file": v.file_path, + "line": v.line_number, + "severity": v.severity, + "issue": v.issue, + "explanation": v.explanation, + "fix": v.suggested_fix, + } + for v in result.vulnerabilities + ] + return json.dumps({ + "target": result.target, + "score": result.score, + "grade": result.grade, + "files_scanned": result.files_triaged, + "vulnerabilities": vulns, + "ai_summary": result.ai_summary, + }, indent=2) + else: # web + issues = [ + {"layer": i.layer, "severity": i.severity, "issue": i.issue, "fix": i.fix} + for i in result.issues + ] + return json.dumps({ + "target": result.url, + "score": result.score, + "grade": result.grade, + "ssl_expiry_days": result.ssl_expiry_days, + "exposed_paths": result.exposed_paths, + "issues": issues, + "ai_summary": result.ai_summary, + }, indent=2) + + +async def _handle_slash_command(cmd: str, ctx: ReplContext) -> bool: + """ + Process a slash command. Returns True if the REPL should exit. + """ + parts = cmd.split() + command = parts[0].lower() + + if command == "/exit": + console_out.print("\n[dim]Goodbye![/dim]\n") + return True + + elif command == "/help": + console_out.print(HELP_TEXT) + + elif command == "/clear": + console_out.clear() + + elif command == "/files": + result = ctx.scan_result + if ctx.target_type == "code" and hasattr(result, "files_triaged"): + console_out.print("\n[bold]Files analyzed:[/bold]") + for f in result.files_triaged: + console_out.print(f" [dim]• {f}[/dim]") + console_out.print() + else: + print_info("File list not available for web scans.") + + elif command == "/score": + r = ctx.scan_result + score = r.score + grade = r.grade + console_out.print(f"\n [bold]Score:[/bold] {score}/100 [bold]Grade:[/bold] {grade}\n") + + elif command == "/model": + if len(parts) < 2: + print_info(f"Current model: {ctx.model}") + print_info("Usage: /model e.g. /model gpt-4o-mini") + else: + ctx.model = parts[1] + print_success(f"Model switched to: {ctx.model}") + + elif command == "/export": + fmt = parts[1].lower() if len(parts) > 1 else "markdown" + if fmt == "json": + path = save_json(ctx.scan_result, ctx.target_type) + print_success(f"JSON report saved to: {path}") + else: + path = save_markdown(ctx.scan_result, ctx.target_type) + print_success(f"Markdown report saved to: {path}") + + else: + print_error(f"Unknown command: {command}. Type /help for available commands.") + + return False diff --git a/cli/securelens/scanners/__init__.py b/cli/securelens/scanners/__init__.py new file mode 100644 index 0000000..dd69bf9 --- /dev/null +++ b/cli/securelens/scanners/__init__.py @@ -0,0 +1,248 @@ +""" +Local Code Scanner +================== +Scans a local directory — no GitHub API needed. + +Pipeline: + 1. Walk the filesystem, respecting .gitignore rules and config ignore patterns + 2. Flag files matching known sensitive patterns (always include these) + 3. Send the file list to the AI for triage (pick the most security-critical ones) + 4. Read each triaged file and send to AI for OWASP vulnerability analysis + 5. Return structured list of vulnerability findings +""" + +import asyncio +import logging +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +import pathspec + +from securelens.ai import call_ai, call_ai_json +from securelens.ai.prompts import triage_prompt, analysis_prompt +from securelens.config import CLIConfig + +logger = logging.getLogger(__name__) + +# ── File extension blocklist (binary / generated — no security signal) ──────── +BINARY_EXTENSIONS = { + ".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".ico", + ".pdf", ".doc", ".docx", ".xls", ".xlsx", + ".zip", ".tar", ".gz", ".rar", ".7z", + ".whl", ".egg", ".jar", ".war", ".ear", + ".mp4", ".mp3", ".avi", ".mov", + ".ttf", ".woff", ".woff2", ".eot", + ".pyc", ".pyo", ".class", + ".so", ".dll", ".dylib", ".exe", + ".db", ".sqlite", ".sqlite3", +} + +# ── Files that are always included regardless of AI triage ─────────────────── +ALWAYS_SCAN_PATTERNS = [ + "*.env", ".env", ".env.*", "*.env.*", + "config.py", "settings.py", "config.js", "config.ts", + "secrets.py", "credentials.py", "keys.py", + "Dockerfile", "docker-compose.yml", "docker-compose.yaml", + "*.pem", "*.key", "*.p12", "*.pfx", + "requirements.txt", "package.json", "Gemfile", +] + +SENSITIVE_NAME_KEYWORDS = [ + "secret", "password", "passwd", "credential", "cred", + "api_key", "apikey", "auth", "jwt", "token", + "private", "priv_key", "access_key", +] + + +@dataclass +class VulnerabilityFinding: + file_path: str + severity: str # Critical | High | Medium | Low + issue: str + explanation: str + suggested_fix: str + line_number: Optional[int] = None + + +@dataclass +class LocalScanResult: + target: str + total_files_found: int + files_triaged: list[str] = field(default_factory=list) + vulnerabilities: list[VulnerabilityFinding] = field(default_factory=list) + ai_summary: str = "" + score: int = 100 + grade: str = "A" + + def compute_score(self) -> None: + """Deterministic score: deduct points by severity.""" + weights = {"Critical": 20, "High": 12, "Medium": 5, "Low": 2} + deduction = sum(weights.get(v.severity, 0) for v in self.vulnerabilities) + self.score = max(100 - deduction, 0) + self.grade = _score_to_grade(self.score) + + +def _score_to_grade(score: int) -> str: + if score >= 90: return "A" + if score >= 80: return "B" + if score >= 70: return "C" + if score >= 60: return "D" + return "F" + + +# ── Phase 1: File Discovery ─────────────────────────────────────────────────── + +def discover_files(root: Path, cfg: CLIConfig) -> list[Path]: + """ + Walk the directory tree and return candidate files. + Respects .gitignore in the root and cfg.ignore_patterns. + Skips binaries and files larger than cfg.max_file_size_kb. + """ + # Build a combined spec from config ignore_patterns + .gitignore + ignore_patterns = list(cfg.ignore_patterns) + gitignore_path = root / ".gitignore" + if gitignore_path.exists(): + with open(gitignore_path) as f: + ignore_patterns.extend( + line.strip() + for line in f + if line.strip() and not line.startswith("#") + ) + + spec = pathspec.PathSpec.from_lines("gitwildmatch", ignore_patterns) + max_bytes = cfg.max_file_size_kb * 1024 + + candidates: list[Path] = [] + for p in root.rglob("*"): + if not p.is_file(): + continue + rel = p.relative_to(root).as_posix() + if spec.match_file(rel): + continue + if p.suffix.lower() in BINARY_EXTENSIONS: + continue + if p.stat().st_size > max_bytes: + continue + candidates.append(p) + + return sorted(candidates) + + +def _is_always_scan(path: Path) -> bool: + """Returns True if this file should always be scanned regardless of triage.""" + name = path.name.lower() + # Check sensitive name keywords + if any(kw in name for kw in SENSITIVE_NAME_KEYWORDS): + return True + # Check always-scan patterns + for pattern in ALWAYS_SCAN_PATTERNS: + if path.match(pattern): + return True + return False + + +# ── Phase 2: AI Triage ─────────────────────────────────────────────────────── + +async def triage_files( + candidates: list[Path], + root: Path, + cfg: CLIConfig, +) -> list[Path]: + """ + Ask the AI to pick the most security-relevant files. + Always-scan files are added automatically regardless of AI choice. + """ + # Separate forced files from candidates + forced = [p for p in candidates if _is_always_scan(p)] + non_forced = [p for p in candidates if not _is_always_scan(p)] + + # Build file list for AI (relative paths — cleaner prompt) + rel_paths = [p.relative_to(root).as_posix() for p in non_forced] + remaining_budget = max(0, cfg.max_files_to_scan - len(forced)) + + ai_selected: list[Path] = [] + if rel_paths and remaining_budget > 0 and cfg.api_key: + file_list_str = "\n".join(rel_paths[:300]) # cap to ~300 paths for token budget + prompt = triage_prompt(file_list_str, remaining_budget) + result = await call_ai_json(prompt, cfg.api_key, cfg.default_model, temperature=0.1) + if result and "critical_files" in result: + for rel in result["critical_files"]: + abs_path = root / rel + if abs_path.exists(): + ai_selected.append(abs_path) + + # Merge: forced first, then AI-selected (deduplicated) + seen = set() + final: list[Path] = [] + for p in forced + ai_selected: + if p not in seen: + seen.add(p) + final.append(p) + + return final[:cfg.max_files_to_scan] + + +# ── Phase 3: File Analysis ──────────────────────────────────────────────────── + +async def analyze_file( + path: Path, + root: Path, + cfg: CLIConfig, +) -> list[VulnerabilityFinding]: + """Send a single file's content to the AI for OWASP analysis.""" + rel = path.relative_to(root).as_posix() + try: + content = path.read_text(errors="replace") + except Exception as e: + logger.warning(f"Could not read {rel}: {e}") + return [] + + # Cap content to avoid token overflow + if len(content) > 30_000: + content = content[:30_000] + "\n... (truncated)" + + prompt = analysis_prompt(rel, content) + result = await call_ai_json(prompt, cfg.api_key, cfg.default_model, temperature=0.2) + if not result: + return [] + + findings: list[VulnerabilityFinding] = [] + for v in result.get("vulnerabilities", []): + findings.append(VulnerabilityFinding( + file_path=rel, + severity=v.get("severity", "Medium"), + issue=v.get("issue", "Unknown Issue"), + explanation=v.get("explanation", ""), + suggested_fix=v.get("suggested_fix", ""), + line_number=v.get("line_number"), + )) + return findings + + +async def analyze_files( + triaged: list[Path], + root: Path, + cfg: CLIConfig, + progress_callback=None, +) -> list[VulnerabilityFinding]: + """ + Analyze all triaged files concurrently. + Uses a semaphore to avoid hammering the API with too many simultaneous calls. + """ + semaphore = asyncio.Semaphore(4) + all_findings: list[VulnerabilityFinding] = [] + + async def _analyze_with_sem(path: Path, idx: int) -> list[VulnerabilityFinding]: + async with semaphore: + result = await analyze_file(path, root, cfg) + if progress_callback: + await progress_callback(idx + 1, len(triaged), path.relative_to(root).as_posix()) + return result + + tasks = [_analyze_with_sem(p, i) for i, p in enumerate(triaged)] + results = await asyncio.gather(*tasks) + for r in results: + all_findings.extend(r) + + return all_findings diff --git a/cli/securelens/scanners/web_scanner.py b/cli/securelens/scanners/web_scanner.py new file mode 100644 index 0000000..7c1ae7d --- /dev/null +++ b/cli/securelens/scanners/web_scanner.py @@ -0,0 +1,339 @@ +""" +Web URL Scanner +=============== +Runs the full HTTP security check suite against a live URL. +Lifted from the backend scanner/ services — no FastAPI dependency. + +Checks: + 1. Transport (HTTPS, HSTS) + 2. Security Headers (CSP, X-Frame-Options, etc.) + 3. Cookie flags (HttpOnly, Secure, SameSite) + 4. Exposed sensitive paths (.env, /admin, etc.) + 5. SSL certificate validity +""" + +import asyncio +import ssl +import socket +import datetime +import logging +from dataclasses import dataclass, field +from typing import Optional +from urllib.parse import urlparse + +import httpx + +logger = logging.getLogger(__name__) + +SENSITIVE_PATHS = [ + "/.env", "/.env.local", "/.env.production", "/.env.backup", + "/admin", "/admin/", "/wp-admin/", + "/phpinfo.php", "/info.php", "/test.php", + "/.git/config", "/.git/HEAD", + "/config.yml", "/config.yaml", "/config.json", + "/backup.sql", "/dump.sql", "/database.sql", + "/robots.txt", "/sitemap.xml", # not dangerous but worth noting + "/.DS_Store", + "/server-status", "/server-info", + "/actuator", "/actuator/health", "/actuator/env", + "/__debug__/", +] + +MIN_HSTS_MAX_AGE = 15_768_000 # 6 months + + +@dataclass +class WebIssue: + issue: str + severity: str # Critical | Warning | Info + layer: str + fix: str + + +@dataclass +class WebScanResult: + url: str + reachable: bool = True + issues: list[WebIssue] = field(default_factory=list) + ai_summary: str = "" + score: int = 100 + grade: str = "A" + ssl_expiry_days: Optional[int] = None + exposed_paths: list[str] = field(default_factory=list) + + def compute_score(self) -> None: + weights = {"Critical": 15, "Warning": 5, "Info": 2} + deduction = sum(weights.get(i.severity, 0) for i in self.issues) + self.score = max(100 - deduction, 0) + self.grade = _score_to_grade(self.score) + + +def _score_to_grade(score: int) -> str: + if score >= 90: return "A" + if score >= 80: return "B" + if score >= 70: return "C" + if score >= 60: return "D" + return "F" + + +async def scan_url(url: str, timeout: int = 10) -> WebScanResult: + """Run all web security checks against the given URL.""" + result = WebScanResult(url=url) + + try: + async with httpx.AsyncClient( + follow_redirects=True, + timeout=timeout, + verify=False, # we do our own cert check + ) as client: + response = await client.get(url) + + _check_transport(url, response, result) + _check_headers(url, response, result) + _check_cookies(url, response, result) + await _check_exposed_paths(url, result, timeout) + _check_ssl(url, result) + + except httpx.ConnectError: + result.reachable = False + result.issues.append(WebIssue( + issue="Could not connect to host", + severity="Critical", + layer="Transport Layer", + fix="Verify the URL is correct and the server is running", + )) + except Exception as e: + logger.error(f"Web scan error: {e}") + result.reachable = False + + result.compute_score() + return result + + +# ── Individual checkers ──────────────────────────────────────────────────────── + +def _check_transport(url: str, response: httpx.Response, result: WebScanResult) -> None: + headers = response.headers + + if not url.startswith("https"): + result.issues.append(WebIssue( + issue="Site is not using HTTPS", + severity="Critical", + layer="Transport Layer", + fix="Install an SSL certificate and redirect all HTTP traffic to HTTPS", + )) + return + + hsts = headers.get("Strict-Transport-Security", "") + if not hsts: + result.issues.append(WebIssue( + issue="Missing HSTS (Strict-Transport-Security) header", + severity="Warning", + layer="Transport Layer", + fix="Add: Strict-Transport-Security: max-age=31536000; includeSubDomains; preload", + )) + else: + max_age = 0 + for directive in hsts.lower().split(";"): + d = directive.strip() + if d.startswith("max-age="): + try: + max_age = int(d.split("=", 1)[1]) + except ValueError: + pass + if max_age < MIN_HSTS_MAX_AGE: + result.issues.append(WebIssue( + issue=f"HSTS max-age is too short ({max_age}s)", + severity="Warning", + layer="Transport Layer", + fix="Set HSTS max-age to at least 31536000 (1 year)", + )) + if "includesubdomains" not in hsts.lower(): + result.issues.append(WebIssue( + issue="HSTS missing includeSubDomains", + severity="Info", + layer="Transport Layer", + fix="Add includeSubDomains to the HSTS header", + )) + + +def _check_headers(url: str, response: httpx.Response, result: WebScanResult) -> None: + h = response.headers + + if "Content-Security-Policy" not in h: + result.issues.append(WebIssue( + issue="Missing Content-Security-Policy header", + severity="Warning", + layer="Security Headers", + fix="Add: Content-Security-Policy: default-src 'self';", + )) + else: + csp = h["Content-Security-Policy"] + if "'unsafe-inline'" in csp: + result.issues.append(WebIssue( + issue="CSP allows 'unsafe-inline'", + severity="Warning", + layer="Security Headers", + fix="Remove 'unsafe-inline' from CSP; use nonces or hashes instead", + )) + if "'unsafe-eval'" in csp: + result.issues.append(WebIssue( + issue="CSP allows 'unsafe-eval'", + severity="Warning", + layer="Security Headers", + fix="Remove 'unsafe-eval' from CSP to prevent eval()-based code execution", + )) + + if "X-Frame-Options" not in h: + result.issues.append(WebIssue( + issue="Missing X-Frame-Options header", + severity="Warning", + layer="Security Headers", + fix="Add: X-Frame-Options: SAMEORIGIN", + )) + + if "X-Content-Type-Options" not in h: + result.issues.append(WebIssue( + issue="Missing X-Content-Type-Options header", + severity="Warning", + layer="Security Headers", + fix="Add: X-Content-Type-Options: nosniff", + )) + + if "Referrer-Policy" not in h: + result.issues.append(WebIssue( + issue="Missing Referrer-Policy header", + severity="Info", + layer="Security Headers", + fix="Add: Referrer-Policy: strict-origin-when-cross-origin", + )) + + if "Permissions-Policy" not in h: + result.issues.append(WebIssue( + issue="Missing Permissions-Policy header", + severity="Info", + layer="Security Headers", + fix="Add: Permissions-Policy: geolocation=(), camera=(), microphone=()", + )) + + if h.get("Access-Control-Allow-Origin") == "*": + result.issues.append(WebIssue( + issue="CORS allows all origins (*)", + severity="Warning", + layer="Security Headers", + fix="Restrict Access-Control-Allow-Origin to trusted domains", + )) + + server = h.get("Server", "") + if server: + result.issues.append(WebIssue( + issue=f"Server header reveals technology: {server}", + severity="Info", + layer="Security Headers", + fix="Remove or mask the Server header", + )) + + if "X-Powered-By" in h: + result.issues.append(WebIssue( + issue=f"X-Powered-By header reveals stack: {h['X-Powered-By']}", + severity="Info", + layer="Security Headers", + fix="Remove the X-Powered-By header", + )) + + +def _check_cookies(url: str, response: httpx.Response, result: WebScanResult) -> None: + from http.cookies import SimpleCookie + is_https = url.startswith("https") + raw_cookies = response.headers.multi_items() + set_cookie_headers = [v for k, v in raw_cookies if k.lower() == "set-cookie"] + + for cookie_str in set_cookie_headers: + cookie = SimpleCookie() + try: + cookie.load(cookie_str) + except Exception: + continue + cookie_lower = cookie_str.lower() + for name, _ in cookie.items(): + if "httponly" not in cookie_lower: + result.issues.append(WebIssue( + issue=f"Cookie '{name}' missing HttpOnly flag", + severity="Warning", + layer="Cookie Security", + fix=f"Set HttpOnly on cookie '{name}' to prevent JS access", + )) + if is_https and "; secure" not in cookie_lower: + result.issues.append(WebIssue( + issue=f"Cookie '{name}' missing Secure flag", + severity="Warning", + layer="Cookie Security", + fix=f"Set Secure flag on cookie '{name}'", + )) + if "samesite" not in cookie_lower: + result.issues.append(WebIssue( + issue=f"Cookie '{name}' missing SameSite attribute", + severity="Warning", + layer="Cookie Security", + fix=f"Set SameSite=Lax or SameSite=Strict on cookie '{name}'", + )) + + +async def _check_exposed_paths(url: str, result: WebScanResult, timeout: int) -> None: + base = url.rstrip("/") + async with httpx.AsyncClient(verify=False, timeout=timeout) as client: + async def check_path(path: str): + try: + r = await client.get(base + path) + if r.status_code == 200 and path not in ("/robots.txt", "/sitemap.xml"): + result.exposed_paths.append(path) + result.issues.append(WebIssue( + issue=f"Sensitive path exposed: {path}", + severity="Critical" if ".env" in path or ".git" in path else "Warning", + layer="Exposure", + fix=f"Block or restrict access to {path} via your web server config", + )) + except Exception: + pass + + await asyncio.gather(*(check_path(p) for p in SENSITIVE_PATHS)) + + +def _check_ssl(url: str, result: WebScanResult) -> None: + if not url.startswith("https"): + return + parsed = urlparse(url) + hostname = parsed.hostname + port = parsed.port or 443 + try: + ctx = ssl.create_default_context() + with ctx.wrap_socket(socket.create_connection((hostname, port), timeout=5), server_hostname=hostname) as s: + cert = s.getpeercert() + expiry_str = cert.get("notAfter", "") + if expiry_str: + expiry = datetime.datetime.strptime(expiry_str, "%b %d %H:%M:%S %Y %Z") + days_left = (expiry - datetime.datetime.utcnow()).days + result.ssl_expiry_days = days_left + if days_left < 14: + result.issues.append(WebIssue( + issue=f"SSL certificate expires in {days_left} days", + severity="Critical", + layer="SSL/TLS", + fix="Renew the SSL certificate immediately", + )) + elif days_left < 30: + result.issues.append(WebIssue( + issue=f"SSL certificate expires soon ({days_left} days)", + severity="Warning", + layer="SSL/TLS", + fix="Renew the SSL certificate within the next 30 days", + )) + except ssl.SSLCertVerificationError: + result.issues.append(WebIssue( + issue="SSL certificate is invalid or self-signed", + severity="Critical", + layer="SSL/TLS", + fix="Install a valid SSL certificate from a trusted CA (e.g. Let's Encrypt)", + )) + except Exception as e: + logger.debug(f"SSL check error: {e}")