Update json2nginx.py

This commit is contained in:
fab 2025-02-28 11:19:32 +01:00 committed by GitHub
parent 1a3c968ba2
commit 676c53d383
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -5,172 +5,219 @@ import logging
from pathlib import Path from pathlib import Path
from collections import defaultdict from collections import defaultdict
from functools import lru_cache from functools import lru_cache
from typing import List, Dict, Optional, Tuple
# Configure logging # --- Configuration ---
logging.basicConfig( LOG_LEVEL = logging.INFO # DEBUG, INFO, WARNING, ERROR
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
# Input and output paths
INPUT_FILE = Path(os.getenv("INPUT_FILE", "owasp_rules.json")) INPUT_FILE = Path(os.getenv("INPUT_FILE", "owasp_rules.json"))
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "waf_patterns/nginx")) OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "waf_patterns/nginx"))
MAPS_FILE = OUTPUT_DIR / "waf_maps.conf" MAPS_FILE = OUTPUT_DIR / "waf_maps.conf"
RULES_FILE = OUTPUT_DIR / "waf_rules.conf" RULES_FILE = OUTPUT_DIR / "waf_rules.conf"
# Unsupported Nginx directives (expand as needed)
UNSUPPORTED_PATTERNS = [
"@pmFromFile", # No direct file lookups in Nginx map
]
# Create output directory if it doesn't exist # --- Logging Setup ---
OUTPUT_DIR.mkdir(parents=True, exist_ok=True) logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def load_owasp_rules(file_path): # --- Utility Functions ---
"""Load OWASP rules from a JSON file.""" def load_owasp_rules(file_path: Path) -> List[Dict]:
"""Loads OWASP rules from a JSON file."""
try: try:
with open(file_path, "r") as f: with open(file_path, "r", encoding="utf-8") as f:
return json.load(f) return json.load(f)
except FileNotFoundError: except (FileNotFoundError, json.JSONDecodeError, OSError) as e:
logging.error(f"Input file not found: {file_path}") logger.error(f"Error loading rules from {file_path}: {e}")
raise raise # Re-raise to prevent continuing
except json.JSONDecodeError:
logging.error(f"Invalid JSON in file: {file_path}")
raise
@lru_cache(maxsize=256) # Increased cache size
@lru_cache(maxsize=128) def validate_regex(pattern: str) -> bool:
def validate_regex(pattern): """Validates a regex pattern (basic check)."""
"""Validate if a pattern is a valid regex."""
try: try:
re.compile(pattern) re.compile(pattern)
return True return True
except re.error: except re.error as e:
logger.warning(f"Invalid regex: {pattern} - {e}")
return False return False
def _sanitize_pattern(pattern: str) -> str:
"""Internal helper to clean and escape patterns for Nginx."""
pattern = pattern.replace("@rx ", "").strip() # Remove ModSecurity @rx
# Remove case-insensitive flag (?i) as Nginx uses ~* for that
pattern = re.sub(r"\(\?i\)", "", pattern)
@lru_cache(maxsize=128) # Convert $ to \$
def sanitize_pattern(pattern): pattern = pattern.replace("$", r"\$")
"""Sanitize and validate OWASP patterns for Nginx compatibility."""
# Directly proceed with check for unsupported patterns # Convert { or { to {
unsupported_keywords = ["@pmFromFile", "!@eq", "!@within", "@lt"] pattern = re.sub(r"&l(?:brace|cub);?", r"{", pattern)
if any(keyword in pattern for keyword in unsupported_keywords): pattern = re.sub(r"&r(?:brace|cub);?", r"}", pattern)
logging.warning(f"Skipping unsupported pattern: {pattern}")
# Remove unnecessary \.*
pattern = re.sub(r"\\\.\*", r"\.*", pattern)
pattern = re.sub(r"(?<!\\)\.(?![\w])", r"\.", pattern) # Escape dots
# Replace non-capturing groups (?:...) with capturing groups (...)
pattern = re.sub(r"\(\?:", "(", pattern)
return pattern
def sanitize_pattern(pattern: str, location: str) -> Optional[str]:
"""
Sanitizes a pattern for use in an Nginx map directive.
Returns the sanitized pattern, or None if the pattern is unsupported.
"""
# Skip unsupported patterns.
for unsupported in UNSUPPORTED_PATTERNS:
if unsupported in pattern:
logger.warning(f"Skipping unsupported pattern: {pattern}")
return None return None
# Faster check for patterns starting with "@rx " # Sanitize the pattern
if pattern.startswith("@rx "): pattern = _sanitize_pattern(pattern)
sanitized_pattern = pattern.replace("@rx ", "").strip()
if validate_regex(sanitized_pattern):
return re.escape(sanitized_pattern).replace(r'\@', '@')
else:
logging.warning(f"Invalid regex in pattern: {sanitized_pattern}")
return None
if validate_regex(pattern): # Escape special characters for Nginx map (most importantly, the ~)
return re.escape(pattern).replace(r'\@', '@') # We use re.escape, but *selectively* unescape key regex metacharacters.
else: pattern = re.escape(pattern)
logging.warning(f"Invalid regex in pattern: {pattern}") # Unescape: \. \( \) \[ \] \| \? \* \+ \{ \} \^ \$ \\
return None pattern = re.sub(r'\\([.()[\]|?*+{}^$\\])', r'\1', pattern)
return pattern
def generate_nginx_waf(rules: List[Dict]) -> None:
"""Generates Nginx WAF configuration (maps and rules)."""
def generate_nginx_waf(rules): OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
"""Generate Nginx WAF configuration snippets from OWASP rules.""" categorized_rules: Dict[str, Dict[str, str]] = defaultdict(lambda: defaultdict(str)) # category -> location
categorized_rules = defaultdict(set)
# Group rules by category
for rule in rules: for rule in rules:
rule_id = rule.get("id", "no_id") # Get rule ID
category = rule.get("category", "generic").lower() category = rule.get("category", "generic").lower()
pattern = rule.get("pattern") location = rule.get("location", "request-uri").lower() # set a default location
pattern = rule["pattern"]
severity = rule.get("severity", "medium").lower() # get severity
sanitized_pattern = sanitize_pattern(pattern) sanitized_pattern = sanitize_pattern(pattern, location)
if sanitized_pattern: if not sanitized_pattern or not validate_regex(sanitized_pattern):
categorized_rules[category].add(sanitized_pattern) continue # Skip invalid or unsupported patterns
if location == "request-uri":
variable = "$request_uri"
elif location == "query-string":
variable = "$args" # Use $args for query string
elif location == "user-agent":
variable = "$http_user_agent"
elif location == "host":
variable = "$http_host"
elif location == "referer":
variable = "$http_referer"
elif location == "content-type":
variable = "$http_content_type"
# Add more location mappings here
else: else:
logging.warning(f"Invalid or unsupported pattern skipped: {pattern}") logger.warning(f"Unsupported location: {location} for rule: {rule_id}")
continue
# Add rule based on severity and location
categorized_rules[category][variable] += f' "~*{sanitized_pattern}" {severity};\n' # set severity as value
# Write map definitions to a dedicated file
# --- Generate Maps (waf_maps.conf) ---
try: try:
with open(MAPS_FILE, "w") as f: with open(MAPS_FILE, "w", encoding="utf-8") as f:
f.write("# Nginx WAF Maps Definitions\n") f.write("# Nginx WAF Maps (Generated by json2nginx.py)\n\n")
f.write("# Automatically generated from OWASP rules.\n\n") f.write("http {\n") # Maps *must* be in the http context
f.write("http {\n")
for category, patterns in categorized_rules.items():
f.write(f" map $request_uri $waf_block_{category} {{\n")
f.write(" default 0;\n")
for pattern in patterns:
escaped_pattern = pattern.replace('"', '\\"')
f.write(f' "~*{escaped_pattern}" 1;\n')
f.write(" }\n\n")
f.write("}\n")
logging.info(f"Generated {MAPS_FILE} containing map definitions") for category, location_rules in categorized_rules.items():
except IOError as e: # Create the map with the high priority
logging.error(f"Failed to write {MAPS_FILE}: {e}") f.write(f" map $1 $waf_{category} {{\n") # dynamic variable
f.write(' default "";\n') # default value empty
for location, rules in location_rules.items():
# Write if blocks to a dedicated file f.write(f" # Rules for {location}\n")
try: f.write(rules) # Write the collected rules for this location
with open(RULES_FILE, "w") as f: f.write("\n")
f.write("# Nginx WAF Rules\n")
f.write("# Automatically generated from OWASP rules.\n")
f.write("# Include this file inside server block\n\n")
f.write(" # WAF rules\n")
for category in categorized_rules.keys():
f.write(f" if ($waf_block_{category}) {{\n")
f.write(" return 403;\n")
f.write(" # Log the blocked request (optional)\n")
f.write(" # access_log /var/log/nginx/waf_blocked.log;\n")
f.write(" }\n\n") f.write(" }\n\n")
logging.info(f"Generated {RULES_FILE} containing rules") f.write("}\n") # Close the http block
logger.info(f"Generated Nginx map file: {MAPS_FILE}")
except IOError as e: except IOError as e:
logging.error(f"Failed to write {RULES_FILE}: {e}") logger.error(f"Error writing to {MAPS_FILE}: {e}")
raise
# Generate a README file with usage instructions # --- Generate Rules (waf_rules.conf) ---
try:
with open(RULES_FILE, "w", encoding="utf-8") as f:
f.write("# Nginx WAF Rules (Generated by json2nginx.py)\n\n")
f.write("# Include this file in your 'server' or 'location' block.\n\n")
# iterate for each rule
for category, location_rules in categorized_rules.items():
# set map to correct WAF block
map_variable = f"$waf_{category}"
# create conditions based on priority
f.write(f' if ({map_variable} = "high") {{\n return 403;\n }}\n')
f.write(f' if ({map_variable} = "medium") {{\n add_header X-WAF-Blocked "medium-{category}";\n }}\n') # example for another action
f.write(f' if ({map_variable} = "low") {{\n add_header X-WAF-Blocked "low-{category}";\n }}\n\n') # expample for other action
logger.info(f"Generated Nginx rules file: {RULES_FILE}")
except IOError as e:
logger.error(f"Error writing to {RULES_FILE}: {e}")
raise
# --- Generate README ---
readme_file = OUTPUT_DIR / "README.md" readme_file = OUTPUT_DIR / "README.md"
with open(readme_file, "w") as f: try:
with open(readme_file, "w", encoding="utf-8") as f:
f.write("# Nginx WAF Configuration\n\n") f.write("# Nginx WAF Configuration\n\n")
f.write("This directory contains Nginx WAF configuration files generated from OWASP rules.\n") f.write("This directory contains Nginx WAF configuration files generated from OWASP rules.\n\n")
f.write("You can include these files in your existing Nginx configuration to enhance security.\n\n") f.write("## Usage\n\n")
f.write("## Usage\n") f.write("1. **Include `waf_maps.conf` in your `http` block:**\n")
f.write("1. Include the `waf_maps.conf` file in your `nginx.conf` *inside the `http` block*:\n")
f.write(" ```nginx\n") f.write(" ```nginx\n")
f.write(" http {\n") f.write(" http {\n")
f.write(" include /path/to/waf_patterns/nginx/waf_maps.conf;\n") f.write(" include /path/to/waf_patterns/nginx/waf_maps.conf;\n")
f.write(" # ... other http configurations ...\n") f.write(" # ... other http configurations ...\n")
f.write(" }\n") f.write(" }\n")
f.write(" ```\n") f.write(" ```\n\n")
f.write("2. Include the `waf_rules.conf` file in your `server` block:\n") f.write("2. **Include `waf_rules.conf` in your `server` or `location` block:**\n")
f.write(" ```nginx\n") f.write(" ```nginx\n")
f.write(" server {\n") f.write(" server {\n")
f.write(" # ... other server configurations ...\n") f.write(" # ... other server configurations ...\n")
f.write(" include /path/to/waf_patterns/nginx/waf_rules.conf;\n") f.write(" include /path/to/waf_patterns/nginx/waf_rules.conf;\n")
f.write(" }\n") f.write(" }\n")
f.write(" ```\n") f.write(" ```\n\n")
f.write("3. Reload Nginx to apply the changes:\n") f.write("3. **Reload Nginx:**\n")
f.write(" ```bash\n") f.write(" ```bash\n")
f.write(" sudo nginx -t && sudo systemctl reload nginx\n") f.write(" sudo nginx -t && sudo systemctl reload nginx\n")
f.write(" ```\n") f.write(" ```\n\n")
f.write("\n## Notes\n") f.write("## Important Notes:\n\n")
f.write("- The rules use `map` directives for efficient pattern matching. The maps are defined in the `waf_maps.conf` file.\n") f.write("* **Testing is Crucial:** Thoroughly test your WAF configuration with a variety of requests (both legitimate and malicious) to ensure it's working correctly and not causing false positives.\n")
f.write("- The rules (if statements) are defined in the `waf_rules.conf` file.\n") f.write("* **False Positives:** WAF rules, especially those based on regex, can sometimes block legitimate traffic. Monitor your Nginx logs and adjust the rules as needed.\n")
f.write("- Blocked requests return a `403 Forbidden` response by default.\n") f.write("* **Performance:** Complex regexes can impact performance. Use the simplest regex that accurately matches the threat.\n")
f.write("- You can enable logging for blocked requests by uncommenting the `access_log` line.\n") f.write("* **Updates:** Regularly update the OWASP rules (by re-running `owasp2json.py` and `json2nginx.py`) to stay protected against new threats.\n")
f.write("* **This is not a complete WAF:** This script provides a basic WAF based on pattern matching. For more comprehensive protection, consider using a dedicated WAF solution like Nginx App Protect or ModSecurity.\n")
except IOError as e:
logger.error(f"Error writing to {readme_file}: {e}")
raise
def main(): def main():
"""Main function to load rules and generate Nginx configurations.""" """Main function."""
try: try:
logging.info("Loading OWASP rules...") logger.info("Loading OWASP rules...")
owasp_rules = load_owasp_rules(INPUT_FILE) owasp_rules = load_owasp_rules(INPUT_FILE)
logger.info(f"Loaded {len(owasp_rules)} rules.")
logging.info(f"Generating Nginx WAF configs from {len(owasp_rules)} rules...") logger.info("Generating Nginx WAF configuration...")
generate_nginx_waf(owasp_rules) generate_nginx_waf(owasp_rules)
logging.info("Nginx WAF configurations generated successfully.") logger.info("Nginx WAF generation complete.")
except Exception as e: except Exception as e:
logging.critical(f"Script failed: {e}") logger.critical(f"Script failed: {e}")
exit(1) exit(1) # Exit with an error code
if __name__ == "__main__": if __name__ == "__main__":
main() main()