added cli installer ]

This commit is contained in:
rarebuffalo
2026-05-15 12:54:58 +05:30
parent 61797fbb97
commit 542f607f25
13 changed files with 2050 additions and 0 deletions

174
cli/README.md Normal file
View File

@@ -0,0 +1,174 @@
# SecureLens AI — CLI
> Scan codebases and URLs for security vulnerabilities, right in your terminal.
> Powered by AI. Works like Gemini CLI.
---
## Install
```bash
# From the project root
chmod +x cli/install.sh
./cli/install.sh
# Then activate the venv
source venv/bin/activate
```
Or manually:
```bash
pip install click rich litellm httpx pyyaml pathspec questionary
pip install -e cli/ --no-build-isolation
```
---
## Quick Start
```bash
# 1. Set up your API key
securelens configure
# 2. Scan your current project
securelens scan .
# 3. Scan a URL
securelens web https://example.com
```
---
## Commands
### `securelens configure`
Interactive setup wizard. Saves config to `~/.securelens/config.yaml`.
```
securelens configure
```
### `securelens scan <path>`
Scan a local codebase. The AI triages files, analyzes them for OWASP vulnerabilities,
and gives you an executive summary. Then you drop into a Q&A chat.
```bash
securelens scan . # scan current directory
securelens scan ./my-project # scan a specific folder
securelens scan . --output markdown # save report as .md file
securelens scan . --model gpt-4o # use a different AI model
securelens scan . --max-files 30 # analyze more files
securelens scan . --no-ai # pattern-based only (no AI, fast)
securelens scan . --ci --fail-on high # CI mode — exits with code 1
```
### `securelens web <url>`
Scan a URL for HTTP security issues (HTTPS, headers, cookies, exposed paths, SSL).
```bash
securelens web https://example.com
securelens web https://my-app.com --output markdown
securelens web https://api.example.com --no-ai # skip AI summary
```
### `securelens version`
Print version and config info.
---
## Interactive REPL
After every scan, you drop into an interactive Q&A session (like Gemini CLI):
```
💬 Ask a follow-up (or press Ctrl+C to exit)
Type /help for available commands
> What's the most critical issue?
> How do I fix the SQL injection in auth.py?
> Show me all high severity issues
> /export markdown
> /files
> /model gpt-4o-mini
> /exit
```
### Slash Commands
| Command | Description |
|---|---|
| `/help` | Show available commands |
| `/files` | List files that were analyzed |
| `/score` | Show the security score |
| `/export markdown` | Save report as Markdown |
| `/export json` | Save report as JSON |
| `/model <name>` | Switch AI model mid-session |
| `/clear` | Clear the terminal |
| `/exit` | Exit the REPL |
---
## Config File
`~/.securelens/config.yaml`:
```yaml
default_model: gemini/gemini-2.0-flash
api_key: YOUR_API_KEY
output_format: terminal # terminal | json | markdown | all
max_files_to_scan: 20
max_file_size_kb: 200
scan_timeout: 10
ignore_patterns:
- "*.lock"
- "node_modules/**"
- ".git/**"
- "venv/**"
```
### Environment Variable Overrides
```bash
export SECURELENS_API_KEY=your-key
export SECURELENS_MODEL=gpt-4o-mini
```
---
## Supported AI Providers
| Provider | Model string |
|---|---|
| Google Gemini (default) | `gemini/gemini-2.0-flash` |
| OpenAI | `gpt-4o-mini`, `gpt-4o` |
| Anthropic | `claude-3-5-haiku-20241022` |
| OpenRouter | `openrouter/google/gemini-flash` |
| Ollama (local, no key) | `ollama/llama3.1` |
---
## CI/CD Usage
```bash
# GitHub Actions — fail the build if any high or critical issues found
securelens scan . --ci --fail-on high
# Pre-commit hook
# Add to .pre-commit-config.yaml:
# - id: securelens
# name: SecureLens Security Scan
# entry: securelens scan
# args: [".", "--ci", "--fail-on", "critical"]
# language: python
# pass_filenames: false
```
---
## Output Formats
| Format | Flag | Description |
|---|---|---|
| Terminal (default) | `--output terminal` | Rich colored display |
| Markdown | `--output markdown` | Saves `securelens-report-{timestamp}.md` |
| JSON | `--output json` | Machine-readable, good for CI |
| All | `--output all` | Terminal display + saves markdown |

36
cli/install.sh Normal file
View File

@@ -0,0 +1,36 @@
#!/usr/bin/env bash
# install.sh — Install SecureLens AI CLI into the project venv
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
BACKEND_ROOT="$(dirname "$SCRIPT_DIR")"
echo "🔍 SecureLens AI CLI — Installer"
echo "================================="
# Detect venv
VENV_PIP=""
if [ -f "$BACKEND_ROOT/venv/bin/pip" ]; then
VENV_PIP="$BACKEND_ROOT/venv/bin/pip"
echo " Using backend venv: $BACKEND_ROOT/venv"
elif command -v pip3 &>/dev/null; then
VENV_PIP="pip3"
echo " Using system pip3"
else
VENV_PIP="pip"
echo " Using system pip"
fi
echo ""
echo " Installing dependencies..."
$VENV_PIP install click rich litellm httpx pyyaml pathspec questionary --quiet
echo " Installing securelens-ai CLI..."
$VENV_PIP install -e "$SCRIPT_DIR" --no-build-isolation --quiet
echo ""
echo "✓ Done! Run: securelens --help"
echo ""
echo " Or if using venv directly:"
echo " source $BACKEND_ROOT/venv/bin/activate"
echo " securelens configure"

29
cli/pyproject.toml Normal file
View File

@@ -0,0 +1,29 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.backends.legacy:build"
[project]
name = "securelens-ai"
version = "2.0.0"
description = "AI-powered CLI security scanner for codebases and URLs"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "MIT" }
keywords = ["security", "cli", "sast", "ai", "scanner", "owasp"]
dependencies = [
"click>=8.1",
"rich>=13.7",
"litellm>=1.40",
"httpx>=0.27",
"pyyaml>=6.0",
"questionary>=2.0",
"pathspec>=0.12",
"asyncio-throttle>=1.0",
]
[project.scripts]
securelens = "securelens.cli:main"
[tool.setuptools.packages.find]
where = ["."]
include = ["securelens*"]

View File

@@ -0,0 +1,5 @@
"""
SecureLens AI — CLI Package
"""
__version__ = "2.0.0"
__app_name__ = "SecureLens AI"

View File

@@ -0,0 +1,77 @@
"""
AI Client
=========
Thin, model-agnostic wrapper around LiteLLM.
The CLI uses this instead of directly calling litellm
so we have one place to handle retries, logging, and key injection.
"""
import json
import asyncio
import logging
from typing import Optional
logger = logging.getLogger(__name__)
async def call_ai(
prompt: str,
api_key: str,
model: str,
temperature: float = 0.3,
json_mode: bool = False,
conversation_history: Optional[list] = None,
) -> str:
"""
Single entry-point for all AI calls in the CLI.
Parameters
----------
prompt : The prompt to send (added as last user message)
api_key : LiteLLM-compatible API key
model : LiteLLM model string (e.g. "gemini/gemini-2.0-flash")
temperature : Creativity (0=deterministic, 1=creative)
json_mode : Ask the model to respond with valid JSON only
conversation_history : Optional list of {"role": ..., "content": ...} dicts
for multi-turn chat sessions
"""
import litellm
litellm.suppress_debug_info = True
messages = list(conversation_history or [])
messages.append({"role": "user", "content": prompt})
kwargs: dict = {
"model": model,
"messages": messages,
"temperature": temperature,
"api_key": api_key if api_key else None,
}
if json_mode:
kwargs["response_format"] = {"type": "json_object"}
try:
response = await litellm.acompletion(**kwargs)
return response.choices[0].message.content or ""
except Exception as e:
logger.error(f"AI call failed [{model}]: {e}")
return ""
async def call_ai_json(
prompt: str,
api_key: str,
model: str,
temperature: float = 0.2,
) -> Optional[dict]:
"""Convenience wrapper — calls AI in JSON mode and parses the result."""
raw = await call_ai(prompt, api_key, model, temperature=temperature, json_mode=True)
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError as e:
logger.error(f"JSON parse failed: {e}\nRaw: {raw[:300]}")
return None

