added random SQL errors, random server errors, XSS baits

This commit is contained in:
Patrick Di Fazio
2026-01-03 17:14:58 +01:00
parent bf73bc7e2c
commit 5f8bb73546
11 changed files with 589 additions and 13 deletions

BIN
src/data/krawl.db Normal file

Binary file not shown.

View File

@@ -6,6 +6,7 @@ import time
from datetime import datetime from datetime import datetime
from http.server import BaseHTTPRequestHandler from http.server import BaseHTTPRequestHandler
from typing import Optional, List from typing import Optional, List
from urllib.parse import urlparse, parse_qs
from config import Config from config import Config
from tracker import AccessTracker from tracker import AccessTracker
@@ -16,6 +17,9 @@ from generators import (
api_response, directory_listing api_response, directory_listing
) )
from wordlists import get_wordlists from wordlists import get_wordlists
from sql_errors import generate_sql_error_response, get_sql_response_with_data
from xss_detector import detect_xss_pattern, generate_xss_response
from server_errors import generate_server_error
class Handler(BaseHTTPRequestHandler): class Handler(BaseHTTPRequestHandler):
@@ -67,6 +71,67 @@ class Handler(BaseHTTPRequestHandler):
if not error_codes: if not error_codes:
error_codes = [400, 401, 403, 404, 500, 502, 503] error_codes = [400, 401, 403, 404, 500, 502, 503]
return random.choice(error_codes) return random.choice(error_codes)
def _parse_query_string(self) -> str:
"""Extract query string from the request path"""
parsed = urlparse(self.path)
return parsed.query
def _handle_sql_endpoint(self, path: str) -> bool:
"""
Handle SQL injection honeypot endpoints.
Returns True if the path was handled, False otherwise.
"""
# SQL-vulnerable endpoints
sql_endpoints = ['/api/search', '/api/sql', '/api/database']
base_path = urlparse(path).path
if base_path not in sql_endpoints:
return False
try:
# Get query parameters
query_string = self._parse_query_string()
# Log SQL injection attempt
client_ip = self._get_client_ip()
user_agent = self._get_user_agent()
# Always check for SQL injection patterns
error_msg, content_type, status_code = generate_sql_error_response(query_string or "")
if error_msg:
# SQL injection detected - log and return error
self.access_logger.warning(f"[SQL INJECTION DETECTED] {client_ip} - {base_path} - Query: {query_string[:100] if query_string else 'empty'}")
self.send_response(status_code)
self.send_header('Content-type', content_type)
self.end_headers()
self.wfile.write(error_msg.encode())
else:
# No injection detected - return fake data
self.access_logger.info(f"[SQL ENDPOINT] {client_ip} - {base_path} - Query: {query_string[:100] if query_string else 'empty'}")
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response_data = get_sql_response_with_data(base_path, query_string or "")
self.wfile.write(response_data.encode())
return True
except BrokenPipeError:
# Client disconnected
return True
except Exception as e:
self.app_logger.error(f"Error handling SQL endpoint {path}: {str(e)}")
# Still send a response even on error
try:
self.send_response(500)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(b'{"error": "Internal server error"}')
except:
pass
return True
def generate_page(self, seed: str) -> str: def generate_page(self, seed: str) -> str:
"""Generate a webpage containing random links or canary token""" """Generate a webpage containing random links or canary token"""
@@ -207,6 +272,68 @@ class Handler(BaseHTTPRequestHandler):
user_agent = self._get_user_agent() user_agent = self._get_user_agent()
post_data = "" post_data = ""
from urllib.parse import urlparse
base_path = urlparse(self.path).path
if base_path in ['/api/search', '/api/sql', '/api/database']:
content_length = int(self.headers.get('Content-Length', 0))
if content_length > 0:
post_data = self.rfile.read(content_length).decode('utf-8', errors="replace")
self.access_logger.info(f"[SQL ENDPOINT POST] {client_ip} - {base_path} - Data: {post_data[:100] if post_data else 'empty'}")
error_msg, content_type, status_code = generate_sql_error_response(post_data)
try:
if error_msg:
self.access_logger.warning(f"[SQL INJECTION DETECTED POST] {client_ip} - {base_path}")
self.send_response(status_code)
self.send_header('Content-type', content_type)
self.end_headers()
self.wfile.write(error_msg.encode())
else:
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response_data = get_sql_response_with_data(base_path, post_data)
self.wfile.write(response_data.encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error in SQL POST handler: {str(e)}")
return
if base_path == '/api/contact':
content_length = int(self.headers.get('Content-Length', 0))
if content_length > 0:
post_data = self.rfile.read(content_length).decode('utf-8', errors="replace")
parsed_data = {}
for pair in post_data.split('&'):
if '=' in pair:
key, value = pair.split('=', 1)
from urllib.parse import unquote_plus
parsed_data[unquote_plus(key)] = unquote_plus(value)
xss_detected = any(detect_xss_pattern(v) for v in parsed_data.values())
if xss_detected:
self.access_logger.warning(f"[XSS ATTEMPT DETECTED] {client_ip} - {base_path} - Data: {post_data[:200]}")
else:
self.access_logger.info(f"[XSS ENDPOINT POST] {client_ip} - {base_path}")
try:
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
response_html = generate_xss_response(parsed_data)
self.wfile.write(response_html.encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error in XSS POST handler: {str(e)}")
return
self.access_logger.warning(f"[LOGIN ATTEMPT] {client_ip} - {self.path} - {user_agent[:50]}") self.access_logger.warning(f"[LOGIN ATTEMPT] {client_ip} - {self.path} - {user_agent[:50]}")
content_length = int(self.headers.get('Content-Length', 0)) content_length = int(self.headers.get('Content-Length', 0))
@@ -215,20 +342,16 @@ class Handler(BaseHTTPRequestHandler):
self.access_logger.warning(f"[POST DATA] {post_data[:200]}") self.access_logger.warning(f"[POST DATA] {post_data[:200]}")
# Parse and log credentials
username, password = self.tracker.parse_credentials(post_data) username, password = self.tracker.parse_credentials(post_data)
if username or password: if username or password:
# Log to dedicated credentials.log file
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
credential_line = f"{timestamp}|{client_ip}|{username or 'N/A'}|{password or 'N/A'}|{self.path}" credential_line = f"{timestamp}|{client_ip}|{username or 'N/A'}|{password or 'N/A'}|{self.path}"
self.credential_logger.info(credential_line) self.credential_logger.info(credential_line)
# Also record in tracker for dashboard
self.tracker.record_credential_attempt(client_ip, self.path, username or 'N/A', password or 'N/A') self.tracker.record_credential_attempt(client_ip, self.path, username or 'N/A', password or 'N/A')
self.access_logger.warning(f"[CREDENTIALS CAPTURED] {client_ip} - Username: {username or 'N/A'} - Path: {self.path}") self.access_logger.warning(f"[CREDENTIALS CAPTURED] {client_ip} - Username: {username or 'N/A'} - Path: {self.path}")
# send the post data (body) to the record_access function so the post data can be used to detect suspicious things.
self.tracker.record_access(client_ip, self.path, user_agent, post_data) self.tracker.record_access(client_ip, self.path, user_agent, post_data)
time.sleep(1) time.sleep(1)
@@ -248,6 +371,10 @@ class Handler(BaseHTTPRequestHandler):
def serve_special_path(self, path: str) -> bool: def serve_special_path(self, path: str) -> bool:
"""Serve special paths like robots.txt, API endpoints, etc.""" """Serve special paths like robots.txt, API endpoints, etc."""
# Check SQL injection honeypot endpoints first
if self._handle_sql_endpoint(path):
return True
try: try:
if path == '/robots.txt': if path == '/robots.txt':
self.send_response(200) self.send_response(200)
@@ -285,7 +412,28 @@ class Handler(BaseHTTPRequestHandler):
self.wfile.write(html_templates.login_form().encode()) self.wfile.write(html_templates.login_form().encode())
return True return True
# WordPress login page if path in ['/users', '/user', '/database', '/db', '/search']:
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_templates.product_search().encode())
return True
if path in ['/info', '/input', '/contact', '/feedback', '/comment']:
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html_templates.input_form().encode())
return True
if path == '/server':
error_html, content_type = generate_server_error()
self.send_response(500)
self.send_header('Content-type', content_type)
self.end_headers()
self.wfile.write(error_html.encode())
return True
if path in ['/wp-login.php', '/wp-login', '/wp-admin', '/wp-admin/']: if path in ['/wp-login.php', '/wp-login', '/wp-admin', '/wp-admin/']:
self.send_response(200) self.send_response(200)
self.send_header('Content-type', 'text/html') self.send_header('Content-type', 'text/html')

