Update json2haproxy.py

This commit is contained in:
fab 2025-02-28 11:00:40 +01:00 committed by GitHub
parent dc731a715c
commit a6307b5cf6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -27,10 +27,10 @@ OPERATOR_MAP = {
"@contains": "str -m sub", "@contains": "str -m sub",
"!@eq": "str -m !str", # Handle negation "!@eq": "str -m !str", # Handle negation
"!@within": "str -m !reg", # Approximate !@within (requires regex) "!@within": "str -m !reg", # Approximate !@within (requires regex)
"@lt": "int <", "@lt": "<",
"@ge": "int >=", "@ge": ">=",
"@gt": "int >", "@gt": ">",
"@eq": "int ==" "@eq": "==",
} }
@ -76,13 +76,17 @@ def sanitize_pattern(pattern: str) -> Tuple[Optional[str], str, Optional[str]]:
acl_type = "hdr_reg" # Default to regex matching acl_type = "hdr_reg" # Default to regex matching
transformed_pattern = None # optional transformation transformed_pattern = None # optional transformation
original_pattern = pattern # store original for logging original_pattern = pattern # store original for logging
int_comp_pattern = None # store the integer pattern
for modsecurity_op, haproxy_op in OPERATOR_MAP.items(): for modsecurity_op, haproxy_op in OPERATOR_MAP.items():
if pattern.startswith(modsecurity_op): if pattern.startswith(modsecurity_op):
# if it is an integer comparison we will want a different ACL # handle 'str' and 'int' matching in a different way
if haproxy_op.startswith("int"):
if haproxy_op in ['<','>=','>','==']:
acl_type = "int" acl_type = "int"
pattern = pattern.replace(modsecurity_op, haproxy_op).strip() int_comp_pattern = pattern.replace(modsecurity_op, haproxy_op).strip()
pattern = None # set to None to avoid regex validation
return pattern, acl_type, transformed_pattern return pattern, acl_type, transformed_pattern
acl_type = "hdr_sub" # String matching acl_type = "hdr_sub" # String matching
@ -132,6 +136,8 @@ def generate_haproxy_conf(rules: List[Dict]) -> None:
config_file = OUTPUT_DIR / "waf.acl" config_file = OUTPUT_DIR / "waf.acl"
acl_rules = {} # Dict to store ACL rule definitions based on 'location' acl_rules = {} # Dict to store ACL rule definitions based on 'location'
all_acl_names = [] # Store a full list of acl names for final deny all_acl_names = [] # Store a full list of acl names for final deny
int_comp_rules = [] # Collect the integer comparison rules
# Initialize lists for different deny actions # Initialize lists for different deny actions
deny_high = [] deny_high = []
log_medium = [] log_medium = []
@ -144,7 +150,6 @@ def generate_haproxy_conf(rules: List[Dict]) -> None:
unique_rules = set() # Prevent duplication rules unique_rules = set() # Prevent duplication rules
# Process each rule # Process each rule
for rule in rules: for rule in rules:
try: try:
@ -156,7 +161,14 @@ def generate_haproxy_conf(rules: List[Dict]) -> None:
sanitized_pattern, acl_type, transformed_pattern = sanitize_pattern(pattern) sanitized_pattern, acl_type, transformed_pattern = sanitize_pattern(pattern)
if sanitized_pattern and validate_regex(sanitized_pattern): if acl_type == "int": # Handle integer comparisons
if sanitized_pattern: # create the int condition pattern direct
action_string = "deny" if severity == "high" else "log" if severity == "medium" else "tarpit"
int_comp_rules.append(f"http-request {action_string} if {{ {location} {sanitized_pattern} }}") # Append direct the rule
else:
logging.warning(f"[!] Skipping integer rule with invalid pattern: {pattern}")
elif sanitized_pattern and validate_regex(sanitized_pattern): # continue to the other filters when integer is not valid
acl_name = f"block_{category}_{rule_id}" # Unique ACL name including ID acl_name = f"block_{category}_{rule_id}" # Unique ACL name including ID
if acl_name not in all_acl_names: if acl_name not in all_acl_names:
@ -164,9 +176,8 @@ def generate_haproxy_conf(rules: List[Dict]) -> None:
# Build the ACL rule string based on the 'location' # Build the ACL rule string based on the 'location'
acl_rule_string = None # Set the initial state acl_rule_string = None # Set the initial state
if acl_type == 'int': # if it is int then we want 'http-request' instead 'acl'
acl_rule_string = f"http-request if {{ {location} {sanitized_pattern} }}" if location == "Request-URI":
elif location == "Request-URI":
acl_rule_string = f"acl {acl_name} path_reg -i {sanitized_pattern}" acl_rule_string = f"acl {acl_name} path_reg -i {sanitized_pattern}"
elif location == "Query-String": elif location == "Query-String":
acl_rule_string = f"acl {acl_name} query_reg -i {sanitized_pattern}" acl_rule_string = f"acl {acl_name} query_reg -i {sanitized_pattern}"
@ -206,6 +217,12 @@ def generate_haproxy_conf(rules: List[Dict]) -> None:
with open(config_file, "w") as f: with open(config_file, "w") as f:
f.write("# HAProxy WAF ACL rules\n\n") f.write("# HAProxy WAF ACL rules\n\n")
# Write integer rules
if int_comp_rules:
f.write("# Integer Comparison Rules\n")
for int_rule in int_comp_rules:
f.write(f"{int_rule}\n")
f.write("\n")
# Write all ACL definitions by location # Write all ACL definitions by location
for location, rules in acl_rules.items(): for location, rules in acl_rules.items():
f.write(f"# Rules for {location}\n") f.write(f"# Rules for {location}\n")
@ -216,20 +233,10 @@ def generate_haproxy_conf(rules: List[Dict]) -> None:
f.write("\n") f.write("\n")
# Add all the actions based on rules # Add all the actions based on rules
for action, rules in all_deny_actions.items(): for action, rules in all_deny_actions.items():
action_string = 'deny' if action == "deny_high" else 'tarpit' if action == "tarpit_low" else 'log' action_string = 'deny' if action == "deny_high" else 'log' if action == "log_medium" else 'tarpit'
# if it is NOT a deny action then we will want http-request instead acl f.write(f"# {action.split('_')[1].capitalize()} Severity Rules ({action_string.capitalize()})\n") # comment action
if action == "deny_high": if rules:
f.write(f"# High Severity Rules (Deny)\n") f.write(f"http-request {action_string} if {' or '.join(rules)}\n")
if rules:
f.write(f"http-request {action_string} if {' or '.join(rules)}\n")
elif action == "log_medium":
f.write(f"# Medium Severity Rules (Log)\n")
if rules:
f.write(f"http-request {action_string} if {' or '.join(rules)}\n")
else:
f.write(f"# Low Severity Rules (Tarpit)\n")
if rules:
f.write(f"http-request {action_string} if {' or '.join(rules)}\n")
f.write("\n") f.write("\n")
logging.info(f"[+] HAProxy WAF rules generated at {config_file}") logging.info(f"[+] HAProxy WAF rules generated at {config_file}")