Update import_traefik_waf.py

This commit is contained in:
fab 2025-02-28 11:24:10 +01:00 committed by GitHub
parent df23dd0d04
commit 23d11b6552
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -3,113 +3,167 @@ import subprocess
import logging
from pathlib import Path
import shutil
import filecmp
import time
import toml # Add toml library
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
# --- Configuration ---
LOG_LEVEL = logging.INFO # DEBUG, INFO, WARNING, ERROR
WAF_DIR = Path(os.getenv("WAF_DIR", "waf_patterns/traefik")).resolve()
TRAEFIK_WAF_DIR = Path(os.getenv("TRAEFIK_WAF_DIR", "/etc/traefik/waf/")).resolve()
TRAEFIK_DYNAMIC_CONF = Path(os.getenv("TRAEFIK_DYNAMIC_CONF", "/etc/traefik/dynamic.toml")).resolve()
BACKUP_DIR = Path(os.getenv("BACKUP_DIR", "/etc/traefik/waf_backup/")).resolve()
# No longer a simple string; will be constructed dynamically
# INCLUDE_STATEMENT = 'middlewares = ["bad_bot_block"]'
# --- Logging Setup ---
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Constants (configurable via environment variables or command-line arguments)
WAF_DIR = Path(os.getenv("WAF_DIR", "waf_patterns/traefik")).resolve() # Source directory for WAF files
TRAEFIK_WAF_DIR = Path(os.getenv("TRAEFIK_WAF_DIR", "/etc/traefik/waf/")).resolve() # Target directory
TRAEFIK_DYNAMIC_CONF = Path(os.getenv("TRAEFIK_DYNAMIC_CONF", "/etc/traefik/dynamic_conf.toml")).resolve() # Dynamic config file
INCLUDE_STATEMENT = 'middlewares = ["bad_bot_block"]' # Configuration to check/append
def copy_waf_files():
"""
Copy WAF pattern files (middleware.toml and bots.toml) to the Traefik WAF directory.
"""
logging.info("Copying Traefik WAF patterns...")
"""Copies WAF files, handling existing files and creating backups."""
logger.info("Copying Traefik WAF patterns...")
try:
# Ensure the target directory exists
TRAEFIK_WAF_DIR.mkdir(parents=True, exist_ok=True)
logging.info(f"[+] Created or verified directory: {TRAEFIK_WAF_DIR}")
TRAEFIK_WAF_DIR.mkdir(parents=True, exist_ok=True)
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
# Copy middleware and bot files
for file in ["middleware.toml", "bots.toml"]:
src_path = WAF_DIR / file
dst_path = TRAEFIK_WAF_DIR / file
for toml_file in WAF_DIR.glob("*.toml"): # Find all .toml files
dst_path = TRAEFIK_WAF_DIR / toml_file.name
if src_path.exists():
shutil.copy2(src_path, dst_path) # Safer copy with metadata preservation
logging.info(f"[+] {file} copied to {TRAEFIK_WAF_DIR}")
else:
logging.warning(f"[!] {file} not found in {WAF_DIR}")
except Exception as e:
logging.error(f"[!] Error copying WAF files: {e}")
raise # Re-raise the exception to halt execution
try:
if dst_path.exists():
# Compare and backup if different
if filecmp.cmp(toml_file, dst_path, shallow=False):
logger.info(f"Skipping {toml_file.name} (identical file exists).")
continue
# Backup different file
backup_path = BACKUP_DIR / f"{dst_path.name}.{int(time.time())}"
logger.warning(f"Existing {dst_path.name} differs. Backing up to {backup_path}")
shutil.copy2(dst_path, backup_path)
# Copy the (new/updated) file
shutil.copy2(toml_file, dst_path)
logger.info(f"Copied {toml_file.name} to {dst_path}")
except OSError as e:
logger.error(f"Error copying {toml_file.name}: {e}")
raise
def update_traefik_conf():
"""
Ensure WAF patterns are referenced in Traefik's dynamic configuration file.
Ensures Traefik's dynamic config includes the generated middlewares.
This function now *intelligently* adds the middlewares to a router,
creating the necessary sections if they don't exist. It also avoids
duplicate middleware entries.
"""
logging.info("Ensuring WAF patterns are referenced in dynamic_conf.toml...")
logger.info("Updating Traefik dynamic configuration...")
try:
# Create dynamic_conf.toml if it doesn't exist
# Create dynamic_conf.toml if it doesn't exist.
if not TRAEFIK_DYNAMIC_CONF.exists():
TRAEFIK_DYNAMIC_CONF.parent.mkdir(parents=True, exist_ok=True)
with TRAEFIK_DYNAMIC_CONF.open("w") as f:
f.write("[http.middlewares]\n")
logging.info(f"[+] Created {TRAEFIK_DYNAMIC_CONF}")
# Initialize with empty http section
with open(TRAEFIK_DYNAMIC_CONF, "w") as f:
f.write("[http]\n [http.middlewares]\n [http.routers]\n [http.routers.default]\n rule = \"PathPrefix(`/`)\"\n service = \"default-service\"\n") # added default service to make it work
logger.info(f"Created initial dynamic config file: {TRAEFIK_DYNAMIC_CONF}")
# Read the current configuration
with TRAEFIK_DYNAMIC_CONF.open("r") as f:
config = f.read()
# Append middleware reference if not present
if INCLUDE_STATEMENT not in config:
logging.info("Adding WAF middleware to dynamic_conf.toml...")
with TRAEFIK_DYNAMIC_CONF.open("a") as f:
f.write(
f'\n[http.routers.my_router]\n'
f' rule = "PathPrefix(`/`)"\n'
f' service = "my_service"\n'
f' {INCLUDE_STATEMENT}\n'
)
logging.info("[+] WAF middleware added to dynamic_conf.toml.")
else:
logging.info("WAF middleware already referenced in dynamic_conf.toml.")
except Exception as e:
logging.error(f"[!] Error updating Traefik configuration: {e}")
# Load existing TOML configuration (if any).
try:
config = toml.load(TRAEFIK_DYNAMIC_CONF)
except toml.TomlDecodeError as e:
logger.error(f"Error decoding TOML file {TRAEFIK_DYNAMIC_CONF}: {e}")
raise
# 1. Collect *all* middleware names from the generated file.
middleware_names = []
middleware_file = TRAEFIK_WAF_DIR / "middleware.toml"
if middleware_file.exists():
try:
middleware_config = toml.load(middleware_file)
if "http" in middleware_config and "middlewares" in middleware_config["http"]:
middleware_names = list(middleware_config["http"]["middlewares"].keys())
except toml.TomlDecodeError as e:
logger.error(f"Error reading generated middleware file: {e}")
# Don't raise here; we can still try to proceed
if not middleware_names:
logger.warning("No middlewares found in generated file. Skipping configuration update.")
return # added return to avoid errors
# 2. Ensure the necessary sections exist in the dynamic config.
if "http" not in config:
config["http"] = {}
if "routers" not in config["http"]:
config["http"]["routers"] = {}
if "my_router" not in config["http"]["routers"]: # Use a specific router name
config["http"]["routers"]["my_router"] = {
"rule": "PathPrefix(`/`)", # Default rule - adjust as needed!
"service": "my_service", # Default service - MUST BE DEFINED!
"middlewares": [],
}
# check default values exists
if "rule" not in config["http"]["routers"]["my_router"]:
config["http"]["routers"]["my_router"]["rule"] = "PathPrefix(`/`)"
if "service" not in config["http"]["routers"]["my_router"]:
config["http"]["routers"]["my_router"]["service"] = "my_service" # needs to have a service
# 3. Add middlewares to the router's 'middlewares' list, avoiding duplicates
existing_middlewares = config["http"]["routers"]["my_router"].get("middlewares", [])
for middleware_name in middleware_names:
if middleware_name not in existing_middlewares:
existing_middlewares.append(middleware_name)
config["http"]["routers"]["my_router"]["middlewares"] = existing_middlewares
# 4. Write the updated configuration back to the file.
try:
with open(TRAEFIK_DYNAMIC_CONF, "w") as f:
toml.dump(config, f)
logger.info(f"Updated Traefik dynamic configuration: {TRAEFIK_DYNAMIC_CONF}")
except OSError as e:
logger.error(f"Error writing to {TRAEFIK_DYNAMIC_CONF}: {e}")
raise
except Exception as e: # Catch broader exception during file ops
logger.error(f"Error updating Traefik dynamic configuration: {e}")
raise
def reload_traefik():
"""
Reload the Traefik service to apply new WAF rules.
"""
logging.info("Reloading Traefik to apply new WAF rules...")
"""Tests the Traefik configuration (if possible) and reloads."""
logger.info("Reloading Traefik...")
try:
subprocess.run(["systemctl", "reload", "traefik"], check=True)
logging.info("[+] Traefik reloaded successfully.")
# Traefik doesn't have a built-in config test like nginx or apachectl.
# We'll rely on systemctl to do a basic check during reload.
# Reload Traefik
result = subprocess.run(["systemctl", "reload", "traefik"],
capture_output=True, text=True, check=True)
logger.info("Traefik reloaded.")
except subprocess.CalledProcessError as e:
logging.error(f"[!] Failed to reload Traefik: {e}")
logger.error(f"Traefik command failed: {e.cmd} - Return code: {e.returncode}")
logger.error(f"Stdout: {e.stdout}")
logger.error(f"Stderr: {e.stderr}")
raise
except FileNotFoundError:
logging.error("[!] 'systemctl' command not found. Are you on a systemd-based system?")
logger.error("'systemctl' command not found. Is systemd installed?")
raise
def main():
"""
Main function to execute the script.
"""
"""Main function."""
try:
copy_waf_files()
update_traefik_conf()
reload_traefik()
logging.info("[✔] Traefik configured with latest WAF rules.")
logger.info("Traefik WAF configuration updated successfully.")
except Exception as e:
logging.critical(f"[!] Script failed: {e}")
logger.critical(f"Script failed: {e}")
exit(1)
if __name__ == "__main__":
main()
main()