parametrized into config.yaml + bug fix

This commit is contained in:
Leonardo Bambini
2026-01-04 22:20:10 +01:00
parent 48f38cb28e
commit ff98a77e1a
31 changed files with 1239 additions and 236 deletions

View File

@@ -6,6 +6,7 @@ import time
from datetime import datetime
from http.server import BaseHTTPRequestHandler
from typing import Optional, List
from urllib.parse import urlparse, parse_qs
from config import Config
from tracker import AccessTracker
@@ -17,6 +18,9 @@ from generators import (
api_response, directory_listing, random_server_header
)
from wordlists import get_wordlists
from sql_errors import generate_sql_error_response, get_sql_response_with_data
from xss_detector import detect_xss_pattern, generate_xss_response
from server_errors import generate_server_error
class Handler(BaseHTTPRequestHandler):
@@ -69,6 +73,67 @@ class Handler(BaseHTTPRequestHandler):
if not error_codes:
error_codes = [400, 401, 403, 404, 500, 502, 503]
return random.choice(error_codes)
def _parse_query_string(self) -> str:
"""Extract query string from the request path"""
parsed = urlparse(self.path)
return parsed.query
def _handle_sql_endpoint(self, path: str) -> bool:
"""
Handle SQL injection honeypot endpoints.
Returns True if the path was handled, False otherwise.
"""
# SQL-vulnerable endpoints
sql_endpoints = ['/api/search', '/api/sql', '/api/database']
base_path = urlparse(path).path
if base_path not in sql_endpoints:
return False
try:
# Get query parameters
query_string = self._parse_query_string()
# Log SQL injection attempt
client_ip = self._get_client_ip()
user_agent = self._get_user_agent()
# Always check for SQL injection patterns
error_msg, content_type, status_code = generate_sql_error_response(query_string or "")
if error_msg:
# SQL injection detected - log and return error
self.access_logger.warning(f"[SQL INJECTION DETECTED] {client_ip} - {base_path} - Query: {query_string[:100] if query_string else 'empty'}")
self.send_response(status_code)
self.send_header('Content-type', content_type)
self.end_headers()
self.wfile.write(error_msg.encode())
else:
# No injection detected - return fake data
self.access_logger.info(f"[SQL ENDPOINT] {client_ip} - {base_path} - Query: {query_string[:100] if query_string else 'empty'}")
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response_data = get_sql_response_with_data(base_path, query_string or "")
self.wfile.write(response_data.encode())
return True
except BrokenPipeError:
# Client disconnected
return True
except Exception as e:
self.app_logger.error(f"Error handling SQL endpoint {path}: {str(e)}")
# Still send a response even on error
try:
self.send_response(500)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(b'{"error": "Internal server error"}')
except:
pass
return True
def generate_page(self, seed: str) -> str:
"""Generate a webpage containing random links or canary token"""
@@ -209,6 +274,68 @@ class Handler(BaseHTTPRequestHandler):
user_agent = self._get_user_agent()
post_data = ""
from urllib.parse import urlparse
base_path = urlparse(self.path).path
if base_path in ['/api/search', '/api/sql', '/api/database']:
content_length = int(self.headers.get('Content-Length', 0))
if content_length > 0:
post_data = self.rfile.read(content_length).decode('utf-8', errors="replace")
self.access_logger.info(f"[SQL ENDPOINT POST] {client_ip} - {base_path} - Data: {post_data[:100] if post_data else 'empty'}")
error_msg, content_type, status_code = generate_sql_error_response(post_data)
try:
if error_msg:
self.access_logger.warning(f"[SQL INJECTION DETECTED POST] {client_ip} - {base_path}")
self.send_response(status_code)
self.send_header('Content-type', content_type)
self.end_headers()
self.wfile.write(error_msg.encode())
else:
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response_data = get_sql_response_with_data(base_path, post_data)
self.wfile.write(response_data.encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error in SQL POST handler: {str(e)}")
return
if base_path == '/api/contact':
content_length = int(self.headers.get('Content-Length', 0))
if content_length > 0:
post_data = self.rfile.read(content_length).decode('utf-8', errors="replace")
parsed_data = {}
for pair in post_data.split('&'):
if '=' in pair:
key, value = pair.split('=', 1)
from urllib.parse import unquote_plus
parsed_data[unquote_plus(key)] = unquote_plus(value)
xss_detected = any(detect_xss_pattern(v) for v in parsed_data.values())
if xss_detected:
self.access_logger.warning(f"[XSS ATTEMPT DETECTED] {client_ip} - {base_path} - Data: {post_data[:200]}")
else:
self.access_logger.info(f"[XSS ENDPOINT POST] {client_ip} - {base_path}")
try:
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
response_html = generate_xss_response(parsed_data)
self.wfile.write(response_html.encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error in XSS POST handler: {str(e)}")
return
self.access_logger.warning(f"[LOGIN ATTEMPT] {client_ip} - {self.path} - {user_agent[:50]}")
content_length = int(self.headers.get('Content-Length', 0))
@@ -250,6 +377,10 @@ class Handler(BaseHTTPRequestHandler):
def serve_special_path(self, path: str) -> bool:
"""Serve special paths like robots.txt, API endpoints, etc."""
# Check SQL injection honeypot endpoints first
if self._handle_sql_endpoint(path):
return True
try:
if path == '/robots.txt':
self.send_response(200)
@@ -287,7 +418,28 @@ class Handler(BaseHTTPRequestHandler):
self.wfile.write(html_templates.login_form().encode())
return True
# WordPress login page
if path in ['/users', '/user', '/database', '/db', '/search']:
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_templates.product_search().encode())
return True
if path in ['/info', '/input', '/contact', '/feedback', '/comment']:
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_templates.input_form().encode())
return True
if path == '/server':
error_html, content_type = generate_server_error()
self.send_response(500)
self.send_header('Content-type', content_type)
self.end_headers()
self.wfile.write(error_html.encode())
return True
if path in ['/wp-login.php', '/wp-login', '/wp-admin', '/wp-admin/']:
self.send_response(200)
self.send_header('Content-type', 'text/html')