updated new features

This commit is contained in:
rarebuffalo
2026-05-07 22:32:20 +05:30
parent 17c14dabf9
commit 61797fbb97
9 changed files with 327 additions and 17 deletions

View File

@@ -207,3 +207,82 @@ async def generate_diff_narrative(diff_data: dict) -> str:
result = await call_ai(prompt, temperature=0.4)
return result or "Could not generate diff narrative."
async def generate_remediation_plan(issues: list[dict], url: str) -> dict:
"""
Generates a prioritized, actionable remediation roadmap from a list of issues.
Instead of per-issue snippets (which the scanner already provides), this
function looks at the full picture and produces a sequenced plan that a
developer can actually follow: what to fix first, how hard each fix is,
and a realistic estimate of total effort.
Returns a dict matching the RemediationPlan schema:
{
"summary": str,
"steps": [
{
"priority": int,
"issue": str,
"severity": str,
"effort": "Easy" | "Medium" | "Hard",
"fix_summary": str,
"code_snippet": str | null
}
],
"estimated_total_effort": str
}
"""
if not settings.effective_ai_key:
return {
"summary": "AI remediation plans require an AI API key to be configured.",
"steps": [],
"estimated_total_effort": "N/A",
}
if not issues:
return {
"summary": "No issues were found in the scan. No remediation required.",
"steps": [],
"estimated_total_effort": "0 hours",
}
prompt = (
"You are a senior application security consultant reviewing scan results for a website.\n"
f"Target URL: {url}\n"
f"Issues found:\n{json.dumps(issues, indent=2)}\n\n"
"Generate a prioritized remediation roadmap. Return a JSON object with exactly these keys:\n"
" 'summary' : A 2-3 sentence overall assessment of the security posture.\n"
" 'steps' : A list of objects, one per issue, ordered by priority "
"(most critical first). Each step object must have:\n"
" 'priority' : Integer starting at 1\n"
" 'issue' : The exact issue name from the input\n"
" 'severity' : Critical | High | Medium | Low\n"
" 'effort' : Easy | Medium | Hard\n"
" 'fix_summary' : A concrete, actionable description of how to fix it (2-3 sentences)\n"
" 'code_snippet' : A relevant code or config example, or null if not applicable\n"
" 'estimated_total_effort' : A realistic total time estimate for all fixes combined "
"(e.g. '2-4 hours', '1-2 days').\n\n"
"Order steps strictly by: Critical first, then High, Medium, Low. "
"Within the same severity, put Easy fixes before Hard ones."
)
raw = await call_ai(prompt, temperature=0.2, json_mode=True)
if not raw:
return {
"summary": "Could not generate remediation plan — AI returned an empty response.",
"steps": [],
"estimated_total_effort": "N/A",
}
try:
return json.loads(raw)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse remediation plan JSON: {e}\nRaw: {raw[:500]}")
return {
"summary": "Could not parse the AI-generated remediation plan.",
"steps": [],
"estimated_total_effort": "N/A",
}

View File

@@ -47,6 +47,11 @@ from app.models.webhook import Webhook
from app.services.scoring import calculate_layer_statuses, calculate_score
from app.services.ai import enhance_security_issues
from app.services.webhook_dispatcher import dispatch_webhooks
from app.services.alerting import (
send_slack_alert,
send_email_alert,
build_regression_email_body,
)
from app.config import settings
logger = logging.getLogger(__name__)
@@ -150,13 +155,13 @@ async def _run_single_scan(scheduled: ScheduledScan) -> None:
await db.commit()
# Fire webhooks if the score dropped
# Fire webhooks, Slack alert, and email if the score dropped
score_dropped = previous_score is not None and score < previous_score
if score_dropped:
delta = previous_score - score
logger.warning(
f"Score dropped {delta} pts for {url} "
f"({previous_score} {score}). Firing webhooks."
f"({previous_score} -> {score}). Sending regression alerts."
)
webhook_payload = {
"event": "scheduled_scan_regression",
@@ -168,6 +173,32 @@ async def _run_single_scan(scheduled: ScheduledScan) -> None:
}
await dispatch_webhooks(user_id, webhook_payload, db)
slack_title = f"Score regression detected for {validated_url}"
slack_msg = (
f"Previous score: {previous_score}/100\n"
f"New score: {score}/100 ({-delta:+d} points)\n"
f"Action: Review the latest scan in SecureLens."
)
await send_slack_alert(title=slack_title, message=slack_msg)
# Fetch user email to send the regression alert
from sqlalchemy import select as _select
from app.models.user import User
async with AsyncSessionLocal() as email_db:
user_result = await email_db.execute(
_select(User).where(User.id == user_id)
)
user = user_result.scalar_one_or_none()
if user:
email_body = build_regression_email_body(
validated_url, previous_score, score
)
await send_email_alert(
to_email=user.email,
subject=f"SecureLens: Score regression detected for {validated_url}",
html_body=email_body,
)
logger.info(f"Scheduled scan complete: {url} → score={score}")
except httpx.HTTPError as e: