Update owasp2json.py

This commit is contained in:
fab 2025-02-28 11:16:46 +01:00 committed by GitHub
parent 95b1b4a784
commit 4591dfa52e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -6,252 +6,362 @@ import base64
import hashlib import hashlib
import logging import logging
import argparse import argparse
from typing import List, Dict, Optional from typing import List, Dict, Optional, Match
from pathlib import Path from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
import requests import requests
from tqdm import tqdm from tqdm import tqdm
# Logging setup # --- Configuration ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") LOG_LEVEL = logging.INFO # Set to DEBUG for more verbose output
logger = logging.getLogger(__name__)
# Constants
GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset" GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset"
OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules" OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules"
GITHUB_REF = "v4" # Default version prefix GITHUB_REF = "v4.0" # More specific default: Major version only
RATE_LIMIT_DELAY = 600 # Rate limit delay in seconds RATE_LIMIT_DELAY = 60 # Shorter delay, rely on exponential backoff
RETRY_DELAY = 5 # Base retry delay in seconds RETRY_DELAY = 2 # Shorter initial retry
MAX_RETRIES = 6 # Maximum number of retries MAX_RETRIES = 8 # More retries
EXPONENTIAL_BACKOFF = True # Use exponential backoff for retries EXPONENTIAL_BACKOFF = True
BACKOFF_MULTIPLIER = 2 # Multiplier for exponential backoff BACKOFF_MULTIPLIER = 2
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # GitHub token for authentication GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # GitHub token for authentication
CONNECTION_POOL_SIZE = 20 # Increased connection pool size CONNECTION_POOL_SIZE = 30 # More connections for faster parallel downloads
# --- Custom Exceptions ---
class GitHubRequestError(Exception): class GitHubRequestError(Exception):
"""Raised when fetching data from GitHub fails after all retries.""" """Base exception for GitHub API request failures."""
pass
class GitHubRateLimitError(GitHubRequestError): class GitHubRateLimitError(GitHubRequestError):
"""Raised when GitHub API rate limit is exceeded.""" """Raised when the GitHub API rate limit is exceeded."""
pass
class GitHubBlobFetchError(GitHubRequestError): class GitHubBlobFetchError(GitHubRequestError):
"""Raised when fetching a blob from GitHub fails.""" """Raised when fetching a blob (file content) fails."""
pass
# --- Logging Setup ---
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# --- Utility Functions ---
def get_session() -> requests.Session: def get_session() -> requests.Session:
""" """Creates and returns a requests.Session with optional GitHub token."""
Creates and returns a requests.Session with optional GitHub token authentication.
"""
session = requests.Session() session = requests.Session()
if GITHUB_TOKEN: if GITHUB_TOKEN:
session.headers.update({"Authorization": f"token {GITHUB_TOKEN}"}) session.headers.update({"Authorization": f"token {GITHUB_TOKEN}"})
# Increase connection pool size # Increase connection pool size (important for parallel requests)
adapter = requests.adapters.HTTPAdapter(pool_connections=CONNECTION_POOL_SIZE, pool_maxsize=CONNECTION_POOL_SIZE) adapter = requests.adapters.HTTPAdapter(pool_connections=CONNECTION_POOL_SIZE, pool_maxsize=CONNECTION_POOL_SIZE)
session.mount("https://", adapter) session.mount("https://", adapter) # Mount for all https:// requests
return session return session
def fetch_with_retries(session: requests.Session, url: str) -> requests.Response: def fetch_with_retries(session: requests.Session, url: str) -> requests.Response:
""" """
Fetches the given URL with retries, handling rate limits and transient HTTP errors. Fetches a URL with retries, handling rate limits and transient errors.
Raises GitHubRequestError if the request cannot be completed after all retries. Raises: GitHubRequestError (or subclasses) if the request ultimately fails.
""" """
retries = 0 retries = 0
while retries < MAX_RETRIES: while retries < MAX_RETRIES:
try: try:
response = session.get(url) response = session.get(url)
if response.status_code == 403 and "X-RateLimit-Remaining" in response.headers:
# Check for rate limiting (403 with specific header)
if response.status_code == 403 and "X-RateLimit-Remaining" in response.headers and response.headers["X-RateLimit-Remaining"] == '0':
reset_time = int(response.headers.get("X-RateLimit-Reset", 0)) reset_time = int(response.headers.get("X-RateLimit-Reset", 0))
wait_time = max(reset_time - int(time.time()), RATE_LIMIT_DELAY) wait_time = max(0, reset_time - int(time.time())) # Ensure wait_time >= 0
# If wait_time is very short, still wait a little bit to avoid hammering the API.
wait_time = max(wait_time, 1)
logger.warning(f"Rate limit exceeded. Retrying in {wait_time} seconds...") logger.warning(f"Rate limit exceeded. Retrying in {wait_time} seconds...")
time.sleep(wait_time) time.sleep(wait_time)
continue continue # Retry Immediately
# Raise exceptions for other HTTP errors (4xx, 5xx)
response.raise_for_status() response.raise_for_status()
return response return response
except requests.HTTPError as e:
logger.warning(f"HTTP error fetching {url}: {e}") except requests.exceptions.RequestException as e:
# Log the error, calculate wait time (exponential backoff)
logger.warning(f"Request failed ({type(e).__name__}): {e} - URL: {url}")
wait_time = (RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries) wait_time = (RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries)
if EXPONENTIAL_BACKOFF else RETRY_DELAY) if EXPONENTIAL_BACKOFF else RETRY_DELAY)
logger.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.") logger.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.")
time.sleep(wait_time) time.sleep(wait_time)
retries += 1 retries += 1
except requests.RequestException as e:
logger.error(f"Error fetching {url}: {e}")
retries += 1
# If we reach here, all retries failed.
raise GitHubRequestError(f"Failed to fetch {url} after {MAX_RETRIES} retries.") raise GitHubRequestError(f"Failed to fetch {url} after {MAX_RETRIES} retries.")
def fetch_latest_tag(session: requests.Session, ref_prefix: str) -> Optional[str]: def fetch_latest_tag(session: requests.Session, ref_prefix: str) -> Optional[str]:
""" """Fetches the latest matching Git tag, or falls back to the latest overall."""
Fetches the latest matching Git tag from the GitHub repository based on a version prefix.
Falls back to the newest tag if no matching prefix is found. Returns the tag reference.
"""
ref_url = f"{GITHUB_REPO_URL}/git/refs/tags" ref_url = f"{GITHUB_REPO_URL}/git/refs/tags"
try: try:
response = fetch_with_retries(session, ref_url) response = fetch_with_retries(session, ref_url)
tags = response.json() tags = response.json()
if not tags: if not tags:
logger.warning("No tags found in the repository.") logger.warning("No tags found in the repository.")
return None return None
matching = [r["ref"] for r in tags if r["ref"].startswith(f"refs/tags/{ref_prefix}.")]
matching.sort(reverse=True, key=lambda x: x.split(".")[-1]) # Filter tags that start with the given prefix.
if matching: matching_tags = [
latest_tag = matching[0] r["ref"] for r in tags
if r["ref"].startswith(f"refs/tags/{ref_prefix}")
]
# Sort matching tags to find the latest (lexicographically, assuming semver).
matching_tags.sort(reverse=True)
if matching_tags:
latest_tag = matching_tags[0] # The first tag is the latest
logger.info(f"Latest matching tag: {latest_tag}") logger.info(f"Latest matching tag: {latest_tag}")
return latest_tag return latest_tag
logger.warning(f"No matching refs found for prefix {ref_prefix}. Falling back to the latest tag.")
return tags[-1]["ref"] # Fallback: If no matching tags, return the *very* latest tag.
except Exception as e: logger.warning(f"No matching refs found for prefix '{ref_prefix}'. Using latest tag.")
logger.error(f"Failed to fetch tags. Reason: {e}") # Sort *all* tags and get the last one.
tags.sort(key=lambda x: x["ref"], reverse=True)
return tags[0]["ref"] if tags else None
except GitHubRequestError as e:
logger.error(f"Failed to fetch tags: {e}")
return None return None
def fetch_rule_files(session: requests.Session, ref: str) -> List[Dict[str, str]]: def fetch_rule_files(session: requests.Session, ref: str) -> List[Dict[str, str]]:
""" """Fetches the list of .conf rule files from the given ref."""
Fetches the list of rule files (.conf) from the given ref in the repository. ref_name = ref.split("/")[-1] if "/" in ref else ref # Extract ref name
Returns a list of dictionaries containing file name and SHA.
"""
ref_name = ref.split("/")[-1] if "/" in ref else ref
rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref_name}" rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref_name}"
try: try:
response = fetch_with_retries(session, rules_url) response = fetch_with_retries(session, rules_url)
files = response.json() files = response.json()
return [{"name": f["name"], "sha": f["sha"]} for f in files if f["name"].endswith(".conf")] # Filter for .conf files and extract relevant data.
except (GitHubRequestError, requests.RequestException) as e: return [
logger.error(f"Failed to fetch rule files from {rules_url}. Reason: {e}") {"name": f["name"], "sha": f["sha"]}
return [] for f in files if f["name"].endswith(".conf")
]
except GitHubRequestError as e:
logger.error(f"Failed to fetch rule files from {rules_url}: {e}")
return [] # Return an empty list on failure
def fetch_github_blob(session: requests.Session, sha: str) -> str: def fetch_github_blob(session: requests.Session, sha: str) -> str:
""" """Fetches the base64-encoded content of a blob (file) given its SHA."""
Fetches the blob content (base64-encoded) for a given SHA from GitHub.
Returns the content if successful, or an empty string on failure.
"""
blob_url = f"{GITHUB_REPO_URL}/git/blobs/{sha}" blob_url = f"{GITHUB_REPO_URL}/git/blobs/{sha}"
try: try:
response = fetch_with_retries(session, blob_url) response = fetch_with_retries(session, blob_url)
return response.json().get("content", "") blob_data = response.json()
except (GitHubRequestError, requests.RequestException) as e: return blob_data.get("content", "") # Return empty string if no content
logger.error(f"Failed to fetch blob for SHA {sha}. Reason: {e}") except GitHubRequestError as e:
logger.error(f"Failed to fetch blob for SHA {sha}: {e}")
return "" return ""
def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool: def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool:
""" """Verifies the SHA1 hash of the decoded blob content."""
Verifies that the SHA of the decoded content matches the expected file_sha.
Logs a warning if the verification fails but does not block execution.
"""
decoded_bytes = base64.b64decode(blob_content_b64) decoded_bytes = base64.b64decode(blob_content_b64)
blob_header = f"blob {len(decoded_bytes)}\0".encode("utf-8") blob_header = f"blob {len(decoded_bytes)}\0".encode("utf-8")
calculated_sha = hashlib.sha1(blob_header + decoded_bytes).hexdigest() calculated_sha = hashlib.sha1(blob_header + decoded_bytes).hexdigest()
if calculated_sha != file_sha: if calculated_sha != file_sha:
logger.warning(f"SHA mismatch for file. Expected: {file_sha}, Calculated: {calculated_sha}") logger.warning(f"SHA mismatch! Expected: {file_sha}, Calculated: {calculated_sha}")
return False return False # This is now an integrity failure, return False
return True return True
def extract_sec_rules(raw_text: str) -> List[str]: def _extract_rule_id(secrule_text: str) -> str:
"""Extracts the rule ID from a SecRule directive."""
match = re.search(r'id:(\d+)', secrule_text)
return match.group(1) if match else "no_id"
def _extract_rule_severity(secrule_text: str) -> str:
"""Extract the severity."""
match = re.search(r'severity:(\w+)', secrule_text)
return match.group(1) if match else "medium" # Set default to medium
def _extract_rule_location(secrule_text: str) -> str:
""" """
Extracts SecRule patterns from the raw text. Extracts the location (variable) from a SecRule directive. Handles
multiple variables and chained rules.
""" """
return re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL) match = re.search(r'SecRule\s+([^"\s]+)', secrule_text)
if not match:
return "UNKNOWN"
variables_str = match.group(1)
variables = variables_str.split("|") # Split multiple variables
# Process variables for location extraction
locations = []
for var in variables:
var = var.upper() # Set all vars to upper case
if var.startswith("REQUEST_HEADERS"):
if ":" in var: # Specific header
locations.append(var.split(":")[1].replace("_","-").strip()) # add support to user-agent
else:
locations.append("REQUEST_HEADERS") # Generic header location
elif var.startswith("ARGS"): # add support to args
locations.append("Query-String")
elif var == "REQUEST_COOKIES":
locations.append("Cookie")
elif var == "REQUEST_URI":
locations.append("Request-URI")
elif var == "QUERY_STRING":
locations.append("Query-String")
elif var in ("REQUEST_LINE", "REQUEST_BODY", "RESPONSE_BODY", "RESPONSE_HEADERS"):
locations.append(var) # if it has an explicit direct
# Add more location mappings as needed
# Prioritize specific locations, fall back to generic ones
if "REQUEST_URI" in locations:
return "Request-URI" # set request uri as top priority
elif "Query-String" in locations:
return "Query-String"
if locations:
return locations[0] # Return the first extracted location
return "UNKNOWN" # default locatioN
def extract_sec_rules(raw_text: str) -> List[Dict[str, str]]:
"""
Extracts SecRule patterns and associated metadata from raw text.
Now returns a *list of dictionaries*, each representing a SecRule.
"""
rules = []
# Find all SecRule directives (including those spanning multiple lines).
for match in re.finditer(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL):
secrule_text = match.group(0) # Full SecRule text
pattern = match.group(1).strip().replace("\\\\", "\\") # Extract and clean pattern
if not pattern: # if there are not pattern then skipp
continue
rule_id = _extract_rule_id(secrule_text) # Extract rule ID
location = _extract_rule_location(secrule_text) # Extract location
severity = _extract_rule_severity(secrule_text)
rules.append({
"id": rule_id,
"pattern": pattern,
"location": location,
"severity": severity
})
return rules
def process_rule_file(file: Dict[str, str], session: requests.Session) -> List[Dict[str, str]]: def process_rule_file(file: Dict[str, str], session: requests.Session) -> List[Dict[str, str]]:
""" """Processes a single rule file, extracting rules and metadata."""
Processes a single rule file, fetching its content and extracting SecRule patterns.
"""
rules = []
blob_b64 = fetch_github_blob(session, file["sha"]) blob_b64 = fetch_github_blob(session, file["sha"])
if not blob_b64: if not blob_b64:
logger.warning(f"Skipping file {file['name']} due to empty blob content.") logger.warning(f"Skipping {file['name']} (empty blob).")
return rules return []
# Verify SHA (non-blocking) if not verify_blob_sha(file["sha"], blob_b64):
verify_blob_sha(file["sha"], blob_b64) pass # We check before but continue, since data is present
try:
raw_text = base64.b64decode(blob_b64).decode("utf-8")
except Exception as e:
logger.error(f"Failed to decode the file: {file['name']}. Reason: {e}")
return []
raw_text = base64.b64decode(blob_b64).decode("utf-8")
sec_rules = extract_sec_rules(raw_text)
category = file["name"].split("-")[-1].replace(".conf", "") category = file["name"].split("-")[-1].replace(".conf", "")
for rule in sec_rules: extracted_rules = extract_sec_rules(raw_text) # Get list of dicts
pattern = rule.strip().replace("\\", "")
if pattern:
rules.append({"category": category, "pattern": pattern})
return rules # Add category to each extracted rule.
for rule in extracted_rules:
rule["category"] = category
return extracted_rules
def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]: def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]:
""" """Fetches and processes rule files in parallel, returning all extracted rules."""
Fetches the OWASP rule content for each rule file, extracts SecRule patterns, all_rules = []
and returns a list of dicts with category and pattern.
"""
rules = []
with ThreadPoolExecutor(max_workers=CONNECTION_POOL_SIZE) as executor: with ThreadPoolExecutor(max_workers=CONNECTION_POOL_SIZE) as executor:
futures = { future_to_file = {
executor.submit(process_rule_file, file, session): file for file in rule_files executor.submit(process_rule_file, file, session): file
for file in rule_files
} }
for future in tqdm(as_completed(futures), total=len(rule_files), desc="Fetching rule files"): # Use tqdm for progress display. as_completed yields futures as they finish.
for future in tqdm(as_completed(future_to_file), total=len(rule_files), desc="Processing rules"):
file = future_to_file[future]
try: try:
rules.extend(future.result()) rules = future.result() # Get result (or raise exception)
all_rules.extend(rules)
except Exception as e: except Exception as e:
logger.error(f"Failed to process file. Reason: {e}") logger.error(f"Error processing {file['name']}: {e}")
# Consider continuing even on individual file errors
logger.info(f"Fetched {len(rules)} rules.") logger.info(f"Fetched a total of {len(all_rules)} rules.")
return rules return all_rules
def save_as_json(rules: List[Dict[str, str]], output_file: str) -> bool: def save_as_json(rules: List[Dict[str, str]], output_file: str) -> bool:
""" """Saves the extracted rules to a JSON file (atomically)."""
Saves the given list of rules to a JSON file. Returns True if successful, False otherwise.
"""
try: try:
output_dir = Path(output_file).parent output_dir = Path(output_file).parent
if output_dir: if output_dir:
output_dir.mkdir(parents=True, exist_ok=True) output_dir.mkdir(parents=True, exist_ok=True)
# Atomic write using a temporary file temp_file = f"{output_file}.tmp" # Use a temporary file
temp_file = f"{output_file}.tmp"
with open(temp_file, "w", encoding="utf-8") as f: with open(temp_file, "w", encoding="utf-8") as f:
json.dump(rules, f, indent=4) json.dump(rules, f, indent=4)
# Rename temp file to the final output file os.replace(temp_file, output_file) # Atomic rename
os.replace(temp_file, output_file) logger.info(f"Rules saved to {output_file}")
logger.info(f"Rules saved to {output_file}.")
return True return True
except IOError as e: except Exception as e:
logger.error(f"Failed to save rules to {output_file}. Reason: {e}") logger.error(f"Failed to save rules to {output_file}: {e}")
return False return False
def main(): def main():
"""Main function to fetch and save OWASP rules.""" """Main function: Fetches, processes, and saves OWASP CRS rules."""
parser = argparse.ArgumentParser(description="Fetch OWASP Core Rule Set rules from GitHub.") parser = argparse.ArgumentParser(
parser.add_argument("--output", type=str, default="owasp_rules.json", help="Output JSON file path.") description="Fetches OWASP Core Rule Set rules and saves them as JSON."
parser.add_argument("--ref", type=str, default=GITHUB_REF, help="Git reference (e.g., tag or branch).") )
parser.add_argument("--dry-run", action="store_true", help="Simulate fetching without saving.") parser.add_argument("--output", type=str, default="owasp_rules.json",
help="Output JSON file path.")
parser.add_argument("--ref", type=str, default=GITHUB_REF,
help="Git reference (tag or branch prefix). E.g., 'v4.0', 'v3.3', 'dev'")
parser.add_argument("--dry-run", action="store_true",
help="Simulate fetching and processing (no file save).")
args = parser.parse_args() args = parser.parse_args()
session = get_session() session = get_session() # Create a requests session
# 1. Fetch the latest tag (or use the provided ref directly)
latest_ref = fetch_latest_tag(session, args.ref) latest_ref = fetch_latest_tag(session, args.ref)
if latest_ref: if not latest_ref:
rule_files = fetch_rule_files(session, latest_ref) logger.error("Could not determine the latest tag. Exiting.")
if rule_files: return # Exit if we can't get a ref
rules = fetch_owasp_rules(session, rule_files)
if args.dry_run: # 2. Fetch the list of rule files.
logger.info("Dry-run mode enabled. Skipping file save.") rule_files = fetch_rule_files(session, latest_ref)
elif rules and save_as_json(rules, args.output): if not rule_files:
logger.info("All rules fetched and saved successfully.") logger.error("Could not fetch the list of rule files. Exiting.")
return
# 3. Fetch and process the rules (in parallel).
rules = fetch_owasp_rules(session, rule_files)
# 4. Save the rules to a JSON file (unless it's a dry run).
if not args.dry_run:
if rules:
if save_as_json(rules, args.output):
logger.info("Successfully saved rules to JSON.")
else: else:
logger.error("Failed to fetch or save rules.") logger.error("Failed to save rules to JSON.") # if the save fail
else: else:
logger.error("Failed to fetch rule files.") logger.warning("No rules were extracted.") # Warn if no rules
else: else:
logger.error("Failed to fetch tags.") logger.info("Dry-run mode: Rules were fetched and processed, but not saved.")
# Optionally print some of the extracted rules here for verification.
if rules:
logger.info(f"Example rule: {rules[0]}")
if __name__ == "__main__": if __name__ == "__main__":