import os import json import re import logging from pathlib import Path # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler()], ) # Constants (configurable via environment variables) OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "waf_patterns/haproxy/")) # Output directory INPUT_FILE = Path(os.getenv("INPUT_FILE", "owasp_rules.json")) # Input JSON file UNSUPPORTED_PATTERNS = ["@pmFromFile", "!@eq", "!@within", "@lt", "@ge", "@gt", "@eq"] def load_owasp_rules(file_path): """ Load OWASP rules from a JSON file. """ try: with open(file_path, "r") as f: return json.load(f) except FileNotFoundError: logging.error(f"[!] Input file not found: {file_path}") raise except json.JSONDecodeError: logging.error(f"[!] Invalid JSON in file: {file_path}") raise except Exception as e: logging.error(f"[!] Error loading OWASP rules: {e}") raise def validate_regex(pattern): """ Validate regex pattern for HAProxy. """ try: re.compile(pattern) return True except re.error as e: logging.warning(f"[!] Invalid regex: {pattern} - {e}") return False def sanitize_pattern(pattern): """ Sanitize unsupported patterns and directives for HAProxy ACLs. """ # Skip unsupported patterns if any(directive in pattern for directive in UNSUPPORTED_PATTERNS): logging.warning(f"[!] Skipping unsupported pattern: {pattern}") return None # Remove @rx (regex indicator) for HAProxy compatibility pattern = pattern.replace("@rx ", "").strip() # Remove case-insensitive flag (?i) as HAProxy uses -i for that pattern = re.sub(r"\(\?i\)", "", pattern) # Convert $ to \$ pattern = pattern.replace("$", r"\$") # Convert { or { to { pattern = re.sub(r"&l(?:brace|cub);?", r"{", pattern) pattern = re.sub(r"&r(?:brace|cub);?", r"}", pattern) # Remove unnecessary \.* pattern = re.sub(r"\\\.\*", r"\.*", pattern) pattern = re.sub(r"(?