Update json2apache.py

This commit is contained in:
fab 2025-02-28 11:17:52 +01:00 committed by GitHub
parent 4591dfa52e
commit 1a3c968ba2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,206 +1,176 @@
import json import json
import os import os
import re import re
from collections import defaultdict
import logging import logging
from pathlib import Path from pathlib import Path
from typing import List, Dict, Set, Tuple, Optional from typing import List, Dict, Set, Tuple, Optional
from functools import lru_cache from functools import lru_cache
# Configure logging # --- Configuration ---
logging.basicConfig( LOG_LEVEL = logging.INFO # Adjust as needed (DEBUG, INFO, WARNING, ERROR)
level=logging.INFO, INPUT_FILE = Path(os.getenv("INPUT_FILE", "owasp_rules.json"))
format="%(asctime)s - %(levelname)s - %(message)s", OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "waf_patterns/apache"))
handlers=[logging.StreamHandler()],
)
# Paths # ModSecurity Rule Templates (more flexible)
INPUT_FILE = Path("owasp_rules.json") # Input JSON file
OUTPUT_DIR = Path("waf_patterns/apache") # Output directory for Apache configs
# Ensure output directory exists
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# ModSecurity rule template
MODSEC_RULE_TEMPLATE = ( MODSEC_RULE_TEMPLATE = (
'SecRule REQUEST_URI "{pattern}" "id:{rule_id},phase:1,deny,status:403,log,msg:\'{category} attack detected\'"\n' 'SecRule {variables} "{pattern}" '
'"id:{rule_id},phase:{phase},t:none,{actions},msg:\'{category} attack detected\',severity:{severity}"\n'
) )
# Default Actions
DEFAULT_ACTIONS = "deny,status:403,log"
# Unsupported patterns for ModSecurity # Unsupported ModSecurity directives (expand as needed)
UNSUPPORTED_PATTERNS = ["@pmFromFile", "!@eq", "!@within", "@lt"] UNSUPPORTED_PATTERNS = [
"@pmFromFile", # File lookups not directly supported
# You might handle some of these with ctl:ruleRemoveTargetById later
def load_owasp_rules(file_path: Path) -> List[Dict]: ]
""" # Supported ModSecurity operators and their rough translations (for logging/info)
Load OWASP rules from a JSON file. SUPPORTED_OPERATORS = {
"@rx": "Regular Expression",
Args: "@streq": "String Equals",
file_path (Path): Path to the JSON file containing OWASP rules. "@contains": "Contains String",
"@beginsWith": "Begins With",
Returns: "@endsWith": "Ends With",
List[Dict]: List of OWASP rules. "@within": "Contained Within",
"@ipMatch": "IP Address Match",
Raises: # ... add more as needed
FileNotFoundError: If the input file is not found. }
json.JSONDecodeError: If the JSON file is invalid.
Exception: For any other errors during file loading.
"""
try:
with open(file_path, "r") as f:
return json.load(f)
except FileNotFoundError:
logging.error(f"[!] Input file not found: {file_path}")
raise
except json.JSONDecodeError:
logging.error(f"[!] Invalid JSON in file: {file_path}")
raise
except Exception as e:
logging.error(f"[!] Error loading OWASP rules: {e}")
raise
# --- Logging Setup ---
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# --- Utility Functions ---
@lru_cache(maxsize=None) @lru_cache(maxsize=None)
def validate_regex(pattern: str) -> bool: def validate_regex(pattern: str) -> bool:
""" """Validates a regex pattern (basic check)."""
Validate regex pattern to ensure it is compatible with ModSecurity.
Args:
pattern (str): Regex pattern to validate.
Returns:
bool: True if the regex is valid, False otherwise.
"""
try: try:
re.compile(pattern) re.compile(pattern)
return True return True
except re.error as e: except re.error as e:
logging.warning(f"[!] Skipping invalid regex: {pattern} - {e}") logger.warning(f"Invalid regex: {pattern} - {e}")
return False return False
def _sanitize_pattern(pattern: str) -> str:
def sanitize_pattern(pattern: str) -> Optional[str]: """Internal helper to perform basic pattern sanitization."""
""" # Remove @rx prefix, if present
Sanitize unsupported patterns and directives for ModSecurity. pattern = pattern.replace("@rx ", "").strip()
# You *could* add basic escaping here if needed, but be *very* careful
Args: # not to break valid regexes. It's generally better to handle this
pattern (str): The pattern to sanitize. # in the `owasp2json.py` script.
Returns:
Optional[str]: The sanitized pattern, or None if the pattern is unsupported.
"""
# Skip unsupported patterns
if any(directive in pattern for directive in UNSUPPORTED_PATTERNS):
logging.warning(f"[!] Skipping unsupported pattern: {pattern}")
return None
# Handle regex patterns prefixed with @rx
if pattern.startswith("@rx "):
return pattern.replace("@rx ", "").strip()
return pattern return pattern
def _determine_variables(location: str) -> str:
"""Maps the 'location' field to ModSecurity variables."""
location = location.lower() # Normalize to lowercase
if location == "request-uri":
return "REQUEST_URI"
elif location == "query-string":
return "ARGS" # Or ARGS_GET, depending on your needs
elif location == "user-agent":
return "REQUEST_HEADERS:User-Agent"
elif location == "host":
return "REQUEST_HEADERS:Host"
elif location == "referer":
return "REQUEST_HEADERS:Referer"
elif location == "content-type":
return "REQUEST_HEADERS:Content-Type"
# Add other location mappings as needed
else:
logger.warning(f"Unknown location '{location}', defaulting to REQUEST_URI")
return "REQUEST_URI" # Default variable
def generate_apache_waf(rules: List[Dict]) -> None: def generate_apache_waf(rules: List[Dict]) -> None:
""" """Generates Apache ModSecurity configuration files."""
Generate Apache ModSecurity configuration files from OWASP rules.
Args: OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
rules (List[Dict]): List of OWASP rules.
# Use a dictionary to group rules by category. Sets prevent duplicates.
categorized_rules: Dict[str, Set[str]] = defaultdict(set)
rule_id_counter = 9000000 # Start with a high ID range (OWASP CRS convention)
Raises:
IOError: If there is an error writing to the output files.
"""
categorized_rules: Dict[str, Set[Tuple[str, int]]] = defaultdict(set)
rule_id_counter = 1000 # Starting rule ID
# Group rules by category and ensure deduplication
for rule in rules: for rule in rules:
try: rule_id = rule.get("id", "no_id") # Get rule ID
category = rule.get("category", "generic").lower() if not isinstance(rule_id, int): # check if is an int
pattern = rule["pattern"] # Extract ID from rule and convert to an integer
match = re.search(r'id:(\d+)', rule_id)
rule_id = int(match.group(1)) if match else rule_id_counter
rule_id_counter += 1
sanitized_pattern = sanitize_pattern(pattern) category = rule.get("category", "generic").lower()
if sanitized_pattern and validate_regex(sanitized_pattern): pattern = rule["pattern"]
categorized_rules[category].add((sanitized_pattern, rule_id_counter)) location = rule.get("location", "REQUEST_URI") # Set a default variable
rule_id_counter += 1 severity = rule.get("severity", "CRITICAL").upper() # CRITICAL, ERROR, WARNING, NOTICE
else: # --- Operator Handling ---
logging.warning(f"[!] Skipping invalid or unsupported rule: {pattern}") operator_used = "Unknown" # Default
except KeyError as e: for op in SUPPORTED_OPERATORS:
logging.warning(f"[!] Skipping malformed rule (missing key: {e}): {rule}") if pattern.startswith(op):
operator_used = SUPPORTED_OPERATORS[op]
break # Stop after finding the *first* matching operator
# Skip unsupported patterns.
if any(unsupported in pattern for unsupported in UNSUPPORTED_PATTERNS):
logger.info(f"[!] Skipping unsupported pattern: {pattern}")
continue continue
# Write rules to per-category configuration files sanitized_pattern = _sanitize_pattern(pattern)
for category, patterns in categorized_rules.items(): if not sanitized_pattern or not validate_regex(sanitized_pattern):
output_file = OUTPUT_DIR / f"{category}.conf" continue # Skip invalid regexes
# Determine ModSecurity variables based on 'location'
variables = _determine_variables(location)
# --- Rule Construction ---
# Build the ModSecurity rule string
rule_str = MODSEC_RULE_TEMPLATE.format(
variables=variables,
pattern=re.escape(sanitized_pattern), # Escape for ModSecurity
rule_id=rule_id,
category=category.upper(), # Use uppercase for category
severity=severity,
phase=2, # Phase 2 (request body processing) is common, adjust if needed
actions=DEFAULT_ACTIONS,
)
categorized_rules[category].add(rule_str) # added into a dict
# --- File Output ---
# Write rules to per-category files. This is good for organization.
for category, rule_set in categorized_rules.items():
output_file = OUTPUT_DIR / f"{category}.conf"
try: try:
with open(output_file, "w") as f: with open(output_file, "w") as f:
f.write(f"# Apache ModSecurity rules for {category.upper()}\n") f.write(f"# ModSecurity Rules for Category: {category.upper()}\n")
f.write("SecRuleEngine On\n\n") f.write("SecRuleEngine On\n\n") # Enable the rule engine
for rule in rule_set:
# Write rules with unique IDs
for pattern, rule_id in patterns:
rule = MODSEC_RULE_TEMPLATE.format(
pattern=re.escape(pattern), rule_id=rule_id, category=category
)
f.write(rule) f.write(rule)
logger.info(f"Generated {output_file} ({len(rule_set)} rules)")
logging.info(f"[+] Generated {output_file} ({len(patterns)} patterns)")
except IOError as e: except IOError as e:
logging.error(f"[!] Failed to write to {output_file}: {e}") logger.error(f"Error writing to {output_file}: {e}")
raise # Consider raising the exception here if you want the script to *stop*
# on any file write error.
def load_json(file_path): def load_owasp_rules(file_path: Path) -> List[Dict]:
""" """Loads OWASP rules from the JSON file."""
Load and parse JSON file. try:
with open(file_path, "r", encoding="utf-8") as f:
Args: return json.load(f)
file_path (Path): Path to the JSON file to be loaded. except (FileNotFoundError, json.JSONDecodeError, Exception) as e:
logger.error(f"Error loading rules from {file_path}: {e}")
Returns: raise
dict: Parsed JSON content.
"""
with file_path.open('r', encoding='utf-8') as f:
return json.load(f)
def write_rules_to_file(rules, output_path):
"""
Write ModSecurity rules to a file.
Args:
rules (list): List of ModSecurity rules as strings.
output_path (Path): Path to the output file.
"""
with output_path.open('w', encoding='utf-8') as f:
f.writelines(rules)
def main(): def main():
json_data = load_json(INPUT_FILE) """Main function."""
try:
rules = [] rules = load_owasp_rules(INPUT_FILE)
rule_id = 1000 # Initial rule ID generate_apache_waf(rules)
logger.info("Apache ModSecurity configuration generated successfully.")
# Check if json_data is a dictionary and contains the 'rules' key except Exception as e:
if isinstance(json_data, dict): logger.critical(f"Script failed: {e}")
for rule in json_data.get('rules', []): exit(1)
pattern = rule.get('pattern')
category = rule.get('category')
if not pattern or any(unsupported in pattern for unsupported in UNSUPPORTED_PATTERNS):
logging.info(f"[!] Skipping unsupported pattern: {pattern}")
continue
if validate_regex(pattern):
rules.append(MODSEC_RULE_TEMPLATE.format(pattern=pattern, rule_id=rule_id, category=category))
rule_id += 1
else:
logging.error("[!] Invalid JSON format: Expected a dictionary with a 'rules' key.")
return
output_file_path = OUTPUT_DIR / "rules.conf"
write_rules_to_file(rules, output_file_path)
logging.info(f"[+] Generated rules.conf in {output_file_path}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()