Update json2traefik.py

This commit is contained in:
fab 2025-02-28 11:23:08 +01:00 committed by GitHub
parent 8277d8bc13
commit df23dd0d04
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,114 +1,154 @@
import os
import json
from pathlib import Path
from typing import List, Dict, Set
import re
import logging
from pathlib import Path
from typing import List, Dict, Set, Tuple, Optional
from functools import lru_cache
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
# --- Configuration ---
LOG_LEVEL = logging.INFO # DEBUG, INFO, WARNING, ERROR
INPUT_FILE = Path(os.getenv("INPUT_FILE", "owasp_rules.json"))
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "waf_patterns/traefik"))
MIDDLEWARE_FILE = OUTPUT_DIR / "middleware.toml"
# Constants
OUTPUT_DIR = Path("waf_patterns/traefik/") # Output directory for Traefik configs
# Unsupported patterns (for Traefik's badbot plugin, which uses regex)
UNSUPPORTED_PATTERNS = [
"@pmFromFile", # No file lookups
# Add other unsupported operators/patterns here.
]
# --- Logging Setup ---
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def load_owasp_rules(file_path: Path) -> List[Dict]:
"""
Load OWASP rules from a JSON file.
Args:
file_path (Path): Path to the JSON file containing OWASP rules.
Returns:
List[Dict]: List of OWASP rules.
Raises:
SystemExit: If the file is not found or contains invalid JSON.
"""
@lru_cache(maxsize=256)
def validate_regex(pattern: str) -> bool:
"""Validates a regex pattern."""
try:
with open(file_path, "r") as f:
return json.load(f)
except FileNotFoundError:
logging.error(f"[-] Error: File '{file_path}' not found.")
exit(1)
except json.JSONDecodeError:
logging.error(f"[-] Error: Invalid JSON in '{file_path}'.")
exit(1)
except Exception as e:
logging.error(f"[-] Unexpected error loading OWASP rules: {e}")
exit(1)
re.compile(pattern)
return True
except re.error as e:
logger.warning(f"Invalid regex: {pattern} - {e}")
return False
def _sanitize_pattern(pattern: str) -> str:
"""Internal helper for pattern sanitization."""
pattern = pattern.replace("@rx ", "").strip()
pattern = re.sub(r"\(\?i\)", "", pattern) # Remove case-insensitive flag
# Convert $ to \$
pattern = pattern.replace("$", r"\$")
# Convert { or { to {
pattern = re.sub(r"&l(?:brace|cub);?", r"{", pattern)
pattern = re.sub(r"&r(?:brace|cub);?", r"}", pattern)
# Remove unnecessary \.*
pattern = re.sub(r"\\\.\*", r"\.*", pattern)
pattern = re.sub(r"(?<!\\)\.(?![\w])", r"\.", pattern) # Escape dots
# Replace non-capturing groups (?:...) with capturing groups (...)
pattern = re.sub(r"\(\?:", "(", pattern)
return pattern
def sanitize_pattern(pattern: str) -> Optional[str]:
"""Sanitizes a pattern for use with Traefik's badbot plugin."""
for unsupported in UNSUPPORTED_PATTERNS:
if unsupported in pattern:
logger.warning(f"Skipping unsupported pattern: {pattern}")
return None
# if it is not a string comparison we use regex
if not any(op in pattern for op in ["@streq", "@contains", "!@eq", "!@within", "@lt", "@ge", "@gt", "@eq", "@ipMatch", "@endsWith"]):
return _sanitize_pattern(pattern) # return the regex
else: # if it is not a regex
return None
def generate_traefik_conf(rules: List[Dict]) -> None:
"""
Generate Traefik middleware configuration from OWASP rules.
Args:
rules (List[Dict]): List of OWASP rules.
Raises:
SystemExit: If there is an error writing to the output file.
"""
try:
# Ensure the output directory exists
"""Generates the Traefik middleware configuration (middleware.toml)."""
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
config_file = OUTPUT_DIR / "middleware.toml"
with open(config_file, "w") as f:
try:
with open(MIDDLEWARE_FILE, "w", encoding="utf-8") as f:
f.write("[http.middlewares]\n\n")
# Group rules by category
grouped_rules: Dict[str, List[Dict]] = {}
# Group rules by category AND location. This is important!
categorized_rules: Dict[str, Dict[str, Set[str]]] = {}
for rule in rules:
category = rule.get("category", "default")
if category not in grouped_rules:
grouped_rules[category] = []
grouped_rules[category].append(rule)
rule_id = rule.get("id", "no_id")
category = rule.get("category", "generic").lower()
location = rule.get("location", "user-agent").lower() # default value!
pattern = rule["pattern"]
severity = rule.get("severity", "medium").lower() # default
# Write grouped rules to the TOML file
for category, rules_in_category in grouped_rules.items():
f.write(f"[http.middlewares.bad_bot_block_{category}]\n")
f.write(f" [http.middlewares.bad_bot_block_{category}.plugin.badbot]\n")
# Sanitize, but *only* if the location is User-Agent.
# We *don't* want to apply regexes to other locations here.
if location == "user-agent":
sanitized_pattern = sanitize_pattern(pattern)
if not sanitized_pattern or not validate_regex(sanitized_pattern):
continue # skip
else:
logger.warning(f"Skipping rule with unsupported location '{location}' for Traefik: {rule_id}")
continue
# Initialize category/location if needed
if category not in categorized_rules:
categorized_rules[category] = {}
if location not in categorized_rules[category]:
categorized_rules[category][location] = set() # Use a set
# Add the *escaped* pattern to the set.
categorized_rules[category][location].add(sanitized_pattern)
# Write the configuration
for category, location_rules in categorized_rules.items():
for location, patterns in location_rules.items():
# Create a unique middleware name
middleware_name = f"waf_{category}_{location}".replace("-", "_")
f.write(f"[http.middlewares.{middleware_name}]\n")
f.write(f" [http.middlewares.{middleware_name}.plugin.badbot]\n")
f.write(" userAgent = [\n")
# Use a set to deduplicate rules
unique_rules: Set[str] = set()
for rule in rules_in_category:
# Escape special characters in the pattern
pattern = rule["pattern"].replace('"', '\\"').replace("\\", "\\\\")
unique_rules.add(f' "{pattern}"')
f.write(",\n".join(unique_rules) + "\n")
# Properly escape for TOML (and for regex within the string)
for pattern in patterns:
# No extra escape for TOML, because we write the full regex
f.write(f' "{pattern}",\n')
f.write(" ]\n\n")
logging.info(f"[+] Traefik WAF rules generated at {config_file}")
except IOError as e:
logging.error(f"[-] Error writing to file: {e}")
exit(1)
except Exception as e:
logging.error(f"[-] Unexpected error generating Traefik config: {e}")
exit(1)
logger.info(f"Generated Traefik middleware file: {MIDDLEWARE_FILE}")
except OSError as e:
logger.error(f"Error writing to {MIDDLEWARE_FILE}: {e}")
raise
def main() -> None:
"""
Main function to execute the script.
"""
def load_owasp_rules(file_path: Path) -> List[Dict]:
"""Loads OWASP rules from a JSON file."""
try:
logging.info("[*] Loading OWASP rules...")
owasp_rules = load_owasp_rules(Path("owasp_rules.json"))
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError, OSError) as e:
logger.error(f"Error loading rules from {file_path}: {e}")
raise
logging.info(f"[*] Generating Traefik WAF configs from {len(owasp_rules)} rules...")
def main():
"""Main function."""
try:
logger.info("Loading OWASP rules...")
owasp_rules = load_owasp_rules(INPUT_FILE)
logger.info(f"Loaded {len(owasp_rules)} rules.")
logger.info("Generating Traefik WAF configuration...")
generate_traefik_conf(owasp_rules)
logger.info("Traefik WAF generation complete.")
logging.info("[✔] Traefik WAF configurations generated successfully.")
except Exception as e:
logging.critical(f"[!] Script failed: {e}")
logger.critical(f"Script failed: {e}")
exit(1)
if __name__ == "__main__":
main()