From e1c1b4d9531f95d37ffa7c6d632533d48e8f94db Mon Sep 17 00:00:00 2001 From: Lorenzo Venerandi Date: Sun, 1 Mar 2026 21:52:27 +0100 Subject: [PATCH] chore: update Dockerfile and requirements for Python 3.13, enhance error handling in config and tracker modules --- Dockerfile | 7 ++-- requirements.txt | 2 +- src/config.py | 2 +- src/deception_responses.py | 75 +++++++++++++++++++------------------- src/tasks/db_retention.py | 4 +- src/tracker.py | 18 ++++----- 6 files changed, 55 insertions(+), 53 deletions(-) diff --git a/Dockerfile b/Dockerfile index f6caa8b..e93a55c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,15 +1,16 @@ -FROM python:3.11-slim +FROM python:3.13-slim LABEL org.opencontainers.image.source=https://github.com/BlessedRebuS/Krawl WORKDIR /app # Install gosu for dropping privileges -RUN apt-get update && apt-get install -y --no-install-recommends gosu && \ +RUN apt-get update && apt-get upgrade -y && apt-get install -y --no-install-recommends gosu && \ rm -rf /var/lib/apt/lists/* COPY requirements.txt /app/ -RUN pip install --no-cache-dir -r requirements.txt +RUN pip install --no-cache-dir --upgrade pip && \ + pip install --no-cache-dir -r requirements.txt COPY src/ /app/src/ COPY wordlists.json /app/ diff --git a/requirements.txt b/requirements.txt index 2c0b7b5..56dc4d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,5 +15,5 @@ requests>=2.32.5 # Web framework fastapi>=0.115.0 uvicorn[standard]>=0.30.0 -jinja2>=3.1.0 +jinja2>=3.1.5 python-multipart>=0.0.9 \ No newline at end of file diff --git a/src/config.py b/src/config.py index 8344883..cb46bf6 100644 --- a/src/config.py +++ b/src/config.py @@ -94,7 +94,7 @@ class Config: ip = response.text.strip() if ip: break - except Exception: + except requests.RequestException: continue if not ip: diff --git a/src/deception_responses.py b/src/deception_responses.py index e8ec551..6e90ed3 100644 --- a/src/deception_responses.py +++ b/src/deception_responses.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 import re -import random +import secrets import logging import json from typing import Optional, Tuple, Dict @@ -9,6 +9,7 @@ from generators import random_username, random_password, random_email from wordlists import get_wordlists logger = logging.getLogger("krawl") +_sysrand = secrets.SystemRandom() def detect_path_traversal(path: str, query: str = "", body: str = "") -> bool: @@ -86,7 +87,7 @@ def generate_fake_passwd() -> str: shells = passwd_config.get("shells", ["/bin/bash"]) fake_users = [ - f"{random_username()}:x:{random.randint(uid_min, uid_max)}:{random.randint(gid_min, gid_max)}::/home/{random_username()}:{random.choice(shells)}" + f"{random_username()}:x:{_sysrand.randint(uid_min, uid_max)}:{_sysrand.randint(gid_min, gid_max)}::/home/{random_username()}:{secrets.choice(shells)}" for _ in range(3) ] @@ -108,7 +109,7 @@ def generate_fake_shadow() -> str: hash_length = shadow_config.get("hash_length", 86) fake_entries = [ - f"{random_username()}:{hash_prefix}{''.join(random.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=salt_length))}${''.join(random.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=hash_length))}:19000:0:99999:7:::" + f"{random_username()}:{hash_prefix}{''.join(_sysrand.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=salt_length))}${''.join(_sysrand.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=hash_length))}:19000:0:99999:7:::" for _ in range(3) ] @@ -147,9 +148,9 @@ SECRET_TOKEN=fake_secret_token_xyz""", return f"""# Configuration File api_endpoint = https://api.example.com -api_key = fake_key_{random.randint(1000, 9999)} +api_key = fake_key_{_sysrand.randint(1000, 9999)} database_url = mysql://user:fake_pass@localhost/db -secret = fake_secret_{random.randint(10000, 99999)} +secret = fake_secret_{_sysrand.randint(10000, 99999)} """ @@ -167,7 +168,7 @@ def generate_fake_directory_listing(path: str) -> str: directories = [(d["name"], d["size"], d["perms"]) for d in fake_dirs] files = [ - (f["name"], str(random.randint(f["size_min"], f["size_max"])), f["perms"]) + (f["name"], str(_sysrand.randint(f["size_min"], f["size_max"])), f["perms"]) for f in fake_files ] @@ -208,7 +209,7 @@ def generate_path_traversal_response(path: str) -> Tuple[str, str, int]: if "proc/self" in path_lower: logger.debug("Returning fake proc info") - return (f"{random.randint(1000, 9999)}", "text/plain", 200) + return (f"{_sysrand.randint(1000, 9999)}", "text/plain", 200) logger.debug("Returning fake directory listing") return (generate_fake_directory_listing(path), "text/html", 200) @@ -246,7 +247,7 @@ def generate_xxe_response(body: str) -> Tuple[str, str, int]: if xxe_config and "entity_processed" in xxe_config: template = xxe_config["entity_processed"]["template"] entity_values = xxe_config["entity_processed"]["entity_values"] - entity_value = random.choice(entity_values) + entity_value = secrets.choice(entity_values) response = template.replace("{entity_value}", entity_value) else: response = """ @@ -260,7 +261,7 @@ def generate_xxe_response(body: str) -> Tuple[str, str, int]: if xxe_config and "error" in xxe_config: template = xxe_config["error"]["template"] messages = xxe_config["error"]["messages"] - message = random.choice(messages) + message = secrets.choice(messages) response = template.replace("{message}", message) else: response = """ @@ -281,22 +282,22 @@ def generate_command_injection_response(input_text: str) -> Tuple[str, str, int] # id command if re.search(r"\bid\b", input_lower): if cmd_config and "id" in cmd_config: - uid = random.randint( + uid = _sysrand.randint( cmd_config.get("uid_min", 1000), cmd_config.get("uid_max", 2000) ) - gid = random.randint( + gid = _sysrand.randint( cmd_config.get("gid_min", 1000), cmd_config.get("gid_max", 2000) ) - template = random.choice(cmd_config["id"]) + template = secrets.choice(cmd_config["id"]) output = template.replace("{uid}", str(uid)).replace("{gid}", str(gid)) else: - output = f"uid={random.randint(1000, 2000)}(www-data) gid={random.randint(1000, 2000)}(www-data) groups={random.randint(1000, 2000)}(www-data)" + output = f"uid={_sysrand.randint(1000, 2000)}(www-data) gid={_sysrand.randint(1000, 2000)}(www-data) groups={_sysrand.randint(1000, 2000)}(www-data)" return (output, "text/plain", 200) # whoami command if re.search(r"\bwhoami\b", input_lower): users = cmd_config.get("whoami", ["www-data"]) if cmd_config else ["www-data"] - return (random.choice(users), "text/plain", 200) + return (secrets.choice(users), "text/plain", 200) # uname command if re.search(r"\buname\b", input_lower): @@ -305,7 +306,7 @@ def generate_command_injection_response(input_text: str) -> Tuple[str, str, int] if cmd_config else ["Linux server 5.4.0 x86_64"] ) - return (random.choice(outputs), "text/plain", 200) + return (secrets.choice(outputs), "text/plain", 200) # pwd command if re.search(r"\bpwd\b", input_lower): @@ -314,16 +315,16 @@ def generate_command_injection_response(input_text: str) -> Tuple[str, str, int] if cmd_config else ["/var/www/html"] ) - return (random.choice(paths), "text/plain", 200) + return (secrets.choice(paths), "text/plain", 200) # ls command if re.search(r"\bls\b", input_lower): if cmd_config and "ls" in cmd_config: - files = random.choice(cmd_config["ls"]) + files = secrets.choice(cmd_config["ls"]) else: files = ["index.php", "config.php", "uploads"] output = "\n".join( - random.sample(files, k=random.randint(3, min(6, len(files)))) + _sysrand.sample(files, k=_sysrand.randint(3, min(6, len(files)))) ) return (output, "text/plain", 200) @@ -351,27 +352,27 @@ def generate_command_injection_response(input_text: str) -> Tuple[str, str, int] if any(cmd in input_lower for cmd in ["wget", "curl", "nc", "netcat"]): if cmd_config and "network_commands" in cmd_config: outputs = cmd_config["network_commands"] - output = random.choice(outputs) + output = secrets.choice(outputs) if "{size}" in output: - size = random.randint( + size = _sysrand.randint( cmd_config.get("download_size_min", 100), cmd_config.get("download_size_max", 10000), ) output = output.replace("{size}", str(size)) else: outputs = ["bash: command not found", "Connection timeout"] - output = random.choice(outputs) + output = secrets.choice(outputs) return (output, "text/plain", 200) # generic outputs if cmd_config and "generic" in cmd_config: generic_outputs = cmd_config["generic"] - output = random.choice(generic_outputs) + output = secrets.choice(generic_outputs) if "{num}" in output: - output = output.replace("{num}", str(random.randint(1, 99))) + output = output.replace("{num}", str(_sysrand.randint(1, 99))) else: generic_outputs = ["", "Command executed successfully", "sh: syntax error"] - output = random.choice(generic_outputs) + output = secrets.choice(generic_outputs) return (output, "text/plain", 200) @@ -414,7 +415,7 @@ def get_random_sql_error( return ("Database error occurred", "text/plain") if not db_type: - db_type = random.choice(list(sql_errors.keys())) + db_type = secrets.choice(list(sql_errors.keys())) db_errors = sql_errors.get(db_type, {}) @@ -429,15 +430,15 @@ def get_random_sql_error( all_errors.extend(error_list) errors = all_errors if all_errors else ["Database error occurred"] - error_message = random.choice(errors) if errors else "Database error occurred" + error_message = secrets.choice(errors) if errors else "Database error occurred" if "{table}" in error_message: tables = ["users", "products", "orders", "customers", "accounts", "sessions"] - error_message = error_message.replace("{table}", random.choice(tables)) + error_message = error_message.replace("{table}", secrets.choice(tables)) if "{column}" in error_message: columns = ["id", "name", "email", "password", "username", "created_at"] - error_message = error_message.replace("{column}", random.choice(columns)) + error_message = error_message.replace("{column}", secrets.choice(columns)) return (error_message, "text/plain") @@ -455,7 +456,7 @@ def generate_sql_error_response( status_code = 500 - if random.random() < 0.3: + if _sysrand.random() < 0.3: status_code = 200 logger.info(f"SQL injection detected: {injection_type}") @@ -475,9 +476,9 @@ def get_sql_response_with_data(path: str, params: str) -> str: "username": random_username(), "email": random_email(), "password_hash": random_password(), - "role": random.choice(["admin", "user", "moderator"]), + "role": secrets.choice(["admin", "user", "moderator"]), } - for i in range(1, random.randint(2, 5)) + for i in range(1, _sysrand.randint(2, 5)) ], } return json.dumps(data, indent=2) @@ -570,7 +571,7 @@ def generate_server_error() -> Tuple[str, str]: if not server_errors: return ("500 Internal Server Error", "text/html") - server_type = random.choice(list(server_errors.keys())) + server_type = secrets.choice(list(server_errors.keys())) server_config = server_errors[server_type] error_codes = { @@ -583,18 +584,18 @@ def generate_server_error() -> Tuple[str, str]: 503: "Service Unavailable", } - code = random.choice(list(error_codes.keys())) + code = secrets.choice(list(error_codes.keys())) message = error_codes[code] template = server_config.get("template", "") - version = random.choice(server_config.get("versions", ["1.0"])) + version = secrets.choice(server_config.get("versions", ["1.0"])) html = template.replace("{code}", str(code)) html = html.replace("{message}", message) html = html.replace("{version}", version) if server_type == "apache": - os = random.choice(server_config.get("os", ["Ubuntu"])) + os = secrets.choice(server_config.get("os", ["Ubuntu"])) html = html.replace("{os}", os) html = html.replace("{host}", "localhost") @@ -611,10 +612,10 @@ def get_server_header(server_type: str = None) -> str: return "nginx/1.18.0" if not server_type: - server_type = random.choice(list(server_errors.keys())) + server_type = secrets.choice(list(server_errors.keys())) server_config = server_errors.get(server_type, {}) - version = random.choice(server_config.get("versions", ["1.0"])) + version = secrets.choice(server_config.get("versions", ["1.0"])) server_headers = { "nginx": f"nginx/{version}", diff --git a/src/tasks/db_retention.py b/src/tasks/db_retention.py index b4feaa7..af803c6 100644 --- a/src/tasks/db_retention.py +++ b/src/tasks/db_retention.py @@ -77,5 +77,5 @@ def main(): finally: try: db.close_session() - except Exception: - pass + except Exception as e: + app_logger.error(f"Error closing DB session after retention cleanup: {e}") diff --git a/src/tracker.py b/src/tracker.py index c52cf24..c5683b7 100644 --- a/src/tracker.py +++ b/src/tracker.py @@ -1,12 +1,15 @@ #!/usr/bin/env python3 from typing import Dict, Tuple, Optional +import logging import re import urllib.parse from wordlists import get_wordlists from database import get_database, DatabaseManager +logger = logging.getLogger("krawl") + # Module-level singleton for background task access _tracker_instance: "AccessTracker | None" = None @@ -103,9 +106,8 @@ class AccessTracker: if self._db_manager is None: try: self._db_manager = get_database() - except Exception: - # Database not initialized, persistence disabled - pass + except Exception as e: + logger.error(f"Failed to initialize database manager: {e}") return self._db_manager def parse_credentials(self, post_data: str) -> Tuple[str, str]: @@ -206,9 +208,8 @@ class AccessTracker: self.db.persist_credential( ip=ip, path=path, username=username, password=password ) - except Exception: - # Don't crash if database persistence fails - pass + except Exception as e: + logger.error(f"Failed to persist credential attempt: {e}") def record_access( self, @@ -271,9 +272,8 @@ class AccessTracker: attack_types=attack_findings if attack_findings else None, raw_request=raw_request if raw_request else None, ) - except Exception: - # Don't crash if database persistence fails - pass + except Exception as e: + logger.error(f"Failed to persist access record: {e}") def detect_attack_type(self, data: str) -> list[str]: """