View File

@@ -0,0 +1,76 @@
"""
All AI prompts for the CLI agent — kept in one place so they're easy to tune.
"""
def triage_prompt(file_list: str, max_files: int) -> str:
return (
"You are a Senior Application Security Engineer. "
"I have a local codebase with the following files:\n"
f"{file_list}\n\n"
f"Select the {max_files} most critical files to review for security vulnerabilities. "
"Focus on: authentication logic, database access, API routes, config files, "
"secret/credential handling, input validation, and file upload handlers.\n"
"Also prioritise any file that contains the words: secret, password, token, key, "
"auth, login, admin, cred, jwt, session, crypto, hash.\n"
"Return a JSON object with a single key 'critical_files' containing the list of "
"exact file paths. Do not select more than "
f"{max_files} files."
)
def analysis_prompt(file_path: str, content: str) -> str:
return (
f"Review the following code from '{file_path}' for security vulnerabilities.\n"
"Focus on OWASP Top 10:\n"
" A01 Broken Access Control, A02 Cryptographic Failures, A03 Injection,\n"
" A04 Insecure Design, A05 Security Misconfiguration, A06 Vulnerable Components,\n"
" A07 Auth Failures, A08 Integrity Failures, A09 Logging Failures, A10 SSRF.\n"
"Also check for: hardcoded secrets/API keys, debug flags left on, insecure defaults.\n\n"
f"CODE:\n{content}\n\n"
"Return a JSON object with key 'vulnerabilities' — a list of objects, each with:\n"
" 'severity' : Critical | High | Medium | Low\n"
" 'issue' : Short title of the vulnerability\n"
" 'explanation' : 1-2 sentences explaining the risk\n"
" 'suggested_fix' : Concrete code snippet or clear instruction to fix it\n"
" 'line_number' : Integer line number, or null if not applicable\n"
"If no vulnerabilities are found, return {\"vulnerabilities\": []}."
)
def summary_prompt(target: str, issues_json: str) -> str:
return (
"You are a Senior AppSec Manager writing an executive security report.\n"
f"Target: {target}\n\n"
"Here are all vulnerabilities found in the automated scan:\n"
f"{issues_json}\n\n"
"Write a 2-3 paragraph executive summary of the overall security posture. "
"Highlight the most critical risks, explain what an attacker could do with them, "
"and recommend the top 3 immediate priorities. "
"Keep it professional, direct, and actionable — avoid generic fluff."
)
def chat_prompt(target: str, scan_context: str, user_question: str) -> str:
return (
"You are SecureLens AI, an expert cybersecurity assistant embedded in a CLI tool.\n"
f"The developer just scanned: {target}\n\n"
"Here are the full scan results:\n"
f"{scan_context}\n\n"
f"Developer's question: {user_question}\n\n"
"Answer clearly and practically. Reference specific findings from the scan when relevant. "
"If asked about a fix, show concrete code where possible."
)
def web_summary_prompt(url: str, issues_json: str, score: int, grade: str) -> str:
return (
"You are SecureLens AI, a web security expert.\n"
f"I just ran a security scan on: {url}\n"
f"Overall score: {score}/100 Grade: {grade}\n\n"
"Issues found:\n"
f"{issues_json}\n\n"
"Write a concise 2-paragraph summary: first explain what the key risks are and how "
"an attacker could exploit them; second, give the top 3 most impactful fixes. "
"Be direct — the reader is a developer, not a manager."
)

392
cli/securelens/cli.py Normal file
View File

