2024-12-21 01:18:50 +01:00
import json
import os
2025-01-07 18:10:33 +01:00
import re
2025-01-03 13:16:19 +01:00
import logging
from pathlib import Path
2024-12-21 01:18:50 +01:00
from collections import defaultdict
2025-02-09 14:06:18 +00:00
from functools import lru_cache
2025-02-28 11:19:32 +01:00
from typing import List , Dict , Optional , Tuple
2024-12-21 01:18:50 +01:00
2025-02-28 11:19:32 +01:00
# --- Configuration ---
LOG_LEVEL = logging . INFO # DEBUG, INFO, WARNING, ERROR
2025-01-07 18:10:33 +01:00
INPUT_FILE = Path ( os . getenv ( " INPUT_FILE " , " owasp_rules.json " ) )
OUTPUT_DIR = Path ( os . getenv ( " OUTPUT_DIR " , " waf_patterns/nginx " ) )
2025-01-28 22:40:56 +01:00
MAPS_FILE = OUTPUT_DIR / " waf_maps.conf "
RULES_FILE = OUTPUT_DIR / " waf_rules.conf "
2025-02-28 11:19:32 +01:00
# Unsupported Nginx directives (expand as needed)
UNSUPPORTED_PATTERNS = [
" @pmFromFile " , # No direct file lookups in Nginx map
]
2024-12-21 01:18:50 +01:00
2025-02-28 11:19:32 +01:00
# --- Logging Setup ---
logging . basicConfig ( level = LOG_LEVEL , format = " %(asctime)s - %(levelname)s - %(message)s " )
logger = logging . getLogger ( __name__ )
2025-01-03 13:16:19 +01:00
2024-12-21 01:18:50 +01:00
2025-02-28 11:19:32 +01:00
# --- Utility Functions ---
def load_owasp_rules ( file_path : Path ) - > List [ Dict ] :
""" Loads OWASP rules from a JSON file. """
2025-01-03 13:16:19 +01:00
try :
2025-02-28 11:19:32 +01:00
with open ( file_path , " r " , encoding = " utf-8 " ) as f :
2025-01-03 13:16:19 +01:00
return json . load ( f )
2025-02-28 11:19:32 +01:00
except ( FileNotFoundError , json . JSONDecodeError , OSError ) as e :
logger . error ( f " Error loading rules from { file_path } : { e } " )
raise # Re-raise to prevent continuing
2025-01-03 13:16:19 +01:00
2025-02-28 11:19:32 +01:00
@lru_cache ( maxsize = 256 ) # Increased cache size
def validate_regex ( pattern : str ) - > bool :
""" Validates a regex pattern (basic check). """
2025-01-07 18:10:33 +01:00
try :
2025-02-09 14:27:36 +00:00
re . compile ( pattern )
2025-01-07 18:10:33 +01:00
return True
2025-02-28 11:19:32 +01:00
except re . error as e :
logger . warning ( f " Invalid regex: { pattern } - { e } " )
2025-01-07 18:10:33 +01:00
return False
2025-02-28 11:19:32 +01:00
def _sanitize_pattern ( pattern : str ) - > str :
""" Internal helper to clean and escape patterns for Nginx. """
pattern = pattern . replace ( " @rx " , " " ) . strip ( ) # Remove ModSecurity @rx
# Remove case-insensitive flag (?i) as Nginx uses ~* for that
pattern = re . sub ( r " \ ( \ ?i \ ) " , " " , pattern )
# Convert $ to \$
pattern = pattern . replace ( " $ " , r " \ $ " )
# Convert { or { to {
pattern = re . sub ( r " &l(?:brace|cub);? " , r " { " , pattern )
pattern = re . sub ( r " &r(?:brace|cub);? " , r " } " , pattern )
# Remove unnecessary \.*
pattern = re . sub ( r " \\ \ . \ * " , r " \ .* " , pattern )
pattern = re . sub ( r " (?<! \\ ) \ .(?![ \ w]) " , r " \ . " , pattern ) # Escape dots
# Replace non-capturing groups (?:...) with capturing groups (...)
pattern = re . sub ( r " \ ( \ ?: " , " ( " , pattern )
return pattern
def sanitize_pattern ( pattern : str , location : str ) - > Optional [ str ] :
"""
Sanitizes a pattern for use in an Nginx map directive .
Returns the sanitized pattern , or None if the pattern is unsupported .
"""
# Skip unsupported patterns.
for unsupported in UNSUPPORTED_PATTERNS :
if unsupported in pattern :
logger . warning ( f " Skipping unsupported pattern: { pattern } " )
2025-02-09 14:27:36 +00:00
return None
2025-02-28 11:19:32 +01:00
# Sanitize the pattern
pattern = _sanitize_pattern ( pattern )
# Escape special characters for Nginx map (most importantly, the ~)
# We use re.escape, but *selectively* unescape key regex metacharacters.
pattern = re . escape ( pattern )
# Unescape: \. \( \) \[ \] \| \? \* \+ \{ \} \^ \$ \\
pattern = re . sub ( r ' \\ ([.()[ \ ]|?*+ {} ^$ \\ ]) ' , r ' \ 1 ' , pattern )
return pattern
2025-01-07 18:10:33 +01:00
2025-02-28 11:19:32 +01:00
def generate_nginx_waf ( rules : List [ Dict ] ) - > None :
""" Generates Nginx WAF configuration (maps and rules). """
2025-01-07 18:10:33 +01:00
2025-02-28 11:19:32 +01:00
OUTPUT_DIR . mkdir ( parents = True , exist_ok = True )
categorized_rules : Dict [ str , Dict [ str , str ] ] = defaultdict ( lambda : defaultdict ( str ) ) # category -> location
2025-01-03 13:16:19 +01:00
2024-12-21 01:18:50 +01:00
for rule in rules :
2025-02-28 11:19:32 +01:00
rule_id = rule . get ( " id " , " no_id " ) # Get rule ID
2025-01-07 18:10:33 +01:00
category = rule . get ( " category " , " generic " ) . lower ( )
2025-02-28 11:19:32 +01:00
location = rule . get ( " location " , " request-uri " ) . lower ( ) # set a default location
pattern = rule [ " pattern " ]
severity = rule . get ( " severity " , " medium " ) . lower ( ) # get severity
sanitized_pattern = sanitize_pattern ( pattern , location )
if not sanitized_pattern or not validate_regex ( sanitized_pattern ) :
continue # Skip invalid or unsupported patterns
if location == " request-uri " :
variable = " $request_uri "
elif location == " query-string " :
variable = " $args " # Use $args for query string
elif location == " user-agent " :
variable = " $http_user_agent "
elif location == " host " :
variable = " $http_host "
elif location == " referer " :
variable = " $http_referer "
elif location == " content-type " :
variable = " $http_content_type "
# Add more location mappings here
2025-01-07 18:10:33 +01:00
else :
2025-02-28 11:19:32 +01:00
logger . warning ( f " Unsupported location: { location } for rule: { rule_id } " )
continue
# Add rule based on severity and location
categorized_rules [ category ] [ variable ] + = f ' " ~* { sanitized_pattern } " { severity } ; \n ' # set severity as value
2025-01-07 18:10:33 +01:00
2025-02-28 11:19:32 +01:00
# --- Generate Maps (waf_maps.conf) ---
2025-01-28 22:40:56 +01:00
try :
2025-02-28 11:19:32 +01:00
with open ( MAPS_FILE , " w " , encoding = " utf-8 " ) as f :
f . write ( " # Nginx WAF Maps (Generated by json2nginx.py) \n \n " )
f . write ( " http { \n " ) # Maps *must* be in the http context
for category , location_rules in categorized_rules . items ( ) :
# Create the map with the high priority
f . write ( f " map $1 $waf_ { category } {{ \n " ) # dynamic variable
f . write ( ' default " " ; \n ' ) # default value empty
for location , rules in location_rules . items ( ) :
f . write ( f " # Rules for { location } \n " )
f . write ( rules ) # Write the collected rules for this location
f . write ( " \n " )
f . write ( " } \n \n " )
f . write ( " } \n " ) # Close the http block
logger . info ( f " Generated Nginx map file: { MAPS_FILE } " )
2025-01-28 22:40:56 +01:00
except IOError as e :
2025-02-28 11:19:32 +01:00
logger . error ( f " Error writing to { MAPS_FILE } : { e } " )
raise
2025-01-28 22:40:56 +01:00
2025-02-28 11:19:32 +01:00
# --- Generate Rules (waf_rules.conf) ---
2025-01-28 22:40:56 +01:00
try :
2025-02-28 11:19:32 +01:00
with open ( RULES_FILE , " w " , encoding = " utf-8 " ) as f :
f . write ( " # Nginx WAF Rules (Generated by json2nginx.py) \n \n " )
f . write ( " # Include this file in your ' server ' or ' location ' block. \n \n " )
# iterate for each rule
for category , location_rules in categorized_rules . items ( ) :
# set map to correct WAF block
map_variable = f " $waf_ { category } "
# create conditions based on priority
f . write ( f ' if ( { map_variable } = " high " ) {{ \n return 403; \n }} \n ' )
f . write ( f ' if ( { map_variable } = " medium " ) {{ \n add_header X-WAF-Blocked " medium- { category } " ; \n }} \n ' ) # example for another action
f . write ( f ' if ( { map_variable } = " low " ) {{ \n add_header X-WAF-Blocked " low- { category } " ; \n }} \n \n ' ) # expample for other action
logger . info ( f " Generated Nginx rules file: { RULES_FILE } " )
2025-01-28 22:40:56 +01:00
except IOError as e :
2025-02-28 11:19:32 +01:00
logger . error ( f " Error writing to { RULES_FILE } : { e } " )
raise
2025-01-03 13:16:19 +01:00
2025-02-28 11:19:32 +01:00
# --- Generate README ---
2025-01-16 13:41:07 +01:00
readme_file = OUTPUT_DIR / " README.md "
2025-02-28 11:19:32 +01:00
try :
with open ( readme_file , " w " , encoding = " utf-8 " ) as f :
f . write ( " # Nginx WAF Configuration \n \n " )
f . write ( " This directory contains Nginx WAF configuration files generated from OWASP rules. \n \n " )
f . write ( " ## Usage \n \n " )
f . write ( " 1. **Include `waf_maps.conf` in your `http` block:** \n " )
f . write ( " ```nginx \n " )
f . write ( " http { \n " )
f . write ( " include /path/to/waf_patterns/nginx/waf_maps.conf; \n " )
f . write ( " # ... other http configurations ... \n " )
f . write ( " } \n " )
f . write ( " ``` \n \n " )
f . write ( " 2. **Include `waf_rules.conf` in your `server` or `location` block:** \n " )
f . write ( " ```nginx \n " )
f . write ( " server { \n " )
f . write ( " # ... other server configurations ... \n " )
f . write ( " include /path/to/waf_patterns/nginx/waf_rules.conf; \n " )
f . write ( " } \n " )
f . write ( " ``` \n \n " )
f . write ( " 3. **Reload Nginx:** \n " )
f . write ( " ```bash \n " )
f . write ( " sudo nginx -t && sudo systemctl reload nginx \n " )
f . write ( " ``` \n \n " )
f . write ( " ## Important Notes: \n \n " )
f . write ( " * **Testing is Crucial:** Thoroughly test your WAF configuration with a variety of requests (both legitimate and malicious) to ensure it ' s working correctly and not causing false positives. \n " )
f . write ( " * **False Positives:** WAF rules, especially those based on regex, can sometimes block legitimate traffic. Monitor your Nginx logs and adjust the rules as needed. \n " )
f . write ( " * **Performance:** Complex regexes can impact performance. Use the simplest regex that accurately matches the threat. \n " )
f . write ( " * **Updates:** Regularly update the OWASP rules (by re-running `owasp2json.py` and `json2nginx.py`) to stay protected against new threats. \n " )
f . write ( " * **This is not a complete WAF:** This script provides a basic WAF based on pattern matching. For more comprehensive protection, consider using a dedicated WAF solution like Nginx App Protect or ModSecurity. \n " )
2025-01-16 13:41:07 +01:00
2025-02-28 11:19:32 +01:00
except IOError as e :
logger . error ( f " Error writing to { readme_file } : { e } " )
raise
2025-01-03 13:16:19 +01:00
def main ( ) :
2025-02-28 11:19:32 +01:00
""" Main function. """
2025-01-03 13:16:19 +01:00
try :
2025-02-28 11:19:32 +01:00
logger . info ( " Loading OWASP rules... " )
2025-01-03 13:16:19 +01:00
owasp_rules = load_owasp_rules ( INPUT_FILE )
2025-02-28 11:19:32 +01:00
logger . info ( f " Loaded { len ( owasp_rules ) } rules. " )
2025-01-03 13:16:19 +01:00
2025-02-28 11:19:32 +01:00
logger . info ( " Generating Nginx WAF configuration... " )
2025-01-03 13:16:19 +01:00
generate_nginx_waf ( owasp_rules )
2025-02-28 11:19:32 +01:00
logger . info ( " Nginx WAF generation complete. " )
2025-01-03 13:16:19 +01:00
except Exception as e :
2025-02-28 11:19:32 +01:00
logger . critical ( f " Script failed: { e } " )
exit ( 1 ) # Exit with an error code
2025-01-03 13:16:19 +01:00
if __name__ == " __main__ " :
2025-02-28 11:19:32 +01:00
main ( )