linted code

This commit is contained in:
carnivuth
2026-02-15 15:10:41 +01:00
parent 815da4300b
commit 396b9b1710
10 changed files with 292 additions and 198 deletions

View File

@@ -8,20 +8,20 @@ from typing import Optional, Tuple, Dict
from generators import random_username, random_password, random_email
from wordlists import get_wordlists
logger = logging.getLogger('krawl')
logger = logging.getLogger("krawl")
def detect_path_traversal(path: str, query: str = "", body: str = "") -> bool:
"""Detect path traversal attempts in request"""
full_input = f"{path} {query} {body}"
wl = get_wordlists()
pattern = wl.attack_patterns.get("path_traversal", "")
if not pattern:
# Fallback pattern if wordlists not loaded
pattern = r'(\.\.|%2e%2e|/etc/passwd|/etc/shadow)'
pattern = r"(\.\.|%2e%2e|/etc/passwd|/etc/shadow)"
if re.search(pattern, full_input, re.IGNORECASE):
logger.debug(f"Path traversal detected in {full_input[:100]}")
return True
@@ -32,14 +32,14 @@ def detect_xxe_injection(body: str) -> bool:
"""Detect XXE injection attempts in XML payloads"""
if not body:
return False
wl = get_wordlists()
pattern = wl.attack_patterns.get("xxe_injection", "")
if not pattern:
# Fallback pattern if wordlists not loaded
pattern = r'(<!ENTITY|<!DOCTYPE|SYSTEM|PUBLIC|file://)'
pattern = r"(<!ENTITY|<!DOCTYPE|SYSTEM|PUBLIC|file://)"
if re.search(pattern, body, re.IGNORECASE):
return True
return False
@@ -48,21 +48,23 @@ def detect_xxe_injection(body: str) -> bool:
def detect_command_injection(path: str, query: str = "", body: str = "") -> bool:
"""Detect command injection attempts"""
full_input = f"{path} {query} {body}"
logger.debug(f"[CMD_INJECTION_CHECK] path='{path}' query='{query}' body='{body[:50] if body else ''}'")
logger.debug(
f"[CMD_INJECTION_CHECK] path='{path}' query='{query}' body='{body[:50] if body else ''}'"
)
logger.debug(f"[CMD_INJECTION_CHECK] full_input='{full_input[:200]}'")
wl = get_wordlists()
pattern = wl.attack_patterns.get("command_injection", "")
if not pattern:
# Fallback pattern if wordlists not loaded
pattern = r'(cmd=|exec=|command=|&&|;|\||whoami|id|uname|cat|ls)'
pattern = r"(cmd=|exec=|command=|&&|;|\||whoami|id|uname|cat|ls)"
if re.search(pattern, full_input, re.IGNORECASE):
logger.debug(f"[CMD_INJECTION_CHECK] Command injection pattern matched!")
return True
logger.debug(f"[CMD_INJECTION_CHECK] No command injection detected")
return False
@@ -71,23 +73,23 @@ def generate_fake_passwd() -> str:
"""Generate fake /etc/passwd content"""
wl = get_wordlists()
passwd_config = wl.fake_passwd
if not passwd_config:
# Fallback
return "root:x:0:0:root:/root:/bin/bash\nwww-data:x:33:33:www-data:/var/www:/usr/sbin/nologin"
users = passwd_config.get("system_users", [])
uid_min = passwd_config.get("uid_min", 1000)
uid_max = passwd_config.get("uid_max", 2000)
gid_min = passwd_config.get("gid_min", 1000)
gid_max = passwd_config.get("gid_max", 2000)
shells = passwd_config.get("shells", ["/bin/bash"])
fake_users = [
f"{random_username()}:x:{random.randint(uid_min, uid_max)}:{random.randint(gid_min, gid_max)}::/home/{random_username()}:{random.choice(shells)}"
for _ in range(3)
]
return "\n".join(users + fake_users)
@@ -95,21 +97,21 @@ def generate_fake_shadow() -> str:
"""Generate fake /etc/shadow content"""
wl = get_wordlists()
shadow_config = wl.fake_shadow
if not shadow_config:
# Fallback
return "root:$6$rounds=656000$fake_salt_here$fake_hash_data:19000:0:99999:7:::"
entries = shadow_config.get("system_entries", [])
hash_prefix = shadow_config.get("hash_prefix", "$6$rounds=656000$")
salt_length = shadow_config.get("salt_length", 16)
hash_length = shadow_config.get("hash_length", 86)
fake_entries = [
f"{random_username()}:{hash_prefix}{''.join(random.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=salt_length))}${''.join(random.choices('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=hash_length))}:19000:0:99999:7:::"
for _ in range(3)
]
return "\n".join(entries + fake_entries)
@@ -138,11 +140,11 @@ DB_PASSWORD=fake_env_password_789
API_KEY=fake_api_key_abc123
SECRET_TOKEN=fake_secret_token_xyz""",
}
for key in configs:
if key.lower() in filename.lower():
return configs[key]
return f"""# Configuration File
api_endpoint = https://api.example.com
api_key = fake_key_{random.randint(1000, 9999)}
@@ -155,57 +157,59 @@ def generate_fake_directory_listing(path: str) -> str:
"""Generate fake directory listing"""
wl = get_wordlists()
dir_config = wl.directory_listing
if not dir_config:
# Fallback
return f"<html><head><title>Index of {path}</title></head><body><h1>Index of {path}</h1></body></html>"
fake_dirs = dir_config.get("fake_directories", [])
fake_files = dir_config.get("fake_files", [])
directories = [(d["name"], d["size"], d["perms"]) for d in fake_dirs]
files = [
(f["name"], str(random.randint(f["size_min"], f["size_max"])), f["perms"])
for f in fake_files
]
html = f"<html><head><title>Index of {path}</title></head><body>"
html += f"<h1>Index of {path}</h1><hr><pre>"
html += f"{'Name':<40} {'Size':<10} {'Permissions':<15}\n"
html += "-" * 70 + "\n"
for name, size, perms in directories:
html += f"{name + '/':<40} {size:<10} {perms:<15}\n"
for name, size, perms in files:
html += f"{name:<40} {size:<10} {perms:<15}\n"
html += "</pre><hr></body></html>"
return html
def generate_path_traversal_response(path: str) -> Tuple[str, str, int]:
"""Generate fake response for path traversal attempts"""
path_lower = path.lower()
logger.debug(f"Generating path traversal response for: {path}")
if "passwd" in path_lower:
logger.debug("Returning fake passwd file")
return (generate_fake_passwd(), "text/plain", 200)
if "shadow" in path_lower:
logger.debug("Returning fake shadow file")
return (generate_fake_shadow(), "text/plain", 200)
if any(ext in path_lower for ext in [".conf", ".config", ".php", ".env", ".properties"]):
if any(
ext in path_lower for ext in [".conf", ".config", ".php", ".env", ".properties"]
):
logger.debug("Returning fake config file")
return (generate_fake_config_file(path), "text/plain", 200)
if "proc/self" in path_lower:
logger.debug("Returning fake proc info")
return (f"{random.randint(1000, 9999)}", "text/plain", 200)
logger.debug("Returning fake directory listing")
return (generate_fake_directory_listing(path), "text/html", 200)
@@ -214,15 +218,19 @@ def generate_xxe_response(body: str) -> Tuple[str, str, int]:
"""Generate fake response for XXE injection attempts"""
wl = get_wordlists()
xxe_config = wl.xxe_responses
if "file://" in body:
if "passwd" in body:
content = generate_fake_passwd()
elif "shadow" in body:
content = generate_fake_shadow()
else:
content = xxe_config.get("default_content", "root:x:0:0:root:/root:/bin/bash") if xxe_config else "root:x:0:0:root:/root:/bin/bash"
content = (
xxe_config.get("default_content", "root:x:0:0:root:/root:/bin/bash")
if xxe_config
else "root:x:0:0:root:/root:/bin/bash"
)
if xxe_config and "file_access" in xxe_config:
template = xxe_config["file_access"]["template"]
response = template.replace("{content}", content)
@@ -233,7 +241,7 @@ def generate_xxe_response(body: str) -> Tuple[str, str, int]:
<data>{content}</data>
</response>"""
return (response, "application/xml", 200)
if "ENTITY" in body:
if xxe_config and "entity_processed" in xxe_config:
template = xxe_config["entity_processed"]["template"]
@@ -248,7 +256,7 @@ def generate_xxe_response(body: str) -> Tuple[str, str, int]:
<entity_value>fake_entity_content_12345</entity_value>
</response>"""
return (response, "application/xml", 200)
if xxe_config and "error" in xxe_config:
template = xxe_config["error"]["template"]
messages = xxe_config["error"]["messages"]
@@ -267,73 +275,94 @@ def generate_command_injection_response(input_text: str) -> Tuple[str, str, int]
"""Generate fake command execution output"""
wl = get_wordlists()
cmd_config = wl.command_outputs
input_lower = input_text.lower()
# id command
if re.search(r'\bid\b', input_lower):
if re.search(r"\bid\b", input_lower):
if cmd_config and "id" in cmd_config:
uid = random.randint(cmd_config.get("uid_min", 1000), cmd_config.get("uid_max", 2000))
gid = random.randint(cmd_config.get("gid_min", 1000), cmd_config.get("gid_max", 2000))
uid = random.randint(
cmd_config.get("uid_min", 1000), cmd_config.get("uid_max", 2000)
)
gid = random.randint(
cmd_config.get("gid_min", 1000), cmd_config.get("gid_max", 2000)
)
template = random.choice(cmd_config["id"])
output = template.replace("{uid}", str(uid)).replace("{gid}", str(gid))
else:
output = f"uid={random.randint(1000, 2000)}(www-data) gid={random.randint(1000, 2000)}(www-data) groups={random.randint(1000, 2000)}(www-data)"
return (output, "text/plain", 200)
# whoami command
if re.search(r'\bwhoami\b', input_lower):
if re.search(r"\bwhoami\b", input_lower):
users = cmd_config.get("whoami", ["www-data"]) if cmd_config else ["www-data"]
return (random.choice(users), "text/plain", 200)
# uname command
if re.search(r'\buname\b', input_lower):
outputs = cmd_config.get("uname", ["Linux server 5.4.0 x86_64"]) if cmd_config else ["Linux server 5.4.0 x86_64"]
if re.search(r"\buname\b", input_lower):
outputs = (
cmd_config.get("uname", ["Linux server 5.4.0 x86_64"])
if cmd_config
else ["Linux server 5.4.0 x86_64"]
)
return (random.choice(outputs), "text/plain", 200)
# pwd command
if re.search(r'\bpwd\b', input_lower):
paths = cmd_config.get("pwd", ["/var/www/html"]) if cmd_config else ["/var/www/html"]
if re.search(r"\bpwd\b", input_lower):
paths = (
cmd_config.get("pwd", ["/var/www/html"])
if cmd_config
else ["/var/www/html"]
)
return (random.choice(paths), "text/plain", 200)
# ls command
if re.search(r'\bls\b', input_lower):
if re.search(r"\bls\b", input_lower):
if cmd_config and "ls" in cmd_config:
files = random.choice(cmd_config["ls"])
else:
files = ["index.php", "config.php", "uploads"]
output = "\n".join(random.sample(files, k=random.randint(3, min(6, len(files)))))
output = "\n".join(
random.sample(files, k=random.randint(3, min(6, len(files))))
)
return (output, "text/plain", 200)
# cat command
if re.search(r'\bcat\b', input_lower):
if re.search(r"\bcat\b", input_lower):
if "passwd" in input_lower:
return (generate_fake_passwd(), "text/plain", 200)
if "shadow" in input_lower:
return (generate_fake_shadow(), "text/plain", 200)
cat_content = cmd_config.get("cat_config", "<?php\n$config = 'fake';\n?>") if cmd_config else "<?php\n$config = 'fake';\n?>"
cat_content = (
cmd_config.get("cat_config", "<?php\n$config = 'fake';\n?>")
if cmd_config
else "<?php\n$config = 'fake';\n?>"
)
return (cat_content, "text/plain", 200)
# echo command
if re.search(r'\becho\b', input_lower):
match = re.search(r'echo\s+(.+?)(?:[;&|]|$)', input_text, re.IGNORECASE)
if re.search(r"\becho\b", input_lower):
match = re.search(r"echo\s+(.+?)(?:[;&|]|$)", input_text, re.IGNORECASE)
if match:
return (match.group(1).strip('"\''), "text/plain", 200)
return (match.group(1).strip("\"'"), "text/plain", 200)
return ("", "text/plain", 200)
# network commands
if any(cmd in input_lower for cmd in ['wget', 'curl', 'nc', 'netcat']):
if any(cmd in input_lower for cmd in ["wget", "curl", "nc", "netcat"]):
if cmd_config and "network_commands" in cmd_config:
outputs = cmd_config["network_commands"]
output = random.choice(outputs)
if "{size}" in output:
size = random.randint(cmd_config.get("download_size_min", 100), cmd_config.get("download_size_max", 10000))
size = random.randint(
cmd_config.get("download_size_min", 100),
cmd_config.get("download_size_max", 10000),
)
output = output.replace("{size}", str(size))
else:
outputs = ["bash: command not found", "Connection timeout"]
output = random.choice(outputs)
return (output, "text/plain", 200)
# generic outputs
if cmd_config and "generic" in cmd_config:
generic_outputs = cmd_config["generic"]
@@ -343,7 +372,7 @@ def generate_command_injection_response(input_text: str) -> Tuple[str, str, int]
else:
generic_outputs = ["", "Command executed successfully", "sh: syntax error"]
output = random.choice(generic_outputs)
return (output, "text/plain", 200)
@@ -374,7 +403,9 @@ def detect_sql_injection_pattern(query_string: str) -> Optional[str]:
return None
def get_random_sql_error(db_type: str = None, injection_type: str = None) -> Tuple[str, str]:
def get_random_sql_error(
db_type: str = None, injection_type: str = None
) -> Tuple[str, str]:
"""Generate a random SQL error message"""
wl = get_wordlists()
sql_errors = wl.sql_errors
@@ -411,7 +442,9 @@ def get_random_sql_error(db_type: str = None, injection_type: str = None) -> Tup
return (error_message, "text/plain")
def generate_sql_error_response(query_string: str, db_type: str = None) -> Tuple[Optional[str], Optional[str], Optional[int]]:
def generate_sql_error_response(
query_string: str, db_type: str = None
) -> Tuple[Optional[str], Optional[str], Optional[int]]:
"""Generate SQL error response for detected injection attempts"""
injection_type = detect_sql_injection_pattern(query_string)
@@ -593,25 +626,29 @@ def get_server_header(server_type: str = None) -> str:
return server_headers.get(server_type, "nginx/1.18.0")
def detect_and_respond_deception(path: str, query: str = "", body: str = "", method: str = "GET") -> Optional[Tuple[str, str, int]]:
def detect_and_respond_deception(
path: str, query: str = "", body: str = "", method: str = "GET"
) -> Optional[Tuple[str, str, int]]:
"""
Main deception detection and response function.
Returns (response_body, content_type, status_code) if deception should be applied, None otherwise.
"""
logger.debug(f"Checking deception for {method} {path} query={query[:50] if query else 'empty'}")
logger.debug(
f"Checking deception for {method} {path} query={query[:50] if query else 'empty'}"
)
if detect_path_traversal(path, query, body):
logger.info(f"Path traversal detected in: {path}")
return generate_path_traversal_response(f"{path}?{query}" if query else path)
if body and detect_xxe_injection(body):
logger.info(f"XXE injection detected")
return generate_xxe_response(body)
if detect_command_injection(path, query, body):
logger.info(f"Command injection detected in: {path}")
full_input = f"{path} {query} {body}"
return generate_command_injection_response(full_input)
return None