mirror of
https://github.com/Rarebuffalo/securelens-backend.git
synced 2026-06-19 07:00:30 +00:00
277 lines
10 KiB
Python
277 lines
10 KiB
Python
"""
|
|
Scheduler Service
|
|
=================
|
|
|
|
Manages recurring automated scans using APScheduler with the AsyncIOScheduler
|
|
backend so it runs natively inside the same event loop as FastAPI.
|
|
|
|
How it works
|
|
------------
|
|
On startup, a single "master" job is registered that runs every hour. When
|
|
it fires, it queries the database for all active ScheduledScan rows whose
|
|
next run is due, executes the full scan pipeline (the same one the /scan
|
|
endpoint uses), saves the result, and — if the score dropped compared to
|
|
the previous run — fires the user's webhooks.
|
|
|
|
We use a one-hour polling interval rather than creating a separate APScheduler
|
|
job per URL. This avoids the complexity of dynamically adding/removing jobs
|
|
when users create or delete scheduled scans, and keeps the scheduler state
|
|
in the database (the source of truth) rather than in APScheduler's job store.
|
|
|
|
Why AsyncIOScheduler
|
|
---------------------
|
|
FastAPI is an async framework running on uvicorn. APScheduler's AsyncIOScheduler
|
|
attaches to the running event loop rather than spawning a separate thread,
|
|
so async functions like database queries and httpx calls work naturally inside
|
|
scheduler jobs without any sync/async bridging hacks.
|
|
|
|
Database sessions
|
|
-----------------
|
|
Scheduler jobs cannot use FastAPI's Depends(get_db) — that only works inside
|
|
request handlers. Instead we create sessions directly via AsyncSessionLocal,
|
|
the same session factory used by the dependency injection system.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
import httpx
|
|
try:
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
except ImportError:
|
|
class AsyncIOScheduler:
|
|
def __init__(self, *args, **kwargs):
|
|
self.running = False
|
|
def add_job(self, *args, **kwargs):
|
|
pass
|
|
def start(self):
|
|
pass
|
|
def shutdown(self, *args, **kwargs):
|
|
pass
|
|
|
|
from sqlalchemy import select
|
|
|
|
from app.database import AsyncSessionLocal
|
|
from app.models.scheduled_scan import ScheduledScan
|
|
from app.models.scan import ScanResult
|
|
from app.models.webhook import Webhook
|
|
from app.services.scoring import calculate_layer_statuses, calculate_score
|
|
from app.services.ai import enhance_security_issues
|
|
from app.services.webhook_dispatcher import dispatch_webhooks
|
|
from app.services.alerting import (
|
|
send_slack_alert,
|
|
send_email_alert,
|
|
build_regression_email_body,
|
|
)
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Module-level scheduler instance — started in main.py lifespan.
|
|
scheduler = AsyncIOScheduler(timezone="UTC")
|
|
|
|
|
|
async def _run_single_scan(scheduled: ScheduledScan) -> None:
|
|
"""
|
|
Executes the full scan pipeline for one ScheduledScan row and saves the
|
|
result. If the score dropped since the last run, fires the user's webhooks.
|
|
|
|
This mirrors the logic in scan.py/scan_website but runs outside a request
|
|
context, so it manages its own DB session.
|
|
"""
|
|
from app.services.scanner.transport import TransportScanner
|
|
from app.services.scanner.ssl_checker import SSLScanner
|
|
from app.services.scanner.headers import HeaderScanner
|
|
from app.services.scanner.cookies import CookieScanner
|
|
from app.services.scanner.exposure import ExposureScanner
|
|
from app.services.scanner.dns import DNSScanner
|
|
from app.services.scanner.ports import PortScanner
|
|
from app.utils.validators import validate_url
|
|
|
|
url = scheduled.url
|
|
user_id = scheduled.user_id
|
|
|
|
logger.info(f"Scheduled scan starting: {url} (user={user_id})")
|
|
|
|
try:
|
|
validated_url = validate_url(url)
|
|
except Exception as e:
|
|
logger.error(f"Scheduled scan skipped — invalid URL '{url}': {e}")
|
|
return
|
|
|
|
try:
|
|
transport_scanner = TransportScanner()
|
|
ssl_scanner = SSLScanner()
|
|
header_scanner = HeaderScanner()
|
|
cookie_scanner = CookieScanner()
|
|
exposure_scanner = ExposureScanner()
|
|
dns_scanner = DNSScanner()
|
|
port_scanner = PortScanner()
|
|
|
|
dns_task = asyncio.create_task(dns_scanner.scan(validated_url))
|
|
port_task = asyncio.create_task(port_scanner.scan(validated_url))
|
|
|
|
async with httpx.AsyncClient(
|
|
timeout=httpx.Timeout(settings.scan_timeout),
|
|
follow_redirects=True,
|
|
) as client:
|
|
response = await client.get(validated_url)
|
|
|
|
all_issues = []
|
|
all_issues.extend(await transport_scanner.scan(validated_url, response))
|
|
all_issues.extend(await ssl_scanner.scan(validated_url, response))
|
|
all_issues.extend(await header_scanner.scan(validated_url, response))
|
|
all_issues.extend(await cookie_scanner.scan(validated_url, response))
|
|
all_issues.extend(await exposure_scanner.scan(validated_url, response))
|
|
all_issues.extend(await dns_task)
|
|
all_issues.extend(await port_task)
|
|
|
|
score = calculate_score(all_issues)
|
|
layers = calculate_layer_statuses(all_issues)
|
|
|
|
if settings.effective_ai_key and all_issues:
|
|
issues_dict_list = [i.model_dump() for i in all_issues]
|
|
ai_data = await enhance_security_issues(issues_dict_list)
|
|
enhanced_list = ai_data.get("enhanced_issues", [])
|
|
enhancement_map = {e.get("issue"): e for e in enhanced_list}
|
|
for original in all_issues:
|
|
enh = enhancement_map.get(original.issue)
|
|
if enh:
|
|
original.contextual_severity = enh.get("contextual_severity")
|
|
original.explanation = enh.get("explanation")
|
|
original.remediation_snippet = enh.get("remediation_snippet")
|
|
|
|
layers_dict = {k: v.model_dump() for k, v in layers.items()}
|
|
issues_list = [i.model_dump() for i in all_issues]
|
|
|
|
previous_score = scheduled.last_score
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
# Save the new scan result
|
|
scan_record = ScanResult(
|
|
user_id=user_id,
|
|
url=validated_url,
|
|
security_score=score,
|
|
layers=layers_dict,
|
|
issues=issues_list,
|
|
)
|
|
db.add(scan_record)
|
|
await db.flush()
|
|
|
|
# Update the scheduled scan metadata
|
|
scheduled_row = await db.get(ScheduledScan, scheduled.id)
|
|
if scheduled_row:
|
|
scheduled_row.last_run_at = datetime.now(timezone.utc)
|
|
scheduled_row.last_score = score
|
|
|
|
await db.commit()
|
|
|
|
# Fire webhooks, Slack alert, and email if the score dropped
|
|
score_dropped = previous_score is not None and score < previous_score
|
|
if score_dropped:
|
|
delta = previous_score - score
|
|
logger.warning(
|
|
f"Score dropped {delta} pts for {url} "
|
|
f"({previous_score} -> {score}). Sending regression alerts."
|
|
)
|
|
webhook_payload = {
|
|
"event": "scheduled_scan_regression",
|
|
"scan_id": scan_record.id,
|
|
"url": validated_url,
|
|
"score": score,
|
|
"previous_score": previous_score,
|
|
"score_delta": -delta,
|
|
}
|
|
await dispatch_webhooks(user_id, webhook_payload, db)
|
|
|
|
slack_title = f"Score regression detected for {validated_url}"
|
|
slack_msg = (
|
|
f"Previous score: {previous_score}/100\n"
|
|
f"New score: {score}/100 ({-delta:+d} points)\n"
|
|
f"Action: Review the latest scan in SecureLens."
|
|
)
|
|
await send_slack_alert(title=slack_title, message=slack_msg)
|
|
|
|
# Fetch user email to send the regression alert
|
|
from sqlalchemy import select as _select
|
|
from app.models.user import User
|
|
async with AsyncSessionLocal() as email_db:
|
|
user_result = await email_db.execute(
|
|
_select(User).where(User.id == user_id)
|
|
)
|
|
user = user_result.scalar_one_or_none()
|
|
if user:
|
|
email_body = build_regression_email_body(
|
|
validated_url, previous_score, score
|
|
)
|
|
await send_email_alert(
|
|
to_email=user.email,
|
|
subject=f"SecureLens: Score regression detected for {validated_url}",
|
|
html_body=email_body,
|
|
)
|
|
|
|
logger.info(f"Scheduled scan complete: {url} → score={score}")
|
|
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"Scheduled scan HTTP error for {url}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Scheduled scan failed for {url}: {e}", exc_info=True)
|
|
|
|
|
|
async def _run_due_scans() -> None:
|
|
"""
|
|
Master job — runs every hour. Finds all active ScheduledScan rows that
|
|
are due for a re-run and executes them concurrently.
|
|
|
|
Due means:
|
|
- daily : last_run_at is None OR more than 24 hours ago
|
|
- weekly : last_run_at is None OR more than 7 days ago
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
thresholds = {
|
|
"daily": now - timedelta(hours=24),
|
|
"weekly": now - timedelta(days=7),
|
|
}
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
result = await db.execute(
|
|
select(ScheduledScan).where(ScheduledScan.is_active == True) # noqa: E712
|
|
)
|
|
all_active = result.scalars().all()
|
|
|
|
due = []
|
|
for s in all_active:
|
|
threshold = thresholds.get(s.schedule)
|
|
if threshold is None:
|
|
continue # unknown schedule type — skip
|
|
if s.last_run_at is None or s.last_run_at < threshold:
|
|
due.append(s)
|
|
|
|
if not due:
|
|
logger.debug("Scheduled scan check: nothing due.")
|
|
return
|
|
|
|
logger.info(f"Running {len(due)} scheduled scan(s).")
|
|
await asyncio.gather(*(_run_single_scan(s) for s in due))
|
|
|
|
|
|
def start_scheduler() -> None:
|
|
"""Start the APScheduler background scheduler. Called from main.py lifespan."""
|
|
scheduler.add_job(
|
|
_run_due_scans,
|
|
trigger="interval",
|
|
hours=1,
|
|
id="scheduled_scan_master",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.start()
|
|
logger.info("Scheduler started — checking for due scans every hour.")
|
|
|
|
|
|
def stop_scheduler() -> None:
|
|
"""Gracefully shut down the scheduler. Called from main.py lifespan."""
|
|
if scheduler.running:
|
|
scheduler.shutdown(wait=False)
|
|
logger.info("Scheduler stopped.")
|