mirror of
https://github.com/Rarebuffalo/securelens-backend.git
synced 2026-06-19 07:00:30 +00:00
add VirusTotal ,AbuseIPDB threat intelligence to scan results
This commit is contained in:
@@ -1,7 +1,12 @@
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
# Import the ThreatIntelReport schema from the service layer.
|
||||
# We import it here for use in ScanResponse so the schema stays clean.
|
||||
from app.services.threat_intel import ThreatIntelReport
|
||||
|
||||
|
||||
class ScanRequest(BaseModel):
|
||||
url: str = Field(..., description="The URL of the website to scan")
|
||||
@@ -29,6 +34,8 @@ class ScanResponse(BaseModel):
|
||||
layers: dict[str, LayerStatus]
|
||||
issues: list[Issue]
|
||||
created_at: datetime | None = None
|
||||
# Step 3: Threat intelligence enrichment (optional — only present when API keys are set)
|
||||
threat_intel: Optional[ThreatIntelReport] = None
|
||||
|
||||
|
||||
class ScanHistoryItem(BaseModel):
|
||||
|
||||
279
app/services/threat_intel.py
Normal file
279
app/services/threat_intel.py
Normal file
@@ -0,0 +1,279 @@
|
||||
"""
|
||||
Threat Intelligence Service
|
||||
=============================
|
||||
|
||||
Enriches scan results with real-world reputation data from external
|
||||
threat intelligence feeds. Two providers are integrated:
|
||||
|
||||
1. VirusTotal — Checks if any of 70+ AV/security vendors have flagged
|
||||
the domain as malicious or suspicious.
|
||||
Free tier: 4 lookups/minute, 500/day
|
||||
Sign up: https://www.virustotal.com/
|
||||
|
||||
2. AbuseIPDB — Checks if the server's IP has been reported for
|
||||
abuse (spam, attacks, scanning, etc.).
|
||||
Free tier: 1000 lookups/day
|
||||
Sign up: https://www.abuseipdb.com/
|
||||
|
||||
Both are OPTIONAL. If the API keys are not set in .env, the lookup is
|
||||
gracefully skipped and the rest of the scan continues normally.
|
||||
|
||||
Usage:
|
||||
from app.services.threat_intel import get_threat_intel_summary
|
||||
intel = await get_threat_intel_summary("https://example.com")
|
||||
# intel is a ThreatIntelReport or None
|
||||
"""
|
||||
|
||||
import logging
|
||||
import socket
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
from pydantic import BaseModel
|
||||
|
||||
from app.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pydantic response schema
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class VirusTotalResult(BaseModel):
|
||||
"""Result from VirusTotal domain reputation lookup."""
|
||||
vendor_count: int # total vendors that checked this domain
|
||||
malicious: int # vendors that flagged it as malicious
|
||||
suspicious: int # vendors that flagged it as suspicious
|
||||
harmless: int # vendors that marked it as clean
|
||||
reputation_score: int # VirusTotal's own reputation score (negative = bad)
|
||||
|
||||
class AbuseIPDBResult(BaseModel):
|
||||
"""Result from AbuseIPDB IP reputation lookup."""
|
||||
ip_address: str
|
||||
abuse_confidence_score: int # 0-100; 100 = definitely malicious
|
||||
total_reports: int # how many times this IP has been reported
|
||||
country_code: str
|
||||
isp: str
|
||||
usage_type: str # e.g. "Data Center/Web Hosting/Transit"
|
||||
|
||||
class ThreatIntelReport(BaseModel):
|
||||
"""
|
||||
Aggregated threat intelligence for a scanned URL.
|
||||
Both fields are Optional — only populated when the respective API key is set.
|
||||
"""
|
||||
domain: str
|
||||
ip_address: Optional[str] = None
|
||||
virustotal: Optional[VirusTotalResult] = None
|
||||
abuseipdb: Optional[AbuseIPDBResult] = None
|
||||
threat_summary: str = "No threat intelligence data available."
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# VirusTotal lookup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def check_virustotal(domain: str) -> Optional[VirusTotalResult]:
|
||||
"""
|
||||
Queries the VirusTotal v3 API for domain reputation.
|
||||
|
||||
The domain report endpoint returns counts from 70+ security vendors.
|
||||
We extract malicious/suspicious/harmless counts and the overall
|
||||
reputation score (a negative number means the community flagged it).
|
||||
"""
|
||||
if not settings.virustotal_api_key:
|
||||
return None
|
||||
|
||||
url = f"https://www.virustotal.com/api/v3/domains/{domain}"
|
||||
headers = {"x-apikey": settings.virustotal_api_key}
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
resp = await client.get(url, headers=headers)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
stats = data["data"]["attributes"]["last_analysis_stats"]
|
||||
reputation = data["data"]["attributes"].get("reputation", 0)
|
||||
|
||||
return VirusTotalResult(
|
||||
vendor_count=sum(stats.values()),
|
||||
malicious=stats.get("malicious", 0),
|
||||
suspicious=stats.get("suspicious", 0),
|
||||
harmless=stats.get("harmless", 0),
|
||||
reputation_score=reputation,
|
||||
)
|
||||
except httpx.HTTPStatusError as e:
|
||||
logger.warning(f"VirusTotal lookup failed for {domain}: HTTP {e.response.status_code}")
|
||||
except Exception as e:
|
||||
logger.warning(f"VirusTotal lookup error for {domain}: {e}")
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AbuseIPDB lookup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def check_abuseipdb(ip_address: str) -> Optional[AbuseIPDBResult]:
|
||||
"""
|
||||
Queries the AbuseIPDB v2 API for IP address reputation.
|
||||
|
||||
Returns an abuse confidence score (0-100) and metadata about
|
||||
the IP address, including ISP and how many times it's been reported.
|
||||
"""
|
||||
if not settings.abuseipdb_api_key:
|
||||
return None
|
||||
|
||||
url = "https://api.abuseipdb.com/api/v2/check"
|
||||
headers = {
|
||||
"Key": settings.abuseipdb_api_key,
|
||||
"Accept": "application/json",
|
||||
}
|
||||
params = {
|
||||
"ipAddress": ip_address,
|
||||
"maxAgeInDays": 90,
|
||||
"verbose": "",
|
||||
}
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
resp = await client.get(url, headers=headers, params=params)
|
||||
resp.raise_for_status()
|
||||
d = resp.json()["data"]
|
||||
|
||||
return AbuseIPDBResult(
|
||||
ip_address=ip_address,
|
||||
abuse_confidence_score=d.get("abuseConfidenceScore", 0),
|
||||
total_reports=d.get("totalReports", 0),
|
||||
country_code=d.get("countryCode", "Unknown"),
|
||||
isp=d.get("isp", "Unknown"),
|
||||
usage_type=d.get("usageType", "Unknown"),
|
||||
)
|
||||
except httpx.HTTPStatusError as e:
|
||||
logger.warning(f"AbuseIPDB lookup failed for {ip_address}: HTTP {e.response.status_code}")
|
||||
except Exception as e:
|
||||
logger.warning(f"AbuseIPDB lookup error for {ip_address}: {e}")
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Resolve domain → IP (sync wrapped in executor)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _resolve_ip(domain: str) -> Optional[str]:
|
||||
"""
|
||||
Resolves a domain name to its IPv4 address using the system resolver.
|
||||
Runs in a thread pool since socket.gethostbyname is blocking.
|
||||
"""
|
||||
import asyncio
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
ip = await loop.run_in_executor(None, socket.gethostbyname, domain)
|
||||
return ip
|
||||
except socket.gaierror:
|
||||
logger.debug(f"Could not resolve IP for domain: {domain}")
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main public function
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def get_threat_intel_summary(url: str) -> Optional[ThreatIntelReport]:
|
||||
"""
|
||||
Runs both VirusTotal and AbuseIPDB checks concurrently for a given URL.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
url : str
|
||||
The full URL that was scanned (e.g. "https://example.com").
|
||||
|
||||
Returns
|
||||
-------
|
||||
ThreatIntelReport if at least one check ran, otherwise None.
|
||||
|
||||
Example return value:
|
||||
{
|
||||
"domain": "example.com",
|
||||
"ip_address": "93.184.216.34",
|
||||
"virustotal": {
|
||||
"vendor_count": 82,
|
||||
"malicious": 0,
|
||||
"suspicious": 0,
|
||||
"harmless": 75,
|
||||
"reputation_score": 0
|
||||
},
|
||||
"abuseipdb": {
|
||||
"ip_address": "93.184.216.34",
|
||||
"abuse_confidence_score": 0,
|
||||
"total_reports": 0,
|
||||
"country_code": "US",
|
||||
"isp": "Edgecast Inc.",
|
||||
"usage_type": "Content Delivery Network"
|
||||
},
|
||||
"threat_summary": "Domain appears clean. No vendor flags on VirusTotal. IP has 0% abuse confidence."
|
||||
}
|
||||
"""
|
||||
# Skip entirely if neither key is configured
|
||||
if not settings.virustotal_api_key and not settings.abuseipdb_api_key:
|
||||
logger.debug("No threat intel keys configured; skipping.")
|
||||
return None
|
||||
|
||||
# Extract the bare domain from the URL
|
||||
from urllib.parse import urlparse
|
||||
parsed = urlparse(url)
|
||||
domain = parsed.hostname or ""
|
||||
if not domain:
|
||||
return None
|
||||
|
||||
import asyncio
|
||||
|
||||
# Run IP resolution first (needed for AbuseIPDB)
|
||||
ip_address = await _resolve_ip(domain)
|
||||
|
||||
# Run both checks concurrently
|
||||
vt_task = asyncio.create_task(check_virustotal(domain))
|
||||
ab_task = asyncio.create_task(check_abuseipdb(ip_address)) if ip_address else None
|
||||
|
||||
vt_result = await vt_task
|
||||
ab_result = await ab_task if ab_task else None
|
||||
|
||||
# If nothing ran (both keys missing despite the early check above), bail
|
||||
if not vt_result and not ab_result:
|
||||
return None
|
||||
|
||||
# Build a human-readable summary sentence
|
||||
parts = []
|
||||
if vt_result:
|
||||
if vt_result.malicious > 0:
|
||||
parts.append(
|
||||
f"⚠️ VirusTotal: {vt_result.malicious}/{vt_result.vendor_count} vendors flagged this domain as malicious."
|
||||
)
|
||||
else:
|
||||
parts.append(
|
||||
f"✅ VirusTotal: No malicious flags from {vt_result.vendor_count} vendors."
|
||||
)
|
||||
if ab_result:
|
||||
score = ab_result.abuse_confidence_score
|
||||
if score >= 50:
|
||||
parts.append(
|
||||
f"⚠️ AbuseIPDB: IP {ip_address} has a high abuse confidence score of {score}% "
|
||||
f"({ab_result.total_reports} reports)."
|
||||
)
|
||||
elif score > 0:
|
||||
parts.append(
|
||||
f"🔶 AbuseIPDB: IP {ip_address} has a low abuse score of {score}% "
|
||||
f"({ab_result.total_reports} reports)."
|
||||
)
|
||||
else:
|
||||
parts.append(f"✅ AbuseIPDB: IP {ip_address} has no reported abuse.")
|
||||
|
||||
return ThreatIntelReport(
|
||||
domain=domain,
|
||||
ip_address=ip_address,
|
||||
virustotal=vt_result,
|
||||
abuseipdb=ab_result,
|
||||
threat_summary=" ".join(parts) if parts else "No threat signals detected.",
|
||||
)
|
||||
Reference in New Issue
Block a user