@@ -0,0 +1,392 @@
"""
SecureLens AI — CLI Entry Point
================================
All Click commands live here.
Commands:
securelens configure Interactive setup wizard
securelens scan <path> Scan a local codebase
securelens web <url> Scan a URL
securelens version Print version info
"""
import asyncio
import json
import sys
from pathlib import Path
import click
from rich.console import Console
from rich.prompt import Prompt, Confirm
console = Console()
# ── Helpers ────────────────────────────────────────────────────────────────────
def _run(coro):
"""Run an async coroutine from a sync Click command."""
return asyncio.run(coro)
def _require_config(cfg):
"""Exit early with a friendly message if no API key is set."""
if not cfg.api_key:
console.print(
"\n[bold yellow]⚠ No API key configured.[/bold yellow]\n"
" Run [bold cyan]securelens configure[/bold cyan] to set one up.\n"
" Or set the [dim]SECURELENS_API_KEY[/dim] environment variable.\n"
)
sys.exit(1)
# ── Main group ─────────────────────────────────────────────────────────────────
@click.group()
@click.version_option("2.0.0", prog_name="SecureLens AI")
def main():
"""
\b
SecureLens AI — AI-powered security scanner
Scan codebases, URLs and get instant security reports.
"""
pass
# ── configure ─────────────────────────────────────────────────────────────────
@main.command()
def configure():
"""Interactive setup wizard — saves config to ~/.securelens/config.yaml"""
from securelens.config import load_config, save_config, CONFIG_FILE
from securelens.output import print_banner
print_banner()
console.print("[bold]Setup Wizard[/bold]\n")
cfg = load_config()
# Provider selection
providers = {
"1": ("gemini/gemini-2.0-flash", "Google Gemini 2.0 Flash [free tier available]"),
"2": ("gemini/gemini-1.5-pro", "Google Gemini 1.5 Pro"),
"3": ("gpt-4o-mini", "OpenAI GPT-4o Mini"),
"4": ("gpt-4o", "OpenAI GPT-4o"),
"5": ("claude-3-5-haiku-20241022","Anthropic Claude 3.5 Haiku"),
"6": ("ollama/llama3.1", "Ollama (local, no key needed)"),
"7": ("custom", "Custom model string"),
}
console.print("[bold]Choose AI Provider:[/bold]")
for k, (_, desc) in providers.items():
console.print(f" [{k}] {desc}")
console.print()
choice = Prompt.ask("Select", choices=list(providers.keys()), default="1")
model_str, _ = providers[choice]
if model_str == "custom":
model_str = Prompt.ask("Enter LiteLLM model string (e.g. openrouter/google/gemini-flash)")
cfg.default_model = model_str
# API key (skip for Ollama)
if not model_str.startswith("ollama/"):
key = Prompt.ask("API Key", password=True, default=cfg.api_key or "")
cfg.api_key = key.strip()
# Output format
console.print("\n[bold]Default output format:[/bold]")
console.print(" [1] terminal (rich display)")
console.print(" [2] markdown (save .md file)")
console.print(" [3] json (machine-readable)")
console.print(" [4] all (terminal + save markdown)")
fmt_choice = Prompt.ask("Select", choices=["1", "2", "3", "4"], default="1")
cfg.output_format = {"1": "terminal", "2": "markdown", "3": "json", "4": "all"}[fmt_choice]
save_config(cfg)
console.print(f"\n[bold green]✓ Config saved to {CONFIG_FILE}[/bold green]")
console.print(f" Model: [cyan]{cfg.default_model}[/cyan]")
console.print(f" Output: [cyan]{cfg.output_format}[/cyan]\n")
# ── scan ──────────────────────────────────────────────────────────────────────
@main.command()
@click.argument("path", default=".", type=click.Path(exists=True, file_okay=False, dir_okay=True))
@click.option("--model", "-m", default=None, help="Override AI model (e.g. gpt-4o-mini)")
@click.option("--output", "-o", default=None,
type=click.Choice(["terminal", "json", "markdown", "all"]),
help="Output format (overrides config)")
@click.option("--max-files", default=None, type=int, help="Max files to analyze (default: 20)")
@click.option("--ci", is_flag=True, help="CI mode: no REPL, exits with code 1 if issues found")
@click.option("--fail-on", default=None,
type=click.Choice(["critical", "high", "medium", "low"]),
help="In --ci mode, exit 1 if issues of this severity or above are found")
@click.option("--no-ai", is_flag=True, help="Skip AI triage & summary (pattern-based only, faster)")
def scan(path, model, output, max_files, ci, fail_on, no_ai):
"""
Scan a local codebase for security vulnerabilities.
\b
Examples:
securelens scan .
securelens scan ./my-project --output markdown
securelens scan . --model gpt-4o --max-files 30
securelens scan . --ci --fail-on high
"""
_run(_scan_async(path, model, output, max_files, ci, fail_on, no_ai))
async def _scan_async(path, model, output, max_files, ci, fail_on, no_ai):
from securelens.config import load_config
from securelens.output import print_banner, print_scan_header, print_code_scan_report, make_progress, print_error
from securelens.output.exporters import save_json, save_markdown, to_json
from securelens.scanners import (
discover_files, triage_files, analyze_files, LocalScanResult
)
from securelens.ai import call_ai
from securelens.ai.prompts import summary_prompt
from securelens.repl import run_repl, ReplContext
cfg = load_config()
if model:
cfg.default_model = model
if output:
cfg.output_format = output
if max_files:
cfg.max_files_to_scan = max_files
if not no_ai:
_require_config(cfg)
root = Path(path).resolve()
if not ci:
print_banner()
print_scan_header(str(root), cfg.default_model)
# ── Phase 1: Discover ────────────────────────────────────────────────────
with make_progress() as progress:
task_discover = progress.add_task(
"[1/4] Discovering files...", total=None, detail=""
)
candidates = discover_files(root, cfg)
progress.update(task_discover, completed=100, total=100,
detail=f"{len(candidates)} files found")
# ── Phase 2: Triage ──────────────────────────────────────────────────
task_triage = progress.add_task(
"[2/4] Triaging with AI...", total=None, detail=""
)
if no_ai:
# In --no-ai mode just take the top N by sensitivity heuristic
from securelens.scanners import _is_always_scan
triaged = [p for p in candidates if _is_always_scan(p)][:cfg.max_files_to_scan]
else:
triaged = await triage_files(candidates, root, cfg)
progress.update(task_triage, completed=100, total=100,
detail=f"{len(triaged)} files selected")
# ── Phase 3: Analyze ─────────────────────────────────────────────────
task_analyze = progress.add_task(
"[3/4] Analyzing security...", total=len(triaged), detail=""
)
analyzed_count = 0
async def on_progress(done, total, filename):
nonlocal analyzed_count
analyzed_count = done
progress.update(task_analyze, completed=done, detail=filename)
if no_ai or not cfg.api_key:
vulnerabilities = []
else:
vulnerabilities = await analyze_files(triaged, root, cfg, on_progress)
progress.update(task_analyze, completed=len(triaged),
detail=f"{len(vulnerabilities)} issues found")
# ── Phase 4: Summary ─────────────────────────────────────────────────
task_summary = progress.add_task(
"[4/4] Generating AI report...", total=None, detail=""
)
ai_summary = ""
if not no_ai and cfg.api_key and vulnerabilities:
import json as _json
issues_data = [
{"file": v.file_path, "severity": v.severity,
"issue": v.issue, "explanation": v.explanation}
for v in vulnerabilities
]
prompt = summary_prompt(str(root), _json.dumps(issues_data, indent=2))
ai_summary = await call_ai(prompt, cfg.api_key, cfg.default_model, temperature=0.4)
progress.update(task_summary, completed=100, total=100, detail="Done")
# ── Build result ─────────────────────────────────────────────────────────
result = LocalScanResult(
target=str(root),
total_files_found=len(candidates),
files_triaged=[p.relative_to(root).as_posix() for p in triaged],
vulnerabilities=vulnerabilities,
ai_summary=ai_summary,
)
result.compute_score()
# ── Output ───────────────────────────────────────────────────────────────
fmt = cfg.output_format
if fmt in ("terminal", "all"):
print_code_scan_report(result)
if fmt in ("json",):
console.print(to_json(result, "code"))
if fmt in ("markdown", "all"):
path_out = save_markdown(result, "code")
if not ci:
console.print(f" [green]✓ Markdown report saved:[/green] [dim]{path_out}[/dim]\n")
if fmt == "json" and not ci:
path_out = save_json(result, "code")
console.print(f" [green]✓ JSON report saved:[/green] [dim]{path_out}[/dim]\n")
# ── CI exit code ─────────────────────────────────────────────────────────
if ci:
_ci_exit(result.vulnerabilities, fail_on, "code")
return
# ── Interactive REPL ─────────────────────────────────────────────────────
if fmt in ("terminal", "all", "markdown") and not no_ai:
ctx = ReplContext(
target=str(root),
scan_result=result,
target_type="code",
api_key=cfg.api_key,
model=cfg.default_model,
)
await run_repl(ctx)
# ── web ───────────────────────────────────────────────────────────────────────
@main.command()
@click.argument("url")
@click.option("--model", "-m", default=None, help="Override AI model")
@click.option("--output", "-o", default=None,
type=click.Choice(["terminal", "json", "markdown", "all"]))
@click.option("--ci", is_flag=True, help="CI mode — no REPL")
@click.option("--fail-on", default=None,
type=click.Choice(["critical", "warning", "info"]))
@click.option("--no-ai", is_flag=True, help="Skip AI summary")
def web(url, model, output, ci, fail_on, no_ai):
"""
Scan a URL for web security issues.
\b
Examples:
securelens web https://example.com
securelens web https://my-app.com --output markdown
"""
_run(_web_async(url, model, output, ci, fail_on, no_ai))
async def _web_async(url, model, output, ci, fail_on, no_ai):
from securelens.config import load_config
from securelens.output import (
print_banner, print_scan_header, print_web_scan_report,
make_progress, console
)
from securelens.output.exporters import save_json, save_markdown, to_json
from securelens.scanners.web_scanner import scan_url
from securelens.ai import call_ai
from securelens.ai.prompts import web_summary_prompt
from securelens.repl import run_repl, ReplContext
import json as _json
# Normalise URL
if not url.startswith(("http://", "https://")):
url = "https://" + url
cfg = load_config()
if model:
cfg.default_model = model
if output:
cfg.output_format = output
if not ci:
print_banner()
print_scan_header(url, cfg.default_model)
with make_progress() as progress:
task = progress.add_task("[1/2] Running web security checks...", total=None, detail="")
result = await scan_url(url, timeout=cfg.scan_timeout)
progress.update(task, completed=100, total=100,
detail=f"{len(result.issues)} issues found")
task2 = progress.add_task("[2/2] Generating AI summary...", total=None, detail="")
if not no_ai and cfg.api_key and result.issues:
issues_data = [
{"layer": i.layer, "severity": i.severity, "issue": i.issue}
for i in result.issues
]
prompt = web_summary_prompt(url, _json.dumps(issues_data, indent=2),
result.score, result.grade)
result.ai_summary = await call_ai(prompt, cfg.api_key, cfg.default_model, temperature=0.4)
progress.update(task2, completed=100, total=100, detail="Done")
fmt = cfg.output_format
if fmt in ("terminal", "all"):
print_web_scan_report(result)
if fmt == "json":
console.print(to_json(result, "web"))
if fmt in ("markdown", "all"):
p = save_markdown(result, "web")
if not ci:
console.print(f" [green]✓ Markdown saved:[/green] [dim]{p}[/dim]\n")
if ci:
_ci_exit(result.issues, fail_on, "web")
return
if fmt in ("terminal", "all", "markdown") and not no_ai:
ctx = ReplContext(
target=url,
scan_result=result,
target_type="web",
api_key=cfg.api_key,
model=cfg.default_model,
)
await run_repl(ctx)
# ── version ───────────────────────────────────────────────────────────────────
@main.command()
def version():
"""Print SecureLens AI version and config info."""
from securelens.config import load_config, CONFIG_FILE
from securelens import __version__
cfg = load_config()
console.print(f"\n [bold cyan]SecureLens AI[/bold cyan] v{__version__}")
console.print(f" Model: [dim]{cfg.default_model}[/dim]")
console.print(f" Config: [dim]{CONFIG_FILE}[/dim]")
console.print(f" API Key: [dim]{'✓ set' if cfg.api_key else '✗ not set'}[/dim]\n")
# ── CI exit helper ─────────────────────────────────────────────────────────────
def _ci_exit(issues, fail_on, scan_type: str):
"""Exit with code 1 if issues meet or exceed the fail_on threshold."""
severity_rank = {"critical": 4, "high": 3, "warning": 3, "medium": 2, "low": 1, "info": 0}
if not fail_on:
# Default: fail on any critical
fail_on = "critical"
threshold = severity_rank.get(fail_on, 4)
for issue in issues:
sev = getattr(issue, "severity", "").lower()
if severity_rank.get(sev, 0) >= threshold:
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

