diff --git a/app/config.py b/app/config.py index 7246f91..7b2b898 100644 --- a/app/config.py +++ b/app/config.py @@ -1,9 +1,32 @@ +""" +Application Configuration +========================== + +All configuration is loaded from environment variables (or a .env file). + +AI Provider Configuration +-------------------------- +SecureLens supports multiple AI providers via LiteLLM. +Set AI_MODEL and AI_API_KEY in your .env to choose a provider: + + Provider | AI_MODEL example | Where to get key + -------------|--------------------------------------|----------------------------- + Gemini | gemini/gemini-2.0-flash | aistudio.google.com + OpenAI | gpt-4o-mini | platform.openai.com + Claude | claude-3-5-haiku-20241022 | console.anthropic.com + OpenRouter | openrouter/google/gemini-2.0-flash | openrouter.ai + Ollama | ollama/llama3.1 | ollama.com (local, no key) + +If you only set GEMINI_API_KEY (legacy), it will still work — the app +automatically maps it to the Gemini provider for backward compatibility. +""" + from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): app_name: str = "SecureLens AI" - app_version: str = "1.0.0" + app_version: str = "1.1.0" debug: bool = False host: str = "0.0.0.0" @@ -22,13 +45,39 @@ class Settings(BaseSettings): jwt_algorithm: str = "HS256" jwt_expiry_minutes: int = 1440 + # ------------------------------------------------------------------------- + # AI Provider Settings (new, provider-agnostic) + # ------------------------------------------------------------------------- + # AI_MODEL: the LiteLLM model string (see table in module docstring above) + ai_model: str = "gemini/gemini-2.0-flash" + + # AI_API_KEY: the API key for the chosen provider. + # Leave blank for Ollama (local, no key needed). + ai_api_key: str | None = None + + # ------------------------------------------------------------------------- + # Legacy Gemini key — kept for backward compatibility. + # If AI_API_KEY is not set but GEMINI_API_KEY is, we use that automatically. + # ------------------------------------------------------------------------- gemini_api_key: str | None = None + # Threat Intelligence API keys (Step 3) + virustotal_api_key: str | None = None + abuseipdb_api_key: str | None = None + model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") @property def cors_origin_list(self) -> list[str]: return [origin.strip() for origin in self.cors_origins.split(",") if origin.strip()] + @property + def effective_ai_key(self) -> str | None: + """ + Returns the resolved AI API key. + Prefers AI_API_KEY; falls back to GEMINI_API_KEY for backward compatibility. + """ + return self.ai_api_key or self.gemini_api_key + settings = Settings() diff --git a/app/routers/scan.py b/app/routers/scan.py index b4a8d1d..36764af 100644 --- a/app/routers/scan.py +++ b/app/routers/scan.py @@ -22,6 +22,7 @@ from app.services.scanner.dns import DNSScanner from app.services.scanner.ports import PortScanner from app.services.scoring import calculate_layer_statuses, calculate_score from app.services.ai import enhance_security_issues +from app.services.threat_intel import get_threat_intel_summary from app.utils.validators import validate_url logger = logging.getLogger(__name__) @@ -78,6 +79,8 @@ async def scan_website( dns_task = asyncio.create_task(dns_scanner.scan(url)) port_task = asyncio.create_task(port_scanner.scan(url)) + # Step 3: Run threat intel lookup concurrently — zero extra latency + threat_intel_task = asyncio.create_task(get_threat_intel_summary(url)) async with httpx.AsyncClient( timeout=httpx.Timeout(settings.scan_timeout), @@ -95,11 +98,12 @@ async def scan_website( # Await infrastructure scans all_issues.extend(await dns_task) all_issues.extend(await port_task) + threat_intel = await threat_intel_task score = calculate_score(all_issues) layers = calculate_layer_statuses(all_issues) - if settings.gemini_api_key and all_issues: + if settings.effective_ai_key and all_issues: issues_dict_list = [i.model_dump() for i in all_issues] ai_data = await enhance_security_issues(issues_dict_list) enhanced_list = ai_data.get("enhanced_issues", []) @@ -144,6 +148,7 @@ async def scan_website( layers=layers, issues=all_issues, created_at=created_at, + threat_intel=threat_intel, # Step 3: attach threat intelligence ) except httpx.HTTPError as e: diff --git a/app/services/ai.py b/app/services/ai.py index 125c56a..848e8d5 100644 --- a/app/services/ai.py +++ b/app/services/ai.py @@ -1,24 +1,108 @@ +""" +AI Service Layer — Provider-Agnostic via LiteLLM +================================================== + +Why LiteLLM? + Previously every AI call used the google-genai SDK directly, which meant + the entire codebase was hard-wired to Gemini. Switching to OpenAI or + Claude would require rewriting every file that touched AI. + + LiteLLM is a thin translation layer. You call one function, it handles + the right SDK under the hood based on the model string you pass: + - "gpt-4o-mini" → OpenAI + - "claude-3-5-haiku-20241022" → Anthropic + - "gemini/gemini-2.0-flash" → Google Gemini + - "ollama/llama3.1" → local Ollama instance + - "openrouter/..." → OpenRouter + + Now you only need to change two env vars (AI_MODEL, AI_API_KEY) to switch + providers — no code changes required. + +Public API (used by the rest of the app): + call_ai(prompt, temperature, json_mode) → str + enhance_security_issues(issues) → dict + chat_with_scan_context(...) → str + generate_threat_narrative(context_data) → str +""" + import json import logging -import asyncio -from google import genai -from google.genai import types from app.config import settings logger = logging.getLogger(__name__) -if settings.gemini_api_key: - # Initialize google-genai client - ai_client = genai.Client(api_key=settings.gemini_api_key) -else: - ai_client = None -async def get_gemini_model(): - return 'gemini-2.0-flash' +# --------------------------------------------------------------------------- +# Core LiteLLM wrapper +# --------------------------------------------------------------------------- + +async def call_ai( + prompt: str, + temperature: float = 0.3, + json_mode: bool = False, +) -> str: + """ + The single entry-point for all AI calls in SecureLens. + + Parameters + ---------- + prompt : The full prompt string to send to the model. + temperature : Creativity level (0 = deterministic, 1 = creative). + json_mode : If True, instruct the model to return valid JSON only. + This maps to response_format={"type":"json_object"} on + providers that support it (OpenAI, Gemini via LiteLLM). + + Returns + ------- + The model's text response as a plain string. Empty string on failure. + """ + import litellm + + api_key = settings.effective_ai_key + model = settings.ai_model + + if not api_key and not model.startswith("ollama/"): + logger.warning("No AI API key configured. Skipping AI call.") + return "" + + messages = [{"role": "user", "content": prompt}] + + kwargs: dict = { + "model": model, + "messages": messages, + "temperature": temperature, + "api_key": api_key, + } + + # JSON mode: supported natively by OpenAI and LiteLLM proxied Gemini. + # For providers that don't support it, LiteLLM silently ignores the flag. + if json_mode: + kwargs["response_format"] = {"type": "json_object"} + + try: + response = await litellm.acompletion(**kwargs) + return response.choices[0].message.content or "" + except Exception as e: + logger.error(f"LiteLLM call failed [model={model}]: {e}") + return "" + + +# --------------------------------------------------------------------------- +# Domain-specific AI functions +# --------------------------------------------------------------------------- async def enhance_security_issues(issues: list[dict]) -> dict: - if not settings.gemini_api_key: - logger.warning("GEMINI_API_KEY is not set. AI enhancements are skipped.") + """ + Takes a raw list of scanner-detected issues and enriches each one with: + - contextual_severity : AI-assessed severity in the real-world context + - explanation : Plain-English description of the risk + - remediation_snippet : Concrete code or config fix + + Returns a dict {"enhanced_issues": [...]} that mirrors the original list + with the three new fields merged in. + """ + if not settings.effective_ai_key: + logger.warning("AI enhancements skipped — no AI API key set.") return {"enhanced_issues": issues} prompt = ( @@ -28,75 +112,61 @@ async def enhance_security_issues(issues: list[dict]) -> dict: "Return a JSON object with a single key 'enhanced_issues' containing a list of objects. " "Each object MUST correspond to one of the original issues and have the following keys: " "'issue' (exact string of the original issue), " - "'contextual_severity' (Low, Medium, High, Critical), " - "'explanation' (a 1-2 sentence non-technical explanation), " - "'remediation_snippet' (Actionable code snippet, e.g. Nginx config, or 'N/A')." + "'contextual_severity' (Low, Medium, High, or Critical), " + "'explanation' (a 1-2 sentence non-technical explanation of the real risk), " + "'remediation_snippet' (an actionable code snippet or config fix, or 'N/A')." ) + raw = await call_ai(prompt, temperature=0.2, json_mode=True) + if not raw: + return {"enhanced_issues": issues, "ai_error": "Empty response from AI"} + try: - model_name = await get_gemini_model() - response = await ai_client.aio.models.generate_content( - model=model_name, - contents=prompt, - config=types.GenerateContentConfig( - response_mime_type="application/json", - temperature=0.2, - ) - ) - if response.text: - return json.loads(response.text) - return {"enhanced_issues": issues, "ai_error": "Empty response"} - except Exception as e: - logger.error(f"AI Generation Error: {str(e)}") - return {"enhanced_issues": issues, "ai_error": str(e)} + return json.loads(raw) + except json.JSONDecodeError as e: + logger.error(f"Failed to parse AI JSON response: {e}\nRaw: {raw[:500]}") + return {"enhanced_issues": issues, "ai_error": "JSON parse error"} + async def chat_with_scan_context(scan_id: str, context_data: dict, user_message: str) -> str: - if not settings.gemini_api_key: - return "AI Chat is disabled because GEMINI_API_KEY is not configured." + """ + Powers the conversational chat feature for web scans. + + The full scan context (score, layers, issues) is injected into the prompt + so the model can answer specific questions about the scan results. + """ + if not settings.effective_ai_key: + return "AI Chat is disabled because no AI API key is configured." prompt = ( "You are SecureLens AI, an expert cybersecurity assistant. " "You are helping a developer understand a security scan report for their website. " - f"Here is the context of the scan: {json.dumps(context_data)}\n\n" - f"User Message: {user_message}" + f"Here is the context of the scan:\n{json.dumps(context_data, indent=2)}\n\n" + f"Developer's question: {user_message}\n\n" + "Answer clearly and professionally. Reference specific findings from the scan when relevant." ) - try: - model_name = await get_gemini_model() - response = await ai_client.aio.models.generate_content( - model=model_name, - contents=prompt, - config=types.GenerateContentConfig( - temperature=0.5, - ) - ) - return response.text or "No response from AI." - except Exception as e: - logger.error(f"AI Chat Error: {str(e)}") - return "I encountered an error trying to process your request." + result = await call_ai(prompt, temperature=0.5) + return result or "I couldn't generate a response. Please try again." + async def generate_threat_narrative(context_data: dict) -> str: - if not settings.gemini_api_key: - return "AI Threat Narrative is disabled because GEMINI_API_KEY is not configured." + """ + Generates a 2-3 paragraph red-team style threat narrative. + + Explains how an attacker could chain the discovered vulnerabilities + together to compromise the system. Used in the PDF report. + """ + if not settings.effective_ai_key: + return "AI Threat Narrative is disabled because no AI API key is configured." prompt = ( "You are a senior cybersecurity red-teamer. Analyze the following security scan results " "and weave them into a single, cohesive 'Threat Narrative'. Explain how an attacker might " "chain these specific vulnerabilities together to compromise the system. " "Keep it professional, concise (2-3 paragraphs), and actionable.\n\n" - f"Context: {json.dumps(context_data)}" + f"Scan Context:\n{json.dumps(context_data, indent=2)}" ) - try: - model_name = await get_gemini_model() - response = await ai_client.aio.models.generate_content( - model=model_name, - contents=prompt, - config=types.GenerateContentConfig( - temperature=0.7, - ) - ) - return response.text or "Could not generate threat narrative." - except Exception as e: - logger.error(f"AI Narrative Error: {str(e)}") - return "I encountered an error trying to generate the threat narrative." + result = await call_ai(prompt, temperature=0.7) + return result or "Could not generate threat narrative." diff --git a/app/services/code_scanner/orchestrator.py b/app/services/code_scanner/orchestrator.py index fd8f754..600c141 100644 --- a/app/services/code_scanner/orchestrator.py +++ b/app/services/code_scanner/orchestrator.py @@ -1,36 +1,50 @@ +""" +Code Scan Orchestrator +======================= + +Coordinates the three phases of an agentic code security scan: + 1. Triage — Ask the AI which files are worth scanning. + 2. Analyze — Send each file's code to the AI for OWASP vulnerability review. + 3. Summarize— Generate an executive summary of all findings. + +Previously this used the google-genai SDK directly. It now delegates all AI +calls to app.services.ai.call_ai(), which is provider-agnostic via LiteLLM. +This means switching from Gemini to Claude (or any other model) automatically +applies to the code scanner without any changes here. +""" + import json import logging -from typing import List, Dict, Any -from google import genai -from google.genai import types import asyncio +from typing import List from app.config import settings +from app.services.ai import call_ai from app.services.code_scanner.github_client import GitHubClient from app.schemas.code_scan import VulnerabilityIssue logger = logging.getLogger(__name__) -if settings.gemini_api_key: - # google-genai client init - ai_client = genai.Client(api_key=settings.gemini_api_key) -else: - ai_client = None class CodeScanOrchestrator: def __init__(self, repo_url: str, github_token: str, branch: str = "main"): self.repo_url = repo_url self.branch = branch self.github = GitHubClient(token=github_token) - # We use gemini-2.0-flash for high rate limits and stability - self.model_name = 'gemini-2.0-flash' async def triage_files(self, all_files: List[str]) -> List[str]: """ - Uses the LLM to select which files are most likely to contain security vulnerabilities + Phase 1 — AI-driven file triage. + + Sends the full file tree to the LLM and asks it to select the + most security-critical files (e.g. auth handlers, DB queries, + config files). Caps at 5 files to stay within token budgets. + + Falls back to the first 5 files if the AI call fails or no key + is configured. """ - if not settings.gemini_api_key: - logger.warning("GEMINI_API_KEY is not set. Triaging all files up to a limit.") + if not settings.effective_ai_key: + logger.warning("No AI key set. Falling back to first 5 files.") return all_files[:5] files_str = "\n".join(all_files) @@ -40,117 +54,118 @@ class CodeScanOrchestrator: prompt = ( "You are a Senior Application Security Engineer. I have a repository with the following files:\n" f"{files_str}\n\n" - "Select the most critical files to review for security vulnerabilities (e.g., SAST, hardcoded secrets, SQLi, Auth bypass). " - "Return a JSON object with a single key 'critical_files' containing a list of the exact file paths. " - "Do not select more than 5 files." + "Select the most critical files to review for security vulnerabilities " + "(e.g. authentication, database access, config, API routes, secrets handling). " + "Return a JSON object with a single key 'critical_files' containing a list of " + "the exact file paths from the list above. Do not select more than 5 files." ) try: - response = await ai_client.aio.models.generate_content( - model=self.model_name, - contents=prompt, - config=types.GenerateContentConfig( - response_mime_type="application/json", - temperature=0.1, - ) - ) - if response.text: - data = json.loads(response.text) + raw = await call_ai(prompt, temperature=0.1, json_mode=True) + if raw: + data = json.loads(raw) return data.get("critical_files", []) except Exception as e: - logger.error(f"Error triaging files: {e}") - + logger.error(f"File triage failed: {e}") + return all_files[:5] async def analyze_files(self, triaged_files: List[str]) -> List[VulnerabilityIssue]: - if not settings.gemini_api_key: + """ + Phase 2 — Per-file SAST analysis. + + Downloads each file's source code from GitHub and sends it to + the AI for a focused OWASP Top-10 vulnerability review. + + Concurrency is throttled with a semaphore to avoid hitting + provider rate limits (max 5 simultaneous AI requests). + """ + if not settings.effective_ai_key: return [] vulnerabilities = [] - semaphore = asyncio.Semaphore(5) # Max 5 concurrent requests to avoid rate limits - - async def process_file(file_path: str): - # Skip massive dependency lock files as they are too slow and unhelpful for SAST - if file_path.endswith('package-lock.json') or file_path.endswith('yarn.lock'): + # Limit concurrent AI calls to avoid rate-limiting + semaphore = asyncio.Semaphore(5) + + async def process_file(file_path: str) -> List[VulnerabilityIssue]: + # Skip lock files — huge, slow, zero security signal + if file_path.endswith(("package-lock.json", "yarn.lock", "poetry.lock")): return [] - - content = await self.github.get_file_content(self.repo_url, file_path, self.branch) + + content = await self.github.get_file_content( + self.repo_url, file_path, self.branch + ) if not content: return [] - + + # Cap file size to avoid token overflows if len(content) > 30000: content = content[:30000] prompt = ( - f"Review the following code from the file '{file_path}' for security vulnerabilities.\n" - "Focus on OWASP Top 10: SQLi, XSS, Hardcoded Secrets, IDOR, Misconfigurations, etc.\n\n" + f"Review the following code from '{file_path}' for security vulnerabilities.\n" + "Focus on OWASP Top 10: SQL Injection, XSS, Hardcoded Secrets, IDOR, " + "Insecure Deserialization, Broken Auth, Misconfigurations, SSRF, etc.\n\n" f"CODE:\n{content}\n\n" "Return a JSON object with a key 'vulnerabilities' containing a list of objects. " - "Each object MUST have the following keys: " - "'severity' (Critical, High, Medium, Low), " - "'issue' (A short title), " - "'explanation' (1-2 sentences explaining the vulnerability), " - "'suggested_fix' (Code snippet or clear instructions to fix), " - "'line_number' (integer or null if general)." + "Each object MUST have the following keys:\n" + " 'severity' : Critical | High | Medium | Low\n" + " 'issue' : Short title of the vulnerability\n" + " 'explanation' : 1-2 sentences explaining the risk\n" + " 'suggested_fix': Code snippet or clear instruction to fix it\n" + " 'line_number' : Integer line number, or null if not applicable\n" + "If no vulnerabilities are found, return {\"vulnerabilities\": []}." ) file_vulns = [] async with semaphore: try: - response = await ai_client.aio.models.generate_content( - model=self.model_name, - contents=prompt, - config=types.GenerateContentConfig( - response_mime_type="application/json", - temperature=0.2, - ) - ) - if response.text: - data = json.loads(response.text) - vulns = data.get("vulnerabilities", []) - for v in vulns: - file_vulns.append(VulnerabilityIssue( - file_path=file_path, - severity=v.get("severity", "Medium"), - issue=v.get("issue", "Unknown Issue"), - explanation=v.get("explanation", ""), - suggested_fix=v.get("suggested_fix"), - line_number=v.get("line_number") - )) + raw = await call_ai(prompt, temperature=0.2, json_mode=True) + if raw: + data = json.loads(raw) + for v in data.get("vulnerabilities", []): + file_vulns.append( + VulnerabilityIssue( + file_path=file_path, + severity=v.get("severity", "Medium"), + issue=v.get("issue", "Unknown Issue"), + explanation=v.get("explanation", ""), + suggested_fix=v.get("suggested_fix"), + line_number=v.get("line_number"), + ) + ) except Exception as e: - logger.error(f"Error analyzing file {file_path}: {e}") + logger.error(f"Analysis failed for {file_path}: {e}") + return file_vulns results = await asyncio.gather(*(process_file(f) for f in triaged_files)) for res in results: vulnerabilities.extend(res) - + return vulnerabilities async def generate_summary(self, vulnerabilities: List[VulnerabilityIssue]) -> str: + """ + Phase 3 — Executive summary. + + Asks the AI to distill all findings into a 2-3 paragraph summary + suitable for a security report or management briefing. + """ if not vulnerabilities: - return "No obvious security vulnerabilities found in the scanned files." - - if not settings.gemini_api_key: - return f"Found {len(vulnerabilities)} potential issues." + return "No security vulnerabilities were identified in the scanned files." + + if not settings.effective_ai_key: + return f"Found {len(vulnerabilities)} potential issue(s) across the scanned files." issues_data = [v.model_dump() for v in vulnerabilities] prompt = ( - "You are a Senior AppSec Manager. Summarize the following list of vulnerabilities found in a recent scan. " - "Provide a 2-3 paragraph executive summary of the repository's security posture. " - "Keep it professional and highlight the most critical risks.\n\n" - f"{json.dumps(issues_data)}" + "You are a Senior AppSec Manager. Summarize the following list of vulnerabilities " + "found in a recent automated security scan. Provide a 2-3 paragraph executive summary " + "of the repository's overall security posture. Highlight the most critical risks " + "and recommend the immediate priorities. Keep it professional and actionable.\n\n" + f"Findings:\n{json.dumps(issues_data, indent=2)}" ) - try: - response = await ai_client.aio.models.generate_content( - model=self.model_name, - contents=prompt, - config=types.GenerateContentConfig( - temperature=0.4, - ) - ) - return response.text or "Could not generate summary." - except Exception as e: - logger.error(f"Error generating summary: {e}") - return f"Found {len(vulnerabilities)} potential issues." + result = await call_ai(prompt, temperature=0.4) + return result or f"Found {len(vulnerabilities)} potential issue(s)." diff --git a/requirements.txt b/requirements.txt index ebe8ffb..dca3593 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,7 @@ alembic google-genai aiodns fpdf2 +# ---- Step 2: LiteLLM for provider-agnostic AI calls ---- +litellm +# ---- Step 3: Threat Intelligence ---- +# (VirusTotal and AbuseIPDB are queried via httpx, no extra SDK needed)