From 95b1b4a78475e5e60cbfff1d0491eb4f4f488c71 Mon Sep 17 00:00:00 2001 From: fab Date: Fri, 28 Feb 2025 11:15:14 +0100 Subject: [PATCH] Update json2haproxy.py --- json2haproxy.py | 343 +++++++++++++++++++++--------------------------- 1 file changed, 149 insertions(+), 194 deletions(-) diff --git a/json2haproxy.py b/json2haproxy.py index 3920a31..72713de 100644 --- a/json2haproxy.py +++ b/json2haproxy.py @@ -6,262 +6,217 @@ from pathlib import Path from typing import List, Dict, Optional, Tuple from functools import lru_cache -# Configure logging -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s", - handlers=[logging.StreamHandler()], -) +# --- Configuration --- +LOG_LEVEL = logging.INFO # Adjust as needed (DEBUG, INFO, WARNING, ERROR) +OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "waf_patterns/haproxy/")) +INPUT_FILE = Path(os.getenv("INPUT_FILE", "owasp_rules.json")) -# Constants (configurable via environment variables) -OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "waf_patterns/haproxy/")) # Output directory -INPUT_FILE = Path(os.getenv("INPUT_FILE", "owasp_rules.json")) # Input JSON file +UNSUPPORTED_PATTERNS = [ + "@pmFromFile", "@detectSQLi", "@validateByteRange", "@detectXSS", # Core unsupported + # Add any other unsupported patterns discovered during testing +] -UNSUPPORTED_PATTERNS = ["@pmFromFile", "@detectSQLi", "@validateByteRange", "@detectXSS"] # ADDED REMOVE xss and added more unsupported patterns - -# Operator mapping (ModSecurity to HAProxy) - Added more mappings +# Operator Mapping: ModSecurity -> HAProxy OPERATOR_MAP = { + # String Comparisons "@streq": "str -m str", - "@ipMatch": "src_ip", "@endsWith": "str -m end", "@contains": "str -m sub", - "!@eq": "str -m !str", # Handle negation - "!@within": "str -m !reg", # Approximate !@within (requires regex) + "!@eq": "str -m !str", # Negated string equality + "!@within": "str -m !reg", # Negated regex (approximate) + # Integer Comparisons (These are handled separately) "@lt": "<", "@ge": ">=", "@gt": ">", "@eq": "==", + # IP address matching + "@ipMatch": "src_ip", } -def load_owasp_rules(file_path: Path) -> List[Dict]: - """ - Load OWASP rules from a JSON file. - """ - try: - with open(file_path, "r") as f: - return json.load(f) - except FileNotFoundError as e: - logging.error(f"[!] Input file not found: {file_path}") - raise - except json.JSONDecodeError as e: - logging.error(f"[!] Invalid JSON in file: {file_path}") - raise - except Exception as e: - logging.error(f"[!] Error loading OWASP rules: {e}") - raise +# --- Logging Setup --- +logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) -@lru_cache(maxsize=None) +# --- Utility Functions --- +@lru_cache(maxsize=None) # Cache regex compilation for performance def validate_regex(pattern: str) -> bool: - """ - Validate regex pattern for HAProxy. Added complexity check - """ + """Validates a regex pattern and checks for excessive complexity.""" try: - # Simple complexity check (can be improved) - if pattern.count(".*") > 5: - logging.warning(f"[!] Regex too complex: {pattern}") - return False - + if pattern.count(".*") > 5: # Basic complexity check + logger.warning(f"Regex potentially too complex: {pattern}") + # Optionally return False here to *reject* complex regexes re.compile(pattern) return True except re.error as e: - logging.warning(f"[!] Invalid regex: {pattern} - {e}") + logger.warning(f"Invalid regex: {pattern} - {e}") return False -def sanitize_pattern(pattern: str) -> Tuple[Optional[str], str, Optional[str]]: +def load_owasp_rules(file_path: Path) -> List[Dict]: + """Loads OWASP rules from the JSON file.""" + try: + with open(file_path, "r") as f: + return json.load(f) + except (FileNotFoundError, json.JSONDecodeError, Exception) as e: + logger.error(f"Error loading rules from {file_path}: {e}") + raise # Re-raise to prevent the script from continuing + +def _sanitize_regex_pattern(pattern: str) -> str: + """Helper function to clean up regex patterns.""" + pattern = pattern.replace("@rx ", "").strip() + pattern = re.sub(r"\(\?i\)", "", pattern) # Remove (?i) + pattern = pattern.replace("$", r"\$") # $ -> \$ + pattern = re.sub(r"&l(?:brace|cub);?", r"{", pattern) # { + pattern = re.sub(r"&r(?:brace|cub);?", r"}", pattern) # } + pattern = re.sub(r"\\\.\*", r"\.*", pattern) # Remove unnecessary escapes + pattern = re.sub(r"(? ( + return pattern + + +def sanitize_pattern(pattern: str, location: str) -> Tuple[Optional[str], str]: """ - Sanitize and convert ModSecurity patterns to HAProxy. - Returns: sanitized pattern, ACL type, and transformed pattern (if needed) + Sanitizes and converts a ModSecurity pattern to its HAProxy equivalent. + Returns: (sanitized_pattern, acl_type) or (None, "") if unsupported. """ - acl_type = "hdr_reg" # Default to regex matching - transformed_pattern = None # optional transformation - original_pattern = pattern # store original for logging - int_comp_pattern = None # store the integer pattern + original_pattern = pattern # Keep for logging - for modsecurity_op, haproxy_op in OPERATOR_MAP.items(): - if pattern.startswith(modsecurity_op): - # handle 'str' and 'int' matching in a different way + # 1. Handle ModSecurity operators *first*. + for modsec_op, haproxy_op in OPERATOR_MAP.items(): + if pattern.startswith(modsec_op): + if haproxy_op in ("<", ">=", ">", "=="): # Integer comparisons + # Integer comparisons are handled *separately* + return pattern.replace(modsec_op, haproxy_op).strip(), "int" + else: # String comparisons + return pattern.replace(modsec_op, haproxy_op).strip(), "hdr_sub" - if haproxy_op in ['<','>=','>','==']: - acl_type = "int" - int_comp_pattern = pattern.replace(modsecurity_op, haproxy_op).strip() - pattern = None # set to None to avoid regex validation - - return pattern, acl_type, transformed_pattern - - acl_type = "hdr_sub" # String matching - pattern = pattern.replace(modsecurity_op, haproxy_op).strip() - return pattern, acl_type, transformed_pattern - - # Skip unsupported patterns with more detailed logging + # 2. Check for unsupported patterns *after* operator handling. for directive in UNSUPPORTED_PATTERNS: if directive in pattern: - logging.warning(f"[!] Skipping unsupported pattern (contains {directive}): {pattern}") - return None, acl_type, transformed_pattern # Indicate skip + logger.warning(f"Skipping unsupported pattern (contains {directive}): {original_pattern}") + return None, "" - if "@rx" in pattern: # only remove @rx for REGEX cases to reduce bugs - acl_type = "hdr_reg" - pattern = pattern.replace("@rx ", "").strip() + # 3. Handle regular expressions (@rx) + if "@rx" in pattern: + return _sanitize_regex_pattern(pattern), "hdr_reg" - # Remove case-insensitive flag (?i) as HAProxy uses -i for that - pattern = re.sub(r"\(\?i\)", "", pattern) - - # Convert $ to \$ - pattern = pattern.replace("$", r"\$") - - # Convert { or { to { - pattern = re.sub(r"&l(?:brace|cub);?", r"{", pattern) - pattern = re.sub(r"&r(?:brace|cub);?", r"}", pattern) - - # Remove unnecessary \.* - pattern = re.sub(r"\\\.\*", r"\.*", pattern) - pattern = re.sub(r"(? None: - """ - Generate HAProxy ACL rules from OWASP rules with prioritization and parameter selection. - """ + """Generates the HAProxy WAF configuration (waf.acl).""" + try: OUTPUT_DIR.mkdir(parents=True, exist_ok=True) - logging.info(f"[+] Created or verified directory: {OUTPUT_DIR}") - config_file = OUTPUT_DIR / "waf.acl" - acl_rules = {} # Dict to store ACL rule definitions based on 'location' - all_acl_names = [] # Store a full list of acl names for final deny - int_comp_rules = [] # Collect the integer comparison rules - # Initialize lists for different deny actions - deny_high = [] - log_medium = [] - tarpit_low = [] - all_deny_actions = { - "deny_high" : deny_high, - "log_medium" : log_medium, - "tarpit_low" : tarpit_low - } + acl_rules: Dict[str, List[str]] = {} # { location: [acl_rules] } + int_rules: List[str] = [] + deny_high: List[str] = [] + log_medium: List[str] = [] + tarpit_low: List[str] = [] - unique_rules = set() # Prevent duplication rules - - # Process each rule + # Process each OWASP rule for rule in rules: - try: - rule_id = rule.get("id", "no_id") # Get ID, default "no_id" - category = rule["category"].lower() - location = rule.get("location", "User-Agent") # Get location, default User-Agent - pattern = rule["pattern"] - severity = rule.get("severity", "medium").lower() # severity for different actions + rule_id = rule.get("id", "no_id") + category = rule.get("category", "uncategorized").lower() + location = rule.get("location", "User-Agent").lower() #important! lowercase + pattern = rule["pattern"] + severity = rule.get("severity", "medium").lower() - sanitized_pattern, acl_type, transformed_pattern = sanitize_pattern(pattern) + sanitized_pattern, acl_type = sanitize_pattern(pattern, location) - if acl_type == "int": # Handle integer comparisons - if sanitized_pattern: # create the int condition pattern direct - action_string = "deny" if severity == "high" else "log" if severity == "medium" else "tarpit" - - int_comp_rules.append(f"http-request {action_string} if {{ {location} {sanitized_pattern} }}") # Append direct the rule - else: - logging.warning(f"[!] Skipping integer rule with invalid pattern: {pattern}") - elif sanitized_pattern and validate_regex(sanitized_pattern): # continue to the other filters when integer is not valid - acl_name = f"block_{category}_{rule_id}" # Unique ACL name including ID - - if acl_name not in all_acl_names: - all_acl_names.append(acl_name) # Add to the list of ACLs - - # Build the ACL rule string based on the 'location' - acl_rule_string = None # Set the initial state - - if location == "Request-URI": - acl_rule_string = f"acl {acl_name} path_reg -i {sanitized_pattern}" - elif location == "Query-String": - acl_rule_string = f"acl {acl_name} query_reg -i {sanitized_pattern}" - elif location == "Host": - acl_rule_string = f"acl {acl_name} hdr_reg(Host) -i {sanitized_pattern}" - elif location == "Content-Type": - acl_rule_string = f"acl {acl_name} hdr_reg(Content-Type) -i {sanitized_pattern}" - elif location == "Referer": - acl_rule_string = f"acl {acl_name} hdr_reg(Referer) -i {sanitized_pattern}" - else: # Default case: User-Agent - if acl_type == 'hdr_reg': - acl_rule_string = f"acl {acl_name} hdr_reg(User-Agent) -i {sanitized_pattern}" - else: # hdr_sub - acl_rule_string = f"acl {acl_name} hdr_sub(User-Agent) -i {sanitized_pattern}" - - if acl_rule_string: # Check that a rule string has a value. - # Get the corresponding action based on severity - if severity == "high": - deny_high.append(acl_name) - elif severity == "medium": - log_medium.append(acl_name) - elif severity == "low": - tarpit_low.append(acl_name) - - if location not in acl_rules: - acl_rules[location] = [] # Initialize if it is not already existent - - acl_rules[location].append(acl_rule_string) # Append rule - else: - logging.warning(f"[!] Skipping invalid rule: {pattern}") - - except KeyError as e: - logging.warning(f"[!] Skipping invalid rule (missing key: {e}): {rule}") + if sanitized_pattern is None: # Unsupported/invalid pattern continue - # Write HAProxy ACL rules to the file + if acl_type == "int": # Int comparison + action = "deny" if severity == "high" else "log" if severity == "medium" else "tarpit" + # Special cases: some locations cannot be used directly + if location in ("query-string", "request-uri"): + int_rules.append(f"http-request {action} if {{ {location} {sanitized_pattern} }}") + else: + int_rules.append(f"http-request {action} if {{ {location},{sanitized_pattern} }}") + + elif acl_type in ("hdr_reg", "hdr_sub"): # String comparison + acl_name = f"block_{category}_{rule_id}" + + # Build the ACL rule string + if location == "request-uri": + acl_string = f"acl {acl_name} path_reg -i {sanitized_pattern}" + elif location == "query-string": + # No direct query_reg in HAProxy. Need to use path, url, or url_param + acl_string = f"acl {acl_name} url_param_reg -i {sanitized_pattern}" + elif location in ("host", "content-type", "referer","user-agent"): + hdr_func = "hdr_reg" if acl_type == "hdr_reg" else "hdr_sub" + acl_string = f"acl {acl_name} {hdr_func}({location.replace('-','')}) -i {sanitized_pattern}" + else: + logger.warning(f"Unsupported location: {location} for rule: {rule_id}") + continue # Skip unsupported locations + + if location not in acl_rules: + acl_rules[location] = [] + acl_rules[location].append(acl_string) + + + if severity == "high": + deny_high.append(acl_name) + elif severity == "medium": + log_medium.append(acl_name) + elif severity == "low": + tarpit_low.append(acl_name) + + # Write the configuration to the file with open(config_file, "w") as f: f.write("# HAProxy WAF ACL rules\n\n") - # Write integer rules - if int_comp_rules: - f.write("# Integer Comparison Rules\n") - for int_rule in int_comp_rules: - f.write(f"{int_rule}\n") - f.write("\n") - # Write all ACL definitions by location - for location, rules in acl_rules.items(): - f.write(f"# Rules for {location}\n") - for acl_rule in rules: - f.write(f"{acl_rule}\n") + # Integer Comparison Rules (if any) + if int_rules: + f.write("# Integer Comparison Rules\n") + for rule in int_rules: + f.write(f"{rule}\n") f.write("\n") - f.write("\n") - # Add all the actions based on rules - f.write("# Deny Actions\n") # start final actions block + # ACL Rules (by location) + for location, rules in acl_rules.items(): + f.write(f"# Rules for {location.title()}\n") # title() + for rule in rules: + f.write(f"{rule}\n") + f.write("\n") + + # Deny/Action Logic + f.write("# Deny/Action Logic\n") if deny_high: f.write(f"http-request deny if {' or '.join(deny_high)}\n") if log_medium: f.write(f"http-request log if {' or '.join(log_medium)}\n") if tarpit_low: f.write(f"http-request tarpit if {' or '.join(tarpit_low)}\n") - f.write("\n") # end of actions log - logging.info(f"[+] HAProxy WAF rules generated at {config_file}") + logger.info(f"HAProxy WAF configuration generated at: {config_file}") except Exception as e: - logging.error(f"[!] Error generating HAProxy configuration: {e}") + logger.error(f"Error generating HAProxy configuration: {e}") raise -def main() -> None: - """ - Main function to execute the script. - """ - try: - logging.info("[*] Loading OWASP rules...") - owasp_rules = load_owasp_rules(INPUT_FILE) - logging.info(f"[*] Generating HAProxy WAF configs from {len(owasp_rules)} rules...") + +def main() -> None: + """Main function.""" + try: + logger.info("Loading OWASP rules...") + owasp_rules = load_owasp_rules(INPUT_FILE) + logger.info(f"Loaded {len(owasp_rules)} rules.") + + logger.info("Generating HAProxy WAF configuration...") generate_haproxy_conf(owasp_rules) - logging.info("[✔] HAProxy WAF configurations generated successfully.") + logger.info("HAProxy WAF generation complete.") + except Exception as e: - logging.critical(f"[!] Script failed: {e}") - exit(1) + logger.critical(f"Script failed: {e}") + exit(1) # Exit with an error code + if __name__ == "__main__": main()