patterns/owasp.py
fab 9aba2163c2
Update owasp.py
Minor improvements.
2025-01-03 13:22:31 +01:00

208 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import time
import json
import base64
import hashlib
import logging
import requests
from typing import List, Dict, Optional
# Logging setup
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Constants
GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset"
OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules"
GITHUB_REF = "v4" # Default version prefix
RATE_LIMIT_DELAY = 600 # Rate limit delay in seconds
RETRY_DELAY = 5 # Base retry delay in seconds
MAX_RETRIES = 6 # Maximum number of retries
EXPONENTIAL_BACKOFF = True # Use exponential backoff for retries
BACKOFF_MULTIPLIER = 2 # Multiplier for exponential backoff
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # GitHub token for authentication
class GitHubRequestError(Exception):
"""Raised when fetching data from GitHub fails after all retries."""
def get_session() -> requests.Session:
"""
Creates and returns a requests.Session with optional GitHub token authentication.
"""
session = requests.Session()
if GITHUB_TOKEN:
session.headers.update({"Authorization": f"token {GITHUB_TOKEN}"})
return session
def fetch_with_retries(session: requests.Session, url: str) -> requests.Response:
"""
Fetches the given URL with retries, handling rate limits and transient HTTP errors.
Raises GitHubRequestError if the request cannot be completed after all retries.
"""
retries = 0
while retries < MAX_RETRIES:
try:
response = session.get(url)
if response.status_code == 403 and "X-RateLimit-Remaining" in response.headers:
reset_time = int(response.headers.get("X-RateLimit-Reset", 0))
wait_time = max(reset_time - int(time.time()), RATE_LIMIT_DELAY)
logging.warning(f"Rate limit exceeded. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
continue
try:
response.raise_for_status()
return response
except requests.HTTPError:
# Handle non-200 codes that are not rate-limit related
pass
# Retry logic for other errors
wait_time = (RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries)
if EXPONENTIAL_BACKOFF else RETRY_DELAY)
logging.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.")
time.sleep(wait_time)
retries += 1
except requests.RequestException as e:
logging.error(f"Error fetching {url}: {e}")
retries += 1
raise GitHubRequestError(f"Failed to fetch {url} after {MAX_RETRIES} retries.")
def fetch_latest_tag(session: requests.Session, ref_prefix: str) -> Optional[str]:
"""
Fetches the latest matching Git tag from the GitHub repository based on a version prefix.
Falls back to the newest tag if no matching prefix is found. Returns the tag reference.
"""
ref_url = f"{GITHUB_REPO_URL}/git/refs/tags"
try:
response = fetch_with_retries(session, ref_url)
tags = response.json()
if not tags:
logging.warning("No tags found in the repository.")
return None
matching = [r["ref"] for r in tags if r["ref"].startswith(f"refs/tags/{ref_prefix}.")]
matching.sort(reverse=True, key=lambda x: x.split(".")[-1])
if matching:
latest_tag = matching[0]
logging.info(f"Latest matching tag: {latest_tag}")
return latest_tag
logging.warning(f"No matching refs found for prefix {ref_prefix}. Falling back to the latest tag.")
return tags[-1]["ref"]
except Exception as e:
logging.error(f"Failed to fetch tags. Reason: {e}")
return None
def fetch_rule_files(session: requests.Session, ref: str) -> List[Dict[str, str]]:
"""
Fetches the list of rule files (.conf) from the given ref in the repository.
Returns a list of dictionaries containing file name and SHA.
"""
ref_name = ref.split("/")[-1] if "/" in ref else ref
rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref_name}"
try:
response = fetch_with_retries(session, rules_url)
files = response.json()
return [{"name": f["name"], "sha": f["sha"]} for f in files if f["name"].endswith(".conf")]
except (GitHubRequestError, requests.RequestException) as e:
logging.error(f"Failed to fetch rule files from {rules_url}. Reason: {e}")
return []
def fetch_github_blob(session: requests.Session, sha: str) -> str:
"""
Fetches the blob content (base64-encoded) for a given SHA from GitHub.
Returns the content if successful, or an empty string on failure.
"""
blob_url = f"{GITHUB_REPO_URL}/git/blobs/{sha}"
try:
response = fetch_with_retries(session, blob_url)
return response.json().get("content", "")
except (GitHubRequestError, requests.RequestException) as e:
logging.error(f"Failed to fetch blob for SHA {sha}. Reason: {e}")
return ""
def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool:
"""
Verifies that the SHA of the decoded content matches the expected file_sha.
Logs a warning if the verification fails but does not block execution.
"""
decoded_bytes = base64.b64decode(blob_content_b64)
# Option 1: Verify Gits actual blob SHA (header + content)
blob_header = f"blob {len(decoded_bytes)}\0".encode("utf-8")
calculated_sha = hashlib.sha1(blob_header + decoded_bytes).hexdigest()
if calculated_sha != file_sha:
logging.warning(
f"SHA mismatch for file. Expected: {file_sha}, Calculated: {calculated_sha}"
)
return False
return True
def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""
Fetches the OWASP rule content for each rule file, extracts SecRule patterns,
and returns a list of dicts with category and pattern.
"""
rules = []
for file in rule_files:
logging.info(f"Fetching {file['name']}...")
blob_b64 = fetch_github_blob(session, file["sha"])
if not blob_b64:
logging.warning(f"Skipping file {file['name']} due to empty blob content.")
continue
# Verify SHA (non-blocking)
verify_blob_sha(file["sha"], blob_b64)
raw_text = base64.b64decode(blob_b64).decode("utf-8")
sec_rules = re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL)
category = file["name"].split("-")[-1].replace(".conf", "")
for rule in sec_rules:
pattern = rule.strip().replace("\\", "")
if pattern:
rules.append({"category": category, "pattern": pattern})
logging.info(f"Fetched {len(rules)} rules.")
return rules
def save_as_json(rules: List[Dict[str, str]], output_file: str) -> bool:
"""
Saves the given list of rules to a JSON file. Returns True if successful, False otherwise.
"""
try:
output_dir = os.path.dirname(output_file)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
with open(output_file, "w", encoding="utf-8") as f:
json.dump(rules, f, indent=4)
logging.info(f"Rules saved to {output_file}.")
return True
except IOError as e:
logging.error(f"Failed to save rules to {output_file}. Reason: {e}")
return False
if __name__ == "__main__":
session = get_session()
latest_ref = fetch_latest_tag(session, GITHUB_REF)
if latest_ref:
rule_files = fetch_rule_files(session, latest_ref)
if rule_files:
rules = fetch_owasp_rules(session, rule_files)
if rules and save_as_json(rules, "owasp_rules.json"):
logging.info("All rules fetched and saved successfully.")
else:
logging.error("Failed to fetch or save rules.")
else:
logging.error("Failed to fetch rule files.")
else:
logging.error("Failed to fetch tags.")