Files
securelens-backend/app/services/threat_intel.py

280 lines
9.6 KiB
Python

"""
Threat Intelligence Service
=============================
Enriches scan results with real-world reputation data from external
threat intelligence feeds. Two providers are integrated:
1. VirusTotal — Checks if any of 70+ AV/security vendors have flagged
the domain as malicious or suspicious.
Free tier: 4 lookups/minute, 500/day
Sign up: https://www.virustotal.com/
2. AbuseIPDB — Checks if the server's IP has been reported for
abuse (spam, attacks, scanning, etc.).
Free tier: 1000 lookups/day
Sign up: https://www.abuseipdb.com/
Both are OPTIONAL. If the API keys are not set in .env, the lookup is
gracefully skipped and the rest of the scan continues normally.
Usage:
from app.services.threat_intel import get_threat_intel_summary
intel = await get_threat_intel_summary("https://example.com")
# intel is a ThreatIntelReport or None
"""
import logging
import socket
from typing import Optional
import httpx
from pydantic import BaseModel
from app.config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Pydantic response schema
# ---------------------------------------------------------------------------
class VirusTotalResult(BaseModel):
"""Result from VirusTotal domain reputation lookup."""
vendor_count: int # total vendors that checked this domain
malicious: int # vendors that flagged it as malicious
suspicious: int # vendors that flagged it as suspicious
harmless: int # vendors that marked it as clean
reputation_score: int # VirusTotal's own reputation score (negative = bad)
class AbuseIPDBResult(BaseModel):
"""Result from AbuseIPDB IP reputation lookup."""
ip_address: str
abuse_confidence_score: int # 0-100; 100 = definitely malicious
total_reports: int # how many times this IP has been reported
country_code: str
isp: str
usage_type: str # e.g. "Data Center/Web Hosting/Transit"
class ThreatIntelReport(BaseModel):
"""
Aggregated threat intelligence for a scanned URL.
Both fields are Optional — only populated when the respective API key is set.
"""
domain: str
ip_address: Optional[str] = None
virustotal: Optional[VirusTotalResult] = None
abuseipdb: Optional[AbuseIPDBResult] = None
threat_summary: str = "No threat intelligence data available."
# ---------------------------------------------------------------------------
# VirusTotal lookup
# ---------------------------------------------------------------------------
async def check_virustotal(domain: str) -> Optional[VirusTotalResult]:
"""
Queries the VirusTotal v3 API for domain reputation.
The domain report endpoint returns counts from 70+ security vendors.
We extract malicious/suspicious/harmless counts and the overall
reputation score (a negative number means the community flagged it).
"""
if not settings.virustotal_api_key:
return None
url = f"https://www.virustotal.com/api/v3/domains/{domain}"
headers = {"x-apikey": settings.virustotal_api_key}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(url, headers=headers)
resp.raise_for_status()
data = resp.json()
stats = data["data"]["attributes"]["last_analysis_stats"]
reputation = data["data"]["attributes"].get("reputation", 0)
return VirusTotalResult(
vendor_count=sum(stats.values()),
malicious=stats.get("malicious", 0),
suspicious=stats.get("suspicious", 0),
harmless=stats.get("harmless", 0),
reputation_score=reputation,
)
except httpx.HTTPStatusError as e:
logger.warning(f"VirusTotal lookup failed for {domain}: HTTP {e.response.status_code}")
except Exception as e:
logger.warning(f"VirusTotal lookup error for {domain}: {e}")
return None
# ---------------------------------------------------------------------------
# AbuseIPDB lookup
# ---------------------------------------------------------------------------
async def check_abuseipdb(ip_address: str) -> Optional[AbuseIPDBResult]:
"""
Queries the AbuseIPDB v2 API for IP address reputation.
Returns an abuse confidence score (0-100) and metadata about
the IP address, including ISP and how many times it's been reported.
"""
if not settings.abuseipdb_api_key:
return None
url = "https://api.abuseipdb.com/api/v2/check"
headers = {
"Key": settings.abuseipdb_api_key,
"Accept": "application/json",
}
params = {
"ipAddress": ip_address,
"maxAgeInDays": 90,
"verbose": "",
}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(url, headers=headers, params=params)
resp.raise_for_status()
d = resp.json()["data"]
return AbuseIPDBResult(
ip_address=ip_address,
abuse_confidence_score=d.get("abuseConfidenceScore", 0),
total_reports=d.get("totalReports", 0),
country_code=d.get("countryCode", "Unknown"),
isp=d.get("isp", "Unknown"),
usage_type=d.get("usageType", "Unknown"),
)
except httpx.HTTPStatusError as e:
logger.warning(f"AbuseIPDB lookup failed for {ip_address}: HTTP {e.response.status_code}")
except Exception as e:
logger.warning(f"AbuseIPDB lookup error for {ip_address}: {e}")
return None
# ---------------------------------------------------------------------------
# Resolve domain → IP (sync wrapped in executor)
# ---------------------------------------------------------------------------
async def _resolve_ip(domain: str) -> Optional[str]:
"""
Resolves a domain name to its IPv4 address using the system resolver.
Runs in a thread pool since socket.gethostbyname is blocking.
"""
import asyncio
try:
loop = asyncio.get_running_loop()
ip = await loop.run_in_executor(None, socket.gethostbyname, domain)
return ip
except socket.gaierror:
logger.debug(f"Could not resolve IP for domain: {domain}")
return None
# ---------------------------------------------------------------------------
# Main public function
# ---------------------------------------------------------------------------
async def get_threat_intel_summary(url: str) -> Optional[ThreatIntelReport]:
"""
Runs both VirusTotal and AbuseIPDB checks concurrently for a given URL.
Parameters
----------
url : str
The full URL that was scanned (e.g. "https://example.com").
Returns
-------
ThreatIntelReport if at least one check ran, otherwise None.
Example return value:
{
"domain": "example.com",
"ip_address": "93.184.216.34",
"virustotal": {
"vendor_count": 82,
"malicious": 0,
"suspicious": 0,
"harmless": 75,
"reputation_score": 0
},
"abuseipdb": {
"ip_address": "93.184.216.34",
"abuse_confidence_score": 0,
"total_reports": 0,
"country_code": "US",
"isp": "Edgecast Inc.",
"usage_type": "Content Delivery Network"
},
"threat_summary": "Domain appears clean. No vendor flags on VirusTotal. IP has 0% abuse confidence."
}
"""
# Skip entirely if neither key is configured
if not settings.virustotal_api_key and not settings.abuseipdb_api_key:
logger.debug("No threat intel keys configured; skipping.")
return None
# Extract the bare domain from the URL
from urllib.parse import urlparse
parsed = urlparse(url)
domain = parsed.hostname or ""
if not domain:
return None
import asyncio
# Run IP resolution first (needed for AbuseIPDB)
ip_address = await _resolve_ip(domain)
# Run both checks concurrently
vt_task = asyncio.create_task(check_virustotal(domain))
ab_task = asyncio.create_task(check_abuseipdb(ip_address)) if ip_address else None
vt_result = await vt_task
ab_result = await ab_task if ab_task else None
# If nothing ran (both keys missing despite the early check above), bail
if not vt_result and not ab_result:
return None
# Build a human-readable summary sentence
parts = []
if vt_result:
if vt_result.malicious > 0:
parts.append(
f"⚠️ VirusTotal: {vt_result.malicious}/{vt_result.vendor_count} vendors flagged this domain as malicious."
)
else:
parts.append(
f"✅ VirusTotal: No malicious flags from {vt_result.vendor_count} vendors."
)
if ab_result:
score = ab_result.abuse_confidence_score
if score >= 50:
parts.append(
f"⚠️ AbuseIPDB: IP {ip_address} has a high abuse confidence score of {score}% "
f"({ab_result.total_reports} reports)."
)
elif score > 0:
parts.append(
f"🔶 AbuseIPDB: IP {ip_address} has a low abuse score of {score}% "
f"({ab_result.total_reports} reports)."
)
else:
parts.append(f"✅ AbuseIPDB: IP {ip_address} has no reported abuse.")
return ThreatIntelReport(
domain=domain,
ip_address=ip_address,
virustotal=vt_result,
abuseipdb=ab_result,
threat_summary=" ".join(parts) if parts else "No threat signals detected.",
)