65
src/server_errors.py Normal file
View File

@@ -0,0 +1,65 @@
#!/usr/bin/env python3
import random
from wordlists import get_wordlists
def generate_server_error() -> tuple[str, str]:
wl = get_wordlists()
server_errors = wl.server_errors
if not server_errors:
return ("500 Internal Server Error", "text/html")
server_type = random.choice(list(server_errors.keys()))
server_config = server_errors[server_type]
error_codes = {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
500: "Internal Server Error",
502: "Bad Gateway",
503: "Service Unavailable"
}
code = random.choice(list(error_codes.keys()))
message = error_codes[code]
template = server_config.get('template', '')
version = random.choice(server_config.get('versions', ['1.0']))
html = template.replace('{code}', str(code))
html = html.replace('{message}', message)
html = html.replace('{version}', version)
if server_type == 'apache':
os = random.choice(server_config.get('os', ['Ubuntu']))
html = html.replace('{os}', os)
html = html.replace('{host}', 'localhost')
return (html, "text/html")
def get_server_header(server_type: str = None) -> str:
wl = get_wordlists()
server_errors = wl.server_errors
if not server_errors:
return "nginx/1.18.0"
if not server_type:
server_type = random.choice(list(server_errors.keys()))
server_config = server_errors.get(server_type, {})
version = random.choice(server_config.get('versions', ['1.0']))
server_headers = {
'nginx': f"nginx/{version}",
'apache': f"Apache/{version}",
'iis': f"Microsoft-IIS/{version}",
'tomcat': f"Apache-Coyote/1.1"
}
return server_headers.get(server_type, "nginx/1.18.0")

