chore: update Dockerfile and requirements for Python 3.13, enhance error handling in config and tracker modules
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import re
|
||||
import random
|
||||
import secrets
|
||||
import logging
|
||||
import json
|
||||
from typing import Optional, Tuple, Dict
|
||||
@@ -9,6 +9,7 @@ from generators import random_username, random_password, random_email
|
||||
from wordlists import get_wordlists
|
||||
|
||||
logger = logging.getLogger("krawl")
|
||||
_sysrand = secrets.SystemRandom()
|
||||
|
||||
|
||||
def detect_path_traversal(path: str, query: str = "", body: str = "") -> bool:
|
||||
@@ -86,7 +87,7 @@ def generate_fake_passwd() -> str:
|
||||
shells = passwd_config.get("shells", ["/bin/bash"])
|
||||
|
||||
fake_users = [
|
||||
f"{random_username()}:x:{random.randint(uid_min, uid_max)}:{random.randint(gid_min, gid_max)}::/home/{random_username()}:{random.choice(shells)}"
|
||||
f"{random_username()}:x:{_sysrand.randint(uid_min, uid_max)}:{_sysrand.randint(gid_min, gid_max)}::/home/{random_username()}:{secrets.choice(shells)}"
|
||||
for _ in range(3)
|
||||
]
|
||||
|
||||
@@ -108,7 +109,7 @@ def generate_fake_shadow() -> str:
|
||||
hash_length = shadow_config.get("hash_length", 86)
|
||||
|
||||
fake_entries = [
|
||||
f"{random_username()}:{hash_prefix}{''.join(random.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=salt_length))}${''.join(random.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=hash_length))}:19000:0:99999:7:::"
|
||||
f"{random_username()}:{hash_prefix}{''.join(_sysrand.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=salt_length))}${''.join(_sysrand.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=hash_length))}:19000:0:99999:7:::"
|
||||
for _ in range(3)
|
||||
]
|
||||
|
||||
@@ -147,9 +148,9 @@ SECRET_TOKEN=fake_secret_token_xyz""",
|
||||
|
||||
return f"""# Configuration File
|
||||
api_endpoint = https://api.example.com
|
||||
api_key = fake_key_{random.randint(1000, 9999)}
|
||||
api_key = fake_key_{_sysrand.randint(1000, 9999)}
|
||||
database_url = mysql://user:fake_pass@localhost/db
|
||||
secret = fake_secret_{random.randint(10000, 99999)}
|
||||
secret = fake_secret_{_sysrand.randint(10000, 99999)}
|
||||
"""
|
||||
|
||||
|
||||
@@ -167,7 +168,7 @@ def generate_fake_directory_listing(path: str) -> str:
|
||||
|
||||
directories = [(d["name"], d["size"], d["perms"]) for d in fake_dirs]
|
||||
files = [
|
||||
(f["name"], str(random.randint(f["size_min"], f["size_max"])), f["perms"])
|
||||
(f["name"], str(_sysrand.randint(f["size_min"], f["size_max"])), f["perms"])
|
||||
for f in fake_files
|
||||
]
|
||||
|
||||
@@ -208,7 +209,7 @@ def generate_path_traversal_response(path: str) -> Tuple[str, str, int]:
|
||||
|
||||
if "proc/self" in path_lower:
|
||||
logger.debug("Returning fake proc info")
|
||||
return (f"{random.randint(1000, 9999)}", "text/plain", 200)
|
||||
return (f"{_sysrand.randint(1000, 9999)}", "text/plain", 200)
|
||||
|
||||
logger.debug("Returning fake directory listing")
|
||||
return (generate_fake_directory_listing(path), "text/html", 200)
|
||||
@@ -246,7 +247,7 @@ def generate_xxe_response(body: str) -> Tuple[str, str, int]:
|
||||
if xxe_config and "entity_processed" in xxe_config:
|
||||
template = xxe_config["entity_processed"]["template"]
|
||||
entity_values = xxe_config["entity_processed"]["entity_values"]
|
||||
entity_value = random.choice(entity_values)
|
||||
entity_value = secrets.choice(entity_values)
|
||||
response = template.replace("{entity_value}", entity_value)
|
||||
else:
|
||||
response = """<?xml version="1.0"?>
|
||||
@@ -260,7 +261,7 @@ def generate_xxe_response(body: str) -> Tuple[str, str, int]:
|
||||
if xxe_config and "error" in xxe_config:
|
||||
template = xxe_config["error"]["template"]
|
||||
messages = xxe_config["error"]["messages"]
|
||||
message = random.choice(messages)
|
||||
message = secrets.choice(messages)
|
||||
response = template.replace("{message}", message)
|
||||
else:
|
||||
response = """<?xml version="1.0"?>
|
||||
@@ -281,22 +282,22 @@ def generate_command_injection_response(input_text: str) -> Tuple[str, str, int]
|
||||
# id command
|
||||
if re.search(r"\bid\b", input_lower):
|
||||
if cmd_config and "id" in cmd_config:
|
||||
uid = random.randint(
|
||||
uid = _sysrand.randint(
|
||||
cmd_config.get("uid_min", 1000), cmd_config.get("uid_max", 2000)
|
||||
)
|
||||
gid = random.randint(
|
||||
gid = _sysrand.randint(
|
||||
cmd_config.get("gid_min", 1000), cmd_config.get("gid_max", 2000)
|
||||
)
|
||||
template = random.choice(cmd_config["id"])
|
||||
template = secrets.choice(cmd_config["id"])
|
||||
output = template.replace("{uid}", str(uid)).replace("{gid}", str(gid))
|
||||
else:
|
||||
output = f"uid={random.randint(1000, 2000)}(www-data) gid={random.randint(1000, 2000)}(www-data) groups={random.randint(1000, 2000)}(www-data)"
|
||||
output = f"uid={_sysrand.randint(1000, 2000)}(www-data) gid={_sysrand.randint(1000, 2000)}(www-data) groups={_sysrand.randint(1000, 2000)}(www-data)"
|
||||
return (output, "text/plain", 200)
|
||||
|
||||
# whoami command
|
||||
if re.search(r"\bwhoami\b", input_lower):
|
||||
users = cmd_config.get("whoami", ["www-data"]) if cmd_config else ["www-data"]
|
||||
return (random.choice(users), "text/plain", 200)
|
||||
return (secrets.choice(users), "text/plain", 200)
|
||||
|
||||
# uname command
|
||||
if re.search(r"\buname\b", input_lower):
|
||||
@@ -305,7 +306,7 @@ def generate_command_injection_response(input_text: str) -> Tuple[str, str, int]
|
||||
if cmd_config
|
||||
else ["Linux server 5.4.0 x86_64"]
|
||||
)
|
||||
return (random.choice(outputs), "text/plain", 200)
|
||||
return (secrets.choice(outputs), "text/plain", 200)
|
||||
|
||||
# pwd command
|
||||
if re.search(r"\bpwd\b", input_lower):
|
||||
@@ -314,16 +315,16 @@ def generate_command_injection_response(input_text: str) -> Tuple[str, str, int]
|
||||
if cmd_config
|
||||
else ["/var/www/html"]
|
||||
)
|
||||
return (random.choice(paths), "text/plain", 200)
|
||||
return (secrets.choice(paths), "text/plain", 200)
|
||||
|
||||
# ls command
|
||||
if re.search(r"\bls\b", input_lower):
|
||||
if cmd_config and "ls" in cmd_config:
|
||||
files = random.choice(cmd_config["ls"])
|
||||
files = secrets.choice(cmd_config["ls"])
|
||||
else:
|
||||
files = ["index.php", "config.php", "uploads"]
|
||||
output = "\n".join(
|
||||
random.sample(files, k=random.randint(3, min(6, len(files))))
|
||||
_sysrand.sample(files, k=_sysrand.randint(3, min(6, len(files))))
|
||||
)
|
||||
return (output, "text/plain", 200)
|
||||
|
||||
@@ -351,27 +352,27 @@ def generate_command_injection_response(input_text: str) -> Tuple[str, str, int]
|
||||
if any(cmd in input_lower for cmd in ["wget", "curl", "nc", "netcat"]):
|
||||
if cmd_config and "network_commands" in cmd_config:
|
||||
outputs = cmd_config["network_commands"]
|
||||
output = random.choice(outputs)
|
||||
output = secrets.choice(outputs)
|
||||
if "{size}" in output:
|
||||
size = random.randint(
|
||||
size = _sysrand.randint(
|
||||
cmd_config.get("download_size_min", 100),
|
||||
cmd_config.get("download_size_max", 10000),
|
||||
)
|
||||
output = output.replace("{size}", str(size))
|
||||
else:
|
||||
outputs = ["bash: command not found", "Connection timeout"]
|
||||
output = random.choice(outputs)
|
||||
output = secrets.choice(outputs)
|
||||
return (output, "text/plain", 200)
|
||||
|
||||
# generic outputs
|
||||
if cmd_config and "generic" in cmd_config:
|
||||
generic_outputs = cmd_config["generic"]
|
||||
output = random.choice(generic_outputs)
|
||||
output = secrets.choice(generic_outputs)
|
||||
if "{num}" in output:
|
||||
output = output.replace("{num}", str(random.randint(1, 99)))
|
||||
output = output.replace("{num}", str(_sysrand.randint(1, 99)))
|
||||
else:
|
||||
generic_outputs = ["", "Command executed successfully", "sh: syntax error"]
|
||||
output = random.choice(generic_outputs)
|
||||
output = secrets.choice(generic_outputs)
|
||||
|
||||
return (output, "text/plain", 200)
|
||||
|
||||
@@ -414,7 +415,7 @@ def get_random_sql_error(
|
||||
return ("Database error occurred", "text/plain")
|
||||
|
||||
if not db_type:
|
||||
db_type = random.choice(list(sql_errors.keys()))
|
||||
db_type = secrets.choice(list(sql_errors.keys()))
|
||||
|
||||
db_errors = sql_errors.get(db_type, {})
|
||||
|
||||
@@ -429,15 +430,15 @@ def get_random_sql_error(
|
||||
all_errors.extend(error_list)
|
||||
errors = all_errors if all_errors else ["Database error occurred"]
|
||||
|
||||
error_message = random.choice(errors) if errors else "Database error occurred"
|
||||
error_message = secrets.choice(errors) if errors else "Database error occurred"
|
||||
|
||||
if "{table}" in error_message:
|
||||
tables = ["users", "products", "orders", "customers", "accounts", "sessions"]
|
||||
error_message = error_message.replace("{table}", random.choice(tables))
|
||||
error_message = error_message.replace("{table}", secrets.choice(tables))
|
||||
|
||||
if "{column}" in error_message:
|
||||
columns = ["id", "name", "email", "password", "username", "created_at"]
|
||||
error_message = error_message.replace("{column}", random.choice(columns))
|
||||
error_message = error_message.replace("{column}", secrets.choice(columns))
|
||||
|
||||
return (error_message, "text/plain")
|
||||
|
||||
@@ -455,7 +456,7 @@ def generate_sql_error_response(
|
||||
|
||||
status_code = 500
|
||||
|
||||
if random.random() < 0.3:
|
||||
if _sysrand.random() < 0.3:
|
||||
status_code = 200
|
||||
|
||||
logger.info(f"SQL injection detected: {injection_type}")
|
||||
@@ -475,9 +476,9 @@ def get_sql_response_with_data(path: str, params: str) -> str:
|
||||
"username": random_username(),
|
||||
"email": random_email(),
|
||||
"password_hash": random_password(),
|
||||
"role": random.choice(["admin", "user", "moderator"]),
|
||||
"role": secrets.choice(["admin", "user", "moderator"]),
|
||||
}
|
||||
for i in range(1, random.randint(2, 5))
|
||||
for i in range(1, _sysrand.randint(2, 5))
|
||||
],
|
||||
}
|
||||
return json.dumps(data, indent=2)
|
||||
@@ -570,7 +571,7 @@ def generate_server_error() -> Tuple[str, str]:
|
||||
if not server_errors:
|
||||
return ("500 Internal Server Error", "text/html")
|
||||
|
||||
server_type = random.choice(list(server_errors.keys()))
|
||||
server_type = secrets.choice(list(server_errors.keys()))
|
||||
server_config = server_errors[server_type]
|
||||
|
||||
error_codes = {
|
||||
@@ -583,18 +584,18 @@ def generate_server_error() -> Tuple[str, str]:
|
||||
503: "Service Unavailable",
|
||||
}
|
||||
|
||||
code = random.choice(list(error_codes.keys()))
|
||||
code = secrets.choice(list(error_codes.keys()))
|
||||
message = error_codes[code]
|
||||
|
||||
template = server_config.get("template", "")
|
||||
version = random.choice(server_config.get("versions", ["1.0"]))
|
||||
version = secrets.choice(server_config.get("versions", ["1.0"]))
|
||||
|
||||
html = template.replace("{code}", str(code))
|
||||
html = html.replace("{message}", message)
|
||||
html = html.replace("{version}", version)
|
||||
|
||||
if server_type == "apache":
|
||||
os = random.choice(server_config.get("os", ["Ubuntu"]))
|
||||
os = secrets.choice(server_config.get("os", ["Ubuntu"]))
|
||||
html = html.replace("{os}", os)
|
||||
html = html.replace("{host}", "localhost")
|
||||
|
||||
@@ -611,10 +612,10 @@ def get_server_header(server_type: str = None) -> str:
|
||||
return "nginx/1.18.0"
|
||||
|
||||
if not server_type:
|
||||
server_type = random.choice(list(server_errors.keys()))
|
||||
server_type = secrets.choice(list(server_errors.keys()))
|
||||
|
||||
server_config = server_errors.get(server_type, {})
|
||||
version = random.choice(server_config.get("versions", ["1.0"]))
|
||||
version = secrets.choice(server_config.get("versions", ["1.0"]))
|
||||
|
||||
server_headers = {
|
||||
"nginx": f"nginx/{version}",
|
||||
|
||||
Reference in New Issue
Block a user