""" Alerting Service ================ Sends scan result notifications via two channels: - Slack : Posts a formatted message to a Slack Incoming Webhook URL. - Email : Sends an HTML email via SMTP using Python's stdlib smtplib, wrapped in asyncio.to_thread so it doesn't block the event loop. Both channels are optional. If the required config is not set, the function logs a debug message and returns silently — it never raises or crashes the caller. Configuration (via .env): SLACK_WEBHOOK_URL — Slack Incoming Webhook URL. Create one at: https://api.slack.com/messaging/webhooks SMTP_HOST — SMTP server hostname (e.g. smtp.gmail.com) SMTP_PORT — SMTP server port (587 for STARTTLS, 465 for SSL) SMTP_USERNAME — SMTP login username / email address SMTP_PASSWORD — SMTP login password or app password SMTP_FROM_EMAIL — The From: address shown in the email SMTP_USE_SSL — Set to true for port 465 (SMTP_SSL). Defaults to false (STARTTLS on port 587). When to call these: - After a website scan completes for an authenticated user (scan.py) - When the background scheduler detects a score regression (scheduler.py) """ import asyncio import logging import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import httpx from app.config import settings logger = logging.getLogger(__name__) async def send_slack_alert(title: str, message: str, color: str = "#e53e3e") -> None: """ POST a notification to the configured Slack Incoming Webhook URL. Parameters ---------- title : Short heading for the Slack attachment. message : Body text shown under the heading. color : Left-border colour of the Slack attachment block. Use "#e53e3e" for regressions/critical, "#38a169" for clean scans. If SLACK_WEBHOOK_URL is not set in config, this is a no-op. """ if not settings.slack_webhook_url: logger.debug("Slack alerting skipped — SLACK_WEBHOOK_URL not configured.") return payload = { "attachments": [ { "color": color, "title": title, "text": message, "footer": "SecureLens AI", } ] } try: async with httpx.AsyncClient() as client: resp = await client.post( settings.slack_webhook_url, json=payload, timeout=10.0, ) if resp.status_code != 200: logger.warning( f"Slack webhook returned unexpected status {resp.status_code}: {resp.text[:200]}" ) else: logger.debug("Slack alert sent.") except Exception as e: logger.warning(f"Slack alert failed: {e}") async def send_email_alert(to_email: str, subject: str, html_body: str) -> None: """ Send an HTML email via SMTP. Runs the blocking smtplib call in a thread via asyncio.to_thread so it does not hold the event loop. Supports both STARTTLS (port 587, default) and SMTP_SSL (port 465, set SMTP_USE_SSL=true). If any SMTP setting is missing, this is a no-op. """ required = [ settings.smtp_host, settings.smtp_port, settings.smtp_username, settings.smtp_password, settings.smtp_from_email, ] if not all(required): logger.debug("Email alerting skipped — SMTP settings not fully configured.") return def _send_blocking() -> None: msg = MIMEMultipart("alternative") msg["Subject"] = subject msg["From"] = settings.smtp_from_email msg["To"] = to_email msg.attach(MIMEText(html_body, "html")) if settings.smtp_use_ssl: with smtplib.SMTP_SSL(settings.smtp_host, settings.smtp_port) as smtp: smtp.login(settings.smtp_username, settings.smtp_password) smtp.send_message(msg) else: with smtplib.SMTP(settings.smtp_host, settings.smtp_port) as smtp: smtp.starttls() smtp.login(settings.smtp_username, settings.smtp_password) smtp.send_message(msg) try: await asyncio.to_thread(_send_blocking) logger.debug(f"Email alert sent to {to_email}.") except Exception as e: logger.warning(f"Email alert to {to_email} failed: {e}") def build_scan_email_body(url: str, score: int, issue_count: int) -> str: """ Render a simple HTML email body for a completed scan notification. Kept minimal to maximise email client compatibility. """ score_color = "#e53e3e" if score < 50 else "#dd6b20" if score < 75 else "#38a169" return f"""
A security scan has been completed for:
{url}
| Security Score | {score}/100 |
| Issues Found | {issue_count} |
Sent by SecureLens AI — automated security monitoring
""" def build_regression_email_body(url: str, old_score: int, new_score: int) -> str: """Render an HTML email body for a scheduled scan score regression alert.""" delta = new_score - old_score return f"""A scheduled scan detected a score drop for:
{url}
| Previous Score | {old_score}/100 |
| New Score | {new_score}/100 ({delta:+d}) |
Log in to SecureLens to review the new findings and take action.
Sent by SecureLens AI — automated security monitoring
"""