112
src/sql_errors.py Normal file
View File

@@ -0,0 +1,112 @@
#!/usr/bin/env python3
import random
import re
from typing import Optional, Tuple
from wordlists import get_wordlists
def detect_sql_injection_pattern(query_string: str) -> Optional[str]:
if not query_string:
return None
query_lower = query_string.lower()
patterns = {
'quote': [r"'", r'"', r'`'],
'comment': [r'--', r'#', r'/\*', r'\*/'],
'union': [r'\bunion\b', r'\bunion\s+select\b'],
'boolean': [r'\bor\b.*=.*', r'\band\b.*=.*', r"'.*or.*'.*=.*'"],
'time_based': [r'\bsleep\b', r'\bwaitfor\b', r'\bdelay\b', r'\bbenchmark\b'],
'stacked': [r';.*select', r';.*drop', r';.*insert', r';.*update', r';.*delete'],
'command': [r'\bexec\b', r'\bexecute\b', r'\bxp_cmdshell\b'],
'info_schema': [r'information_schema', r'table_schema', r'table_name'],
}
for injection_type, pattern_list in patterns.items():
for pattern in pattern_list:
if re.search(pattern, query_lower):
return injection_type
return None
def get_random_sql_error(db_type: str = None, injection_type: str = None) -> Tuple[str, str]:
wl = get_wordlists()
sql_errors = wl.sql_errors
if not sql_errors:
return ("Database error occurred", "text/plain")
if not db_type:
db_type = random.choice(list(sql_errors.keys()))
db_errors = sql_errors.get(db_type, {})
if injection_type and injection_type in db_errors:
errors = db_errors[injection_type]
elif 'generic' in db_errors:
errors = db_errors['generic']
else:
all_errors = []
for error_list in db_errors.values():
if isinstance(error_list, list):
all_errors.extend(error_list)
errors = all_errors if all_errors else ["Database error occurred"]
error_message = random.choice(errors) if errors else "Database error occurred"
if '{table}' in error_message:
tables = ['users', 'products', 'orders', 'customers', 'accounts', 'sessions']
error_message = error_message.replace('{table}', random.choice(tables))
if '{column}' in error_message:
columns = ['id', 'name', 'email', 'password', 'username', 'created_at']
error_message = error_message.replace('{column}', random.choice(columns))
return (error_message, "text/plain")
def generate_sql_error_response(query_string: str, db_type: str = None) -> Tuple[str, str, int]:
injection_type = detect_sql_injection_pattern(query_string)
if not injection_type:
return (None, None, None)
error_message, content_type = get_random_sql_error(db_type, injection_type)
status_code = 500
if random.random() < 0.3:
status_code = 200
return (error_message, content_type, status_code)
def get_sql_response_with_data(path: str, params: str) -> str:
import json
from generators import random_username, random_email, random_password
injection_type = detect_sql_injection_pattern(params)
if injection_type in ['union', 'boolean', 'stacked']:
data = {
"success": True,
"results": [
{
"id": i,
"username": random_username(),
"email": random_email(),
"password_hash": random_password(),
"role": random.choice(["admin", "user", "moderator"])
}
for i in range(1, random.randint(2, 5))
]
}
return json.dumps(data, indent=2)
return json.dumps({
"success": True,
"message": "Query executed successfully",
"results": []
}, indent=2)

