diff --git a/json2nginx.py b/json2nginx.py index 390f10c..a37ae9d 100644 --- a/json2nginx.py +++ b/json2nginx.py @@ -5,172 +5,219 @@ import logging from pathlib import Path from collections import defaultdict from functools import lru_cache +from typing import List, Dict, Optional, Tuple -# Configure logging -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(levelname)s - %(message)s", - handlers=[logging.StreamHandler()], -) - -# Input and output paths +# --- Configuration --- +LOG_LEVEL = logging.INFO # DEBUG, INFO, WARNING, ERROR INPUT_FILE = Path(os.getenv("INPUT_FILE", "owasp_rules.json")) OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "waf_patterns/nginx")) MAPS_FILE = OUTPUT_DIR / "waf_maps.conf" RULES_FILE = OUTPUT_DIR / "waf_rules.conf" +# Unsupported Nginx directives (expand as needed) +UNSUPPORTED_PATTERNS = [ + "@pmFromFile", # No direct file lookups in Nginx map +] -# Create output directory if it doesn't exist -OUTPUT_DIR.mkdir(parents=True, exist_ok=True) +# --- Logging Setup --- +logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) -def load_owasp_rules(file_path): - """Load OWASP rules from a JSON file.""" +# --- Utility Functions --- +def load_owasp_rules(file_path: Path) -> List[Dict]: + """Loads OWASP rules from a JSON file.""" try: - with open(file_path, "r") as f: + with open(file_path, "r", encoding="utf-8") as f: return json.load(f) - except FileNotFoundError: - logging.error(f"Input file not found: {file_path}") - raise - except json.JSONDecodeError: - logging.error(f"Invalid JSON in file: {file_path}") - raise + except (FileNotFoundError, json.JSONDecodeError, OSError) as e: + logger.error(f"Error loading rules from {file_path}: {e}") + raise # Re-raise to prevent continuing - -@lru_cache(maxsize=128) -def validate_regex(pattern): - """Validate if a pattern is a valid regex.""" +@lru_cache(maxsize=256) # Increased cache size +def validate_regex(pattern: str) -> bool: + """Validates a regex pattern (basic check).""" try: re.compile(pattern) return True - except re.error: + except re.error as e: + logger.warning(f"Invalid regex: {pattern} - {e}") return False +def _sanitize_pattern(pattern: str) -> str: + """Internal helper to clean and escape patterns for Nginx.""" + pattern = pattern.replace("@rx ", "").strip() # Remove ModSecurity @rx + # Remove case-insensitive flag (?i) as Nginx uses ~* for that + pattern = re.sub(r"\(\?i\)", "", pattern) -@lru_cache(maxsize=128) -def sanitize_pattern(pattern): - """Sanitize and validate OWASP patterns for Nginx compatibility.""" - # Directly proceed with check for unsupported patterns - unsupported_keywords = ["@pmFromFile", "!@eq", "!@within", "@lt"] - if any(keyword in pattern for keyword in unsupported_keywords): - logging.warning(f"Skipping unsupported pattern: {pattern}") - return None + # Convert $ to \$ + pattern = pattern.replace("$", r"\$") - # Faster check for patterns starting with "@rx " - if pattern.startswith("@rx "): - sanitized_pattern = pattern.replace("@rx ", "").strip() - if validate_regex(sanitized_pattern): - return re.escape(sanitized_pattern).replace(r'\@', '@') - else: - logging.warning(f"Invalid regex in pattern: {sanitized_pattern}") + # Convert { or { to { + pattern = re.sub(r"&l(?:brace|cub);?", r"{", pattern) + pattern = re.sub(r"&r(?:brace|cub);?", r"}", pattern) + + # Remove unnecessary \.* + pattern = re.sub(r"\\\.\*", r"\.*", pattern) + pattern = re.sub(r"(? Optional[str]: + """ + Sanitizes a pattern for use in an Nginx map directive. + Returns the sanitized pattern, or None if the pattern is unsupported. + """ + # Skip unsupported patterns. + for unsupported in UNSUPPORTED_PATTERNS: + if unsupported in pattern: + logger.warning(f"Skipping unsupported pattern: {pattern}") return None - if validate_regex(pattern): - return re.escape(pattern).replace(r'\@', '@') - else: - logging.warning(f"Invalid regex in pattern: {pattern}") - return None + # Sanitize the pattern + pattern = _sanitize_pattern(pattern) + # Escape special characters for Nginx map (most importantly, the ~) + # We use re.escape, but *selectively* unescape key regex metacharacters. + pattern = re.escape(pattern) + # Unescape: \. \( \) \[ \] \| \? \* \+ \{ \} \^ \$ \\ + pattern = re.sub(r'\\([.()[\]|?*+{}^$\\])', r'\1', pattern) + return pattern -def generate_nginx_waf(rules): - """Generate Nginx WAF configuration snippets from OWASP rules.""" - categorized_rules = defaultdict(set) +def generate_nginx_waf(rules: List[Dict]) -> None: + """Generates Nginx WAF configuration (maps and rules).""" + + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + categorized_rules: Dict[str, Dict[str, str]] = defaultdict(lambda: defaultdict(str)) # category -> location - # Group rules by category for rule in rules: + rule_id = rule.get("id", "no_id") # Get rule ID category = rule.get("category", "generic").lower() - pattern = rule.get("pattern") + location = rule.get("location", "request-uri").lower() # set a default location + pattern = rule["pattern"] + severity = rule.get("severity", "medium").lower() # get severity - sanitized_pattern = sanitize_pattern(pattern) - if sanitized_pattern: - categorized_rules[category].add(sanitized_pattern) + sanitized_pattern = sanitize_pattern(pattern, location) + if not sanitized_pattern or not validate_regex(sanitized_pattern): + continue # Skip invalid or unsupported patterns + + + if location == "request-uri": + variable = "$request_uri" + elif location == "query-string": + variable = "$args" # Use $args for query string + elif location == "user-agent": + variable = "$http_user_agent" + elif location == "host": + variable = "$http_host" + elif location == "referer": + variable = "$http_referer" + elif location == "content-type": + variable = "$http_content_type" + # Add more location mappings here else: - logging.warning(f"Invalid or unsupported pattern skipped: {pattern}") + logger.warning(f"Unsupported location: {location} for rule: {rule_id}") + continue + # Add rule based on severity and location + categorized_rules[category][variable] += f' "~*{sanitized_pattern}" {severity};\n' # set severity as value - # Write map definitions to a dedicated file + + # --- Generate Maps (waf_maps.conf) --- try: - with open(MAPS_FILE, "w") as f: - f.write("# Nginx WAF Maps Definitions\n") - f.write("# Automatically generated from OWASP rules.\n\n") - f.write("http {\n") - for category, patterns in categorized_rules.items(): - f.write(f" map $request_uri $waf_block_{category} {{\n") - f.write(" default 0;\n") - for pattern in patterns: - escaped_pattern = pattern.replace('"', '\\"') - f.write(f' "~*{escaped_pattern}" 1;\n') - f.write(" }\n\n") - f.write("}\n") + with open(MAPS_FILE, "w", encoding="utf-8") as f: + f.write("# Nginx WAF Maps (Generated by json2nginx.py)\n\n") + f.write("http {\n") # Maps *must* be in the http context - logging.info(f"Generated {MAPS_FILE} containing map definitions") + for category, location_rules in categorized_rules.items(): + # Create the map with the high priority + f.write(f" map $1 $waf_{category} {{\n") # dynamic variable + f.write(' default "";\n') # default value empty + for location, rules in location_rules.items(): + f.write(f" # Rules for {location}\n") + f.write(rules) # Write the collected rules for this location + f.write("\n") + f.write(" }\n\n") + + f.write("}\n") # Close the http block + logger.info(f"Generated Nginx map file: {MAPS_FILE}") except IOError as e: - logging.error(f"Failed to write {MAPS_FILE}: {e}") + logger.error(f"Error writing to {MAPS_FILE}: {e}") + raise - - # Write if blocks to a dedicated file + # --- Generate Rules (waf_rules.conf) --- try: - with open(RULES_FILE, "w") as f: - f.write("# Nginx WAF Rules\n") - f.write("# Automatically generated from OWASP rules.\n") - f.write("# Include this file inside server block\n\n") - f.write(" # WAF rules\n") - for category in categorized_rules.keys(): - f.write(f" if ($waf_block_{category}) {{\n") - f.write(" return 403;\n") - f.write(" # Log the blocked request (optional)\n") - f.write(" # access_log /var/log/nginx/waf_blocked.log;\n") - f.write(" }\n\n") + with open(RULES_FILE, "w", encoding="utf-8") as f: + f.write("# Nginx WAF Rules (Generated by json2nginx.py)\n\n") + f.write("# Include this file in your 'server' or 'location' block.\n\n") + + # iterate for each rule + for category, location_rules in categorized_rules.items(): + # set map to correct WAF block + map_variable = f"$waf_{category}" + # create conditions based on priority + f.write(f' if ({map_variable} = "high") {{\n return 403;\n }}\n') + f.write(f' if ({map_variable} = "medium") {{\n add_header X-WAF-Blocked "medium-{category}";\n }}\n') # example for another action + f.write(f' if ({map_variable} = "low") {{\n add_header X-WAF-Blocked "low-{category}";\n }}\n\n') # expample for other action + logger.info(f"Generated Nginx rules file: {RULES_FILE}") - logging.info(f"Generated {RULES_FILE} containing rules") except IOError as e: - logging.error(f"Failed to write {RULES_FILE}: {e}") + logger.error(f"Error writing to {RULES_FILE}: {e}") + raise - # Generate a README file with usage instructions + + # --- Generate README --- readme_file = OUTPUT_DIR / "README.md" - with open(readme_file, "w") as f: - f.write("# Nginx WAF Configuration\n\n") - f.write("This directory contains Nginx WAF configuration files generated from OWASP rules.\n") - f.write("You can include these files in your existing Nginx configuration to enhance security.\n\n") - f.write("## Usage\n") - f.write("1. Include the `waf_maps.conf` file in your `nginx.conf` *inside the `http` block*:\n") - f.write(" ```nginx\n") - f.write(" http {\n") - f.write(" include /path/to/waf_patterns/nginx/waf_maps.conf;\n") - f.write(" # ... other http configurations ...\n") - f.write(" }\n") - f.write(" ```\n") - f.write("2. Include the `waf_rules.conf` file in your `server` block:\n") - f.write(" ```nginx\n") - f.write(" server {\n") - f.write(" # ... other server configurations ...\n") - f.write(" include /path/to/waf_patterns/nginx/waf_rules.conf;\n") - f.write(" }\n") - f.write(" ```\n") - f.write("3. Reload Nginx to apply the changes:\n") - f.write(" ```bash\n") - f.write(" sudo nginx -t && sudo systemctl reload nginx\n") - f.write(" ```\n") - f.write("\n## Notes\n") - f.write("- The rules use `map` directives for efficient pattern matching. The maps are defined in the `waf_maps.conf` file.\n") - f.write("- The rules (if statements) are defined in the `waf_rules.conf` file.\n") - f.write("- Blocked requests return a `403 Forbidden` response by default.\n") - f.write("- You can enable logging for blocked requests by uncommenting the `access_log` line.\n") + try: + with open(readme_file, "w", encoding="utf-8") as f: + f.write("# Nginx WAF Configuration\n\n") + f.write("This directory contains Nginx WAF configuration files generated from OWASP rules.\n\n") + f.write("## Usage\n\n") + f.write("1. **Include `waf_maps.conf` in your `http` block:**\n") + f.write(" ```nginx\n") + f.write(" http {\n") + f.write(" include /path/to/waf_patterns/nginx/waf_maps.conf;\n") + f.write(" # ... other http configurations ...\n") + f.write(" }\n") + f.write(" ```\n\n") + f.write("2. **Include `waf_rules.conf` in your `server` or `location` block:**\n") + f.write(" ```nginx\n") + f.write(" server {\n") + f.write(" # ... other server configurations ...\n") + f.write(" include /path/to/waf_patterns/nginx/waf_rules.conf;\n") + f.write(" }\n") + f.write(" ```\n\n") + f.write("3. **Reload Nginx:**\n") + f.write(" ```bash\n") + f.write(" sudo nginx -t && sudo systemctl reload nginx\n") + f.write(" ```\n\n") + f.write("## Important Notes:\n\n") + f.write("* **Testing is Crucial:** Thoroughly test your WAF configuration with a variety of requests (both legitimate and malicious) to ensure it's working correctly and not causing false positives.\n") + f.write("* **False Positives:** WAF rules, especially those based on regex, can sometimes block legitimate traffic. Monitor your Nginx logs and adjust the rules as needed.\n") + f.write("* **Performance:** Complex regexes can impact performance. Use the simplest regex that accurately matches the threat.\n") + f.write("* **Updates:** Regularly update the OWASP rules (by re-running `owasp2json.py` and `json2nginx.py`) to stay protected against new threats.\n") + f.write("* **This is not a complete WAF:** This script provides a basic WAF based on pattern matching. For more comprehensive protection, consider using a dedicated WAF solution like Nginx App Protect or ModSecurity.\n") + except IOError as e: + logger.error(f"Error writing to {readme_file}: {e}") + raise def main(): - """Main function to load rules and generate Nginx configurations.""" + """Main function.""" try: - logging.info("Loading OWASP rules...") + logger.info("Loading OWASP rules...") owasp_rules = load_owasp_rules(INPUT_FILE) + logger.info(f"Loaded {len(owasp_rules)} rules.") - logging.info(f"Generating Nginx WAF configs from {len(owasp_rules)} rules...") + logger.info("Generating Nginx WAF configuration...") generate_nginx_waf(owasp_rules) - logging.info("Nginx WAF configurations generated successfully.") + logger.info("Nginx WAF generation complete.") except Exception as e: - logging.critical(f"Script failed: {e}") - exit(1) - + logger.critical(f"Script failed: {e}") + exit(1) # Exit with an error code if __name__ == "__main__": - main() \ No newline at end of file + main()