Files
krawl.es/src/routes/htmx.py

402 lines
12 KiB
Python

#!/usr/bin/env python3
"""
HTMX fragment endpoints.
Server-rendered HTML partials for table pagination, sorting, IP details, and search.
"""
from fastapi import APIRouter, Request, Response, Query
from dependencies import get_db, get_templates
router = APIRouter()
def _dashboard_path(request: Request) -> str:
config = request.app.state.config
return "/" + config.dashboard_secret_path.lstrip("/")
# ── Honeypot Triggers ────────────────────────────────────────────────
@router.get("/htmx/honeypot")
async def htmx_honeypot(
request: Request,
page: int = Query(1),
sort_by: str = Query("count"),
sort_order: str = Query("desc"),
):
db = get_db()
result = db.get_honeypot_paginated(
page=max(1, page), page_size=5, sort_by=sort_by, sort_order=sort_order
)
templates = get_templates()
return templates.TemplateResponse(
"dashboard/partials/honeypot_table.html",
{
"request": request,
"dashboard_path": _dashboard_path(request),
"items": result["honeypots"],
"pagination": result["pagination"],
"sort_by": sort_by,
"sort_order": sort_order,
},
)
# ── Top IPs ──────────────────────────────────────────────────────────
@router.get("/htmx/top-ips")
async def htmx_top_ips(
request: Request,
page: int = Query(1),
sort_by: str = Query("count"),
sort_order: str = Query("desc"),
):
db = get_db()
result = db.get_top_ips_paginated(
page=max(1, page), page_size=8, sort_by=sort_by, sort_order=sort_order
)
templates = get_templates()
return templates.TemplateResponse(
"dashboard/partials/top_ips_table.html",
{
"request": request,
"dashboard_path": _dashboard_path(request),
"items": result["ips"],
"pagination": result["pagination"],
"sort_by": sort_by,
"sort_order": sort_order,
},
)
# ── Top Paths ────────────────────────────────────────────────────────
@router.get("/htmx/top-paths")
async def htmx_top_paths(
request: Request,
page: int = Query(1),
sort_by: str = Query("count"),
sort_order: str = Query("desc"),
):
db = get_db()
result = db.get_top_paths_paginated(
page=max(1, page), page_size=5, sort_by=sort_by, sort_order=sort_order
)
templates = get_templates()
return templates.TemplateResponse(
"dashboard/partials/top_paths_table.html",
{
"request": request,
"dashboard_path": _dashboard_path(request),
"items": result["paths"],
"pagination": result["pagination"],
"sort_by": sort_by,
"sort_order": sort_order,
},
)
# ── Top User-Agents ─────────────────────────────────────────────────
@router.get("/htmx/top-ua")
async def htmx_top_ua(
request: Request,
page: int = Query(1),
sort_by: str = Query("count"),
sort_order: str = Query("desc"),
):
db = get_db()
result = db.get_top_user_agents_paginated(
page=max(1, page), page_size=5, sort_by=sort_by, sort_order=sort_order
)
templates = get_templates()
return templates.TemplateResponse(
"dashboard/partials/top_ua_table.html",
{
"request": request,
"dashboard_path": _dashboard_path(request),
"items": result["user_agents"],
"pagination": result["pagination"],
"sort_by": sort_by,
"sort_order": sort_order,
},
)
# ── Attackers ────────────────────────────────────────────────────────
@router.get("/htmx/attackers")
async def htmx_attackers(
request: Request,
page: int = Query(1),
sort_by: str = Query("total_requests"),
sort_order: str = Query("desc"),
):
db = get_db()
result = db.get_attackers_paginated(
page=max(1, page), page_size=25, sort_by=sort_by, sort_order=sort_order
)
# Normalize pagination key (DB returns total_attackers, template expects total)
pagination = result["pagination"]
if "total_attackers" in pagination and "total" not in pagination:
pagination["total"] = pagination["total_attackers"]
templates = get_templates()
return templates.TemplateResponse(
"dashboard/partials/attackers_table.html",
{
"request": request,
"dashboard_path": _dashboard_path(request),
"items": result["attackers"],
"pagination": pagination,
"sort_by": sort_by,
"sort_order": sort_order,
},
)
# ── Access logs by ip ────────────────────────────────────────────────────────
@router.get("/htmx/access-logs")
async def htmx_access_logs_by_ip(
request: Request,
page: int = Query(1),
sort_by: str = Query("total_requests"),
sort_order: str = Query("desc"),
ip_filter: str = Query("ip_filter"),
):
db = get_db()
result = db.get_access_logs_paginated(
page=max(1, page), page_size=25, ip_filter=ip_filter
)
# Normalize pagination key (DB returns total_attackers, template expects total)
pagination = result["pagination"]
if "total_access_logs" in pagination and "total" not in pagination:
pagination["total"] = pagination["total_access_logs"]
templates = get_templates()
return templates.TemplateResponse(
"dashboard/partials/access_by_ip_table.html",
{
"request": request,
"dashboard_path": _dashboard_path(request),
"items": result["access_logs"],
"pagination": pagination,
"sort_by": sort_by,
"sort_order": sort_order,
"ip_filter": ip_filter,
},
)
# ── Credentials ──────────────────────────────────────────────────────
@router.get("/htmx/credentials")
async def htmx_credentials(
request: Request,
page: int = Query(1),
sort_by: str = Query("timestamp"),
sort_order: str = Query("desc"),
):
db = get_db()
result = db.get_credentials_paginated(
page=max(1, page), page_size=5, sort_by=sort_by, sort_order=sort_order
)
templates = get_templates()
return templates.TemplateResponse(
"dashboard/partials/credentials_table.html",
{
"request": request,
"dashboard_path": _dashboard_path(request),
"items": result["credentials"],
"pagination": result["pagination"],
"sort_by": sort_by,
"sort_order": sort_order,
},
)
# ── Attack Types ─────────────────────────────────────────────────────
@router.get("/htmx/attacks")
async def htmx_attacks(
request: Request,
page: int = Query(1),
sort_by: str = Query("timestamp"),
sort_order: str = Query("desc"),
):
db = get_db()
result = db.get_attack_types_paginated(
page=max(1, page), page_size=5, sort_by=sort_by, sort_order=sort_order
)
# Transform attack data for template (join attack_types list, map id to log_id)
items = []
for attack in result["attacks"]:
items.append(
{
"ip": attack["ip"],
"path": attack["path"],
"attack_type": ", ".join(attack.get("attack_types", [])),
"user_agent": attack.get("user_agent", ""),
"timestamp": attack.get("timestamp"),
"log_id": attack.get("id"),
}
)
templates = get_templates()
return templates.TemplateResponse(
"dashboard/partials/attack_types_table.html",
{
"request": request,
"dashboard_path": _dashboard_path(request),
"items": items,
"pagination": result["pagination"],
"sort_by": sort_by,
"sort_order": sort_order,
},
)
# ── Attack Patterns ──────────────────────────────────────────────────
@router.get("/htmx/patterns")
async def htmx_patterns(
request: Request,
page: int = Query(1),
):
db = get_db()
page = max(1, page)
page_size = 10
# Get all attack type stats and paginate manually
result = db.get_attack_types_stats(limit=100)
all_patterns = [
{"pattern": item["type"], "count": item["count"]}
for item in result.get("attack_types", [])
]
total = len(all_patterns)
total_pages = max(1, (total + page_size - 1) // page_size)
offset = (page - 1) * page_size
items = all_patterns[offset : offset + page_size]
templates = get_templates()
return templates.TemplateResponse(
"dashboard/partials/patterns_table.html",
{
"request": request,
"dashboard_path": _dashboard_path(request),
"items": items,
"pagination": {
"page": page,
"page_size": page_size,
"total": total,
"total_pages": total_pages,
},
},
)
# ── IP Insight (full IP page as partial) ─────────────────────────────
@router.get("/htmx/ip-insight/{ip_address:path}")
async def htmx_ip_insight(ip_address: str, request: Request):
db = get_db()
stats = db.get_ip_stats_by_ip(ip_address)
if not stats:
stats = {"ip": ip_address, "total_requests": "N/A"}
# Transform fields for template compatibility
list_on = stats.get("list_on") or {}
stats["blocklist_memberships"] = list(list_on.keys()) if list_on else []
stats["reverse_dns"] = stats.get("reverse")
templates = get_templates()
return templates.TemplateResponse(
"dashboard/partials/ip_insight.html",
{
"request": request,
"dashboard_path": _dashboard_path(request),
"stats": stats,
"ip_address": ip_address,
},
)
# ── IP Detail ────────────────────────────────────────────────────────
@router.get("/htmx/ip-detail/{ip_address:path}")
async def htmx_ip_detail(ip_address: str, request: Request):
db = get_db()
stats = db.get_ip_stats_by_ip(ip_address)
if not stats:
stats = {"ip": ip_address, "total_requests": "N/A"}
# Transform fields for template compatibility
list_on = stats.get("list_on") or {}
stats["blocklist_memberships"] = list(list_on.keys()) if list_on else []
stats["reverse_dns"] = stats.get("reverse")
templates = get_templates()
return templates.TemplateResponse(
"dashboard/partials/ip_detail.html",
{
"request": request,
"dashboard_path": _dashboard_path(request),
"stats": stats,
},
)
# ── Search ───────────────────────────────────────────────────────────
@router.get("/htmx/search")
async def htmx_search(
request: Request,
q: str = Query(""),
page: int = Query(1),
):
q = q.strip()
if not q:
return Response(content="", media_type="text/html")
db = get_db()
result = db.search_attacks_and_ips(query=q, page=max(1, page), page_size=20)
templates = get_templates()
return templates.TemplateResponse(
"dashboard/partials/search_results.html",
{
"request": request,
"dashboard_path": _dashboard_path(request),
"attacks": result["attacks"],
"ips": result["ips"],
"query": q,
"pagination": result["pagination"],
},
)