mirror of
https://github.com/Rarebuffalo/securelens-backend.git
synced 2026-06-19 07:00:30 +00:00
280 lines
9.6 KiB
Python
280 lines
9.6 KiB
Python
"""
|
|
Threat Intelligence Service
|
|
=============================
|
|
|
|
Enriches scan results with real-world reputation data from external
|
|
threat intelligence feeds. Two providers are integrated:
|
|
|
|
1. VirusTotal — Checks if any of 70+ AV/security vendors have flagged
|
|
the domain as malicious or suspicious.
|
|
Free tier: 4 lookups/minute, 500/day
|
|
Sign up: https://www.virustotal.com/
|
|
|
|
2. AbuseIPDB — Checks if the server's IP has been reported for
|
|
abuse (spam, attacks, scanning, etc.).
|
|
Free tier: 1000 lookups/day
|
|
Sign up: https://www.abuseipdb.com/
|
|
|
|
Both are OPTIONAL. If the API keys are not set in .env, the lookup is
|
|
gracefully skipped and the rest of the scan continues normally.
|
|
|
|
Usage:
|
|
from app.services.threat_intel import get_threat_intel_summary
|
|
intel = await get_threat_intel_summary("https://example.com")
|
|
# intel is a ThreatIntelReport or None
|
|
"""
|
|
|
|
import logging
|
|
import socket
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from pydantic import BaseModel
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pydantic response schema
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class VirusTotalResult(BaseModel):
|
|
"""Result from VirusTotal domain reputation lookup."""
|
|
vendor_count: int # total vendors that checked this domain
|
|
malicious: int # vendors that flagged it as malicious
|
|
suspicious: int # vendors that flagged it as suspicious
|
|
harmless: int # vendors that marked it as clean
|
|
reputation_score: int # VirusTotal's own reputation score (negative = bad)
|
|
|
|
class AbuseIPDBResult(BaseModel):
|
|
"""Result from AbuseIPDB IP reputation lookup."""
|
|
ip_address: str
|
|
abuse_confidence_score: int # 0-100; 100 = definitely malicious
|
|
total_reports: int # how many times this IP has been reported
|
|
country_code: str
|
|
isp: str
|
|
usage_type: str # e.g. "Data Center/Web Hosting/Transit"
|
|
|
|
class ThreatIntelReport(BaseModel):
|
|
"""
|
|
Aggregated threat intelligence for a scanned URL.
|
|
Both fields are Optional — only populated when the respective API key is set.
|
|
"""
|
|
domain: str
|
|
ip_address: Optional[str] = None
|
|
virustotal: Optional[VirusTotalResult] = None
|
|
abuseipdb: Optional[AbuseIPDBResult] = None
|
|
threat_summary: str = "No threat intelligence data available."
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# VirusTotal lookup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def check_virustotal(domain: str) -> Optional[VirusTotalResult]:
|
|
"""
|
|
Queries the VirusTotal v3 API for domain reputation.
|
|
|
|
The domain report endpoint returns counts from 70+ security vendors.
|
|
We extract malicious/suspicious/harmless counts and the overall
|
|
reputation score (a negative number means the community flagged it).
|
|
"""
|
|
if not settings.virustotal_api_key:
|
|
return None
|
|
|
|
url = f"https://www.virustotal.com/api/v3/domains/{domain}"
|
|
headers = {"x-apikey": settings.virustotal_api_key}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.get(url, headers=headers)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
stats = data["data"]["attributes"]["last_analysis_stats"]
|
|
reputation = data["data"]["attributes"].get("reputation", 0)
|
|
|
|
return VirusTotalResult(
|
|
vendor_count=sum(stats.values()),
|
|
malicious=stats.get("malicious", 0),
|
|
suspicious=stats.get("suspicious", 0),
|
|
harmless=stats.get("harmless", 0),
|
|
reputation_score=reputation,
|
|
)
|
|
except httpx.HTTPStatusError as e:
|
|
logger.warning(f"VirusTotal lookup failed for {domain}: HTTP {e.response.status_code}")
|
|
except Exception as e:
|
|
logger.warning(f"VirusTotal lookup error for {domain}: {e}")
|
|
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AbuseIPDB lookup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def check_abuseipdb(ip_address: str) -> Optional[AbuseIPDBResult]:
|
|
"""
|
|
Queries the AbuseIPDB v2 API for IP address reputation.
|
|
|
|
Returns an abuse confidence score (0-100) and metadata about
|
|
the IP address, including ISP and how many times it's been reported.
|
|
"""
|
|
if not settings.abuseipdb_api_key:
|
|
return None
|
|
|
|
url = "https://api.abuseipdb.com/api/v2/check"
|
|
headers = {
|
|
"Key": settings.abuseipdb_api_key,
|
|
"Accept": "application/json",
|
|
}
|
|
params = {
|
|
"ipAddress": ip_address,
|
|
"maxAgeInDays": 90,
|
|
"verbose": "",
|
|
}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.get(url, headers=headers, params=params)
|
|
resp.raise_for_status()
|
|
d = resp.json()["data"]
|
|
|
|
return AbuseIPDBResult(
|
|
ip_address=ip_address,
|
|
abuse_confidence_score=d.get("abuseConfidenceScore", 0),
|
|
total_reports=d.get("totalReports", 0),
|
|
country_code=d.get("countryCode", "Unknown"),
|
|
isp=d.get("isp", "Unknown"),
|
|
usage_type=d.get("usageType", "Unknown"),
|
|
)
|
|
except httpx.HTTPStatusError as e:
|
|
logger.warning(f"AbuseIPDB lookup failed for {ip_address}: HTTP {e.response.status_code}")
|
|
except Exception as e:
|
|
logger.warning(f"AbuseIPDB lookup error for {ip_address}: {e}")
|
|
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Resolve domain → IP (sync wrapped in executor)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _resolve_ip(domain: str) -> Optional[str]:
|
|
"""
|
|
Resolves a domain name to its IPv4 address using the system resolver.
|
|
Runs in a thread pool since socket.gethostbyname is blocking.
|
|
"""
|
|
import asyncio
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
ip = await loop.run_in_executor(None, socket.gethostbyname, domain)
|
|
return ip
|
|
except socket.gaierror:
|
|
logger.debug(f"Could not resolve IP for domain: {domain}")
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main public function
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def get_threat_intel_summary(url: str) -> Optional[ThreatIntelReport]:
|
|
"""
|
|
Runs both VirusTotal and AbuseIPDB checks concurrently for a given URL.
|
|
|
|
Parameters
|
|
----------
|
|
url : str
|
|
The full URL that was scanned (e.g. "https://example.com").
|
|
|
|
Returns
|
|
-------
|
|
ThreatIntelReport if at least one check ran, otherwise None.
|
|
|
|
Example return value:
|
|
{
|
|
"domain": "example.com",
|
|
"ip_address": "93.184.216.34",
|
|
"virustotal": {
|
|
"vendor_count": 82,
|
|
"malicious": 0,
|
|
"suspicious": 0,
|
|
"harmless": 75,
|
|
"reputation_score": 0
|
|
},
|
|
"abuseipdb": {
|
|
"ip_address": "93.184.216.34",
|
|
"abuse_confidence_score": 0,
|
|
"total_reports": 0,
|
|
"country_code": "US",
|
|
"isp": "Edgecast Inc.",
|
|
"usage_type": "Content Delivery Network"
|
|
},
|
|
"threat_summary": "Domain appears clean. No vendor flags on VirusTotal. IP has 0% abuse confidence."
|
|
}
|
|
"""
|
|
# Skip entirely if neither key is configured
|
|
if not settings.virustotal_api_key and not settings.abuseipdb_api_key:
|
|
logger.debug("No threat intel keys configured; skipping.")
|
|
return None
|
|
|
|
# Extract the bare domain from the URL
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(url)
|
|
domain = parsed.hostname or ""
|
|
if not domain:
|
|
return None
|
|
|
|
import asyncio
|
|
|
|
# Run IP resolution first (needed for AbuseIPDB)
|
|
ip_address = await _resolve_ip(domain)
|
|
|
|
# Run both checks concurrently
|
|
vt_task = asyncio.create_task(check_virustotal(domain))
|
|
ab_task = asyncio.create_task(check_abuseipdb(ip_address)) if ip_address else None
|
|
|
|
vt_result = await vt_task
|
|
ab_result = await ab_task if ab_task else None
|
|
|
|
# If nothing ran (both keys missing despite the early check above), bail
|
|
if not vt_result and not ab_result:
|
|
return None
|
|
|
|
# Build a human-readable summary sentence
|
|
parts = []
|
|
if vt_result:
|
|
if vt_result.malicious > 0:
|
|
parts.append(
|
|
f"⚠️ VirusTotal: {vt_result.malicious}/{vt_result.vendor_count} vendors flagged this domain as malicious."
|
|
)
|
|
else:
|
|
parts.append(
|
|
f"✅ VirusTotal: No malicious flags from {vt_result.vendor_count} vendors."
|
|
)
|
|
if ab_result:
|
|
score = ab_result.abuse_confidence_score
|
|
if score >= 50:
|
|
parts.append(
|
|
f"⚠️ AbuseIPDB: IP {ip_address} has a high abuse confidence score of {score}% "
|
|
f"({ab_result.total_reports} reports)."
|
|
)
|
|
elif score > 0:
|
|
parts.append(
|
|
f"🔶 AbuseIPDB: IP {ip_address} has a low abuse score of {score}% "
|
|
f"({ab_result.total_reports} reports)."
|
|
)
|
|
else:
|
|
parts.append(f"✅ AbuseIPDB: IP {ip_address} has no reported abuse.")
|
|
|
|
return ThreatIntelReport(
|
|
domain=domain,
|
|
ip_address=ip_address,
|
|
virustotal=vt_result,
|
|
abuseipdb=ab_result,
|
|
threat_summary=" ".join(parts) if parts else "No threat signals detected.",
|
|
)
|