100
cli/securelens/config.py Normal file
View File

@@ -0,0 +1,100 @@
"""
Config Manager
==============
Reads and writes ~/.securelens/config.yaml.
Falls back to environment variables so the CLI works in CI/CD
without a config file.
"""
import os
import yaml
from pathlib import Path
from dataclasses import dataclass, field
CONFIG_DIR = Path.home() / ".securelens"
CONFIG_FILE = CONFIG_DIR / "config.yaml"
@dataclass
class CLIConfig:
# AI backend
default_model: str = "gemini/gemini-2.0-flash"
api_key: str = ""
# Scan behaviour
output_format: str = "terminal" # terminal | json | markdown | all
max_files_to_scan: int = 20
max_file_size_kb: int = 200
scan_timeout: int = 10 # seconds — for web scans
# File exclusions (gitignore-style globs)
ignore_patterns: list = field(default_factory=lambda: [
"*.lock",
"node_modules/**",
".git/**",
"venv/**",
".venv/**",
"__pycache__/**",
"*.pyc",
"dist/**",
"build/**",
".next/**",
"*.min.js",
"*.min.css",
"*.map",
])
def load_config() -> CLIConfig:
"""
Load config from ~/.securelens/config.yaml,
then overlay any env-var overrides.
"""
cfg = CLIConfig()
if CONFIG_FILE.exists():
with open(CONFIG_FILE) as f:
data = yaml.safe_load(f) or {}
cfg.default_model = data.get("default_model", cfg.default_model)
cfg.api_key = data.get("api_key", cfg.api_key)
cfg.output_format = data.get("output_format", cfg.output_format)
cfg.max_files_to_scan = data.get("max_files_to_scan", cfg.max_files_to_scan)
cfg.max_file_size_kb = data.get("max_file_size_kb", cfg.max_file_size_kb)
cfg.scan_timeout = data.get("scan_timeout", cfg.scan_timeout)
cfg.ignore_patterns = data.get("ignore_patterns", cfg.ignore_patterns)
# Env-var overrides (for CI/CD)
cfg.api_key = (
os.environ.get("SECURELENS_API_KEY")
or os.environ.get("AI_API_KEY")
or os.environ.get("GEMINI_API_KEY")
or os.environ.get("OPENAI_API_KEY")
or cfg.api_key
)
cfg.default_model = (
os.environ.get("SECURELENS_MODEL")
or os.environ.get("AI_MODEL")
or cfg.default_model
)
return cfg
def save_config(cfg: CLIConfig) -> None:
"""Persist the config object to ~/.securelens/config.yaml."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
data = {
"default_model": cfg.default_model,
"api_key": cfg.api_key,
"output_format": cfg.output_format,
"max_files_to_scan": cfg.max_files_to_scan,
"max_file_size_kb": cfg.max_file_size_kb,
"scan_timeout": cfg.scan_timeout,
"ignore_patterns": cfg.ignore_patterns,
}
with open(CONFIG_FILE, "w") as f:
yaml.safe_dump(data, f, default_flow_style=False)
def config_exists() -> bool:
return CONFIG_FILE.exists() and bool(load_config().api_key)

View File

@@ -0,0 +1,208 @@
"""
Terminal Renderer
=================
All Rich-based output for the CLI — banners, progress, tables, panels.
"""
import json
from datetime import datetime
from typing import Optional
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn
from rich.table import Table
from rich.text import Text
from rich import box
from rich.columns import Columns
from rich.rule import Rule
from rich.syntax import Syntax
from rich.live import Live
from rich.padding import Padding
console = Console()
# ── Severity colours ──────────────────────────────────────────────────────────
SEVERITY_COLOR = {
"Critical": "bold red",
"High": "bold orange1",
"Warning": "bold yellow",
"Medium": "bold yellow",
"Info": "bold blue",
"Low": "bold cyan",
}
GRADE_COLOR = {
"A": "bold green",
"B": "bold cyan",
"C": "bold yellow",
"D": "bold orange1",
"F": "bold red",
}
def print_banner() -> None:
banner = Text()
banner.append("\n")
banner.append(" ███████╗███████╗ ██████╗██╗ ██╗██████╗ ███████╗██╗ ███████╗███╗ ██╗███████╗\n", style="bold cyan")
banner.append(" ██╔════╝██╔════╝██╔════╝██║ ██║██╔══██╗██╔════╝██║ ██╔════╝████╗ ██║██╔════╝\n", style="bold cyan")
banner.append(" ███████╗█████╗ ██║ ██║ ██║██████╔╝█████╗ ██║ █████╗ ██╔██╗ ██║███████╗\n", style="bold blue")
banner.append(" ╚════██║██╔══╝ ██║ ██║ ██║██╔══██╗██╔══╝ ██║ ██╔══╝ ██║╚██╗██║╚════██║\n", style="bold blue")
banner.append(" ███████║███████╗╚██████╗╚██████╔╝██║ ██║███████╗███████╗███████╗██║ ╚████║███████║\n", style="bold magenta")
banner.append(" ╚══════╝╚══════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚══════╝╚══════╝╚══════╝╚═╝ ╚═══╝╚══════╝\n", style="bold magenta")
banner.append(" AI Security Agent v2.0.0\n", style="dim")
console.print(banner)
def print_scan_header(target: str, model: str) -> None:
console.print(f" [bold]🔍 Target:[/bold] [cyan]{target}[/cyan]")
console.print(f" [bold]🧠 Model:[/bold] [dim]{model}[/dim]")
console.print()
def make_progress() -> Progress:
return Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
BarColumn(bar_width=30),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TextColumn("• [dim]{task.fields[detail]}[/dim]"),
TimeElapsedColumn(),
console=console,
transient=False,
)
def print_code_scan_report(result) -> None:
"""Render a full local code scan report."""
console.print()
console.rule("[bold white] SECURITY REPORT [/bold white]", style="bright_black")
console.print()
# Score panel
grade_color = GRADE_COLOR.get(result.grade, "white")
score_text = Text()
score_text.append(f" {result.score}/100", style=f"bold {grade_color}")
score_text.append(" Grade: ", style="dim")
score_text.append(result.grade, style=grade_color)
score_text.append(f"{len(result.vulnerabilities)} issue(s) found", style="dim")
score_text.append(f"{len(result.files_triaged)} file(s) scanned", style="dim")
console.print(Panel(score_text, title="[bold]Overall Score[/bold]", border_style="bright_black"))
console.print()
if not result.vulnerabilities:
console.print(" [bold green]✓ No vulnerabilities found![/bold green]")
console.print()
else:
_print_vulnerability_table(result.vulnerabilities)
# AI Summary
if result.ai_summary:
console.print(Panel(
result.ai_summary,
title="[bold cyan]🤖 AI Security Summary[/bold cyan]",
border_style="cyan",
padding=(1, 2),
))
console.print()
def print_web_scan_report(result) -> None:
"""Render a full web scan report."""
console.print()
console.rule("[bold white] WEB SECURITY REPORT [/bold white]", style="bright_black")
console.print()
if not result.reachable:
console.print(" [bold red]✗ Could not reach the target URL[/bold red]")
return
grade_color = GRADE_COLOR.get(result.grade, "white")
score_text = Text()
score_text.append(f" {result.score}/100", style=f"bold {grade_color}")
score_text.append(" Grade: ", style="dim")
score_text.append(result.grade, style=grade_color)
if result.ssl_expiry_days is not None:
score_text.append(f" • SSL expires in {result.ssl_expiry_days} days", style="dim")
console.print(Panel(score_text, title="[bold]Overall Score[/bold]", border_style="bright_black"))
console.print()
if result.exposed_paths:
console.print(f" [bold red]⚠ Exposed sensitive paths:[/bold red] {', '.join(result.exposed_paths)}")
console.print()
if not result.issues:
console.print(" [bold green]✓ No issues found![/bold green]")
else:
_print_web_issue_table(result.issues)
if result.ai_summary:
console.print(Panel(
result.ai_summary,
title="[bold cyan]🤖 AI Security Summary[/bold cyan]",
border_style="cyan",
padding=(1, 2),
))
console.print()
def _print_vulnerability_table(vulns) -> None:
"""Render grouped vulnerability table by severity."""
severity_order = ["Critical", "High", "Medium", "Low"]
grouped: dict = {s: [] for s in severity_order}
for v in vulns:
sev = v.severity if v.severity in grouped else "Low"
grouped[sev].append(v)
for sev in severity_order:
items = grouped[sev]
if not items:
continue
color = SEVERITY_COLOR.get(sev, "white")
console.print(f" [{color}]▶ {sev.upper()} ({len(items)})[/{color}]")
for v in items:
loc = f"[dim]{v.file_path}"
if v.line_number:
loc += f":{v.line_number}"
loc += "[/dim]"
console.print(f" [bold]{v.issue}[/bold] {loc}")
console.print(f" [dim]{v.explanation}[/dim]")
console.print(f" [green]Fix:[/green] [dim]{v.suggested_fix}[/dim]")
console.print()
def _print_web_issue_table(issues) -> None:
"""Render web scan issues grouped by layer."""
layers: dict = {}
for issue in issues:
layers.setdefault(issue.layer, []).append(issue)
for layer, items in layers.items():
console.print(f" [bold bright_black]── {layer} ──[/bold bright_black]")
for item in items:
color = SEVERITY_COLOR.get(item.severity, "white")
console.print(f" [{color}]●[/{color}] [bold]{item.issue}[/bold]")
console.print(f" [green]Fix:[/green] [dim]{item.fix}[/dim]")
console.print()
def print_repl_prompt() -> None:
console.print("\n[bold cyan]💬 Ask a follow-up[/bold cyan] [dim](or press Ctrl+C to exit)[/dim]")
def print_ai_response(text: str) -> None:
console.print()
console.print(Panel(text, border_style="dim", padding=(0, 1)))
console.print()
def print_error(msg: str) -> None:
console.print(f"\n [bold red]✗ {msg}[/bold red]\n")
def print_success(msg: str) -> None:
console.print(f"\n [bold green]✓ {msg}[/bold green]\n")
def print_info(msg: str) -> None:
console.print(f" [dim]{msg}[/dim]")

View File

@@ -0,0 +1,148 @@
"""
Export Formatters
=================
Serializes scan results to JSON and Markdown.
"""
import json
from datetime import datetime
from pathlib import Path
# ── JSON ──────────────────────────────────────────────────────────────────────
def to_json(result, target_type: str = "code") -> str:
"""Serialize a scan result to a JSON string."""
if target_type == "code":
data = {
"scan_type": "code",
"target": result.target,
"timestamp": datetime.now().isoformat(),
"score": result.score,
"grade": result.grade,
"files_scanned": result.files_triaged,
"total_issues": len(result.vulnerabilities),
"vulnerabilities": [
{
"file": v.file_path,
"line": v.line_number,
"severity": v.severity,
"issue": v.issue,
"explanation": v.explanation,
"fix": v.suggested_fix,
}
for v in result.vulnerabilities
],
"ai_summary": result.ai_summary,
}
else: # web
data = {
"scan_type": "web",
"target": result.url,
"timestamp": datetime.now().isoformat(),
"score": result.score,
"grade": result.grade,
"ssl_expiry_days": result.ssl_expiry_days,
"exposed_paths": result.exposed_paths,
"total_issues": len(result.issues),
"issues": [
{
"layer": i.layer,
"severity": i.severity,
"issue": i.issue,
"fix": i.fix,
}
for i in result.issues
],
"ai_summary": result.ai_summary,
}
return json.dumps(data, indent=2)
def save_json(result, target_type: str = "code") -> Path:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
path = Path(f"securelens-report-{ts}.json")
path.write_text(to_json(result, target_type))
return path
# ── Markdown ──────────────────────────────────────────────────────────────────
def to_markdown(result, target_type: str = "code") -> str:
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
lines = []
if target_type == "code":
lines.append(f"# SecureLens AI — Code Security Report\n")
lines.append(f"**Target:** `{result.target}` ")
lines.append(f"**Score:** {result.score}/100 **Grade:** {result.grade} ")
lines.append(f"**Files Scanned:** {len(result.files_triaged)} ")
lines.append(f"**Issues Found:** {len(result.vulnerabilities)} ")
lines.append(f"**Generated:** {ts}\n")
if result.ai_summary:
lines.append("## AI Summary\n")
lines.append(result.ai_summary)
lines.append("\n")
severity_order = ["Critical", "High", "Medium", "Low"]
grouped: dict = {s: [] for s in severity_order}
for v in result.vulnerabilities:
sev = v.severity if v.severity in grouped else "Low"
grouped[sev].append(v)
for sev in severity_order:
items = grouped[sev]
if not items:
continue
lines.append(f"## {sev} ({len(items)})\n")
for v in items:
loc = v.file_path
if v.line_number:
loc += f":{v.line_number}"
lines.append(f"### `{v.issue}`")
lines.append(f"**File:** `{loc}` ")
lines.append(f"**Risk:** {v.explanation} ")
lines.append(f"**Fix:** {v.suggested_fix}\n")
lines.append("## Files Scanned\n")
for f in result.files_triaged:
lines.append(f"- `{f}`")
else: # web
lines.append(f"# SecureLens AI — Web Security Report\n")
lines.append(f"**Target:** {result.url} ")
lines.append(f"**Score:** {result.score}/100 **Grade:** {result.grade} ")
lines.append(f"**Issues Found:** {len(result.issues)} ")
if result.ssl_expiry_days is not None:
lines.append(f"**SSL Expires In:** {result.ssl_expiry_days} days ")
lines.append(f"**Generated:** {ts}\n")
if result.ai_summary:
lines.append("## AI Summary\n")
lines.append(result.ai_summary)
lines.append("\n")
if result.exposed_paths:
lines.append("## Exposed Paths\n")
for p in result.exposed_paths:
lines.append(f"- `{p}`")
lines.append("")
layers: dict = {}
for issue in result.issues:
layers.setdefault(issue.layer, []).append(issue)
for layer, items in layers.items():
lines.append(f"## {layer}\n")
for item in items:
lines.append(f"**[{item.severity}]** {item.issue} ")
lines.append(f"*Fix:* {item.fix}\n")
return "\n".join(lines)
def save_markdown(result, target_type: str = "code") -> Path:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
path = Path(f"securelens-report-{ts}.md")
path.write_text(to_markdown(result, target_type))
return path

218
cli/securelens/repl.py Normal file
View File

@@ -0,0 +1,218 @@
"""
Interactive REPL
================
Post-scan Q&A loop — the "Gemini CLI feel".
After a scan completes, the user drops into this loop where they can:
- Ask natural-language questions about the scan results
- Use slash commands (/export, /files, /model, /clear, /help)
- Ctrl+C to exit
The AI is given full scan context at the start of the conversation
and remembers the entire chat history during the session.
"""
import asyncio
import json
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
from rich.console import Console
from securelens.ai import call_ai
from securelens.ai.prompts import chat_prompt
from securelens.output import console, print_ai_response, print_info, print_error, print_success
from securelens.output.exporters import save_json, save_markdown
console_out = Console()
HELP_TEXT = """
[bold cyan]Available commands:[/bold cyan]
[bold]/help[/bold] Show this help message
[bold]/files[/bold] List files that were scanned
[bold]/score[/bold] Show the security score
[bold]/export markdown[/bold] Save the report as a Markdown file
[bold]/export json[/bold] Save the report as a JSON file
[bold]/model <name>[/bold] Switch AI model (e.g. /model gpt-4o-mini)
[bold]/clear[/bold] Clear the terminal
[bold]/exit[/bold] Exit the REPL
Or just type a question in plain English, e.g.:
[dim]> How do I fix the SQL injection?[/dim]
[dim]> What's the most critical issue?[/dim]
[dim]> Show me all issues in auth.py[/dim]
"""
@dataclass
class ReplContext:
target: str
scan_result: object # LocalScanResult or WebScanResult
target_type: str # "code" | "web"
api_key: str
model: str
conversation_history: list = field(default_factory=list)
async def run_repl(ctx: ReplContext) -> None:
"""
Enter the interactive REPL. Blocks until the user exits.
"""
# Build initial scan context string (injected into every AI prompt)
scan_ctx_str = _build_scan_context(ctx)
console_out.print()
console_out.print("[bold cyan]💬 Ask a follow-up[/bold cyan] [dim](or press Ctrl+C / type /exit to quit)[/dim]")
console_out.print("[dim]Type /help for available commands[/dim]")
console_out.print()
while True:
try:
user_input = _prompt_user()
except (KeyboardInterrupt, EOFError):
console_out.print("\n[dim]Goodbye![/dim]\n")
break
user_input = user_input.strip()
if not user_input:
continue
# ── Slash commands ──────────────────────────────────────────────────
if user_input.startswith("/"):
should_exit = await _handle_slash_command(user_input, ctx)
if should_exit:
break
continue
# ── AI response ─────────────────────────────────────────────────────
if not ctx.api_key:
print_error("No API key configured. Run `securelens configure` to set one.")
continue
prompt = chat_prompt(ctx.target, scan_ctx_str, user_input)
console_out.print("[dim] Thinking...[/dim]")
response = await call_ai(
prompt=prompt,
api_key=ctx.api_key,
model=ctx.model,
temperature=0.5,
conversation_history=ctx.conversation_history,
)
if response:
# Save to history for multi-turn context
ctx.conversation_history.append({"role": "user", "content": user_input})
ctx.conversation_history.append({"role": "assistant", "content": response})
print_ai_response(response)
else:
print_error("No response from AI. Check your API key and network connection.")
def _prompt_user() -> str:
"""Read a line from stdin with a styled prompt."""
sys.stdout.write("[dim bold cyan]>[/dim bold cyan] ")
sys.stdout.flush()
# Use input() — rich.prompt not used here to keep it simple
try:
from rich.prompt import Prompt
return Prompt.ask("[bold cyan]>[/bold cyan]")
except Exception:
return input("> ")
def _build_scan_context(ctx: ReplContext) -> str:
"""Serialize the scan result into a compact string for the AI context."""
result = ctx.scan_result
if ctx.target_type == "code":
vulns = [
{
"file": v.file_path,
"line": v.line_number,
"severity": v.severity,
"issue": v.issue,
"explanation": v.explanation,
"fix": v.suggested_fix,
}
for v in result.vulnerabilities
]
return json.dumps({
"target": result.target,
"score": result.score,
"grade": result.grade,
"files_scanned": result.files_triaged,
"vulnerabilities": vulns,
"ai_summary": result.ai_summary,
}, indent=2)
else: # web
issues = [
{"layer": i.layer, "severity": i.severity, "issue": i.issue, "fix": i.fix}
for i in result.issues
]
return json.dumps({
"target": result.url,
"score": result.score,
"grade": result.grade,
"ssl_expiry_days": result.ssl_expiry_days,
"exposed_paths": result.exposed_paths,
"issues": issues,
"ai_summary": result.ai_summary,
}, indent=2)
async def _handle_slash_command(cmd: str, ctx: ReplContext) -> bool:
"""
Process a slash command. Returns True if the REPL should exit.
"""
parts = cmd.split()
command = parts[0].lower()
if command == "/exit":
console_out.print("\n[dim]Goodbye![/dim]\n")
return True
elif command == "/help":
console_out.print(HELP_TEXT)
elif command == "/clear":
console_out.clear()
elif command == "/files":
result = ctx.scan_result
if ctx.target_type == "code" and hasattr(result, "files_triaged"):
console_out.print("\n[bold]Files analyzed:[/bold]")
for f in result.files_triaged:
console_out.print(f" [dim]• {f}[/dim]")
console_out.print()
else:
print_info("File list not available for web scans.")
elif command == "/score":
r = ctx.scan_result
score = r.score
grade = r.grade
console_out.print(f"\n [bold]Score:[/bold] {score}/100 [bold]Grade:[/bold] {grade}\n")
elif command == "/model":
if len(parts) < 2:
print_info(f"Current model: {ctx.model}")
print_info("Usage: /model <model-name> e.g. /model gpt-4o-mini")
else:
ctx.model = parts[1]
print_success(f"Model switched to: {ctx.model}")
elif command == "/export":
fmt = parts[1].lower() if len(parts) > 1 else "markdown"
if fmt == "json":
path = save_json(ctx.scan_result, ctx.target_type)
print_success(f"JSON report saved to: {path}")
else:
path = save_markdown(ctx.scan_result, ctx.target_type)
print_success(f"Markdown report saved to: {path}")
else:
print_error(f"Unknown command: {command}. Type /help for available commands.")
return False

View File

@@ -0,0 +1,248 @@
"""
Local Code Scanner
==================
Scans a local directory — no GitHub API needed.
Pipeline:
1. Walk the filesystem, respecting .gitignore rules and config ignore patterns
2. Flag files matching known sensitive patterns (always include these)
3. Send the file list to the AI for triage (pick the most security-critical ones)
4. Read each triaged file and send to AI for OWASP vulnerability analysis
5. Return structured list of vulnerability findings
"""
import asyncio
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
import pathspec
from securelens.ai import call_ai, call_ai_json
from securelens.ai.prompts import triage_prompt, analysis_prompt
from securelens.config import CLIConfig
logger = logging.getLogger(__name__)
# ── File extension blocklist (binary / generated — no security signal) ────────
BINARY_EXTENSIONS = {
".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".ico",
".pdf", ".doc", ".docx", ".xls", ".xlsx",
".zip", ".tar", ".gz", ".rar", ".7z",
".whl", ".egg", ".jar", ".war", ".ear",
".mp4", ".mp3", ".avi", ".mov",
".ttf", ".woff", ".woff2", ".eot",
".pyc", ".pyo", ".class",
".so", ".dll", ".dylib", ".exe",
".db", ".sqlite", ".sqlite3",
}
# ── Files that are always included regardless of AI triage ───────────────────
ALWAYS_SCAN_PATTERNS = [
"*.env", ".env", ".env.*", "*.env.*",
"config.py", "settings.py", "config.js", "config.ts",
"secrets.py", "credentials.py", "keys.py",
"Dockerfile", "docker-compose.yml", "docker-compose.yaml",
"*.pem", "*.key", "*.p12", "*.pfx",
"requirements.txt", "package.json", "Gemfile",
]
SENSITIVE_NAME_KEYWORDS = [
"secret", "password", "passwd", "credential", "cred",
"api_key", "apikey", "auth", "jwt", "token",
"private", "priv_key", "access_key",
]
@dataclass
class VulnerabilityFinding:
file_path: str
severity: str # Critical | High | Medium | Low
issue: str
explanation: str
suggested_fix: str
line_number: Optional[int] = None
@dataclass
class LocalScanResult:
target: str
total_files_found: int
files_triaged: list[str] = field(default_factory=list)
vulnerabilities: list[VulnerabilityFinding] = field(default_factory=list)
ai_summary: str = ""
score: int = 100
grade: str = "A"
def compute_score(self) -> None:
"""Deterministic score: deduct points by severity."""
weights = {"Critical": 20, "High": 12, "Medium": 5, "Low": 2}
deduction = sum(weights.get(v.severity, 0) for v in self.vulnerabilities)
self.score = max(100 - deduction, 0)
self.grade = _score_to_grade(self.score)
def _score_to_grade(score: int) -> str:
if score >= 90: return "A"
if score >= 80: return "B"
if score >= 70: return "C"
if score >= 60: return "D"
return "F"
# ── Phase 1: File Discovery ───────────────────────────────────────────────────
def discover_files(root: Path, cfg: CLIConfig) -> list[Path]:
"""
Walk the directory tree and return candidate files.
Respects .gitignore in the root and cfg.ignore_patterns.
Skips binaries and files larger than cfg.max_file_size_kb.
"""
# Build a combined spec from config ignore_patterns + .gitignore
ignore_patterns = list(cfg.ignore_patterns)
gitignore_path = root / ".gitignore"
if gitignore_path.exists():
with open(gitignore_path) as f:
ignore_patterns.extend(
line.strip()
for line in f
if line.strip() and not line.startswith("#")
)
spec = pathspec.PathSpec.from_lines("gitwildmatch", ignore_patterns)
max_bytes = cfg.max_file_size_kb * 1024
candidates: list[Path] = []
for p in root.rglob("*"):
if not p.is_file():
continue
rel = p.relative_to(root).as_posix()
if spec.match_file(rel):
continue
if p.suffix.lower() in BINARY_EXTENSIONS:
continue
if p.stat().st_size > max_bytes:
continue
candidates.append(p)
return sorted(candidates)
def _is_always_scan(path: Path) -> bool:
"""Returns True if this file should always be scanned regardless of triage."""
name = path.name.lower()
# Check sensitive name keywords
if any(kw in name for kw in SENSITIVE_NAME_KEYWORDS):
return True
# Check always-scan patterns
for pattern in ALWAYS_SCAN_PATTERNS:
if path.match(pattern):
return True
return False
# ── Phase 2: AI Triage ───────────────────────────────────────────────────────
async def triage_files(
candidates: list[Path],
root: Path,
cfg: CLIConfig,
) -> list[Path]:
"""
Ask the AI to pick the most security-relevant files.
Always-scan files are added automatically regardless of AI choice.
"""
# Separate forced files from candidates
forced = [p for p in candidates if _is_always_scan(p)]
non_forced = [p for p in candidates if not _is_always_scan(p)]
# Build file list for AI (relative paths — cleaner prompt)
rel_paths = [p.relative_to(root).as_posix() for p in non_forced]
remaining_budget = max(0, cfg.max_files_to_scan - len(forced))
ai_selected: list[Path] = []
if rel_paths and remaining_budget > 0 and cfg.api_key:
file_list_str = "\n".join(rel_paths[:300]) # cap to ~300 paths for token budget
prompt = triage_prompt(file_list_str, remaining_budget)
result = await call_ai_json(prompt, cfg.api_key, cfg.default_model, temperature=0.1)
if result and "critical_files" in result:
for rel in result["critical_files"]:
abs_path = root / rel
if abs_path.exists():
ai_selected.append(abs_path)
# Merge: forced first, then AI-selected (deduplicated)
seen = set()
final: list[Path] = []
for p in forced + ai_selected:
if p not in seen:
seen.add(p)
final.append(p)
return final[:cfg.max_files_to_scan]
# ── Phase 3: File Analysis ────────────────────────────────────────────────────
async def analyze_file(
path: Path,
root: Path,
cfg: CLIConfig,
) -> list[VulnerabilityFinding]:
"""Send a single file's content to the AI for OWASP analysis."""
rel = path.relative_to(root).as_posix()
try:
content = path.read_text(errors="replace")
except Exception as e:
logger.warning(f"Could not read {rel}: {e}")
return []
# Cap content to avoid token overflow
if len(content) > 30_000:
content = content[:30_000] + "\n... (truncated)"
prompt = analysis_prompt(rel, content)
result = await call_ai_json(prompt, cfg.api_key, cfg.default_model, temperature=0.2)
if not result:
return []
findings: list[VulnerabilityFinding] = []
for v in result.get("vulnerabilities", []):
findings.append(VulnerabilityFinding(
file_path=rel,
severity=v.get("severity", "Medium"),
issue=v.get("issue", "Unknown Issue"),
explanation=v.get("explanation", ""),
suggested_fix=v.get("suggested_fix", ""),
line_number=v.get("line_number"),
))
return findings
async def analyze_files(
triaged: list[Path],
root: Path,
cfg: CLIConfig,
progress_callback=None,
) -> list[VulnerabilityFinding]:
"""
Analyze all triaged files concurrently.
Uses a semaphore to avoid hammering the API with too many simultaneous calls.
"""
semaphore = asyncio.Semaphore(4)
all_findings: list[VulnerabilityFinding] = []
async def _analyze_with_sem(path: Path, idx: int) -> list[VulnerabilityFinding]:
async with semaphore:
result = await analyze_file(path, root, cfg)
if progress_callback:
await progress_callback(idx + 1, len(triaged), path.relative_to(root).as_posix())
return result
tasks = [_analyze_with_sem(p, i) for i, p in enumerate(triaged)]
results = await asyncio.gather(*tasks)
for r in results:
all_findings.extend(r)
return all_findings

