Update import_nginx_waf.py

This commit is contained in:
fab 2025-02-28 11:22:21 +01:00 committed by GitHub
parent edd338a311
commit 8277d8bc13
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,122 +2,150 @@ import os
import subprocess
import logging
from pathlib import Path
import shutil
import filecmp
import time
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
# --- Configuration ---
LOG_LEVEL = logging.INFO # DEBUG, INFO, WARNING, ERROR
WAF_DIR = Path(os.getenv("WAF_DIR", "waf_patterns/nginx")).resolve()
NGINX_WAF_DIR = Path(os.getenv("NGINX_WAF_DIR", "/etc/nginx/waf/")).resolve()
NGINX_CONF = Path(os.getenv("NGINX_CONF", "/etc/nginx/nginx.conf")).resolve()
BACKUP_DIR = Path(os.getenv("BACKUP_DIR", "/etc/nginx/waf_backup/")).resolve()
INCLUDE_STATEMENT = "include /etc/nginx/waf/*.conf;"
# --- Logging Setup ---
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Constants (configurable via environment variables)
WAF_DIR = Path(os.getenv("WAF_DIR", "/tmp/waf_patterns/nginx")).resolve() # Source directory for WAF files
NGINX_WAF_DIR = Path(os.getenv("NGINX_WAF_DIR", "/etc/nginx/waf/")).resolve() # Target directory
NGINX_CONF = Path(os.getenv("NGINX_CONF", "/etc/nginx/nginx.conf")).resolve() # Nginx config file
INCLUDE_STATEMENT = "include /etc/nginx/waf/*.conf;" # Include directive
def copy_waf_files():
"""
Copy Nginx WAF configuration files to the target directory.
"""Copies WAF files, handling existing files, and creating backups."""
logger.info("Copying Nginx WAF patterns...")
Raises:
Exception: If there is an error copying files.
"""
logging.info("Copying Nginx WAF patterns...")
try:
# Ensure the target directory exists
NGINX_WAF_DIR.mkdir(parents=True, exist_ok=True)
logging.info(f"[+] Created or verified directory: {NGINX_WAF_DIR}")
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
# Copy .conf files from source to target directory
for conf_file in WAF_DIR.glob("*.conf"):
dst_path = NGINX_WAF_DIR / conf_file.name
if dst_path.exists():
logging.warning(f"[!] File already exists: {dst_path}")
continue
try:
subprocess.run(["cp", str(conf_file), str(dst_path)], check=True)
logging.info(f"[+] Copied {conf_file} to {NGINX_WAF_DIR}")
except subprocess.CalledProcessError as e:
logging.error(f"[!] Failed to copy {conf_file}: {e}")
raise
except Exception as e:
logging.error(f"[!] Error copying WAF files: {e}")
if dst_path.exists():
# Compare and backup if different
if filecmp.cmp(conf_file, dst_path, shallow=False):
logger.info(f"Skipping {conf_file.name} (identical file exists).")
continue
# Backup different file
backup_path = BACKUP_DIR / f"{dst_path.name}.{int(time.time())}"
logger.warning(f"Existing {dst_path.name} differs. Backing up to {backup_path}")
shutil.copy2(dst_path, backup_path)
# Copy the (new/updated) file
shutil.copy2(conf_file, dst_path)
logger.info(f"Copied {conf_file.name} to {dst_path}")
except OSError as e:
logger.error(f"Error copying {conf_file.name}: {e}")
raise
def update_nginx_conf():
"""
Ensure the WAF include statement is present in the Nginx configuration file.
Raises:
Exception: If there is an error updating the Nginx configuration.
"""
logging.info("Ensuring WAF patterns are included in nginx.conf...")
"""Ensures the include directive is present in nginx.conf (http context)."""
logger.info("Checking Nginx configuration for WAF include...")
try:
# Read the current configuration
with open(NGINX_CONF, "r") as f:
config = f.read()
config_lines = f.readlines()
# Append include statement if not present
if INCLUDE_STATEMENT not in config:
logging.info("Adding WAF include to nginx.conf...")
# Check if the include statement is already present.
include_present = any(INCLUDE_STATEMENT in line for line in config_lines)
if not include_present:
# Find the 'http' block. This is where the include belongs.
http_start = -1
for i, line in enumerate(config_lines):
if line.strip().startswith("http {"):
http_start = i
break
if http_start == -1:
# No http block found. Add the include *and* the http block.
logger.warning("No 'http' block found. Adding to end of file.")
with open(NGINX_CONF, "a") as f:
f.write(f"\n{INCLUDE_STATEMENT}\n")
logging.info("[+] WAF include statement added to nginx.conf.")
f.write(f"\nhttp {{\n {INCLUDE_STATEMENT}\n}}\n")
logger.info(f"Added 'http' block and WAF include to {NGINX_CONF}")
return
# Find the end of the 'http' block
http_end = -1
for i in range(http_start + 1, len(config_lines)):
if line.strip() == "}":
http_end = i
break
if http_end == -1:
# Malformed config? Shouldn't happen, but handle it.
http_end = len(config_lines)
logger.warning("Malformed Nginx config (no closing brace for 'http' block).")
# Insert the include statement *within* the http block.
config_lines.insert(http_end, f" {INCLUDE_STATEMENT}\n")
# Write the modified configuration back.
with open(NGINX_CONF, "w") as f:
f.writelines(config_lines)
logger.info(f"Added WAF include to 'http' block in {NGINX_CONF}")
else:
logging.info("WAF already included in nginx.conf.")
except Exception as e:
logging.error(f"[!] Error updating Nginx configuration: {e}")
logger.info("WAF include statement already present.")
except FileNotFoundError:
logger.error(f"Nginx configuration file not found: {NGINX_CONF}")
raise
except OSError as e:
logger.error(f"Error updating Nginx configuration: {e}")
raise
def reload_nginx():
"""
Reload Nginx to apply the new WAF rules.
Raises:
Exception: If there is an error reloading Nginx.
"""
logging.info("Reloading Nginx to apply new WAF rules...")
"""Tests the Nginx configuration and reloads if valid."""
logger.info("Reloading Nginx...")
try:
# Test Nginx configuration
subprocess.run(["nginx", "-t"], check=True)
logging.info("[+] Nginx configuration test passed.")
# Test configuration
result = subprocess.run(["nginx", "-t"],
capture_output=True, text=True, check=True)
logger.info(f"Nginx configuration test successful:\n{result.stdout}")
# Reload Nginx
subprocess.run(["systemctl", "reload", "nginx"], check=True)
logging.info("[+] Nginx reloaded successfully.")
result = subprocess.run(["systemctl", "reload", "nginx"],
capture_output=True, text=True, check=True)
logger.info("Nginx reloaded.")
except subprocess.CalledProcessError as e:
logging.error(f"[!] Nginx configuration test failed: {e}")
raise
logger.error(f"Nginx command failed: {e.cmd} - Return code: {e.returncode}")
logger.error(f"Stdout: {e.stdout}")
logger.error(f"Stderr: {e.stderr}")
raise # Re-raise to signal failure
except FileNotFoundError:
logging.error("[!] 'nginx' or 'systemctl' command not found. Are you on a supported system?")
logger.error("'nginx' or 'systemctl' command not found. Is Nginx/systemd installed?")
raise
except Exception as e:
logging.error(f"[!] Error reloading Nginx: {e}")
except Exception as e: # added extra exception
logger.error(f"[!] Error reloading Nginx: {e}")
raise
def main():
"""
Main function to execute the script.
"""
"""Main function."""
try:
copy_waf_files()
update_nginx_conf()
reload_nginx()
logging.info("[✔] Nginx configured with latest WAF rules.")
logger.info("Nginx WAF configuration updated successfully.")
except Exception as e:
logging.critical(f"[!] Script failed: {e}")
logger.critical(f"Script failed: {e}")
exit(1)
if __name__ == "__main__":
main()