mirror of
https://github.com/Rarebuffalo/securelens-backend.git
synced 2026-06-19 07:00:30 +00:00
144 lines
5.9 KiB
Python
144 lines
5.9 KiB
Python
import json
|
|
import logging
|
|
from typing import List, Dict, Any
|
|
from google import genai
|
|
from google.genai import types
|
|
|
|
from app.config import settings
|
|
from app.services.code_scanner.github_client import GitHubClient
|
|
from app.schemas.code_scan import VulnerabilityIssue
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
if settings.gemini_api_key:
|
|
# google-genai client init
|
|
ai_client = genai.Client(api_key=settings.gemini_api_key)
|
|
else:
|
|
ai_client = None
|
|
|
|
class CodeScanOrchestrator:
|
|
def __init__(self, repo_url: str, github_token: str, branch: str = "main"):
|
|
self.repo_url = repo_url
|
|
self.branch = branch
|
|
self.github = GitHubClient(token=github_token)
|
|
# We use gemini-2.5-flash for fast and cost-effective analysis
|
|
self.model_name = 'gemini-2.5-flash'
|
|
|
|
async def triage_files(self, all_files: List[str]) -> List[str]:
|
|
"""
|
|
Uses the LLM to select which files are most likely to contain security vulnerabilities
|
|
"""
|
|
if not settings.gemini_api_key:
|
|
logger.warning("GEMINI_API_KEY is not set. Triaging all files up to a limit.")
|
|
return all_files[:10]
|
|
|
|
files_str = "\n".join(all_files)
|
|
if len(files_str) > 15000:
|
|
files_str = files_str[:15000] + "\n... (truncated)"
|
|
|
|
prompt = (
|
|
"You are a Senior Application Security Engineer. I have a repository with the following files:\n"
|
|
f"{files_str}\n\n"
|
|
"Select the most critical files to review for security vulnerabilities (e.g., SAST, hardcoded secrets, SQLi, Auth bypass). "
|
|
"Return a JSON object with a single key 'critical_files' containing a list of the exact file paths. "
|
|
"Do not select more than 15 files."
|
|
)
|
|
|
|
try:
|
|
response = await ai_client.aio.models.generate_content(
|
|
model=self.model_name,
|
|
contents=prompt,
|
|
config=types.GenerateContentConfig(
|
|
response_mime_type="application/json",
|
|
temperature=0.1,
|
|
)
|
|
)
|
|
if response.text:
|
|
data = json.loads(response.text)
|
|
return data.get("critical_files", [])
|
|
except Exception as e:
|
|
logger.error(f"Error triaging files: {e}")
|
|
|
|
return all_files[:10]
|
|
|
|
async def analyze_files(self, triaged_files: List[str]) -> List[VulnerabilityIssue]:
|
|
vulnerabilities = []
|
|
|
|
if not settings.gemini_api_key:
|
|
return []
|
|
|
|
for file_path in triaged_files:
|
|
content = await self.github.get_file_content(self.repo_url, file_path, self.branch)
|
|
if not content:
|
|
continue
|
|
|
|
if len(content) > 30000:
|
|
content = content[:30000]
|
|
|
|
prompt = (
|
|
f"Review the following code from the file '{file_path}' for security vulnerabilities.\n"
|
|
"Focus on OWASP Top 10: SQLi, XSS, Hardcoded Secrets, IDOR, Misconfigurations, etc.\n\n"
|
|
f"CODE:\n{content}\n\n"
|
|
"Return a JSON object with a key 'vulnerabilities' containing a list of objects. "
|
|
"Each object MUST have the following keys: "
|
|
"'severity' (Critical, High, Medium, Low), "
|
|
"'issue' (A short title), "
|
|
"'explanation' (1-2 sentences explaining the vulnerability), "
|
|
"'suggested_fix' (Code snippet or clear instructions to fix), "
|
|
"'line_number' (integer or null if general)."
|
|
)
|
|
|
|
try:
|
|
response = await ai_client.aio.models.generate_content(
|
|
model=self.model_name,
|
|
contents=prompt,
|
|
config=types.GenerateContentConfig(
|
|
response_mime_type="application/json",
|
|
temperature=0.2,
|
|
)
|
|
)
|
|
if response.text:
|
|
data = json.loads(response.text)
|
|
vulns = data.get("vulnerabilities", [])
|
|
for v in vulns:
|
|
vulnerabilities.append(VulnerabilityIssue(
|
|
file_path=file_path,
|
|
severity=v.get("severity", "Medium"),
|
|
issue=v.get("issue", "Unknown Issue"),
|
|
explanation=v.get("explanation", ""),
|
|
suggested_fix=v.get("suggested_fix"),
|
|
line_number=v.get("line_number")
|
|
))
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing file {file_path}: {e}")
|
|
|
|
return vulnerabilities
|
|
|
|
async def generate_summary(self, vulnerabilities: List[VulnerabilityIssue]) -> str:
|
|
if not vulnerabilities:
|
|
return "No obvious security vulnerabilities found in the scanned files."
|
|
|
|
if not settings.gemini_api_key:
|
|
return f"Found {len(vulnerabilities)} potential issues."
|
|
|
|
issues_data = [v.model_dump() for v in vulnerabilities]
|
|
prompt = (
|
|
"You are a Senior AppSec Manager. Summarize the following list of vulnerabilities found in a recent scan. "
|
|
"Provide a 2-3 paragraph executive summary of the repository's security posture. "
|
|
"Keep it professional and highlight the most critical risks.\n\n"
|
|
f"{json.dumps(issues_data)}"
|
|
)
|
|
|
|
try:
|
|
response = await ai_client.aio.models.generate_content(
|
|
model=self.model_name,
|
|
contents=prompt,
|
|
config=types.GenerateContentConfig(
|
|
temperature=0.4,
|
|
)
|
|
)
|
|
return response.text or "Could not generate summary."
|
|
except Exception as e:
|
|
logger.error(f"Error generating summary: {e}")
|
|
return f"Found {len(vulnerabilities)} potential issues."
|