View File

@@ -0,0 +1,339 @@
"""
Web URL Scanner
===============
Runs the full HTTP security check suite against a live URL.
Lifted from the backend scanner/ services — no FastAPI dependency.
Checks:
1. Transport (HTTPS, HSTS)
2. Security Headers (CSP, X-Frame-Options, etc.)
3. Cookie flags (HttpOnly, Secure, SameSite)
4. Exposed sensitive paths (.env, /admin, etc.)
5. SSL certificate validity
"""
import asyncio
import ssl
import socket
import datetime
import logging
from dataclasses import dataclass, field
from typing import Optional
from urllib.parse import urlparse
import httpx
logger = logging.getLogger(__name__)
SENSITIVE_PATHS = [
"/.env", "/.env.local", "/.env.production", "/.env.backup",
"/admin", "/admin/", "/wp-admin/",
"/phpinfo.php", "/info.php", "/test.php",
"/.git/config", "/.git/HEAD",
"/config.yml", "/config.yaml", "/config.json",
"/backup.sql", "/dump.sql", "/database.sql",
"/robots.txt", "/sitemap.xml", # not dangerous but worth noting
"/.DS_Store",
"/server-status", "/server-info",
"/actuator", "/actuator/health", "/actuator/env",
"/__debug__/",
]
MIN_HSTS_MAX_AGE = 15_768_000 # 6 months
@dataclass
class WebIssue:
issue: str
severity: str # Critical | Warning | Info
layer: str
fix: str
@dataclass
class WebScanResult:
url: str
reachable: bool = True
issues: list[WebIssue] = field(default_factory=list)
ai_summary: str = ""
score: int = 100
grade: str = "A"
ssl_expiry_days: Optional[int] = None
exposed_paths: list[str] = field(default_factory=list)
def compute_score(self) -> None:
weights = {"Critical": 15, "Warning": 5, "Info": 2}
deduction = sum(weights.get(i.severity, 0) for i in self.issues)
self.score = max(100 - deduction, 0)
self.grade = _score_to_grade(self.score)
def _score_to_grade(score: int) -> str:
if score >= 90: return "A"
if score >= 80: return "B"
if score >= 70: return "C"
if score >= 60: return "D"
return "F"
async def scan_url(url: str, timeout: int = 10) -> WebScanResult:
"""Run all web security checks against the given URL."""
result = WebScanResult(url=url)
try:
async with httpx.AsyncClient(
follow_redirects=True,
timeout=timeout,
verify=False, # we do our own cert check
) as client:
response = await client.get(url)
_check_transport(url, response, result)
_check_headers(url, response, result)
_check_cookies(url, response, result)
await _check_exposed_paths(url, result, timeout)
_check_ssl(url, result)
except httpx.ConnectError:
result.reachable = False
result.issues.append(WebIssue(
issue="Could not connect to host",
severity="Critical",
layer="Transport Layer",
fix="Verify the URL is correct and the server is running",
))
except Exception as e:
logger.error(f"Web scan error: {e}")
result.reachable = False
result.compute_score()
return result
# ── Individual checkers ────────────────────────────────────────────────────────
def _check_transport(url: str, response: httpx.Response, result: WebScanResult) -> None:
headers = response.headers
if not url.startswith("https"):
result.issues.append(WebIssue(
issue="Site is not using HTTPS",
severity="Critical",
layer="Transport Layer",
fix="Install an SSL certificate and redirect all HTTP traffic to HTTPS",
))
return
hsts = headers.get("Strict-Transport-Security", "")
if not hsts:
result.issues.append(WebIssue(
issue="Missing HSTS (Strict-Transport-Security) header",
severity="Warning",
layer="Transport Layer",
fix="Add: Strict-Transport-Security: max-age=31536000; includeSubDomains; preload",
))
else:
max_age = 0
for directive in hsts.lower().split(";"):
d = directive.strip()
if d.startswith("max-age="):
try:
max_age = int(d.split("=", 1)[1])
except ValueError:
pass
if max_age < MIN_HSTS_MAX_AGE:
result.issues.append(WebIssue(
issue=f"HSTS max-age is too short ({max_age}s)",
severity="Warning",
layer="Transport Layer",
fix="Set HSTS max-age to at least 31536000 (1 year)",
))
if "includesubdomains" not in hsts.lower():
result.issues.append(WebIssue(
issue="HSTS missing includeSubDomains",
severity="Info",
layer="Transport Layer",
fix="Add includeSubDomains to the HSTS header",
))
def _check_headers(url: str, response: httpx.Response, result: WebScanResult) -> None:
h = response.headers
if "Content-Security-Policy" not in h:
result.issues.append(WebIssue(
issue="Missing Content-Security-Policy header",
severity="Warning",
layer="Security Headers",
fix="Add: Content-Security-Policy: default-src 'self';",
))
else:
csp = h["Content-Security-Policy"]
if "'unsafe-inline'" in csp:
result.issues.append(WebIssue(
issue="CSP allows 'unsafe-inline'",
severity="Warning",
layer="Security Headers",
fix="Remove 'unsafe-inline' from CSP; use nonces or hashes instead",
))
if "'unsafe-eval'" in csp:
result.issues.append(WebIssue(
issue="CSP allows 'unsafe-eval'",
severity="Warning",
layer="Security Headers",
fix="Remove 'unsafe-eval' from CSP to prevent eval()-based code execution",
))
if "X-Frame-Options" not in h:
result.issues.append(WebIssue(
issue="Missing X-Frame-Options header",
severity="Warning",
layer="Security Headers",
fix="Add: X-Frame-Options: SAMEORIGIN",
))
if "X-Content-Type-Options" not in h:
result.issues.append(WebIssue(
issue="Missing X-Content-Type-Options header",
severity="Warning",
layer="Security Headers",
fix="Add: X-Content-Type-Options: nosniff",
))
if "Referrer-Policy" not in h:
result.issues.append(WebIssue(
issue="Missing Referrer-Policy header",
severity="Info",
layer="Security Headers",
fix="Add: Referrer-Policy: strict-origin-when-cross-origin",
))
if "Permissions-Policy" not in h:
result.issues.append(WebIssue(
issue="Missing Permissions-Policy header",
severity="Info",
layer="Security Headers",
fix="Add: Permissions-Policy: geolocation=(), camera=(), microphone=()",
))
if h.get("Access-Control-Allow-Origin") == "*":
result.issues.append(WebIssue(
issue="CORS allows all origins (*)",
severity="Warning",
layer="Security Headers",
fix="Restrict Access-Control-Allow-Origin to trusted domains",
))
server = h.get("Server", "")
if server:
result.issues.append(WebIssue(
issue=f"Server header reveals technology: {server}",
severity="Info",
layer="Security Headers",
fix="Remove or mask the Server header",
))
if "X-Powered-By" in h:
result.issues.append(WebIssue(
issue=f"X-Powered-By header reveals stack: {h['X-Powered-By']}",
severity="Info",
layer="Security Headers",
fix="Remove the X-Powered-By header",
))
def _check_cookies(url: str, response: httpx.Response, result: WebScanResult) -> None:
from http.cookies import SimpleCookie
is_https = url.startswith("https")
raw_cookies = response.headers.multi_items()
set_cookie_headers = [v for k, v in raw_cookies if k.lower() == "set-cookie"]
for cookie_str in set_cookie_headers:
cookie = SimpleCookie()
try:
cookie.load(cookie_str)
except Exception:
continue
cookie_lower = cookie_str.lower()
for name, _ in cookie.items():
if "httponly" not in cookie_lower:
result.issues.append(WebIssue(
issue=f"Cookie '{name}' missing HttpOnly flag",
severity="Warning",
layer="Cookie Security",
fix=f"Set HttpOnly on cookie '{name}' to prevent JS access",
))
if is_https and "; secure" not in cookie_lower:
result.issues.append(WebIssue(
issue=f"Cookie '{name}' missing Secure flag",
severity="Warning",
layer="Cookie Security",
fix=f"Set Secure flag on cookie '{name}'",
))
if "samesite" not in cookie_lower:
result.issues.append(WebIssue(
issue=f"Cookie '{name}' missing SameSite attribute",
severity="Warning",
layer="Cookie Security",
fix=f"Set SameSite=Lax or SameSite=Strict on cookie '{name}'",
))
async def _check_exposed_paths(url: str, result: WebScanResult, timeout: int) -> None:
base = url.rstrip("/")
async with httpx.AsyncClient(verify=False, timeout=timeout) as client:
async def check_path(path: str):
try:
r = await client.get(base + path)
if r.status_code == 200 and path not in ("/robots.txt", "/sitemap.xml"):
result.exposed_paths.append(path)
result.issues.append(WebIssue(
issue=f"Sensitive path exposed: {path}",
severity="Critical" if ".env" in path or ".git" in path else "Warning",
layer="Exposure",
fix=f"Block or restrict access to {path} via your web server config",
))
except Exception:
pass
await asyncio.gather(*(check_path(p) for p in SENSITIVE_PATHS))
def _check_ssl(url: str, result: WebScanResult) -> None:
if not url.startswith("https"):
return
parsed = urlparse(url)
hostname = parsed.hostname
port = parsed.port or 443
try:
ctx = ssl.create_default_context()
with ctx.wrap_socket(socket.create_connection((hostname, port), timeout=5), server_hostname=hostname) as s:
cert = s.getpeercert()
expiry_str = cert.get("notAfter", "")
if expiry_str:
expiry = datetime.datetime.strptime(expiry_str, "%b %d %H:%M:%S %Y %Z")
days_left = (expiry - datetime.datetime.utcnow()).days
result.ssl_expiry_days = days_left
if days_left < 14:
result.issues.append(WebIssue(
issue=f"SSL certificate expires in {days_left} days",
severity="Critical",
layer="SSL/TLS",
fix="Renew the SSL certificate immediately",
))
elif days_left < 30:
result.issues.append(WebIssue(
issue=f"SSL certificate expires soon ({days_left} days)",
severity="Warning",
layer="SSL/TLS",
fix="Renew the SSL certificate within the next 30 days",
))
except ssl.SSLCertVerificationError:
result.issues.append(WebIssue(
issue="SSL certificate is invalid or self-signed",
severity="Critical",
layer="SSL/TLS",
fix="Install a valid SSL certificate from a trusted CA (e.g. Let's Encrypt)",
))
except Exception as e:
logger.debug(f"SSL check error: {e}")