From 815da4300b81c48cb204724bf11c36e43854c007 Mon Sep 17 00:00:00 2001 From: carnivuth Date: Sun, 15 Feb 2026 15:10:27 +0100 Subject: [PATCH 1/3] fixed bug on non existent database dump directory --- src/tasks/db_dump.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/tasks/db_dump.py b/src/tasks/db_dump.py index b1644d6..d254f45 100644 --- a/src/tasks/db_dump.py +++ b/src/tasks/db_dump.py @@ -3,7 +3,7 @@ from logger import get_app_logger from database import get_database from config import get_config -from sqlalchemy import MetaData, inspect +from sqlalchemy import MetaData from sqlalchemy.schema import CreateTable import os @@ -36,18 +36,16 @@ def main(): engine = db._engine metadata = MetaData() - - # Reflect the database structure metadata.reflect(bind=engine) + + # create backup directory + os.makedirs(config.backups_path,exist_ok=True) output_file = os.path.join(config.backups_path, "db_dump.sql") with open(output_file, "w") as f: # Write header app_logger.info(f"[Background Task] {task_name} started database dump") - # Get inspector for additional metadata - inspector = inspect(engine) - # Dump schema (CREATE TABLE statements) f.write("-- Schema\n") f.write("-- " + "=" * 70 + "\n\n") From 396b9b17106f3db1640970b2e42e673e62ff6d75 Mon Sep 17 00:00:00 2001 From: carnivuth Date: Sun, 15 Feb 2026 15:10:41 +0100 Subject: [PATCH 2/3] linted code --- src/database.py | 5 +- src/deception_responses.py | 201 +++++++++++++--------- src/handler.py | 104 +++++++---- src/migrations/add_performance_indexes.py | 44 ++--- src/migrations/add_raw_request_column.py | 28 ++- src/models.py | 8 +- src/tasks/analyze_ips.py | 7 +- src/tasks/db_dump.py | 2 +- src/templates/dashboard_template.py | 72 ++++---- src/tracker.py | 19 +- 10 files changed, 292 insertions(+), 198 deletions(-) diff --git a/src/database.py b/src/database.py index eed7f76..5c1a275 100644 --- a/src/database.py +++ b/src/database.py @@ -1697,7 +1697,7 @@ class DatabaseManager: results = ( session.query( AttackDetection.attack_type, - func.count(AttackDetection.id).label('count') + func.count(AttackDetection.id).label("count"), ) .group_by(AttackDetection.attack_type) .order_by(func.count(AttackDetection.id).desc()) @@ -1707,8 +1707,7 @@ class DatabaseManager: return { "attack_types": [ - {"type": row.attack_type, "count": row.count} - for row in results + {"type": row.attack_type, "count": row.count} for row in results ] } finally: diff --git a/src/deception_responses.py b/src/deception_responses.py index 4438d1e..e8ec551 100644 --- a/src/deception_responses.py +++ b/src/deception_responses.py @@ -8,20 +8,20 @@ from typing import Optional, Tuple, Dict from generators import random_username, random_password, random_email from wordlists import get_wordlists -logger = logging.getLogger('krawl') +logger = logging.getLogger("krawl") def detect_path_traversal(path: str, query: str = "", body: str = "") -> bool: """Detect path traversal attempts in request""" full_input = f"{path} {query} {body}" - + wl = get_wordlists() pattern = wl.attack_patterns.get("path_traversal", "") - + if not pattern: # Fallback pattern if wordlists not loaded - pattern = r'(\.\.|%2e%2e|/etc/passwd|/etc/shadow)' - + pattern = r"(\.\.|%2e%2e|/etc/passwd|/etc/shadow)" + if re.search(pattern, full_input, re.IGNORECASE): logger.debug(f"Path traversal detected in {full_input[:100]}") return True @@ -32,14 +32,14 @@ def detect_xxe_injection(body: str) -> bool: """Detect XXE injection attempts in XML payloads""" if not body: return False - + wl = get_wordlists() pattern = wl.attack_patterns.get("xxe_injection", "") - + if not pattern: # Fallback pattern if wordlists not loaded - pattern = r'( bool: def detect_command_injection(path: str, query: str = "", body: str = "") -> bool: """Detect command injection attempts""" full_input = f"{path} {query} {body}" - - logger.debug(f"[CMD_INJECTION_CHECK] path='{path}' query='{query}' body='{body[:50] if body else ''}'") + + logger.debug( + f"[CMD_INJECTION_CHECK] path='{path}' query='{query}' body='{body[:50] if body else ''}'" + ) logger.debug(f"[CMD_INJECTION_CHECK] full_input='{full_input[:200]}'") - + wl = get_wordlists() pattern = wl.attack_patterns.get("command_injection", "") - + if not pattern: # Fallback pattern if wordlists not loaded - pattern = r'(cmd=|exec=|command=|&&|;|\||whoami|id|uname|cat|ls)' - + pattern = r"(cmd=|exec=|command=|&&|;|\||whoami|id|uname|cat|ls)" + if re.search(pattern, full_input, re.IGNORECASE): logger.debug(f"[CMD_INJECTION_CHECK] Command injection pattern matched!") return True - + logger.debug(f"[CMD_INJECTION_CHECK] No command injection detected") return False @@ -71,23 +73,23 @@ def generate_fake_passwd() -> str: """Generate fake /etc/passwd content""" wl = get_wordlists() passwd_config = wl.fake_passwd - + if not passwd_config: # Fallback return "root:x:0:0:root:/root:/bin/bash\nwww-data:x:33:33:www-data:/var/www:/usr/sbin/nologin" - + users = passwd_config.get("system_users", []) uid_min = passwd_config.get("uid_min", 1000) uid_max = passwd_config.get("uid_max", 2000) gid_min = passwd_config.get("gid_min", 1000) gid_max = passwd_config.get("gid_max", 2000) shells = passwd_config.get("shells", ["/bin/bash"]) - + fake_users = [ f"{random_username()}:x:{random.randint(uid_min, uid_max)}:{random.randint(gid_min, gid_max)}::/home/{random_username()}:{random.choice(shells)}" for _ in range(3) ] - + return "\n".join(users + fake_users) @@ -95,21 +97,21 @@ def generate_fake_shadow() -> str: """Generate fake /etc/shadow content""" wl = get_wordlists() shadow_config = wl.fake_shadow - + if not shadow_config: # Fallback return "root:$6$rounds=656000$fake_salt_here$fake_hash_data:19000:0:99999:7:::" - + entries = shadow_config.get("system_entries", []) hash_prefix = shadow_config.get("hash_prefix", "$6$rounds=656000$") salt_length = shadow_config.get("salt_length", 16) hash_length = shadow_config.get("hash_length", 86) - + fake_entries = [ f"{random_username()}:{hash_prefix}{''.join(random.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=salt_length))}${''.join(random.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=hash_length))}:19000:0:99999:7:::" for _ in range(3) ] - + return "\n".join(entries + fake_entries) @@ -138,11 +140,11 @@ DB_PASSWORD=fake_env_password_789 API_KEY=fake_api_key_abc123 SECRET_TOKEN=fake_secret_token_xyz""", } - + for key in configs: if key.lower() in filename.lower(): return configs[key] - + return f"""# Configuration File api_endpoint = https://api.example.com api_key = fake_key_{random.randint(1000, 9999)} @@ -155,57 +157,59 @@ def generate_fake_directory_listing(path: str) -> str: """Generate fake directory listing""" wl = get_wordlists() dir_config = wl.directory_listing - + if not dir_config: # Fallback return f"Index of {path}

