diff --git a/src/handler.py b/src/handler.py
deleted file mode 100644
index 863c223..0000000
--- a/src/handler.py
+++ /dev/null
@@ -1,1264 +0,0 @@
-#!/usr/bin/env python3
-
-import logging
-import random
-import time
-from datetime import datetime
-from http.server import BaseHTTPRequestHandler
-from typing import Optional, List
-from urllib.parse import urlparse, parse_qs, unquote_plus
-import json
-import os
-
-from database import get_database
-from config import Config, get_config
-
-# imports for the __init_subclass__ method, do not remove pls
-from firewall.fwtype import FWType
-from firewall.iptables import Iptables
-from firewall.raw import Raw
-
-from tracker import AccessTracker
-from templates import html_templates
-from templates.dashboard_template import generate_dashboard
-from generators import (
- credentials_txt,
- passwords_txt,
- users_json,
- api_keys_json,
- api_response,
- directory_listing,
- random_server_header,
-)
-from wordlists import get_wordlists
-from deception_responses import (
- detect_and_respond_deception,
- generate_sql_error_response,
- get_sql_response_with_data,
- detect_xss_pattern,
- generate_xss_response,
- generate_server_error,
-)
-from models import AccessLog
-from ip_utils import is_valid_public_ip
-from sqlalchemy import distinct
-
-
-class Handler(BaseHTTPRequestHandler):
- """HTTP request handler for the deception server"""
-
- webpages: Optional[List[str]] = None
- config: Config = None
- tracker: AccessTracker = None
- counter: int = 0
- app_logger: logging.Logger = None
- access_logger: logging.Logger = None
- credential_logger: logging.Logger = None
-
- def _get_client_ip(self) -> str:
- """Extract client IP address from request, checking proxy headers first"""
- # Headers might not be available during early error logging
- if hasattr(self, "headers") and self.headers:
- # Check X-Forwarded-For header (set by load balancers/proxies)
- forwarded_for = self.headers.get("X-Forwarded-For")
- if forwarded_for:
- # X-Forwarded-For can contain multiple IPs, get the first (original client)
- return forwarded_for.split(",")[0].strip()
-
- # Check X-Real-IP header (set by nginx and other proxies)
- real_ip = self.headers.get("X-Real-IP")
- if real_ip:
- return real_ip.strip()
-
- # Fallback to direct connection IP
- return self.client_address[0]
-
- def _build_raw_request(self, body: str = "") -> str:
- """Build raw HTTP request string for forensic analysis"""
- try:
- # Request line
- raw = f"{self.command} {self.path} {self.request_version}\r\n"
-
- # Headers
- if hasattr(self, "headers") and self.headers:
- for header, value in self.headers.items():
- raw += f"{header}: {value}\r\n"
-
- raw += "\r\n"
-
- # Body (if present)
- if body:
- raw += body
-
- return raw
- except Exception as e:
- # Fallback to minimal representation if building fails
- return f"{self.command} {self.path} (error building full request: {str(e)})"
-
- def _get_category_by_ip(self, client_ip: str) -> str:
- """Get the category of an IP from the database"""
- return self.tracker.get_category_by_ip(client_ip)
-
- def _get_page_visit_count(self, client_ip: str) -> int:
- """Get current page visit count for an IP"""
- return self.tracker.get_page_visit_count(client_ip)
-
- def _increment_page_visit(self, client_ip: str) -> int:
- """Increment page visit counter for an IP and return new count"""
- return self.tracker.increment_page_visit(client_ip)
-
- def version_string(self) -> str:
- """Return custom server version for deception."""
- return random_server_header()
-
- def _should_return_error(self) -> bool:
- """Check if we should return an error based on probability"""
- if self.config.probability_error_codes <= 0:
- return False
- return random.randint(1, 100) <= self.config.probability_error_codes
-
- def _get_random_error_code(self) -> int:
- """Get a random error code from wordlists"""
- wl = get_wordlists()
- error_codes = wl.error_codes
- if not error_codes:
- error_codes = [400, 401, 403, 404, 500, 502, 503]
- return random.choice(error_codes)
-
- def _handle_sql_endpoint(self, path: str) -> bool:
- """
- Handle SQL injection honeypot endpoints.
- Returns True if the path was handled, False otherwise.
- """
- # SQL-vulnerable endpoints
- sql_endpoints = ["/api/search", "/api/sql", "/api/database"]
-
- base_path = urlparse(path).path
- if base_path not in sql_endpoints:
- return False
-
- try:
- parsed_url = urlparse(path)
- request_query = parsed_url.query
-
- # Log SQL injection attempt
- client_ip = self._get_client_ip()
- user_agent = self.headers.get("User-Agent", "")
-
- # Always check for SQL injection patterns
- error_msg, content_type, status_code = generate_sql_error_response(
- request_query or ""
- )
-
- if error_msg:
- # SQL injection detected - log and return error
- self.access_logger.warning(
- f"[SQL INJECTION DETECTED] {client_ip} - {base_path} - Query: {request_query[:100] if request_query else 'empty'}"
- )
- self.send_response(status_code)
- self.send_header("Content-type", content_type)
- self.end_headers()
- self.wfile.write(error_msg.encode())
- else:
- # No injection detected - return fake data
- self.access_logger.info(
- f"[SQL ENDPOINT] {client_ip} - {base_path} - Query: {request_query[:100] if request_query else 'empty'}"
- )
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.end_headers()
- response_data = get_sql_response_with_data(
- base_path, request_query or ""
- )
- self.wfile.write(response_data.encode())
-
- return True
-
- except BrokenPipeError:
- # Client disconnected
- return True
- except Exception as e:
- self.app_logger.error(f"Error handling SQL endpoint {path}: {str(e)}")
- # Still send a response even on error
- try:
- self.send_response(500)
- self.send_header("Content-type", "application/json")
- self.end_headers()
- self.wfile.write(b'{"error": "Internal server error"}')
- except:
- pass
- return True
-
- def _handle_deception_response(
- self, path: str, query: str = "", body: str = "", method: str = "GET"
- ) -> bool:
- """
- Handle deception responses for path traversal, XXE, and command injection.
- Returns True if a deception response was sent, False otherwise.
- """
- try:
- self.app_logger.debug(f"Checking deception for: {method} {path}")
- result = detect_and_respond_deception(path, query, body, method)
-
- if result:
- response_body, content_type, status_code = result
- client_ip = self._get_client_ip()
- user_agent = self.headers.get("User-Agent", "")
-
- # Determine attack type using standardized names from wordlists
- full_input = f"{path} {query} {body}".lower()
- attack_type_db = None # For database (standardized)
- attack_type_log = "UNKNOWN" # For logging (human-readable)
-
- if (
- "passwd" in path.lower()
- or "shadow" in path.lower()
- or ".." in path
- or ".." in query
- ):
- attack_type_db = "path_traversal"
- attack_type_log = "PATH_TRAVERSAL"
- elif body and (" str:
- """Generate a webpage containing random links or canary token"""
-
- random.seed(seed)
- num_pages = random.randint(*self.config.links_per_page_range)
-
- # Check if this is a good crawler by IP category from database
- ip_category = self._get_category_by_ip(self._get_client_ip())
-
- # Determine if we should apply crawler page limit based on config and IP category
- should_apply_crawler_limit = False
- if self.config.infinite_pages_for_malicious:
- if (
- ip_category == "good_crawler" or ip_category == "regular_user"
- ) and page_visit_count >= self.config.max_pages_limit:
- should_apply_crawler_limit = True
- else:
- if (
- ip_category == "good_crawler"
- or ip_category == "bad_crawler"
- or ip_category == "attacker"
- ) and page_visit_count >= self.config.max_pages_limit:
- should_apply_crawler_limit = True
-
- # If good crawler reached max pages, return a simple page with no links
- if should_apply_crawler_limit:
- return html_templates.main_page(
- Handler.counter, "
Crawl limit reached.
"
- )
-
- num_pages = random.randint(*self.config.links_per_page_range)
-
- # Build the content HTML
- content = ""
-
- # Add canary token if needed
- if Handler.counter <= 0 and self.config.canary_token_url:
- content += f"""
-
-"""
-
- # Add links
- if self.webpages is None:
- for _ in range(num_pages):
- address = "".join(
- [
- random.choice(self.config.char_space)
- for _ in range(random.randint(*self.config.links_length_range))
- ]
- )
- content += f"""
-
-"""
- else:
- for _ in range(num_pages):
- address = random.choice(self.webpages)
- content += f"""
-
-"""
-
- # Return the complete page using the template
- return html_templates.main_page(Handler.counter, content)
-
- def do_HEAD(self):
- """Sends header information"""
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
-
- def do_POST(self):
- """Handle POST requests (mainly login attempts)"""
- client_ip = self._get_client_ip()
- user_agent = self.headers.get("User-Agent", "")
- post_data = ""
-
- base_path = urlparse(self.path).path
-
- content_length = int(self.headers.get("Content-Length", 0))
- if content_length > 0:
- post_data = self.rfile.read(content_length).decode(
- "utf-8", errors="replace"
- )
-
- parsed_url = urlparse(self.path)
- query_string = parsed_url.query
-
- if self._handle_deception_response(self.path, query_string, post_data, "POST"):
- return
-
- if base_path in ["/api/search", "/api/sql", "/api/database"]:
- self.access_logger.info(
- f"[SQL ENDPOINT POST] {client_ip} - {base_path} - Data: {post_data[:100] if post_data else 'empty'}"
- )
-
- error_msg, content_type, status_code = generate_sql_error_response(
- post_data
- )
-
- try:
- if error_msg:
- self.access_logger.warning(
- f"[SQL INJECTION DETECTED POST] {client_ip} - {base_path}"
- )
- self.send_response(status_code)
- self.send_header("Content-type", content_type)
- self.end_headers()
- self.wfile.write(error_msg.encode())
- else:
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.end_headers()
- response_data = get_sql_response_with_data(base_path, post_data)
- self.wfile.write(response_data.encode())
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error in SQL POST handler: {str(e)}")
- return
-
- if base_path == "/api/contact":
- # Parse URL-encoded POST data properly
- parsed_data = {}
- if post_data:
- # Use parse_qs for proper URL decoding
- parsed_qs = parse_qs(post_data)
- # parse_qs returns lists, get first value of each
- parsed_data = {k: v[0] if v else "" for k, v in parsed_qs.items()}
-
- self.app_logger.debug(f"Parsed contact data: {parsed_data}")
-
- xss_detected = any(detect_xss_pattern(str(v)) for v in parsed_data.values())
-
- if xss_detected:
- self.access_logger.warning(
- f"[XSS ATTEMPT DETECTED] {client_ip} - {base_path} - Data: {post_data[:200]}"
- )
- else:
- self.access_logger.info(
- f"[XSS ENDPOINT POST] {client_ip} - {base_path}"
- )
-
- # Record access for dashboard tracking (including XSS detection)
- self.tracker.record_access(
- ip=client_ip,
- path=self.path,
- user_agent=user_agent,
- body=post_data,
- method="POST",
- raw_request=self._build_raw_request(post_data),
- )
-
- try:
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
- response_html = generate_xss_response(parsed_data)
- self.wfile.write(response_html.encode())
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error in XSS POST handler: {str(e)}")
- return
-
- self.access_logger.warning(
- f"[LOGIN ATTEMPT] {client_ip} - {self.path} - {user_agent[:50]}"
- )
-
- # post_data was already read at the beginning of do_POST, don't read again
- if post_data:
- self.access_logger.warning(f"[POST DATA] {post_data[:200]}")
-
- # Parse and log credentials
- username, password = self.tracker.parse_credentials(post_data)
- if username or password:
- # Log to dedicated credentials.log file
- timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
- credential_line = f"{timestamp}|{client_ip}|{username or 'N/A'}|{password or 'N/A'}|{self.path}"
- self.credential_logger.info(credential_line)
-
- # Also record in tracker for dashboard
- self.tracker.record_credential_attempt(
- client_ip, self.path, username or "N/A", password or "N/A"
- )
-
- self.access_logger.warning(
- f"[CREDENTIALS CAPTURED] {client_ip} - Username: {username or 'N/A'} - Path: {self.path}"
- )
-
- # send the post data (body) to the record_access function so the post data can be used to detect suspicious things.
- self.tracker.record_access(
- client_ip,
- self.path,
- user_agent,
- post_data,
- method="POST",
- raw_request=self._build_raw_request(post_data),
- )
-
- time.sleep(1)
-
- try:
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
- self.wfile.write(html_templates.login_error().encode())
- except BrokenPipeError:
- # Client disconnected before receiving response, ignore silently
- pass
- except Exception as e:
- # Log other exceptions but don't crash
- self.app_logger.error(f"Failed to send response to {client_ip}: {str(e)}")
-
- def serve_special_path(self, path: str) -> bool:
- """Serve special paths like robots.txt, API endpoints, etc."""
-
- # Check SQL injection honeypot endpoints first
- if self._handle_sql_endpoint(path):
- return True
-
- try:
- if path == "/robots.txt":
- self.send_response(200)
- self.send_header("Content-type", "text/plain")
- self.end_headers()
- self.wfile.write(html_templates.robots_txt().encode())
- return True
-
- if path in ["/credentials.txt", "/passwords.txt", "/admin_notes.txt"]:
- self.send_response(200)
- self.send_header("Content-type", "text/plain")
- self.end_headers()
- if "credentials" in path:
- self.wfile.write(credentials_txt().encode())
- else:
- self.wfile.write(passwords_txt().encode())
- return True
-
- if path in ["/users.json", "/api_keys.json", "/config.json"]:
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.end_headers()
- if "users" in path:
- self.wfile.write(users_json().encode())
- elif "api_keys" in path:
- self.wfile.write(api_keys_json().encode())
- else:
- self.wfile.write(api_response("/api/config").encode())
- return True
-
- if path in ["/admin", "/admin/", "/admin/login", "/login"]:
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
- self.wfile.write(html_templates.login_form().encode())
- return True
-
- if path in ["/users", "/user", "/database", "/db", "/search"]:
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
- self.wfile.write(html_templates.product_search().encode())
- return True
-
- if path in ["/info", "/input", "/contact", "/feedback", "/comment"]:
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
- self.wfile.write(html_templates.input_form().encode())
- return True
-
- if path == "/server":
- error_html, content_type = generate_server_error()
- self.send_response(500)
- self.send_header("Content-type", content_type)
- self.end_headers()
- self.wfile.write(error_html.encode())
- return True
-
- if path in ["/wp-login.php", "/wp-login", "/wp-admin", "/wp-admin/"]:
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
- self.wfile.write(html_templates.wp_login().encode())
- return True
-
- if path in ["/wp-content/", "/wp-includes/"] or "wordpress" in path.lower():
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
- self.wfile.write(html_templates.wordpress().encode())
- return True
-
- if "phpmyadmin" in path.lower() or path in ["/pma/", "/phpMyAdmin/"]:
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
- self.wfile.write(html_templates.phpmyadmin().encode())
- return True
-
- if path.startswith("/api/") or path.startswith("/api") or path in ["/.env"]:
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.end_headers()
- self.wfile.write(api_response(path).encode())
- return True
-
- if path in [
- "/backup/",
- "/uploads/",
- "/private/",
- "/admin/",
- "/config/",
- "/database/",
- ]:
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
- self.wfile.write(directory_listing(path).encode())
- return True
- except BrokenPipeError:
- # Client disconnected, ignore silently
- pass
- except Exception as e:
- self.app_logger.error(f"Failed to serve special path {path}: {str(e)}")
- pass
-
- return False
-
- def do_GET(self):
- """Responds to webpage requests"""
-
- client_ip = self._get_client_ip()
-
- # respond with HTTP error code if client is banned
- if self.tracker.is_banned_ip(client_ip):
- self.send_response(500)
- self.end_headers()
- return
-
- # get request data
- user_agent = self.headers.get("User-Agent", "")
- request_path = urlparse(self.path).path
- self.app_logger.info(f"request_query: {request_path}")
- parsed_url = urlparse(self.path)
- query_string = parsed_url.query
- query_params = parse_qs(query_string)
- self.app_logger.info(f"query_params: {query_params}")
-
- if self._handle_deception_response(self.path, query_string, "", "GET"):
- return
-
- # get database reference
- db = get_database()
- session = db.session
-
- # Handle static files for dashboard
- if self.config.dashboard_secret_path and self.path.startswith(
- f"{self.config.dashboard_secret_path}/static/"
- ):
-
- file_path = self.path.replace(
- f"{self.config.dashboard_secret_path}/static/", ""
- )
- static_dir = os.path.join(os.path.dirname(__file__), "templates", "static")
- full_path = os.path.join(static_dir, file_path)
-
- # Security check: ensure the path is within static directory
- if os.path.commonpath(
- [full_path, static_dir]
- ) == static_dir and os.path.exists(full_path):
- try:
- with open(full_path, "rb") as f:
- content = f.read()
- self.send_response(200)
- if file_path.endswith(".svg"):
- self.send_header("Content-type", "image/svg+xml")
- elif file_path.endswith(".css"):
- self.send_header("Content-type", "text/css")
- elif file_path.endswith(".js"):
- self.send_header("Content-type", "application/javascript")
- else:
- self.send_header("Content-type", "application/octet-stream")
- self.send_header("Content-Length", str(len(content)))
- self.end_headers()
- self.wfile.write(content)
- return
- except Exception as e:
- self.app_logger.error(f"Error serving static file: {e}")
-
- self.send_response(404)
- self.send_header("Content-type", "text/plain")
- self.end_headers()
- self.wfile.write(b"Not found")
- return
-
- if (
- self.config.dashboard_secret_path
- and self.path == self.config.dashboard_secret_path
- ):
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
- try:
- stats = self.tracker.get_stats()
- self.wfile.write(
- generate_dashboard(
- stats, self.config.dashboard_secret_path
- ).encode()
- )
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error generating dashboard: {e}")
- return
-
- # API endpoint for fetching all IP statistics
- if (
- self.config.dashboard_secret_path
- and self.path == f"{self.config.dashboard_secret_path}/api/all-ip-stats"
- ):
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header(
- "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
- )
- self.send_header("Pragma", "no-cache")
- self.send_header("Expires", "0")
- self.end_headers()
- try:
-
- ip_stats_list = db.get_ip_stats(limit=500)
- self.wfile.write(json.dumps({"ips": ip_stats_list}).encode())
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error fetching all IP stats: {e}")
- self.wfile.write(json.dumps({"error": str(e)}).encode())
- return
-
- # API endpoint for fetching paginated attackers
- if self.config.dashboard_secret_path and self.path.startswith(
- f"{self.config.dashboard_secret_path}/api/attackers"
- ):
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header(
- "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
- )
- self.send_header("Pragma", "no-cache")
- self.send_header("Expires", "0")
- self.end_headers()
- try:
-
- page = int(query_params.get("page", ["1"])[0])
- page_size = int(query_params.get("page_size", ["25"])[0])
- sort_by = query_params.get("sort_by", ["total_requests"])[0]
- sort_order = query_params.get("sort_order", ["desc"])[0]
-
- # Ensure valid parameters
- page = max(1, page)
- page_size = min(max(1, page_size), 100) # Max 100 per page
-
- result = db.get_attackers_paginated(
- page=page,
- page_size=page_size,
- sort_by=sort_by,
- sort_order=sort_order,
- )
- self.wfile.write(json.dumps(result).encode())
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error fetching attackers: {e}")
- self.wfile.write(json.dumps({"error": str(e)}).encode())
- return
-
- # API endpoint for fetching all IPs (all categories)
- if self.config.dashboard_secret_path and self.path.startswith(
- f"{self.config.dashboard_secret_path}/api/all-ips"
- ):
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header(
- "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
- )
- self.send_header("Pragma", "no-cache")
- self.send_header("Expires", "0")
- self.end_headers()
- try:
-
- # Parse query parameters
- parsed_url = urlparse(self.path)
- query_params = parse_qs(parsed_url.query)
- page = int(query_params.get("page", ["1"])[0])
- page_size = int(query_params.get("page_size", ["25"])[0])
- sort_by = query_params.get("sort_by", ["total_requests"])[0]
- sort_order = query_params.get("sort_order", ["desc"])[0]
-
- # Ensure valid parameters
- page = max(1, page)
- page_size = min(max(1, page_size), 100) # Max 100 per page
-
- result = db.get_all_ips_paginated(
- page=page,
- page_size=page_size,
- sort_by=sort_by,
- sort_order=sort_order,
- )
- self.wfile.write(json.dumps(result).encode())
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error fetching all IPs: {e}")
- self.wfile.write(json.dumps({"error": str(e)}).encode())
- return
-
- # API endpoint for fetching IP stats
- if self.config.dashboard_secret_path and self.path.startswith(
- f"{self.config.dashboard_secret_path}/api/ip-stats/"
- ):
- ip_address = self.path.replace(
- f"{self.config.dashboard_secret_path}/api/ip-stats/", ""
- )
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.send_header("Access-Control-Allow-Origin", "*")
- # Prevent browser caching - force fresh data from database every time
- self.send_header(
- "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
- )
- self.send_header("Pragma", "no-cache")
- self.send_header("Expires", "0")
- self.end_headers()
- try:
-
- ip_stats = db.get_ip_stats_by_ip(ip_address)
- if ip_stats:
- self.wfile.write(json.dumps(ip_stats).encode())
- else:
- self.wfile.write(json.dumps({"error": "IP not found"}).encode())
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error fetching IP stats: {e}")
- self.wfile.write(json.dumps({"error": str(e)}).encode())
- return
-
- # API endpoint for paginated honeypot triggers
- if self.config.dashboard_secret_path and self.path.startswith(
- f"{self.config.dashboard_secret_path}/api/honeypot"
- ):
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header(
- "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
- )
- self.send_header("Pragma", "no-cache")
- self.send_header("Expires", "0")
- self.end_headers()
- try:
-
- parsed_url = urlparse(self.path)
- query_params = parse_qs(parsed_url.query)
- page = int(query_params.get("page", ["1"])[0])
- page_size = int(query_params.get("page_size", ["5"])[0])
- sort_by = query_params.get("sort_by", ["count"])[0]
- sort_order = query_params.get("sort_order", ["desc"])[0]
-
- page = max(1, page)
- page_size = min(max(1, page_size), 100)
-
- result = db.get_honeypot_paginated(
- page=page,
- page_size=page_size,
- sort_by=sort_by,
- sort_order=sort_order,
- )
- self.wfile.write(json.dumps(result).encode())
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error fetching honeypot data: {e}")
- self.wfile.write(json.dumps({"error": str(e)}).encode())
- return
-
- # API endpoint for paginated credentials
- if self.config.dashboard_secret_path and self.path.startswith(
- f"{self.config.dashboard_secret_path}/api/credentials"
- ):
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header(
- "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
- )
- self.send_header("Pragma", "no-cache")
- self.send_header("Expires", "0")
- self.end_headers()
- try:
-
- parsed_url = urlparse(self.path)
- query_params = parse_qs(parsed_url.query)
- page = int(query_params.get("page", ["1"])[0])
- page_size = int(query_params.get("page_size", ["5"])[0])
- sort_by = query_params.get("sort_by", ["timestamp"])[0]
- sort_order = query_params.get("sort_order", ["desc"])[0]
-
- page = max(1, page)
- page_size = min(max(1, page_size), 100)
-
- result = db.get_credentials_paginated(
- page=page,
- page_size=page_size,
- sort_by=sort_by,
- sort_order=sort_order,
- )
- self.wfile.write(json.dumps(result).encode())
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error fetching credentials: {e}")
- self.wfile.write(json.dumps({"error": str(e)}).encode())
- return
-
- # API endpoint for paginated top IPs
- if self.config.dashboard_secret_path and self.path.startswith(
- f"{self.config.dashboard_secret_path}/api/top-ips"
- ):
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header(
- "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
- )
- self.send_header("Pragma", "no-cache")
- self.send_header("Expires", "0")
- self.end_headers()
- try:
-
- parsed_url = urlparse(self.path)
- query_params = parse_qs(parsed_url.query)
- page = int(query_params.get("page", ["1"])[0])
- page_size = int(query_params.get("page_size", ["5"])[0])
- sort_by = query_params.get("sort_by", ["count"])[0]
- sort_order = query_params.get("sort_order", ["desc"])[0]
-
- page = max(1, page)
- page_size = min(max(1, page_size), 100)
-
- result = db.get_top_ips_paginated(
- page=page,
- page_size=page_size,
- sort_by=sort_by,
- sort_order=sort_order,
- )
- self.wfile.write(json.dumps(result).encode())
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error fetching top IPs: {e}")
- self.wfile.write(json.dumps({"error": str(e)}).encode())
- return
-
- # API endpoint for paginated top paths
- if self.config.dashboard_secret_path and self.path.startswith(
- f"{self.config.dashboard_secret_path}/api/top-paths"
- ):
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header(
- "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
- )
- self.send_header("Pragma", "no-cache")
- self.send_header("Expires", "0")
- self.end_headers()
- try:
-
- parsed_url = urlparse(self.path)
- query_params = parse_qs(parsed_url.query)
- page = int(query_params.get("page", ["1"])[0])
- page_size = int(query_params.get("page_size", ["5"])[0])
- sort_by = query_params.get("sort_by", ["count"])[0]
- sort_order = query_params.get("sort_order", ["desc"])[0]
-
- page = max(1, page)
- page_size = min(max(1, page_size), 100)
-
- result = db.get_top_paths_paginated(
- page=page,
- page_size=page_size,
- sort_by=sort_by,
- sort_order=sort_order,
- )
- self.wfile.write(json.dumps(result).encode())
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error fetching top paths: {e}")
- self.wfile.write(json.dumps({"error": str(e)}).encode())
- return
-
- # API endpoint for paginated top user agents
- if self.config.dashboard_secret_path and self.path.startswith(
- f"{self.config.dashboard_secret_path}/api/top-user-agents"
- ):
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header(
- "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
- )
- self.send_header("Pragma", "no-cache")
- self.send_header("Expires", "0")
- self.end_headers()
- try:
-
- parsed_url = urlparse(self.path)
- query_params = parse_qs(parsed_url.query)
- page = int(query_params.get("page", ["1"])[0])
- page_size = int(query_params.get("page_size", ["5"])[0])
- sort_by = query_params.get("sort_by", ["count"])[0]
- sort_order = query_params.get("sort_order", ["desc"])[0]
-
- page = max(1, page)
- page_size = min(max(1, page_size), 100)
-
- result = db.get_top_user_agents_paginated(
- page=page,
- page_size=page_size,
- sort_by=sort_by,
- sort_order=sort_order,
- )
- self.wfile.write(json.dumps(result).encode())
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error fetching top user agents: {e}")
- self.wfile.write(json.dumps({"error": str(e)}).encode())
- return
-
- # API endpoint for paginated attack types
- if self.config.dashboard_secret_path and self.path.startswith(
- f"{self.config.dashboard_secret_path}/api/attack-types"
- ):
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header(
- "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
- )
- self.send_header("Pragma", "no-cache")
- self.send_header("Expires", "0")
- self.end_headers()
- try:
-
- parsed_url = urlparse(self.path)
- query_params = parse_qs(parsed_url.query)
- page = int(query_params.get("page", ["1"])[0])
- page_size = int(query_params.get("page_size", ["5"])[0])
- sort_by = query_params.get("sort_by", ["timestamp"])[0]
- sort_order = query_params.get("sort_order", ["desc"])[0]
-
- page = max(1, page)
- page_size = min(max(1, page_size), 100)
-
- result = db.get_attack_types_paginated(
- page=page,
- page_size=page_size,
- sort_by=sort_by,
- sort_order=sort_order,
- )
- self.wfile.write(json.dumps(result).encode())
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error fetching attack types: {e}")
- self.wfile.write(json.dumps({"error": str(e)}).encode())
- return
-
- # API endpoint for attack types statistics (aggregated)
- if self.config.dashboard_secret_path and self.path.startswith(
- f"{self.config.dashboard_secret_path}/api/attack-types-stats"
- ):
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header(
- "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
- )
- self.send_header("Pragma", "no-cache")
- self.send_header("Expires", "0")
- self.end_headers()
- try:
- parsed_url = urlparse(self.path)
- query_params = parse_qs(parsed_url.query)
- limit = int(query_params.get("limit", ["20"])[0])
- limit = min(max(1, limit), 100) # Cap at 100
-
- result = db.get_attack_types_stats(limit=limit)
- self.wfile.write(json.dumps(result).encode())
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error fetching attack types stats: {e}")
- self.wfile.write(json.dumps({"error": str(e)}).encode())
- return
-
- # API endpoint for fetching raw request by log ID
- if self.config.dashboard_secret_path and self.path.startswith(
- f"{self.config.dashboard_secret_path}/api/raw-request/"
- ):
- try:
- # Extract log ID from path: /api/raw-request/123
- log_id = int(self.path.split("/")[-1])
- raw_request = db.get_raw_request_by_id(log_id)
-
- if raw_request is None:
- self.send_response(404)
- self.send_header("Content-type", "application/json")
- self.end_headers()
- self.wfile.write(
- json.dumps({"error": "Raw request not found"}).encode()
- )
- else:
- self.send_response(200)
- self.send_header("Content-type", "application/json")
- self.send_header("Access-Control-Allow-Origin", "*")
- self.send_header(
- "Cache-Control",
- "no-store, no-cache, must-revalidate, max-age=0",
- )
- self.end_headers()
- self.wfile.write(json.dumps({"raw_request": raw_request}).encode())
- except (ValueError, IndexError):
- self.send_response(400)
- self.send_header("Content-type", "application/json")
- self.end_headers()
- self.wfile.write(json.dumps({"error": "Invalid log ID"}).encode())
- except Exception as e:
- self.app_logger.error(f"Error fetching raw request: {e}")
- self.send_response(500)
- self.send_header("Content-type", "application/json")
- self.end_headers()
- self.wfile.write(json.dumps({"error": str(e)}).encode())
- return
-
- # API endpoint for downloading malicious IPs blocklist file
- if (
- self.config.dashboard_secret_path
- and request_path == f"{self.config.dashboard_secret_path}/api/get_banlist"
- ):
-
- # get fwtype from request params
- fwtype = query_params.get("fwtype", ["iptables"])[0]
- filename = f"{fwtype}_banlist.txt"
- if fwtype == "raw":
- filename = f"malicious_ips.txt"
-
- file_path = os.path.join(self.config.exports_path, f"{filename}")
-
- try:
- if os.path.exists(file_path):
- with open(file_path, "rb") as f:
- content = f.read()
- self.send_response(200)
- self.send_header("Content-type", "text/plain")
- self.send_header(
- "Content-Disposition",
- f'attachment; filename="{filename}"',
- )
- self.send_header("Content-Length", str(len(content)))
- self.end_headers()
- self.wfile.write(content)
- else:
- self.send_response(404)
- self.send_header("Content-type", "text/plain")
- self.end_headers()
- self.wfile.write(b"File not found")
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error serving malicious IPs file: {e}")
- self.send_response(500)
- self.send_header("Content-type", "text/plain")
- self.end_headers()
- self.wfile.write(b"Internal server error")
- return
-
- # API endpoint for downloading malicious IPs file
- if (
- self.config.dashboard_secret_path
- and self.path
- == f"{self.config.dashboard_secret_path}/api/download/malicious_ips.txt"
- ):
-
- file_path = os.path.join(
- os.path.dirname(__file__), "exports", "malicious_ips.txt"
- )
- try:
- if os.path.exists(file_path):
- with open(file_path, "rb") as f:
- content = f.read()
- self.send_response(200)
- self.send_header("Content-type", "text/plain")
- self.send_header(
- "Content-Disposition",
- 'attachment; filename="malicious_ips.txt"',
- )
- self.send_header("Content-Length", str(len(content)))
- self.end_headers()
- self.wfile.write(content)
- else:
- self.send_response(404)
- self.send_header("Content-type", "text/plain")
- self.end_headers()
- self.wfile.write(b"File not found")
- except BrokenPipeError:
- pass
- except Exception as e:
- self.app_logger.error(f"Error serving malicious IPs file: {e}")
- self.send_response(500)
- self.send_header("Content-type", "text/plain")
- self.end_headers()
- self.wfile.write(b"Internal server error")
- return
-
- self.tracker.record_access(
- client_ip,
- self.path,
- user_agent,
- method="GET",
- raw_request=self._build_raw_request(),
- )
-
- if self.tracker.is_suspicious_user_agent(user_agent):
- self.access_logger.warning(
- f"[SUSPICIOUS] {client_ip} - {user_agent[:50]} - {self.path}"
- )
-
- if self._should_return_error():
- error_code = self._get_random_error_code()
- self.access_logger.info(
- f"Returning error {error_code} to {client_ip} - {self.path}"
- )
- self.send_response(error_code)
- self.end_headers()
- return
-
- if self.serve_special_path(self.path):
- return
-
- time.sleep(self.config.delay / 1000.0)
- self.send_response(200)
- self.send_header("Content-type", "text/html")
- self.end_headers()
-
- try:
- # Increment page visit counter for this IP and get the current count
- current_visit_count = self._increment_page_visit(client_ip)
- self.wfile.write(
- self.generate_page(self.path, current_visit_count).encode()
- )
-
- Handler.counter -= 1
-
- if Handler.counter < 0:
- Handler.counter = self.config.canary_token_tries
- except BrokenPipeError:
- # Client disconnected, ignore silently
- pass
- except Exception as e:
- self.app_logger.error(f"Error generating page: {e}")
-
- def log_message(self, format, *args):
- """Override to customize logging - uses access logger"""
- client_ip = self._get_client_ip()
- self.access_logger.info(f"{client_ip} - {format % args}")
diff --git a/src/server.py b/src/server.py
deleted file mode 100644
index ed7ecad..0000000
--- a/src/server.py
+++ /dev/null
@@ -1,138 +0,0 @@
-#!/usr/bin/env python3
-
-"""
-Main server module for the deception honeypot.
-Run this file to start the server.
-"""
-
-import sys
-from http.server import HTTPServer
-
-from config import get_config
-from tracker import AccessTracker
-from handler import Handler
-from logger import (
- initialize_logging,
- get_app_logger,
- get_access_logger,
- get_credential_logger,
-)
-from database import initialize_database
-from tasks_master import get_tasksmaster
-
-
-def print_usage():
- """Print usage information"""
- print(f"Usage: {sys.argv[0]} [FILE]\n")
- print("FILE is file containing a list of webpage names to serve, one per line.")
- print("If no file is provided, random links will be generated.\n")
- print("Configuration:")
- print(" Configuration is loaded from a YAML file (default: config.yaml)")
- print("Set CONFIG_LOCATION environment variable to use a different file.\n")
- print("Example config.yaml structure:")
- print("server:")
- print("port: 5000")
- print("delay: 100")
- print("links:")
- print("min_length: 5")
- print("max_length: 15")
- print("min_per_page: 10")
- print("max_per_page: 15")
- print("canary:")
- print("token_url: null")
- print("token_tries: 10")
- print("dashboard:")
- print("secret_path: null # auto-generated if not set")
- print("database:")
- print('path: "data/krawl.db"')
- print("retention_days: 30")
- print("behavior:")
- print("probability_error_codes: 0")
-
-
-def main():
- """Main entry point for the deception server"""
- if "-h" in sys.argv or "--help" in sys.argv:
- print_usage()
- exit(0)
-
- config = get_config()
-
- # Initialize logging with timezone
- initialize_logging()
- app_logger = get_app_logger()
- access_logger = get_access_logger()
- credential_logger = get_credential_logger()
-
- # Initialize database for persistent storage
- try:
- initialize_database(config.database_path)
- app_logger.info(f"Database initialized at: {config.database_path}")
- except Exception as e:
- app_logger.warning(
- f"Database initialization failed: {e}. Continuing with in-memory only."
- )
-
- tracker = AccessTracker(config.max_pages_limit, config.ban_duration_seconds)
-
- Handler.config = config
- Handler.tracker = tracker
- Handler.counter = config.canary_token_tries
- Handler.app_logger = app_logger
- Handler.access_logger = access_logger
- Handler.credential_logger = credential_logger
-
- if len(sys.argv) == 2:
- try:
- with open(sys.argv[1], "r") as f:
- Handler.webpages = f.readlines()
-
- if not Handler.webpages:
- app_logger.warning(
- "The file provided was empty. Using randomly generated links."
- )
- Handler.webpages = None
- except IOError:
- app_logger.warning("Can't read input file. Using randomly generated links.")
-
- # tasks master init
- tasks_master = get_tasksmaster()
- tasks_master.run_scheduled_tasks()
-
- try:
-
- banner = f"""
-
-============================================================
-DASHBOARD AVAILABLE AT
-{config.dashboard_secret_path}
-============================================================
- """
- app_logger.info(banner)
- app_logger.info(f"Starting deception server on port {config.port}...")
- if config.canary_token_url:
- app_logger.info(
- f"Canary token will appear after {config.canary_token_tries} tries"
- )
- else:
- app_logger.info(
- "No canary token configured (set CANARY_TOKEN_URL to enable)"
- )
-
- server = HTTPServer(("0.0.0.0", config.port), Handler)
- app_logger.info("Server started. Use to stop.")
- server.serve_forever()
- except KeyboardInterrupt:
- app_logger.info("Stopping server...")
- server.socket.close()
- app_logger.info("Server stopped")
- except Exception as e:
- app_logger.error(f"Error starting HTTP server on port {config.port}: {e}")
- app_logger.error(
- f"Make sure you are root, if needed, and that port {config.port} is open."
- )
- exit(1)
-
-
-if __name__ == "__main__":
- main()
diff --git a/src/templates/dashboard_template.py b/src/templates/dashboard_template.py
deleted file mode 100644
index 1a312a1..0000000
--- a/src/templates/dashboard_template.py
+++ /dev/null
@@ -1,3612 +0,0 @@
-#!/usr/bin/env python3
-
-"""
-Dashboard template for viewing honeypot statistics.
-Customize this template to change the dashboard appearance.
-"""
-
-import html
-from datetime import datetime
-from zoneinfo import ZoneInfo
-
-# imports for the __init_subclass__ method, do not remove pls
-from firewall import fwtype
-
-
-def _escape(value) -> str:
- """Escape HTML special characters to prevent XSS attacks."""
- if value is None:
- return ""
- return html.escape(str(value))
-
-
-def format_timestamp(iso_timestamp: str, time_only: bool = False) -> str:
- """Format ISO timestamp for display with timezone conversion
-
- Args:
- iso_timestamp: ISO format timestamp string (UTC)
- time_only: If True, return only HH:MM:SS, otherwise full datetime
- """
- try:
- # Parse UTC timestamp
- dt = datetime.fromisoformat(iso_timestamp)
- if time_only:
- return dt.strftime("%H:%M:%S")
- return dt.strftime("%Y-%m-%d %H:%M:%S")
- except Exception:
- # Fallback for old format
- return (
- iso_timestamp.split("T")[1][:8] if "T" in iso_timestamp else iso_timestamp
- )
-
-
-def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
- """Generate dashboard HTML with access statistics
-
- Args:
- stats: Statistics dictionary
- dashboard_path: The secret dashboard path for generating API URLs
- """
-
- # Generate comprehensive suspicious activity rows combining all suspicious events
- suspicious_activities = []
-
- # Add recent suspicious accesses (attacks)
- for log in stats.get("recent_suspicious", [])[-20:]:
- suspicious_activities.append(
- {
- "type": "Attack",
- "ip": log["ip"],
- "path": log["path"],
- "user_agent": log["user_agent"][:60],
- "timestamp": log["timestamp"],
- "details": (
- ", ".join(log.get("attack_types", []))
- if log.get("attack_types")
- else "Suspicious behavior"
- ),
- }
- )
-
- # Add credential attempts
- for cred in stats.get("credential_attempts", [])[-20:]:
- suspicious_activities.append(
- {
- "type": "Credentials",
- "ip": cred["ip"],
- "path": cred["path"],
- "user_agent": "",
- "timestamp": cred["timestamp"],
- "details": f"User: {cred.get('username', 'N/A')}",
- }
- )
-
- # Add honeypot triggers
- for honeypot in stats.get("honeypot_triggered_ips", [])[-20:]:
- # honeypot is a tuple (ip, paths)
- ip = honeypot[0]
- paths = honeypot[1] if isinstance(honeypot[1], list) else []
- suspicious_activities.append(
- {
- "type": "Honeypot",
- "ip": ip,
- "path": paths[0] if paths else "Multiple",
- "user_agent": "",
- "timestamp": "", # Tuples don't have timestamp
- "details": f"{len(paths)} trap(s) triggered",
- }
- )
-
- # Sort by timestamp (most recent first) and take last 20
- # Put entries with empty timestamps at the end
- try:
- suspicious_activities.sort(
- key=lambda x: (x["timestamp"] == "", x["timestamp"]), reverse=True
- )
- except:
- pass
- suspicious_activities = suspicious_activities[:20]
-
- # Generate table rows
- suspicious_rows = (
- "\n".join([f"""
- | {_escape(activity["ip"])} |
- {_escape(activity["type"])} |
- {_escape(activity["path"])} |
- {_escape(activity["details"])} |
- {format_timestamp(activity["timestamp"], time_only=True)} |
-
-
- |
-
- |
-
""" for activity in suspicious_activities])
- or '| No suspicious activity detected |
'
- )
-
- return f"""
-
-
-
- Krawl Dashboard
-
-
-
-
-
-
-
-
-
-
- BlessedRebuS/Krawl
-
-
-
-
-
-
-
-
Krawl Dashboard
-
-
-
-
{stats['total_accesses']}
-
Total Accesses
-
-
-
{stats['unique_ips']}
-
Unique IPs
-
-
-
{stats['unique_paths']}
-
Unique Paths
-
-
-
{stats['suspicious_accesses']}
-
Suspicious Accesses
-
-
-
{stats.get('honeypot_ips', 0)}
-
Honeypot Caught
-
-
-
{len(stats.get('credential_attempts', []))}
-
Credentials Captured
-
-
-
{stats.get('unique_attackers', 0)}
-
Unique Attackers
-
-
-
-
-
-
-
-
Recent Suspicious Activity
-
-
-
- | IP Address |
- Type |
- Path |
- Details |
- Time |
-
-
-
- {suspicious_rows}
-
-
-
-
-
-
-
Honeypot Triggers by IP
-
-
-
-
-
- | # |
- IP Address |
- Accessed Paths |
- Count |
-
-
-
- | Loading... |
-
-
-
-
-
-
-
-
Top IP Addresses
-
-
-
-
-
- | # |
- IP Address |
- Access Count |
-
-
-
- | Loading... |
-
-
-
-
-
-
-
Top User-Agents
-
-
-
-
-
- | # |
- User-Agent |
- Count |
-
-
-
- | Loading... |
-
-
-
-
-
-
-
-
-
-
-
-
Attackers by Total Requests
-
-
-
-
-
-
- | # |
- IP Address |
- Total Requests |
- First Seen |
- Last Seen |
- Location |
-
-
-
-
-
-
-
-
-
-
-
Captured Credentials
-
-
-
-
-
- | # |
- IP Address |
- Username |
- Password |
- Path |
- Time |
-
-
-
- | Loading... |
-
-
-
-
-
-
-
Detected Attack Types
-
-
-
-
-
- | # |
- IP Address |
- Path |
- Attack Types |
- User-Agent |
- Time |
- Actions |
-
-
-
- | Loading... |
-
-
-
-
-
-
-
-
-
Most Recurring Attack Types
-
Top 10
-
-
-
-
-
-
-
-
-
-
-
Most Recurring Attack Patterns
-
-
-
-
-
-
- | # |
- Attack Pattern |
- Attack Type |
- Frequency |
- IPs |
-
-
-
- | Loading... |
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-"""