mirror of
https://github.com/fabriziosalmi/patterns.git
synced 2025-12-17 17:55:48 +00:00
224 lines
9.5 KiB
Python
224 lines
9.5 KiB
Python
import json
|
|
import os
|
|
import re
|
|
import logging
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
from functools import lru_cache
|
|
from typing import List, Dict, Optional, Tuple
|
|
|
|
# --- Configuration ---
|
|
LOG_LEVEL = logging.INFO # DEBUG, INFO, WARNING, ERROR
|
|
INPUT_FILE = Path(os.getenv("INPUT_FILE", "owasp_rules.json"))
|
|
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "waf_patterns/nginx"))
|
|
MAPS_FILE = OUTPUT_DIR / "waf_maps.conf"
|
|
RULES_FILE = OUTPUT_DIR / "waf_rules.conf"
|
|
|
|
# Unsupported Nginx directives (expand as needed)
|
|
UNSUPPORTED_PATTERNS = [
|
|
"@pmFromFile", # No direct file lookups in Nginx map
|
|
]
|
|
|
|
# --- Logging Setup ---
|
|
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# --- Utility Functions ---
|
|
def load_owasp_rules(file_path: Path) -> List[Dict]:
|
|
"""Loads OWASP rules from a JSON file."""
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError, OSError) as e:
|
|
logger.error(f"Error loading rules from {file_path}: {e}")
|
|
raise # Re-raise to prevent continuing
|
|
|
|
@lru_cache(maxsize=256) # Increased cache size
|
|
def validate_regex(pattern: str) -> bool:
|
|
"""Validates a regex pattern (basic check)."""
|
|
try:
|
|
re.compile(pattern)
|
|
return True
|
|
except re.error as e:
|
|
logger.warning(f"Invalid regex: {pattern} - {e}")
|
|
return False
|
|
|
|
def _sanitize_pattern(pattern: str) -> str:
|
|
"""Internal helper to clean and escape patterns for Nginx."""
|
|
pattern = pattern.replace("@rx ", "").strip() # Remove ModSecurity @rx
|
|
# Remove case-insensitive flag (?i) as Nginx uses ~* for that
|
|
pattern = re.sub(r"\(\?i\)", "", pattern)
|
|
|
|
# Convert $ to \$
|
|
pattern = pattern.replace("$", r"\$")
|
|
|
|
# Convert { or { to {
|
|
pattern = re.sub(r"&l(?:brace|cub);?", r"{", pattern)
|
|
pattern = re.sub(r"&r(?:brace|cub);?", r"}", pattern)
|
|
|
|
# Remove unnecessary \.*
|
|
pattern = re.sub(r"\\\.\*", r"\.*", pattern)
|
|
pattern = re.sub(r"(?<!\\)\.(?![\w])", r"\.", pattern) # Escape dots
|
|
|
|
# Replace non-capturing groups (?:...) with capturing groups (...)
|
|
pattern = re.sub(r"\(\?:", "(", pattern)
|
|
|
|
return pattern
|
|
|
|
def sanitize_pattern(pattern: str, location: str) -> Optional[str]:
|
|
"""
|
|
Sanitizes a pattern for use in an Nginx map directive.
|
|
Returns the sanitized pattern, or None if the pattern is unsupported.
|
|
"""
|
|
# Skip unsupported patterns.
|
|
for unsupported in UNSUPPORTED_PATTERNS:
|
|
if unsupported in pattern:
|
|
logger.warning(f"Skipping unsupported pattern: {pattern}")
|
|
return None
|
|
|
|
# Sanitize the pattern
|
|
pattern = _sanitize_pattern(pattern)
|
|
|
|
# Escape special characters for Nginx map (most importantly, the ~)
|
|
# We use re.escape, but *selectively* unescape key regex metacharacters.
|
|
pattern = re.escape(pattern)
|
|
# Unescape: \. \( \) \[ \] \| \? \* \+ \{ \} \^ \$ \\
|
|
pattern = re.sub(r'\\([.()[\]|?*+{}^$\\])', r'\1', pattern)
|
|
return pattern
|
|
|
|
def generate_nginx_waf(rules: List[Dict]) -> None:
|
|
"""Generates Nginx WAF configuration (maps and rules)."""
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
categorized_rules: Dict[str, Dict[str, str]] = defaultdict(lambda: defaultdict(str)) # category -> location
|
|
|
|
for rule in rules:
|
|
rule_id = rule.get("id", "no_id") # Get rule ID
|
|
category = rule.get("category", "generic").lower()
|
|
location = rule.get("location", "request-uri").lower() # set a default location
|
|
pattern = rule["pattern"]
|
|
severity = rule.get("severity", "medium").lower() # get severity
|
|
|
|
sanitized_pattern = sanitize_pattern(pattern, location)
|
|
if not sanitized_pattern or not validate_regex(sanitized_pattern):
|
|
continue # Skip invalid or unsupported patterns
|
|
|
|
|
|
if location == "request-uri":
|
|
variable = "$request_uri"
|
|
elif location == "query-string":
|
|
variable = "$args" # Use $args for query string
|
|
elif location == "user-agent":
|
|
variable = "$http_user_agent"
|
|
elif location == "host":
|
|
variable = "$http_host"
|
|
elif location == "referer":
|
|
variable = "$http_referer"
|
|
elif location == "content-type":
|
|
variable = "$http_content_type"
|
|
# Add more location mappings here
|
|
else:
|
|
logger.warning(f"Unsupported location: {location} for rule: {rule_id}")
|
|
continue
|
|
# Add rule based on severity and location
|
|
categorized_rules[category][variable] += f' "~*{sanitized_pattern}" {severity};\n' # set severity as value
|
|
|
|
|
|
# --- Generate Maps (waf_maps.conf) ---
|
|
try:
|
|
with open(MAPS_FILE, "w", encoding="utf-8") as f:
|
|
f.write("# Nginx WAF Maps (Generated by json2nginx.py)\n\n")
|
|
f.write("http {\n") # Maps *must* be in the http context
|
|
|
|
for category, location_rules in categorized_rules.items():
|
|
# Create the map with the high priority
|
|
f.write(f" map $1 $waf_{category} {{\n") # dynamic variable
|
|
f.write(' default "";\n') # default value empty
|
|
for location, rules in location_rules.items():
|
|
f.write(f" # Rules for {location}\n")
|
|
f.write(rules) # Write the collected rules for this location
|
|
f.write("\n")
|
|
f.write(" }\n\n")
|
|
|
|
f.write("}\n") # Close the http block
|
|
logger.info(f"Generated Nginx map file: {MAPS_FILE}")
|
|
except IOError as e:
|
|
logger.error(f"Error writing to {MAPS_FILE}: {e}")
|
|
raise
|
|
|
|
# --- Generate Rules (waf_rules.conf) ---
|
|
try:
|
|
with open(RULES_FILE, "w", encoding="utf-8") as f:
|
|
f.write("# Nginx WAF Rules (Generated by json2nginx.py)\n\n")
|
|
f.write("# Include this file in your 'server' or 'location' block.\n\n")
|
|
|
|
# iterate for each rule
|
|
for category, location_rules in categorized_rules.items():
|
|
# set map to correct WAF block
|
|
map_variable = f"$waf_{category}"
|
|
# create conditions based on priority
|
|
f.write(f' if ({map_variable} = "high") {{\n return 403;\n }}\n')
|
|
f.write(f' if ({map_variable} = "medium") {{\n add_header X-WAF-Blocked "medium-{category}";\n }}\n') # example for another action
|
|
f.write(f' if ({map_variable} = "low") {{\n add_header X-WAF-Blocked "low-{category}";\n }}\n\n') # expample for other action
|
|
logger.info(f"Generated Nginx rules file: {RULES_FILE}")
|
|
|
|
except IOError as e:
|
|
logger.error(f"Error writing to {RULES_FILE}: {e}")
|
|
raise
|
|
|
|
|
|
# --- Generate README ---
|
|
readme_file = OUTPUT_DIR / "README.md"
|
|
try:
|
|
with open(readme_file, "w", encoding="utf-8") as f:
|
|
f.write("# Nginx WAF Configuration\n\n")
|
|
f.write("This directory contains Nginx WAF configuration files generated from OWASP rules.\n\n")
|
|
f.write("## Usage\n\n")
|
|
f.write("1. **Include `waf_maps.conf` in your `http` block:**\n")
|
|
f.write(" ```nginx\n")
|
|
f.write(" http {\n")
|
|
f.write(" include /path/to/waf_patterns/nginx/waf_maps.conf;\n")
|
|
f.write(" # ... other http configurations ...\n")
|
|
f.write(" }\n")
|
|
f.write(" ```\n\n")
|
|
f.write("2. **Include `waf_rules.conf` in your `server` or `location` block:**\n")
|
|
f.write(" ```nginx\n")
|
|
f.write(" server {\n")
|
|
f.write(" # ... other server configurations ...\n")
|
|
f.write(" include /path/to/waf_patterns/nginx/waf_rules.conf;\n")
|
|
f.write(" }\n")
|
|
f.write(" ```\n\n")
|
|
f.write("3. **Reload Nginx:**\n")
|
|
f.write(" ```bash\n")
|
|
f.write(" sudo nginx -t && sudo systemctl reload nginx\n")
|
|
f.write(" ```\n\n")
|
|
f.write("## Important Notes:\n\n")
|
|
f.write("* **Testing is Crucial:** Thoroughly test your WAF configuration with a variety of requests (both legitimate and malicious) to ensure it's working correctly and not causing false positives.\n")
|
|
f.write("* **False Positives:** WAF rules, especially those based on regex, can sometimes block legitimate traffic. Monitor your Nginx logs and adjust the rules as needed.\n")
|
|
f.write("* **Performance:** Complex regexes can impact performance. Use the simplest regex that accurately matches the threat.\n")
|
|
f.write("* **Updates:** Regularly update the OWASP rules (by re-running `owasp2json.py` and `json2nginx.py`) to stay protected against new threats.\n")
|
|
f.write("* **This is not a complete WAF:** This script provides a basic WAF based on pattern matching. For more comprehensive protection, consider using a dedicated WAF solution like Nginx App Protect or ModSecurity.\n")
|
|
|
|
except IOError as e:
|
|
logger.error(f"Error writing to {readme_file}: {e}")
|
|
raise
|
|
|
|
def main():
|
|
"""Main function."""
|
|
try:
|
|
logger.info("Loading OWASP rules...")
|
|
owasp_rules = load_owasp_rules(INPUT_FILE)
|
|
logger.info(f"Loaded {len(owasp_rules)} rules.")
|
|
|
|
logger.info("Generating Nginx WAF configuration...")
|
|
generate_nginx_waf(owasp_rules)
|
|
|
|
logger.info("Nginx WAF generation complete.")
|
|
except Exception as e:
|
|
logger.critical(f"Script failed: {e}")
|
|
exit(1) # Exit with an error code
|
|
|
|
if __name__ == "__main__":
|
|
main()
|