From 34a9802af56a082b3df99407215f1e79009e7ec4 Mon Sep 17 00:00:00 2001 From: Lorenzo Venerandi Date: Tue, 17 Feb 2026 14:46:54 +0100 Subject: [PATCH] feat: implement auto-tracking for honeypot requests based on attack patterns --- src/routes/honeypot.py | 65 ++++++++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 28 deletions(-) diff --git a/src/routes/honeypot.py b/src/routes/honeypot.py index 44c38a5..5d2f0ff 100644 --- a/src/routes/honeypot.py +++ b/src/routes/honeypot.py @@ -41,7 +41,43 @@ from deception_responses import ( from wordlists import get_wordlists from logger import get_app_logger, get_access_logger, get_credential_logger -router = APIRouter() + +# --- Auto-tracking dependency --- +# Only records requests where an attack pattern is detected in the path or body. + + +async def _track_honeypot_request(request: Request): + """Record access only for requests with detected attack patterns.""" + tracker = request.app.state.tracker + client_ip = get_client_ip(request) + user_agent = request.headers.get("User-Agent", "") + path = request.url.path + + body = "" + if request.method in ("POST", "PUT"): + body_bytes = await request.body() + body = body_bytes.decode("utf-8", errors="replace") + + # Only record if an attack pattern is detected in the path or body + attack_findings = tracker.detect_attack_type(path) + + if body: + import urllib.parse + decoded_body = urllib.parse.unquote(body) + attack_findings.extend(tracker.detect_attack_type(decoded_body)) + + if attack_findings: + tracker.record_access( + ip=client_ip, + path=path, + user_agent=user_agent, + body=body, + method=request.method, + raw_request=build_raw_request(request, body), + ) + + +router = APIRouter(dependencies=[Depends(_track_honeypot_request)]) # --- Helper functions --- @@ -128,15 +164,6 @@ async def contact_post(request: Request): else: access_logger.info(f"[XSS ENDPOINT POST] {client_ip} - {request.url.path}") - tracker.record_access( - ip=client_ip, - path=str(request.url.path), - user_agent=user_agent, - body=post_data, - method="POST", - raw_request=build_raw_request(request, post_data), - ) - response_html = generate_xss_response(parsed_data) return HTMLResponse(content=response_html, status_code=200) @@ -176,15 +203,6 @@ async def credential_capture_post(request: Request, path: str): f"[CREDENTIALS CAPTURED] {client_ip} - Username: {username or 'N/A'} - Path: {full_path}" ) - tracker.record_access( - client_ip, - full_path, - user_agent, - post_data, - method="POST", - raw_request=build_raw_request(request, post_data), - ) - await asyncio.sleep(1) return HTMLResponse(content=html_templates.login_error(), status_code=200) @@ -373,15 +391,6 @@ async def trap_page(request: Request, path: str): if "wordpress" in full_path.lower(): return HTMLResponse(html_templates.wordpress()) - # Record access - tracker.record_access( - client_ip, - full_path, - user_agent, - method="GET", - raw_request=build_raw_request(request), - ) - if tracker.is_suspicious_user_agent(user_agent): access_logger.warning( f"[SUSPICIOUS] {client_ip} - {user_agent[:50]} - {full_path}"