View File

@@ -0,0 +1,66 @@
<!DOCTYPE html>
<html>
<head>
<title>Search</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 600px;
margin: 50px auto;
padding: 20px;
}
h1 {
color: #333;
}
input {
width: 100%;
padding: 8px;
margin: 10px 0;
box-sizing: border-box;
}
button {
background: #4CAF50;
color: white;
padding: 10px 20px;
border: none;
cursor: pointer;
}
button:hover {
background: #45a049;
}
#results {
margin-top: 20px;
padding: 10px;
border: 1px solid #ddd;
background: #f9f9f9;
display: none;
}
</style>
</head>
<body>
<h1>Search</h1>
<form id="searchForm">
<input type="text" id="searchQuery" placeholder="Enter search query..." required>
<button type="submit">Search</button>
</form>
<div id="results"></div>
<script>
document.getElementById('searchForm').addEventListener('submit', async (e) => {
e.preventDefault();
const query = document.getElementById('searchQuery').value;
const results = document.getElementById('results');
try {
const response = await fetch(`/api/search?q=${encodeURIComponent(query)}`);
const text = await response.text();
results.innerHTML = `<pre>${text}</pre>`;
results.style.display = 'block';
} catch (err) {
results.innerHTML = `<p>Error: ${err.message}</p>`;
results.style.display = 'block';
}
});
</script>
</body>
</html>

View File

@@ -0,0 +1,74 @@
<!DOCTYPE html>
<html>
<head>
<title>Contact</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 500px;
margin: 50px auto;
padding: 20px;
}
h1 {
color: #333;
}
input, textarea {
width: 100%;
padding: 8px;
margin: 10px 0;
border: 1px solid #ddd;
box-sizing: border-box;
}
textarea {
min-height: 100px;
}
button {
background: #4CAF50;
color: white;
padding: 10px 20px;
border: none;
cursor: pointer;
}
button:hover {
background: #45a049;
}
#response {
margin-top: 20px;
padding: 10px;
display: none;
}
</style>
</head>
<body>
<h1>Contact</h1>
<form id="contactForm">
<input type="text" name="name" placeholder="Name" required>
<input type="email" name="email" placeholder="Email" required>
<textarea name="message" placeholder="Message" required></textarea>
<button type="submit">Submit</button>
</form>
<div id="response"></div>
<script>
document.getElementById('contactForm').addEventListener('submit', function(e) {
e.preventDefault();
const formData = new FormData(this);
fetch('/api/contact', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams(formData)
})
.then(response => response.text())
.then(text => {
document.getElementById('response').innerHTML = text;
document.getElementById('response').style.display = 'block';
})
.catch(error => {
document.getElementById('response').innerHTML = 'Error: ' + error.message;
document.getElementById('response').style.display = 'block';
});
});
</script>
</body>
</html>

View File

@@ -11,8 +11,18 @@ Disallow: /login/
Disallow: /admin/login Disallow: /admin/login
Disallow: /phpMyAdmin/ Disallow: /phpMyAdmin/
Disallow: /admin/login.php Disallow: /admin/login.php
Disallow: /users
Disallow: /search
Disallow: /contact
Disallow: /info
Disallow: /input
Disallow: /feedback
Disallow: /server
Disallow: /api/v1/users Disallow: /api/v1/users
Disallow: /api/v2/secrets Disallow: /api/v2/secrets
Disallow: /api/search
Disallow: /api/sql
Disallow: /api/database
Disallow: /.env Disallow: /.env
Disallow: /credentials.txt Disallow: /credentials.txt
Disallow: /passwords.txt Disallow: /passwords.txt

View File

@@ -50,3 +50,13 @@ def directory_listing(path: str, dirs: list, files: list) -> str:
rows += row_template.format(href=f, name=f, date="2024-12-01 14:22", size=size) rows += row_template.format(href=f, name=f, date="2024-12-01 14:22", size=size)
return load_template("directory_listing", path=path, rows=rows) return load_template("directory_listing", path=path, rows=rows)
def product_search() -> str:
"""Generate product search page with SQL injection honeypot"""
return load_template("generic_search")
def input_form() -> str:
"""Generate input form page for XSS honeypot"""
return load_template("input_form")

