2024-12-29 23:50:29 +01:00
|
|
|
import os
|
2024-12-30 00:18:50 +01:00
|
|
|
import re
|
2024-12-29 23:50:29 +01:00
|
|
|
import time
|
2024-12-30 00:18:50 +01:00
|
|
|
import json
|
2024-12-29 23:50:29 +01:00
|
|
|
import base64
|
|
|
|
|
import hashlib
|
2024-12-30 00:18:50 +01:00
|
|
|
import logging
|
2025-01-03 20:58:23 +01:00
|
|
|
import argparse
|
2025-02-28 11:16:46 +01:00
|
|
|
from typing import List, Dict, Optional, Match
|
2025-01-03 20:58:23 +01:00
|
|
|
from pathlib import Path
|
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
from tqdm import tqdm
|
2024-12-21 01:00:55 +01:00
|
|
|
|
2025-02-28 11:16:46 +01:00
|
|
|
# --- Configuration ---
|
|
|
|
|
LOG_LEVEL = logging.INFO # Set to DEBUG for more verbose output
|
2024-12-21 11:31:31 +01:00
|
|
|
GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset"
|
2024-12-29 23:50:29 +01:00
|
|
|
OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules"
|
2025-02-28 11:16:46 +01:00
|
|
|
GITHUB_REF = "v4.0" # More specific default: Major version only
|
|
|
|
|
RATE_LIMIT_DELAY = 60 # Shorter delay, rely on exponential backoff
|
|
|
|
|
RETRY_DELAY = 2 # Shorter initial retry
|
|
|
|
|
MAX_RETRIES = 8 # More retries
|
|
|
|
|
EXPONENTIAL_BACKOFF = True
|
|
|
|
|
BACKOFF_MULTIPLIER = 2
|
2025-01-03 13:22:31 +01:00
|
|
|
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # GitHub token for authentication
|
2025-02-28 11:16:46 +01:00
|
|
|
CONNECTION_POOL_SIZE = 30 # More connections for faster parallel downloads
|
2024-12-21 01:00:55 +01:00
|
|
|
|
2024-12-21 11:31:31 +01:00
|
|
|
|
2025-02-28 11:16:46 +01:00
|
|
|
# --- Custom Exceptions ---
|
2024-12-30 00:18:50 +01:00
|
|
|
class GitHubRequestError(Exception):
|
2025-02-28 11:16:46 +01:00
|
|
|
"""Base exception for GitHub API request failures."""
|
|
|
|
|
pass
|
2024-12-21 01:00:55 +01:00
|
|
|
|
2025-01-03 20:58:23 +01:00
|
|
|
class GitHubRateLimitError(GitHubRequestError):
|
2025-02-28 11:16:46 +01:00
|
|
|
"""Raised when the GitHub API rate limit is exceeded."""
|
|
|
|
|
pass
|
2025-01-03 20:58:23 +01:00
|
|
|
|
|
|
|
|
class GitHubBlobFetchError(GitHubRequestError):
|
2025-02-28 11:16:46 +01:00
|
|
|
"""Raised when fetching a blob (file content) fails."""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# --- Logging Setup ---
|
|
|
|
|
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s")
|
|
|
|
|
logger = logging.getLogger(__name__)
|
2025-01-03 20:58:23 +01:00
|
|
|
|
|
|
|
|
|
2025-02-28 11:16:46 +01:00
|
|
|
# --- Utility Functions ---
|
2024-12-30 00:18:50 +01:00
|
|
|
def get_session() -> requests.Session:
|
2025-02-28 11:16:46 +01:00
|
|
|
"""Creates and returns a requests.Session with optional GitHub token."""
|
2024-12-30 00:18:50 +01:00
|
|
|
session = requests.Session()
|
2024-12-29 23:50:29 +01:00
|
|
|
if GITHUB_TOKEN:
|
2024-12-30 00:18:50 +01:00
|
|
|
session.headers.update({"Authorization": f"token {GITHUB_TOKEN}"})
|
2025-02-28 11:16:46 +01:00
|
|
|
# Increase connection pool size (important for parallel requests)
|
2025-01-03 20:58:23 +01:00
|
|
|
adapter = requests.adapters.HTTPAdapter(pool_connections=CONNECTION_POOL_SIZE, pool_maxsize=CONNECTION_POOL_SIZE)
|
2025-02-28 11:16:46 +01:00
|
|
|
session.mount("https://", adapter) # Mount for all https:// requests
|
2024-12-30 00:18:50 +01:00
|
|
|
return session
|
|
|
|
|
|
2024-12-21 01:00:55 +01:00
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
def fetch_with_retries(session: requests.Session, url: str) -> requests.Response:
|
|
|
|
|
"""
|
2025-02-28 11:16:46 +01:00
|
|
|
Fetches a URL with retries, handling rate limits and transient errors.
|
|
|
|
|
Raises: GitHubRequestError (or subclasses) if the request ultimately fails.
|
2024-12-30 00:18:50 +01:00
|
|
|
"""
|
|
|
|
|
retries = 0
|
2024-12-29 23:50:29 +01:00
|
|
|
while retries < MAX_RETRIES:
|
|
|
|
|
try:
|
2024-12-30 00:18:50 +01:00
|
|
|
response = session.get(url)
|
2025-02-28 11:16:46 +01:00
|
|
|
|
|
|
|
|
# Check for rate limiting (403 with specific header)
|
|
|
|
|
if response.status_code == 403 and "X-RateLimit-Remaining" in response.headers and response.headers["X-RateLimit-Remaining"] == '0':
|
2024-12-30 00:18:50 +01:00
|
|
|
reset_time = int(response.headers.get("X-RateLimit-Reset", 0))
|
2025-02-28 11:16:46 +01:00
|
|
|
wait_time = max(0, reset_time - int(time.time())) # Ensure wait_time >= 0
|
|
|
|
|
# If wait_time is very short, still wait a little bit to avoid hammering the API.
|
|
|
|
|
wait_time = max(wait_time, 1)
|
2025-01-03 20:58:23 +01:00
|
|
|
logger.warning(f"Rate limit exceeded. Retrying in {wait_time} seconds...")
|
2024-12-29 23:50:29 +01:00
|
|
|
time.sleep(wait_time)
|
2025-02-28 11:16:46 +01:00
|
|
|
continue # Retry Immediately
|
|
|
|
|
|
|
|
|
|
# Raise exceptions for other HTTP errors (4xx, 5xx)
|
2025-01-03 20:58:23 +01:00
|
|
|
response.raise_for_status()
|
|
|
|
|
return response
|
2025-02-28 11:16:46 +01:00
|
|
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
# Log the error, calculate wait time (exponential backoff)
|
|
|
|
|
logger.warning(f"Request failed ({type(e).__name__}): {e} - URL: {url}")
|
2024-12-30 00:18:50 +01:00
|
|
|
wait_time = (RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries)
|
|
|
|
|
if EXPONENTIAL_BACKOFF else RETRY_DELAY)
|
2025-01-03 20:58:23 +01:00
|
|
|
logger.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.")
|
2024-12-30 00:18:50 +01:00
|
|
|
time.sleep(wait_time)
|
|
|
|
|
retries += 1
|
|
|
|
|
|
2025-02-28 11:16:46 +01:00
|
|
|
# If we reach here, all retries failed.
|
2024-12-30 00:18:50 +01:00
|
|
|
raise GitHubRequestError(f"Failed to fetch {url} after {MAX_RETRIES} retries.")
|
2024-12-21 01:00:55 +01:00
|
|
|
|
2024-12-29 23:50:29 +01:00
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
def fetch_latest_tag(session: requests.Session, ref_prefix: str) -> Optional[str]:
|
2025-02-28 11:16:46 +01:00
|
|
|
"""Fetches the latest matching Git tag, or falls back to the latest overall."""
|
2024-12-29 23:50:29 +01:00
|
|
|
ref_url = f"{GITHUB_REPO_URL}/git/refs/tags"
|
2024-12-21 11:31:31 +01:00
|
|
|
try:
|
2024-12-30 00:18:50 +01:00
|
|
|
response = fetch_with_retries(session, ref_url)
|
|
|
|
|
tags = response.json()
|
2025-02-28 11:16:46 +01:00
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
if not tags:
|
2025-01-03 20:58:23 +01:00
|
|
|
logger.warning("No tags found in the repository.")
|
2024-12-30 00:18:50 +01:00
|
|
|
return None
|
2025-02-28 11:16:46 +01:00
|
|
|
|
|
|
|
|
# Filter tags that start with the given prefix.
|
|
|
|
|
matching_tags = [
|
|
|
|
|
r["ref"] for r in tags
|
|
|
|
|
if r["ref"].startswith(f"refs/tags/{ref_prefix}")
|
|
|
|
|
]
|
|
|
|
|
# Sort matching tags to find the latest (lexicographically, assuming semver).
|
|
|
|
|
matching_tags.sort(reverse=True)
|
|
|
|
|
|
|
|
|
|
if matching_tags:
|
|
|
|
|
latest_tag = matching_tags[0] # The first tag is the latest
|
2025-01-03 20:58:23 +01:00
|
|
|
logger.info(f"Latest matching tag: {latest_tag}")
|
2024-12-29 23:50:29 +01:00
|
|
|
return latest_tag
|
2025-02-28 11:16:46 +01:00
|
|
|
|
|
|
|
|
# Fallback: If no matching tags, return the *very* latest tag.
|
|
|
|
|
logger.warning(f"No matching refs found for prefix '{ref_prefix}'. Using latest tag.")
|
|
|
|
|
# Sort *all* tags and get the last one.
|
|
|
|
|
tags.sort(key=lambda x: x["ref"], reverse=True)
|
|
|
|
|
return tags[0]["ref"] if tags else None
|
|
|
|
|
|
|
|
|
|
except GitHubRequestError as e:
|
|
|
|
|
logger.error(f"Failed to fetch tags: {e}")
|
2024-12-29 23:50:29 +01:00
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
def fetch_rule_files(session: requests.Session, ref: str) -> List[Dict[str, str]]:
|
2025-02-28 11:16:46 +01:00
|
|
|
"""Fetches the list of .conf rule files from the given ref."""
|
|
|
|
|
ref_name = ref.split("/")[-1] if "/" in ref else ref # Extract ref name
|
2024-12-30 00:18:50 +01:00
|
|
|
rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref_name}"
|
2025-02-28 11:16:46 +01:00
|
|
|
|
2024-12-29 23:50:29 +01:00
|
|
|
try:
|
2024-12-30 00:18:50 +01:00
|
|
|
response = fetch_with_retries(session, rules_url)
|
|
|
|
|
files = response.json()
|
2025-02-28 11:16:46 +01:00
|
|
|
# Filter for .conf files and extract relevant data.
|
|
|
|
|
return [
|
|
|
|
|
{"name": f["name"], "sha": f["sha"]}
|
|
|
|
|
for f in files if f["name"].endswith(".conf")
|
|
|
|
|
]
|
|
|
|
|
except GitHubRequestError as e:
|
|
|
|
|
logger.error(f"Failed to fetch rule files from {rules_url}: {e}")
|
|
|
|
|
return [] # Return an empty list on failure
|
2024-12-21 01:00:55 +01:00
|
|
|
|
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
def fetch_github_blob(session: requests.Session, sha: str) -> str:
|
2025-02-28 11:16:46 +01:00
|
|
|
"""Fetches the base64-encoded content of a blob (file) given its SHA."""
|
2024-12-29 23:50:29 +01:00
|
|
|
blob_url = f"{GITHUB_REPO_URL}/git/blobs/{sha}"
|
|
|
|
|
try:
|
2024-12-30 00:18:50 +01:00
|
|
|
response = fetch_with_retries(session, blob_url)
|
2025-02-28 11:16:46 +01:00
|
|
|
blob_data = response.json()
|
|
|
|
|
return blob_data.get("content", "") # Return empty string if no content
|
|
|
|
|
except GitHubRequestError as e:
|
|
|
|
|
logger.error(f"Failed to fetch blob for SHA {sha}: {e}")
|
2024-12-29 23:50:29 +01:00
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool:
|
2025-02-28 11:16:46 +01:00
|
|
|
"""Verifies the SHA1 hash of the decoded blob content."""
|
2024-12-30 00:18:50 +01:00
|
|
|
decoded_bytes = base64.b64decode(blob_content_b64)
|
|
|
|
|
blob_header = f"blob {len(decoded_bytes)}\0".encode("utf-8")
|
|
|
|
|
calculated_sha = hashlib.sha1(blob_header + decoded_bytes).hexdigest()
|
|
|
|
|
|
2025-01-03 13:22:31 +01:00
|
|
|
if calculated_sha != file_sha:
|
2025-02-28 11:16:46 +01:00
|
|
|
logger.warning(f"SHA mismatch! Expected: {file_sha}, Calculated: {calculated_sha}")
|
|
|
|
|
return False # This is now an integrity failure, return False
|
2025-01-03 13:22:31 +01:00
|
|
|
return True
|
2024-12-21 11:31:31 +01:00
|
|
|
|
|
|
|
|
|
2025-02-28 11:16:46 +01:00
|
|
|
def _extract_rule_id(secrule_text: str) -> str:
|
|
|
|
|
"""Extracts the rule ID from a SecRule directive."""
|
|
|
|
|
match = re.search(r'id:(\d+)', secrule_text)
|
|
|
|
|
return match.group(1) if match else "no_id"
|
2025-01-16 14:02:19 +01:00
|
|
|
|
2025-02-28 11:16:46 +01:00
|
|
|
def _extract_rule_severity(secrule_text: str) -> str:
|
|
|
|
|
"""Extract the severity."""
|
|
|
|
|
match = re.search(r'severity:(\w+)', secrule_text)
|
|
|
|
|
return match.group(1) if match else "medium" # Set default to medium
|
2025-01-16 14:02:19 +01:00
|
|
|
|
2025-02-28 11:16:46 +01:00
|
|
|
|
|
|
|
|
def _extract_rule_location(secrule_text: str) -> str:
|
|
|
|
|
"""
|
|
|
|
|
Extracts the location (variable) from a SecRule directive. Handles
|
|
|
|
|
multiple variables and chained rules.
|
2025-01-16 14:02:19 +01:00
|
|
|
"""
|
2025-02-28 11:16:46 +01:00
|
|
|
match = re.search(r'SecRule\s+([^"\s]+)', secrule_text)
|
|
|
|
|
if not match:
|
|
|
|
|
return "UNKNOWN"
|
|
|
|
|
|
|
|
|
|
variables_str = match.group(1)
|
|
|
|
|
variables = variables_str.split("|") # Split multiple variables
|
|
|
|
|
# Process variables for location extraction
|
|
|
|
|
locations = []
|
|
|
|
|
|
|
|
|
|
for var in variables:
|
|
|
|
|
var = var.upper() # Set all vars to upper case
|
|
|
|
|
if var.startswith("REQUEST_HEADERS"):
|
|
|
|
|
if ":" in var: # Specific header
|
|
|
|
|
locations.append(var.split(":")[1].replace("_","-").strip()) # add support to user-agent
|
|
|
|
|
else:
|
|
|
|
|
locations.append("REQUEST_HEADERS") # Generic header location
|
|
|
|
|
elif var.startswith("ARGS"): # add support to args
|
|
|
|
|
locations.append("Query-String")
|
|
|
|
|
elif var == "REQUEST_COOKIES":
|
|
|
|
|
locations.append("Cookie")
|
|
|
|
|
elif var == "REQUEST_URI":
|
|
|
|
|
locations.append("Request-URI")
|
|
|
|
|
elif var == "QUERY_STRING":
|
|
|
|
|
locations.append("Query-String")
|
|
|
|
|
elif var in ("REQUEST_LINE", "REQUEST_BODY", "RESPONSE_BODY", "RESPONSE_HEADERS"):
|
|
|
|
|
locations.append(var) # if it has an explicit direct
|
|
|
|
|
# Add more location mappings as needed
|
|
|
|
|
|
|
|
|
|
# Prioritize specific locations, fall back to generic ones
|
|
|
|
|
if "REQUEST_URI" in locations:
|
|
|
|
|
return "Request-URI" # set request uri as top priority
|
|
|
|
|
elif "Query-String" in locations:
|
|
|
|
|
return "Query-String"
|
|
|
|
|
if locations:
|
|
|
|
|
return locations[0] # Return the first extracted location
|
|
|
|
|
return "UNKNOWN" # default locatioN
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_sec_rules(raw_text: str) -> List[Dict[str, str]]:
|
|
|
|
|
"""
|
|
|
|
|
Extracts SecRule patterns and associated metadata from raw text.
|
|
|
|
|
Now returns a *list of dictionaries*, each representing a SecRule.
|
2025-01-16 14:02:19 +01:00
|
|
|
"""
|
|
|
|
|
rules = []
|
2025-02-28 11:16:46 +01:00
|
|
|
# Find all SecRule directives (including those spanning multiple lines).
|
|
|
|
|
for match in re.finditer(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL):
|
|
|
|
|
secrule_text = match.group(0) # Full SecRule text
|
|
|
|
|
pattern = match.group(1).strip().replace("\\\\", "\\") # Extract and clean pattern
|
|
|
|
|
|
|
|
|
|
if not pattern: # if there are not pattern then skipp
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
rule_id = _extract_rule_id(secrule_text) # Extract rule ID
|
|
|
|
|
location = _extract_rule_location(secrule_text) # Extract location
|
|
|
|
|
severity = _extract_rule_severity(secrule_text)
|
|
|
|
|
|
|
|
|
|
rules.append({
|
|
|
|
|
"id": rule_id,
|
|
|
|
|
"pattern": pattern,
|
|
|
|
|
"location": location,
|
|
|
|
|
"severity": severity
|
|
|
|
|
})
|
|
|
|
|
return rules
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_rule_file(file: Dict[str, str], session: requests.Session) -> List[Dict[str, str]]:
|
|
|
|
|
"""Processes a single rule file, extracting rules and metadata."""
|
2025-01-16 14:02:19 +01:00
|
|
|
blob_b64 = fetch_github_blob(session, file["sha"])
|
|
|
|
|
if not blob_b64:
|
2025-02-28 11:16:46 +01:00
|
|
|
logger.warning(f"Skipping {file['name']} (empty blob).")
|
|
|
|
|
return []
|
2025-01-16 14:02:19 +01:00
|
|
|
|
2025-02-28 11:16:46 +01:00
|
|
|
if not verify_blob_sha(file["sha"], blob_b64):
|
|
|
|
|
pass # We check before but continue, since data is present
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
raw_text = base64.b64decode(blob_b64).decode("utf-8")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Failed to decode the file: {file['name']}. Reason: {e}")
|
|
|
|
|
return []
|
2025-01-16 14:02:19 +01:00
|
|
|
|
|
|
|
|
category = file["name"].split("-")[-1].replace(".conf", "")
|
2025-02-28 11:16:46 +01:00
|
|
|
extracted_rules = extract_sec_rules(raw_text) # Get list of dicts
|
2025-01-16 14:02:19 +01:00
|
|
|
|
2025-02-28 11:16:46 +01:00
|
|
|
# Add category to each extracted rule.
|
|
|
|
|
for rule in extracted_rules:
|
|
|
|
|
rule["category"] = category
|
|
|
|
|
|
|
|
|
|
return extracted_rules
|
2025-01-16 14:02:19 +01:00
|
|
|
|
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]:
|
2025-02-28 11:16:46 +01:00
|
|
|
"""Fetches and processes rule files in parallel, returning all extracted rules."""
|
|
|
|
|
all_rules = []
|
2025-01-03 20:58:23 +01:00
|
|
|
with ThreadPoolExecutor(max_workers=CONNECTION_POOL_SIZE) as executor:
|
2025-02-28 11:16:46 +01:00
|
|
|
future_to_file = {
|
|
|
|
|
executor.submit(process_rule_file, file, session): file
|
|
|
|
|
for file in rule_files
|
2025-01-03 20:58:23 +01:00
|
|
|
}
|
2025-02-28 11:16:46 +01:00
|
|
|
# Use tqdm for progress display. as_completed yields futures as they finish.
|
|
|
|
|
for future in tqdm(as_completed(future_to_file), total=len(rule_files), desc="Processing rules"):
|
|
|
|
|
file = future_to_file[future]
|
2025-01-03 20:58:23 +01:00
|
|
|
try:
|
2025-02-28 11:16:46 +01:00
|
|
|
rules = future.result() # Get result (or raise exception)
|
|
|
|
|
all_rules.extend(rules)
|
2025-01-03 20:58:23 +01:00
|
|
|
except Exception as e:
|
2025-02-28 11:16:46 +01:00
|
|
|
logger.error(f"Error processing {file['name']}: {e}")
|
|
|
|
|
# Consider continuing even on individual file errors
|
2025-01-03 20:58:23 +01:00
|
|
|
|
2025-02-28 11:16:46 +01:00
|
|
|
logger.info(f"Fetched a total of {len(all_rules)} rules.")
|
|
|
|
|
return all_rules
|
2024-12-21 01:00:55 +01:00
|
|
|
|
2024-12-21 11:31:31 +01:00
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
def save_as_json(rules: List[Dict[str, str]], output_file: str) -> bool:
|
2025-02-28 11:16:46 +01:00
|
|
|
"""Saves the extracted rules to a JSON file (atomically)."""
|
2024-12-21 11:31:31 +01:00
|
|
|
try:
|
2025-01-03 20:58:23 +01:00
|
|
|
output_dir = Path(output_file).parent
|
2024-12-29 23:50:29 +01:00
|
|
|
if output_dir:
|
2025-02-28 11:16:46 +01:00
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
temp_file = f"{output_file}.tmp" # Use a temporary file
|
2025-01-03 20:58:23 +01:00
|
|
|
with open(temp_file, "w", encoding="utf-8") as f:
|
2024-12-21 11:31:31 +01:00
|
|
|
json.dump(rules, f, indent=4)
|
2025-02-28 11:16:46 +01:00
|
|
|
os.replace(temp_file, output_file) # Atomic rename
|
|
|
|
|
logger.info(f"Rules saved to {output_file}")
|
2024-12-30 00:18:50 +01:00
|
|
|
return True
|
2025-02-28 11:16:46 +01:00
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Failed to save rules to {output_file}: {e}")
|
2024-12-30 00:18:50 +01:00
|
|
|
return False
|
2024-12-21 11:31:31 +01:00
|
|
|
|
2024-12-21 01:00:55 +01:00
|
|
|
|
2025-01-03 20:58:23 +01:00
|
|
|
def main():
|
2025-02-28 11:16:46 +01:00
|
|
|
"""Main function: Fetches, processes, and saves OWASP CRS rules."""
|
|
|
|
|
parser = argparse.ArgumentParser(
|
|
|
|
|
description="Fetches OWASP Core Rule Set rules and saves them as JSON."
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument("--output", type=str, default="owasp_rules.json",
|
|
|
|
|
help="Output JSON file path.")
|
|
|
|
|
parser.add_argument("--ref", type=str, default=GITHUB_REF,
|
|
|
|
|
help="Git reference (tag or branch prefix). E.g., 'v4.0', 'v3.3', 'dev'")
|
|
|
|
|
parser.add_argument("--dry-run", action="store_true",
|
|
|
|
|
help="Simulate fetching and processing (no file save).")
|
2025-01-03 20:58:23 +01:00
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
2025-02-28 11:16:46 +01:00
|
|
|
session = get_session() # Create a requests session
|
|
|
|
|
|
|
|
|
|
# 1. Fetch the latest tag (or use the provided ref directly)
|
2025-01-03 20:58:23 +01:00
|
|
|
latest_ref = fetch_latest_tag(session, args.ref)
|
2025-02-28 11:16:46 +01:00
|
|
|
if not latest_ref:
|
|
|
|
|
logger.error("Could not determine the latest tag. Exiting.")
|
|
|
|
|
return # Exit if we can't get a ref
|
|
|
|
|
|
|
|
|
|
# 2. Fetch the list of rule files.
|
|
|
|
|
rule_files = fetch_rule_files(session, latest_ref)
|
|
|
|
|
if not rule_files:
|
|
|
|
|
logger.error("Could not fetch the list of rule files. Exiting.")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# 3. Fetch and process the rules (in parallel).
|
|
|
|
|
rules = fetch_owasp_rules(session, rule_files)
|
|
|
|
|
|
|
|
|
|
# 4. Save the rules to a JSON file (unless it's a dry run).
|
|
|
|
|
if not args.dry_run:
|
|
|
|
|
if rules:
|
|
|
|
|
if save_as_json(rules, args.output):
|
|
|
|
|
logger.info("Successfully saved rules to JSON.")
|
2024-12-29 23:50:29 +01:00
|
|
|
else:
|
2025-02-28 11:16:46 +01:00
|
|
|
logger.error("Failed to save rules to JSON.") # if the save fail
|
2024-12-29 23:50:29 +01:00
|
|
|
else:
|
2025-02-28 11:16:46 +01:00
|
|
|
logger.warning("No rules were extracted.") # Warn if no rules
|
2024-12-21 11:31:31 +01:00
|
|
|
else:
|
2025-02-28 11:16:46 +01:00
|
|
|
logger.info("Dry-run mode: Rules were fetched and processed, but not saved.")
|
|
|
|
|
# Optionally print some of the extracted rules here for verification.
|
|
|
|
|
if rules:
|
|
|
|
|
logger.info(f"Example rule: {rules[0]}")
|
2025-01-03 20:58:23 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2025-02-28 11:16:46 +01:00
|
|
|
main()
|