Index of {path}

" - + fake_dirs = dir_config.get("fake_directories", []) fake_files = dir_config.get("fake_files", []) - + directories = [(d["name"], d["size"], d["perms"]) for d in fake_dirs] files = [ (f["name"], str(random.randint(f["size_min"], f["size_max"])), f["perms"]) for f in fake_files ] - + html = f"Index of {path}" html += f"

Index of {path}


"
     html += f"{'Name':<40} {'Size':<10} {'Permissions':<15}\n"
     html += "-" * 70 + "\n"
-    
+
     for name, size, perms in directories:
         html += f"{name + '/':<40} {size:<10} {perms:<15}\n"
-    
+
     for name, size, perms in files:
         html += f"{name:<40} {size:<10} {perms:<15}\n"
-    
+
     html += "

" return html def generate_path_traversal_response(path: str) -> Tuple[str, str, int]: """Generate fake response for path traversal attempts""" - + path_lower = path.lower() logger.debug(f"Generating path traversal response for: {path}") - + if "passwd" in path_lower: logger.debug("Returning fake passwd file") return (generate_fake_passwd(), "text/plain", 200) - + if "shadow" in path_lower: logger.debug("Returning fake shadow file") return (generate_fake_shadow(), "text/plain", 200) - - if any(ext in path_lower for ext in [".conf", ".config", ".php", ".env", ".properties"]): + + if any( + ext in path_lower for ext in [".conf", ".config", ".php", ".env", ".properties"] + ): logger.debug("Returning fake config file") return (generate_fake_config_file(path), "text/plain", 200) - + if "proc/self" in path_lower: logger.debug("Returning fake proc info") return (f"{random.randint(1000, 9999)}", "text/plain", 200) - + logger.debug("Returning fake directory listing") return (generate_fake_directory_listing(path), "text/html", 200) @@ -214,15 +218,19 @@ def generate_xxe_response(body: str) -> Tuple[str, str, int]: """Generate fake response for XXE injection attempts""" wl = get_wordlists() xxe_config = wl.xxe_responses - + if "file://" in body: if "passwd" in body: content = generate_fake_passwd() elif "shadow" in body: content = generate_fake_shadow() else: - content = xxe_config.get("default_content", "root:x:0:0:root:/root:/bin/bash") if xxe_config else "root:x:0:0:root:/root:/bin/bash" - + content = ( + xxe_config.get("default_content", "root:x:0:0:root:/root:/bin/bash") + if xxe_config + else "root:x:0:0:root:/root:/bin/bash" + ) + if xxe_config and "file_access" in xxe_config: template = xxe_config["file_access"]["template"] response = template.replace("{content}", content) @@ -233,7 +241,7 @@ def generate_xxe_response(body: str) -> Tuple[str, str, int]: {content} """ return (response, "application/xml", 200) - + if "ENTITY" in body: if xxe_config and "entity_processed" in xxe_config: template = xxe_config["entity_processed"]["template"] @@ -248,7 +256,7 @@ def generate_xxe_response(body: str) -> Tuple[str, str, int]: fake_entity_content_12345 """ return (response, "application/xml", 200) - + if xxe_config and "error" in xxe_config: template = xxe_config["error"]["template"] messages = xxe_config["error"]["messages"] @@ -267,73 +275,94 @@ def generate_command_injection_response(input_text: str) -> Tuple[str, str, int] """Generate fake command execution output""" wl = get_wordlists() cmd_config = wl.command_outputs - + input_lower = input_text.lower() - + # id command - if re.search(r'\bid\b', input_lower): + if re.search(r"\bid\b", input_lower): if cmd_config and "id" in cmd_config: - uid = random.randint(cmd_config.get("uid_min", 1000), cmd_config.get("uid_max", 2000)) - gid = random.randint(cmd_config.get("gid_min", 1000), cmd_config.get("gid_max", 2000)) + uid = random.randint( + cmd_config.get("uid_min", 1000), cmd_config.get("uid_max", 2000) + ) + gid = random.randint( + cmd_config.get("gid_min", 1000), cmd_config.get("gid_max", 2000) + ) template = random.choice(cmd_config["id"]) output = template.replace("{uid}", str(uid)).replace("{gid}", str(gid)) else: output = f"uid={random.randint(1000, 2000)}(www-data) gid={random.randint(1000, 2000)}(www-data) groups={random.randint(1000, 2000)}(www-data)" return (output, "text/plain", 200) - + # whoami command - if re.search(r'\bwhoami\b', input_lower): + if re.search(r"\bwhoami\b", input_lower): users = cmd_config.get("whoami", ["www-data"]) if cmd_config else ["www-data"] return (random.choice(users), "text/plain", 200) - + # uname command - if re.search(r'\buname\b', input_lower): - outputs = cmd_config.get("uname", ["Linux server 5.4.0 x86_64"]) if cmd_config else ["Linux server 5.4.0 x86_64"] + if re.search(r"\buname\b", input_lower): + outputs = ( + cmd_config.get("uname", ["Linux server 5.4.0 x86_64"]) + if cmd_config + else ["Linux server 5.4.0 x86_64"] + ) return (random.choice(outputs), "text/plain", 200) - + # pwd command - if re.search(r'\bpwd\b', input_lower): - paths = cmd_config.get("pwd", ["/var/www/html"]) if cmd_config else ["/var/www/html"] + if re.search(r"\bpwd\b", input_lower): + paths = ( + cmd_config.get("pwd", ["/var/www/html"]) + if cmd_config + else ["/var/www/html"] + ) return (random.choice(paths), "text/plain", 200) - + # ls command - if re.search(r'\bls\b', input_lower): + if re.search(r"\bls\b", input_lower): if cmd_config and "ls" in cmd_config: files = random.choice(cmd_config["ls"]) else: files = ["index.php", "config.php", "uploads"] - output = "\n".join(random.sample(files, k=random.randint(3, min(6, len(files))))) + output = "\n".join( + random.sample(files, k=random.randint(3, min(6, len(files)))) + ) return (output, "text/plain", 200) - + # cat command - if re.search(r'\bcat\b', input_lower): + if re.search(r"\bcat\b", input_lower): if "passwd" in input_lower: return (generate_fake_passwd(), "text/plain", 200) if "shadow" in input_lower: return (generate_fake_shadow(), "text/plain", 200) - cat_content = cmd_config.get("cat_config", "") if cmd_config else "" + cat_content = ( + cmd_config.get("cat_config", "") + if cmd_config + else "" + ) return (cat_content, "text/plain", 200) - + # echo command - if re.search(r'\becho\b', input_lower): - match = re.search(r'echo\s+(.+?)(?:[;&|]|$)', input_text, re.IGNORECASE) + if re.search(r"\becho\b", input_lower): + match = re.search(r"echo\s+(.+?)(?:[;&|]|$)", input_text, re.IGNORECASE) if match: - return (match.group(1).strip('"\''), "text/plain", 200) + return (match.group(1).strip("\"'"), "text/plain", 200) return ("", "text/plain", 200) - + # network commands - if any(cmd in input_lower for cmd in ['wget', 'curl', 'nc', 'netcat']): + if any(cmd in input_lower for cmd in ["wget", "curl", "nc", "netcat"]): if cmd_config and "network_commands" in cmd_config: outputs = cmd_config["network_commands"] output = random.choice(outputs) if "{size}" in output: - size = random.randint(cmd_config.get("download_size_min", 100), cmd_config.get("download_size_max", 10000)) + size = random.randint( + cmd_config.get("download_size_min", 100), + cmd_config.get("download_size_max", 10000), + ) output = output.replace("{size}", str(size)) else: outputs = ["bash: command not found", "Connection timeout"] output = random.choice(outputs) return (output, "text/plain", 200) - + # generic outputs if cmd_config and "generic" in cmd_config: generic_outputs = cmd_config["generic"] @@ -343,7 +372,7 @@ def generate_command_injection_response(input_text: str) -> Tuple[str, str, int] else: generic_outputs = ["", "Command executed successfully", "sh: syntax error"] output = random.choice(generic_outputs) - + return (output, "text/plain", 200) @@ -374,7 +403,9 @@ def detect_sql_injection_pattern(query_string: str) -> Optional[str]: return None -def get_random_sql_error(db_type: str = None, injection_type: str = None) -> Tuple[str, str]: +def get_random_sql_error( + db_type: str = None, injection_type: str = None +) -> Tuple[str, str]: """Generate a random SQL error message""" wl = get_wordlists() sql_errors = wl.sql_errors @@ -411,7 +442,9 @@ def get_random_sql_error(db_type: str = None, injection_type: str = None) -> Tup return (error_message, "text/plain") -def generate_sql_error_response(query_string: str, db_type: str = None) -> Tuple[Optional[str], Optional[str], Optional[int]]: +def generate_sql_error_response( + query_string: str, db_type: str = None +) -> Tuple[Optional[str], Optional[str], Optional[int]]: """Generate SQL error response for detected injection attempts""" injection_type = detect_sql_injection_pattern(query_string) @@ -593,25 +626,29 @@ def get_server_header(server_type: str = None) -> str: return server_headers.get(server_type, "nginx/1.18.0") -def detect_and_respond_deception(path: str, query: str = "", body: str = "", method: str = "GET") -> Optional[Tuple[str, str, int]]: +def detect_and_respond_deception( + path: str, query: str = "", body: str = "", method: str = "GET" +) -> Optional[Tuple[str, str, int]]: """ Main deception detection and response function. Returns (response_body, content_type, status_code) if deception should be applied, None otherwise. """ - - logger.debug(f"Checking deception for {method} {path} query={query[:50] if query else 'empty'}") - + + logger.debug( + f"Checking deception for {method} {path} query={query[:50] if query else 'empty'}" + ) + if detect_path_traversal(path, query, body): logger.info(f"Path traversal detected in: {path}") return generate_path_traversal_response(f"{path}?{query}" if query else path) - + if body and detect_xxe_injection(body): logger.info(f"XXE injection detected") return generate_xxe_response(body) - + if detect_command_injection(path, query, body): logger.info(f"Command injection detected in: {path}") full_input = f"{path} {query} {body}" return generate_command_injection_response(full_input) - + return None diff --git a/src/handler.py b/src/handler.py index 45e9de3..863c223 100644 --- a/src/handler.py +++ b/src/handler.py @@ -78,18 +78,18 @@ class Handler(BaseHTTPRequestHandler): try: # Request line raw = f"{self.command} {self.path} {self.request_version}\r\n" - + # Headers if hasattr(self, "headers") and self.headers: for header, value in self.headers.items(): raw += f"{header}: {value}\r\n" - + raw += "\r\n" - + # Body (if present) if body: raw += body - + return raw except Exception as e: # Fallback to minimal representation if building fails @@ -189,7 +189,9 @@ class Handler(BaseHTTPRequestHandler): pass return True - def _handle_deception_response(self, path: str, query: str = "", body: str = "", method: str = "GET") -> bool: + def _handle_deception_response( + self, path: str, query: str = "", body: str = "", method: str = "GET" + ) -> bool: """ Handle deception responses for path traversal, XXE, and command injection. Returns True if a deception response was sent, False otherwise. @@ -197,32 +199,55 @@ class Handler(BaseHTTPRequestHandler): try: self.app_logger.debug(f"Checking deception for: {method} {path}") result = detect_and_respond_deception(path, query, body, method) - + if result: response_body, content_type, status_code = result client_ip = self._get_client_ip() user_agent = self.headers.get("User-Agent", "") - + # Determine attack type using standardized names from wordlists full_input = f"{path} {query} {body}".lower() attack_type_db = None # For database (standardized) attack_type_log = "UNKNOWN" # For logging (human-readable) - - if "passwd" in path.lower() or "shadow" in path.lower() or ".." in path or ".." in query: + + if ( + "passwd" in path.lower() + or "shadow" in path.lower() + or ".." in path + or ".." in query + ): attack_type_db = "path_traversal" attack_type_log = "PATH_TRAVERSAL" elif body and (" str: @@ -329,16 +356,16 @@ class Handler(BaseHTTPRequestHandler): post_data = "" base_path = urlparse(self.path).path - + content_length = int(self.headers.get("Content-Length", 0)) if content_length > 0: post_data = self.rfile.read(content_length).decode( "utf-8", errors="replace" ) - + parsed_url = urlparse(self.path) query_string = parsed_url.query - + if self._handle_deception_response(self.path, query_string, post_data, "POST"): return @@ -379,8 +406,8 @@ class Handler(BaseHTTPRequestHandler): # Use parse_qs for proper URL decoding parsed_qs = parse_qs(post_data) # parse_qs returns lists, get first value of each - parsed_data = {k: v[0] if v else '' for k, v in parsed_qs.items()} - + parsed_data = {k: v[0] if v else "" for k, v in parsed_qs.items()} + self.app_logger.debug(f"Parsed contact data: {parsed_data}") xss_detected = any(detect_xss_pattern(str(v)) for v in parsed_data.values()) @@ -401,7 +428,7 @@ class Handler(BaseHTTPRequestHandler): user_agent=user_agent, body=post_data, method="POST", - raw_request=self._build_raw_request(post_data) + raw_request=self._build_raw_request(post_data), ) try: @@ -443,8 +470,12 @@ class Handler(BaseHTTPRequestHandler): # send the post data (body) to the record_access function so the post data can be used to detect suspicious things. self.tracker.record_access( - client_ip, self.path, user_agent, post_data, method="POST", - raw_request=self._build_raw_request(post_data) + client_ip, + self.path, + user_agent, + post_data, + method="POST", + raw_request=self._build_raw_request(post_data), ) time.sleep(1) @@ -596,7 +627,7 @@ class Handler(BaseHTTPRequestHandler): query_string = parsed_url.query query_params = parse_qs(query_string) self.app_logger.info(f"query_params: {query_params}") - + if self._handle_deception_response(self.path, query_string, "", "GET"): return @@ -1069,17 +1100,22 @@ class Handler(BaseHTTPRequestHandler): # Extract log ID from path: /api/raw-request/123 log_id = int(self.path.split("/")[-1]) raw_request = db.get_raw_request_by_id(log_id) - + if raw_request is None: self.send_response(404) self.send_header("Content-type", "application/json") self.end_headers() - self.wfile.write(json.dumps({"error": "Raw request not found"}).encode()) + self.wfile.write( + json.dumps({"error": "Raw request not found"}).encode() + ) else: self.send_response(200) self.send_header("Content-type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") - self.send_header("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0") + self.send_header( + "Cache-Control", + "no-store, no-cache, must-revalidate, max-age=0", + ) self.end_headers() self.wfile.write(json.dumps({"raw_request": raw_request}).encode()) except (ValueError, IndexError): @@ -1175,9 +1211,13 @@ class Handler(BaseHTTPRequestHandler): self.wfile.write(b"Internal server error") return - self.tracker.record_access(client_ip, self.path, user_agent, method="GET", - raw_request=self._build_raw_request()) - + self.tracker.record_access( + client_ip, + self.path, + user_agent, + method="GET", + raw_request=self._build_raw_request(), + ) if self.tracker.is_suspicious_user_agent(user_agent): self.access_logger.warning( diff --git a/src/migrations/add_performance_indexes.py b/src/migrations/add_performance_indexes.py index 3359612..b44be15 100644 --- a/src/migrations/add_performance_indexes.py +++ b/src/migrations/add_performance_indexes.py @@ -12,17 +12,19 @@ import os def index_exists(cursor, index_name: str) -> bool: """Check if an index exists.""" - cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,)) + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,) + ) return cursor.fetchone() is not None def add_performance_indexes(db_path: str) -> bool: """ Add performance indexes to optimize queries. - + Args: db_path: Path to the SQLite database file - + Returns: True if indexes were added or already exist, False on error """ @@ -31,14 +33,14 @@ def add_performance_indexes(db_path: str) -> bool: if not os.path.exists(db_path): print(f"Database file not found: {db_path}") return False - + # Connect to database conn = sqlite3.connect(db_path) cursor = conn.cursor() - + indexes_added = [] indexes_existed = [] - + # Index 1: attack_type for efficient GROUP BY operations if not index_exists(cursor, "ix_attack_detections_attack_type"): print("Adding index on attack_detections.attack_type...") @@ -49,10 +51,12 @@ def add_performance_indexes(db_path: str) -> bool: indexes_added.append("ix_attack_detections_attack_type") else: indexes_existed.append("ix_attack_detections_attack_type") - + # Index 2: Composite index for attack_type + access_log_id if not index_exists(cursor, "ix_attack_detections_type_log"): - print("Adding composite index on attack_detections(attack_type, access_log_id)...") + print( + "Adding composite index on attack_detections(attack_type, access_log_id)..." + ) cursor.execute(""" CREATE INDEX ix_attack_detections_type_log ON attack_detections(attack_type, access_log_id) @@ -60,26 +64,26 @@ def add_performance_indexes(db_path: str) -> bool: indexes_added.append("ix_attack_detections_type_log") else: indexes_existed.append("ix_attack_detections_type_log") - + conn.commit() conn.close() - + # Report results if indexes_added: print(f"Successfully added {len(indexes_added)} index(es):") for idx in indexes_added: print(f" - {idx}") - + if indexes_existed: print(f"â„šī¸ {len(indexes_existed)} index(es) already existed:") for idx in indexes_existed: print(f" - {idx}") - + if not indexes_added and not indexes_existed: print("No indexes processed") - + return True - + except sqlite3.Error as e: print(f"SQLite error: {e}") return False @@ -92,19 +96,17 @@ def main(): """Main migration function.""" # Default database path default_db_path = os.path.join( - os.path.dirname(os.path.dirname(__file__)), - "data", - "krawl.db" + os.path.dirname(os.path.dirname(__file__)), "data", "krawl.db" ) - + # Allow custom path as command line argument db_path = sys.argv[1] if len(sys.argv) > 1 else default_db_path - + print(f"Adding performance indexes to database: {db_path}") print("=" * 60) - + success = add_performance_indexes(db_path) - + print("=" * 60) if success: print("Migration completed successfully") diff --git a/src/migrations/add_raw_request_column.py b/src/migrations/add_raw_request_column.py index 8cb63ee..81c3fd7 100644 --- a/src/migrations/add_raw_request_column.py +++ b/src/migrations/add_raw_request_column.py @@ -21,10 +21,10 @@ def column_exists(cursor, table_name: str, column_name: str) -> bool: def add_raw_request_column(db_path: str) -> bool: """ Add raw_request column to access_logs table if it doesn't exist. - + Args: db_path: Path to the SQLite database file - + Returns: True if column was added or already exists, False on error """ @@ -33,30 +33,30 @@ def add_raw_request_column(db_path: str) -> bool: if not os.path.exists(db_path): print(f"Database file not found: {db_path}") return False - + # Connect to database conn = sqlite3.connect(db_path) cursor = conn.cursor() - + # Check if column already exists if column_exists(cursor, "access_logs", "raw_request"): print("Column 'raw_request' already exists in access_logs table") conn.close() return True - + # Add the column print("Adding 'raw_request' column to access_logs table...") cursor.execute(""" ALTER TABLE access_logs ADD COLUMN raw_request TEXT """) - + conn.commit() conn.close() - + print("✅ Successfully added 'raw_request' column to access_logs table") return True - + except sqlite3.Error as e: print(f"SQLite error: {e}") return False @@ -69,19 +69,17 @@ def main(): """Main migration function.""" # Default database path default_db_path = os.path.join( - os.path.dirname(os.path.dirname(__file__)), - "data", - "krawl.db" + os.path.dirname(os.path.dirname(__file__)), "data", "krawl.db" ) - + # Allow custom path as command line argument db_path = sys.argv[1] if len(sys.argv) > 1 else default_db_path - + print(f"🔄 Running migration on database: {db_path}") print("=" * 60) - + success = add_raw_request_column(db_path) - + print("=" * 60) if success: print("Migration completed successfully") diff --git a/src/models.py b/src/models.py index c9d6a26..a38b1f6 100644 --- a/src/models.py +++ b/src/models.py @@ -64,9 +64,7 @@ class AccessLog(Base): DateTime, nullable=False, default=datetime.utcnow, index=True ) # Raw HTTP request for forensic analysis (nullable for backward compatibility) - raw_request: Mapped[Optional[str]] = mapped_column( - String, nullable=True - ) + raw_request: Mapped[Optional[str]] = mapped_column(String, nullable=True) # Relationship to attack detections attack_detections: Mapped[List["AttackDetection"]] = relationship( @@ -141,7 +139,9 @@ class AttackDetection(Base): ) # Composite index for efficient aggregation queries - __table_args__ = (Index("ix_attack_detections_type_log", "attack_type", "access_log_id"),) + __table_args__ = ( + Index("ix_attack_detections_type_log", "attack_type", "access_log_id"), + ) def __repr__(self) -> str: return f"" diff --git a/src/tasks/analyze_ips.py b/src/tasks/analyze_ips.py index e51ab9a..f5fea5b 100644 --- a/src/tasks/analyze_ips.py +++ b/src/tasks/analyze_ips.py @@ -1,7 +1,4 @@ -from sqlalchemy import select -from typing import Optional -from database import get_database, DatabaseManager -from zoneinfo import ZoneInfo +from database import get_database from pathlib import Path from datetime import datetime, timedelta import re @@ -9,8 +6,6 @@ import urllib.parse from wordlists import get_wordlists from config import get_config from logger import get_app_logger -import requests -from sanitizer import sanitize_for_storage, sanitize_dict # ---------------------- # TASK CONFIG diff --git a/src/tasks/db_dump.py b/src/tasks/db_dump.py index d254f45..93d55e3 100644 --- a/src/tasks/db_dump.py +++ b/src/tasks/db_dump.py @@ -39,7 +39,7 @@ def main(): metadata.reflect(bind=engine) # create backup directory - os.makedirs(config.backups_path,exist_ok=True) + os.makedirs(config.backups_path, exist_ok=True) output_file = os.path.join(config.backups_path, "db_dump.sql") with open(output_file, "w") as f: diff --git a/src/templates/dashboard_template.py b/src/templates/dashboard_template.py index dab9a4a..1a312a1 100644 --- a/src/templates/dashboard_template.py +++ b/src/templates/dashboard_template.py @@ -50,51 +50,63 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str: # Generate comprehensive suspicious activity rows combining all suspicious events suspicious_activities = [] - + # Add recent suspicious accesses (attacks) for log in stats.get("recent_suspicious", [])[-20:]: - suspicious_activities.append({ - "type": "Attack", - "ip": log["ip"], - "path": log["path"], - "user_agent": log["user_agent"][:60], - "timestamp": log["timestamp"], - "details": ", ".join(log.get("attack_types", [])) if log.get("attack_types") else "Suspicious behavior" - }) - + suspicious_activities.append( + { + "type": "Attack", + "ip": log["ip"], + "path": log["path"], + "user_agent": log["user_agent"][:60], + "timestamp": log["timestamp"], + "details": ( + ", ".join(log.get("attack_types", [])) + if log.get("attack_types") + else "Suspicious behavior" + ), + } + ) + # Add credential attempts for cred in stats.get("credential_attempts", [])[-20:]: - suspicious_activities.append({ - "type": "Credentials", - "ip": cred["ip"], - "path": cred["path"], - "user_agent": "", - "timestamp": cred["timestamp"], - "details": f"User: {cred.get('username', 'N/A')}" - }) - + suspicious_activities.append( + { + "type": "Credentials", + "ip": cred["ip"], + "path": cred["path"], + "user_agent": "", + "timestamp": cred["timestamp"], + "details": f"User: {cred.get('username', 'N/A')}", + } + ) + # Add honeypot triggers for honeypot in stats.get("honeypot_triggered_ips", [])[-20:]: # honeypot is a tuple (ip, paths) ip = honeypot[0] paths = honeypot[1] if isinstance(honeypot[1], list) else [] - suspicious_activities.append({ - "type": "Honeypot", - "ip": ip, - "path": paths[0] if paths else "Multiple", - "user_agent": "", - "timestamp": "", # Tuples don't have timestamp - "details": f"{len(paths)} trap(s) triggered" - }) - + suspicious_activities.append( + { + "type": "Honeypot", + "ip": ip, + "path": paths[0] if paths else "Multiple", + "user_agent": "", + "timestamp": "", # Tuples don't have timestamp + "details": f"{len(paths)} trap(s) triggered", + } + ) + # Sort by timestamp (most recent first) and take last 20 # Put entries with empty timestamps at the end try: - suspicious_activities.sort(key=lambda x: (x["timestamp"] == "", x["timestamp"]), reverse=True) + suspicious_activities.sort( + key=lambda x: (x["timestamp"] == "", x["timestamp"]), reverse=True + ) except: pass suspicious_activities = suspicious_activities[:20] - + # Generate table rows suspicious_rows = ( "\n".join([f""" diff --git a/src/tracker.py b/src/tracker.py index a951e20..b7b97d5 100644 --- a/src/tracker.py +++ b/src/tracker.py @@ -161,13 +161,24 @@ class AccessTracker: except Exception: # If parsing fails, try simple regex patterns wl = get_wordlists() - username_fields = wl.username_fields or ["username", "user", "login", "email", "log"] - password_fields = wl.password_fields or ["password", "pass", "passwd", "pwd"] - + username_fields = wl.username_fields or [ + "username", + "user", + "login", + "email", + "log", + ] + password_fields = wl.password_fields or [ + "password", + "pass", + "passwd", + "pwd", + ] + # Build regex pattern from wordlist fields username_pattern = "(?:" + "|".join(username_fields) + ")=([^&\\s]+)" password_pattern = "(?:" + "|".join(password_fields) + ")=([^&\\s]+)" - + username_match = re.search(username_pattern, post_data, re.IGNORECASE) password_match = re.search(password_pattern, post_data, re.IGNORECASE) From 17b657744b5171e43c47dd7717888710f32d2827 Mon Sep 17 00:00:00 2001 From: carnivuth Date: Sun, 15 Feb 2026 18:07:39 +0100 Subject: [PATCH 3/3] set default port to 5000 --- config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yaml b/config.yaml index 23a2ae8..08f9fcc 100644 --- a/config.yaml +++ b/config.yaml @@ -1,7 +1,7 @@ # Krawl Honeypot Configuration server: - port: 1234 + port: 5000 delay: 100 # Response delay in milliseconds # manually set the server header, if null a random one will be used.