Files
securelens-backend/app/services/scanner/ssl_checker.py
2026-04-07 18:13:43 +05:30

137 lines
4.8 KiB
Python

import asyncio
import datetime
import logging
import socket
import ssl
from urllib.parse import urlparse
import httpx
from app.schemas.scan import Issue
from app.services.scanner.base import BaseScanner
logger = logging.getLogger(__name__)
WEAK_TLS_VERSIONS = {"TLSv1", "TLSv1.1"}
def _check_ssl(hostname: str, port: int) -> dict:
result: dict = {
"error": None,
"cert": None,
"tls_version": None,
"self_signed": False,
}
context = ssl.create_default_context()
try:
with socket.create_connection((hostname, port), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
result["cert"] = ssock.getpeercert()
result["tls_version"] = ssock.version()
except ssl.SSLCertVerificationError as e:
error_msg = str(e)
result["error"] = error_msg
if "self-signed" in error_msg.lower() or "self signed" in error_msg.lower():
result["self_signed"] = True
try:
ctx_no_verify = ssl.create_default_context()
ctx_no_verify.check_hostname = False
ctx_no_verify.verify_mode = ssl.CERT_NONE
with socket.create_connection((hostname, port), timeout=5) as sock:
with ctx_no_verify.wrap_socket(sock, server_hostname=hostname) as ssock:
result["tls_version"] = ssock.version()
except Exception:
pass
except (socket.timeout, socket.gaierror, OSError) as e:
result["error"] = str(e)
return result
class SSLScanner(BaseScanner):
async def scan(self, url: str, response: httpx.Response) -> list[Issue]:
issues: list[Issue] = []
parsed = urlparse(url)
if parsed.scheme != "https":
return issues
hostname = parsed.hostname
port = parsed.port or 443
if not hostname:
return issues
try:
result = await asyncio.to_thread(_check_ssl, hostname, port)
except Exception as e:
logger.warning(f"SSL check failed for {hostname}: {e}")
return issues
if result["self_signed"]:
issues.append(Issue(
issue="SSL certificate is self-signed",
severity="Critical",
layer="SSL/TLS Layer",
fix="Obtain a valid SSL certificate from a trusted Certificate Authority (e.g., Let's Encrypt)",
))
if result["error"] and not result["self_signed"]:
issues.append(Issue(
issue=f"SSL certificate verification failed: {result['error'][:120]}",
severity="Critical",
layer="SSL/TLS Layer",
fix="Ensure the SSL certificate is valid, not expired, and issued by a trusted CA",
))
cert = result.get("cert")
if cert:
not_after = cert.get("notAfter")
if not_after:
try:
expiry = datetime.datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
now = datetime.datetime.utcnow()
if expiry < now:
issues.append(Issue(
issue="SSL certificate has expired",
severity="Critical",
layer="SSL/TLS Layer",
fix="Renew the SSL certificate immediately",
))
elif (expiry - now).days < 30:
issues.append(Issue(
issue=f"SSL certificate expires in {(expiry - now).days} days",
severity="Warning",
layer="SSL/TLS Layer",
fix="Renew the SSL certificate before it expires",
))
except ValueError:
logger.debug(f"Could not parse cert expiry: {not_after}")
subject = cert.get("subject", ())
issuer = cert.get("issuer", ())
if subject and issuer and subject == issuer:
if not result["self_signed"]:
issues.append(Issue(
issue="SSL certificate is self-signed",
severity="Critical",
layer="SSL/TLS Layer",
fix="Obtain a valid SSL certificate from a trusted Certificate Authority",
))
tls_version = result.get("tls_version")
if tls_version and tls_version in WEAK_TLS_VERSIONS:
issues.append(Issue(
issue=f"Server supports weak TLS version: {tls_version}",
severity="Critical",
layer="SSL/TLS Layer",
fix="Disable TLS 1.0 and TLS 1.1; enforce TLS 1.2 or higher",
))
return issues