View File

@@ -5,6 +5,7 @@ from collections import defaultdict
from datetime import datetime from datetime import datetime
import re import re
import urllib.parse import urllib.parse
from wordlists import get_wordlists
class AccessTracker: class AccessTracker:
@@ -21,14 +22,19 @@ class AccessTracker:
'burp', 'zap', 'w3af', 'metasploit', 'nuclei', 'gobuster', 'dirbuster' 'burp', 'zap', 'w3af', 'metasploit', 'nuclei', 'gobuster', 'dirbuster'
] ]
# common attack types such as xss, shell injection, probes # Load attack patterns from wordlists
self.attack_types = { wl = get_wordlists()
'path_traversal': r'\.\.', self.attack_types = wl.attack_patterns
'sql_injection': r"('|--|;|\bOR\b|\bUNION\b|\bSELECT\b|\bDROP\b)",
'xss_attempt': r'(<script|javascript:|onerror=|onload=)', # Fallback if wordlists not loaded
'common_probes': r'(wp-admin|phpmyadmin|\.env|\.git|/admin|/config)', if not self.attack_types:
'shell_injection': r'(\||;|`|\$\(|&&)', self.attack_types = {
} 'path_traversal': r'\.\.',
'sql_injection': r"('|--|;|\bOR\b|\bUNION\b|\bSELECT\b|\bDROP\b)",
'xss_attempt': r'(<script|javascript:|onerror=|onload=)',
'common_probes': r'(wp-admin|phpmyadmin|\.env|\.git|/admin|/config)',
'shell_injection': r'(\||;|`|\$\(|&&)',
}
# Track IPs that accessed honeypot paths from robots.txt # Track IPs that accessed honeypot paths from robots.txt
self.honeypot_triggered: Dict[str, List[str]] = defaultdict(list) self.honeypot_triggered: Dict[str, List[str]] = defaultdict(list)

View File

@@ -111,6 +111,18 @@ class Wordlists:
@property @property
def error_codes(self): def error_codes(self):
return self._data.get("error_codes", []) return self._data.get("error_codes", [])
@property
def sql_errors(self):
return self._data.get("sql_errors", {})
@property
def attack_patterns(self):
return self._data.get("attack_patterns", {})
@property
def server_errors(self):
return self._data.get("server_errors", {})
_wordlists_instance = None _wordlists_instance = None

73
src/xss_detector.py Normal file
View File

@@ -0,0 +1,73 @@
#!/usr/bin/env python3
import re
from typing import Optional
from wordlists import get_wordlists
def detect_xss_pattern(input_string: str) -> bool:
if not input_string:
return False
wl = get_wordlists()
xss_pattern = wl.attack_patterns.get('xss_attempt', '')
if not xss_pattern:
xss_pattern = r'(<script|</script|javascript:|onerror=|onload=|onclick=|<iframe|<img|<svg|eval\(|alert\()'
return bool(re.search(xss_pattern, input_string, re.IGNORECASE))
def generate_xss_response(input_data: dict) -> str:
xss_detected = False
reflected_content = []
for key, value in input_data.items():
if detect_xss_pattern(value):
xss_detected = True
reflected_content.append(f"<p><strong>{key}:</strong> {value}</p>")
if xss_detected:
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Submission Received</title>
<style>
body {{ font-family: Arial, sans-serif; max-width: 600px; margin: 50px auto; padding: 20px; }}
.success {{ background: #d4edda; padding: 20px; border-radius: 8px; border: 1px solid #c3e6cb; }}
h2 {{ color: #155724; }}
p {{ margin: 10px 0; }}
</style>
</head>
<body>
<div class="success">
<h2>Thank you for your submission!</h2>
<p>We have received your information:</p>
{''.join(reflected_content)}
<p><em>We will get back to you shortly.</em></p>
</div>
</body>
</html>
"""
return html
return """
<!DOCTYPE html>
<html>
<head>
<title>Submission Received</title>
<style>
body { font-family: Arial, sans-serif; max-width: 600px; margin: 50px auto; padding: 20px; }
.success { background: #d4edda; padding: 20px; border-radius: 8px; border: 1px solid #c3e6cb; }
h2 { color: #155724; }
</style>
</head>
<body>
<div class="success">
<h2>Thank you for your submission!</h2>
<p>Your message has been received and we will respond soon.</p>
</div>
</body>
</html>
"""