add sync command and scan options to securelens cli

This commit is contained in:
rarebuffalo
2026-06-12 19:10:58 +05:30
parent c29acca5bc
commit 8623f28910

View File

@@ -123,7 +123,8 @@ def configure():
type=click.Choice(["critical", "high", "medium", "low"]),
help="In --ci mode, exit 1 if issues of this severity or above are found")
@click.option("--no-ai", is_flag=True, help="Skip AI triage & summary (pattern-based only, faster)")
def scan(path, model, output, max_files, ci, fail_on, no_ai):
@click.option("--sync", is_flag=True, help="Sync scan findings to backend database")
def scan(path, model, output, max_files, ci, fail_on, no_ai, sync):
"""
Scan a local codebase for security vulnerabilities.
@@ -134,10 +135,10 @@ def scan(path, model, output, max_files, ci, fail_on, no_ai):
securelens scan . --model gpt-4o --max-files 30
securelens scan . --ci --fail-on high
"""
_run(_scan_async(path, model, output, max_files, ci, fail_on, no_ai))
_run(_scan_async(path, model, output, max_files, ci, fail_on, no_ai, sync))
async def _scan_async(path, model, output, max_files, ci, fail_on, no_ai):
async def _scan_async(path, model, output, max_files, ci, fail_on, no_ai, sync):
from securelens.config import load_config
from securelens.output import print_banner, print_scan_header, print_code_scan_report, make_progress, print_error
from securelens.output.exporters import save_json, save_markdown, to_json
@@ -202,7 +203,17 @@ async def _scan_async(path, model, output, max_files, ci, fail_on, no_ai):
progress.update(task_analyze, completed=done, detail=filename)
if no_ai or not cfg.api_key:
from securelens.scanners.patterns import scan_file_content
vulnerabilities = []
for idx, p in enumerate(triaged):
rel_path = p.relative_to(root).as_posix()
try:
content = p.read_text(errors="replace")
file_vulns = scan_file_content(rel_path, content)
vulnerabilities.extend(file_vulns)
except Exception as e:
click.echo(f"Warning: Could not read {rel_path} for offline scan: {e}", err=True)
await on_progress(idx + 1, len(triaged), rel_path)
else:
vulnerabilities = await analyze_files(triaged, root, cfg, on_progress)
progress.update(task_analyze, completed=len(triaged),
@@ -248,6 +259,16 @@ async def _scan_async(path, model, output, max_files, ci, fail_on, no_ai):
if not ci:
console.print(f" [green]✓ Markdown report saved:[/green] [dim]{path_out}[/dim]\n")
# ── Sync to Central Backend ──────────────────────────────────────────────
if sync or cfg.token:
from securelens.scanners.sync import sync_scan_to_backend
console.print("[dim]Synchronizing scan with backend database...[/dim]")
scan_id = await sync_scan_to_backend(result, token=cfg.token)
if scan_id:
console.print(f" [bold green]✓ Scan synchronized successfully![/bold green] Backend Scan ID: [cyan]{scan_id}[/cyan]\n")
else:
console.print(" [bold yellow]⚠ Sync failed: Could not connect to backend or token is invalid.[/bold yellow]\n")
# ── CI exit code ─────────────────────────────────────────────────────────
if ci:
_ci_exit(result.vulnerabilities, fail_on, "code")
@@ -393,5 +414,59 @@ def _ci_exit(issues, fail_on, scan_type: str):
sys.exit(0)
# ── sync ──────────────────────────────────────────────────────────────────────
@main.command()
@click.argument("scan_file", type=click.Path(exists=True, dir_okay=False))
def sync(scan_file):
"""Synchronize a local JSON scan report with the backend database."""
_run(_sync_async(scan_file))
async def _sync_async(scan_file):
from securelens.config import load_config
from securelens.scanners import LocalScanResult, VulnerabilityFinding
from securelens.scanners.sync import sync_scan_to_backend
import json as _json
cfg = load_config()
try:
with open(scan_file) as f:
data = _json.load(f)
# Reconstruct LocalScanResult
findings = []
for v in data.get("vulnerabilities", []):
findings.append(VulnerabilityFinding(
file_path=v.get("file_path", v.get("file", "")),
severity=v.get("severity", "Medium"),
issue=v.get("issue", "Unknown"),
explanation=v.get("explanation", ""),
suggested_fix=v.get("suggested_fix", ""),
line_number=v.get("line_number")
))
result = LocalScanResult(
target=data.get("target", "unknown"),
total_files_found=data.get("total_files_found", 0),
files_triaged=data.get("files_triaged", []),
vulnerabilities=findings,
ai_summary=data.get("ai_summary", "")
)
result.compute_score()
console.print(f"\n[dim]Reading scan file: {scan_file}[/dim]")
console.print("[dim]Synchronizing scan with backend database...[/dim]")
scan_id = await sync_scan_to_backend(result, token=cfg.token)
if scan_id:
console.print(f" [bold green]✓ Scan synchronized successfully![/bold green] Backend Scan ID: [cyan]{scan_id}[/cyan]\n")
else:
console.print(" [bold yellow]⚠ Sync failed: Could not connect to backend or token is invalid.[/bold yellow]\n")
except Exception as e:
console.print(f" [bold red]✗ Failed to parse or sync scan file: {e}[/bold red]\n")
if __name__ == "__main__":
main()