patterns/owasp.py

208 lines
8.0 KiB
Python
Raw Normal View History

import os
import re
import time
import json
import base64
import hashlib
import logging
import requests
from typing import List, Dict, Optional
2024-12-21 01:00:55 +01:00
2025-01-03 13:22:31 +01:00
# Logging setup
2024-12-21 11:31:31 +01:00
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
2025-01-03 13:22:31 +01:00
# Constants
2024-12-21 11:31:31 +01:00
GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset"
OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules"
2025-01-03 13:22:31 +01:00
GITHUB_REF = "v4" # Default version prefix
RATE_LIMIT_DELAY = 600 # Rate limit delay in seconds
RETRY_DELAY = 5 # Base retry delay in seconds
MAX_RETRIES = 6 # Maximum number of retries
EXPONENTIAL_BACKOFF = True # Use exponential backoff for retries
BACKOFF_MULTIPLIER = 2 # Multiplier for exponential backoff
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # GitHub token for authentication
2024-12-21 01:00:55 +01:00
2024-12-21 11:31:31 +01:00
class GitHubRequestError(Exception):
"""Raised when fetching data from GitHub fails after all retries."""
2024-12-21 01:00:55 +01:00
def get_session() -> requests.Session:
"""
2025-01-03 13:22:31 +01:00
Creates and returns a requests.Session with optional GitHub token authentication.
"""
session = requests.Session()
if GITHUB_TOKEN:
session.headers.update({"Authorization": f"token {GITHUB_TOKEN}"})
return session
2024-12-21 01:00:55 +01:00
def fetch_with_retries(session: requests.Session, url: str) -> requests.Response:
"""
Fetches the given URL with retries, handling rate limits and transient HTTP errors.
Raises GitHubRequestError if the request cannot be completed after all retries.
"""
retries = 0
while retries < MAX_RETRIES:
try:
response = session.get(url)
if response.status_code == 403 and "X-RateLimit-Remaining" in response.headers:
reset_time = int(response.headers.get("X-RateLimit-Reset", 0))
wait_time = max(reset_time - int(time.time()), RATE_LIMIT_DELAY)
logging.warning(f"Rate limit exceeded. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
continue
try:
response.raise_for_status()
return response
except requests.HTTPError:
# Handle non-200 codes that are not rate-limit related
pass
# Retry logic for other errors
wait_time = (RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries)
if EXPONENTIAL_BACKOFF else RETRY_DELAY)
logging.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.")
time.sleep(wait_time)
retries += 1
except requests.RequestException as e:
logging.error(f"Error fetching {url}: {e}")
retries += 1
raise GitHubRequestError(f"Failed to fetch {url} after {MAX_RETRIES} retries.")
2024-12-21 01:00:55 +01:00
def fetch_latest_tag(session: requests.Session, ref_prefix: str) -> Optional[str]:
"""
Fetches the latest matching Git tag from the GitHub repository based on a version prefix.
Falls back to the newest tag if no matching prefix is found. Returns the tag reference.
"""
ref_url = f"{GITHUB_REPO_URL}/git/refs/tags"
2024-12-21 11:31:31 +01:00
try:
response = fetch_with_retries(session, ref_url)
tags = response.json()
if not tags:
logging.warning("No tags found in the repository.")
return None
matching = [r["ref"] for r in tags if r["ref"].startswith(f"refs/tags/{ref_prefix}.")]
matching.sort(reverse=True, key=lambda x: x.split(".")[-1])
if matching:
latest_tag = matching[0]
logging.info(f"Latest matching tag: {latest_tag}")
return latest_tag
logging.warning(f"No matching refs found for prefix {ref_prefix}. Falling back to the latest tag.")
return tags[-1]["ref"]
except Exception as e:
logging.error(f"Failed to fetch tags. Reason: {e}")
return None
def fetch_rule_files(session: requests.Session, ref: str) -> List[Dict[str, str]]:
"""
Fetches the list of rule files (.conf) from the given ref in the repository.
Returns a list of dictionaries containing file name and SHA.
"""
ref_name = ref.split("/")[-1] if "/" in ref else ref
rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref_name}"
try:
response = fetch_with_retries(session, rules_url)
files = response.json()
return [{"name": f["name"], "sha": f["sha"]} for f in files if f["name"].endswith(".conf")]
except (GitHubRequestError, requests.RequestException) as e:
2024-12-21 11:31:31 +01:00
logging.error(f"Failed to fetch rule files from {rules_url}. Reason: {e}")
return []
2024-12-21 01:00:55 +01:00
def fetch_github_blob(session: requests.Session, sha: str) -> str:
"""
Fetches the blob content (base64-encoded) for a given SHA from GitHub.
Returns the content if successful, or an empty string on failure.
"""
blob_url = f"{GITHUB_REPO_URL}/git/blobs/{sha}"
try:
response = fetch_with_retries(session, blob_url)
return response.json().get("content", "")
except (GitHubRequestError, requests.RequestException) as e:
logging.error(f"Failed to fetch blob for SHA {sha}. Reason: {e}")
return ""
def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool:
"""
Verifies that the SHA of the decoded content matches the expected file_sha.
2025-01-03 13:22:31 +01:00
Logs a warning if the verification fails but does not block execution.
"""
decoded_bytes = base64.b64decode(blob_content_b64)
# Option 1: Verify Gits actual blob SHA (header + content)
blob_header = f"blob {len(decoded_bytes)}\0".encode("utf-8")
calculated_sha = hashlib.sha1(blob_header + decoded_bytes).hexdigest()
2025-01-03 13:22:31 +01:00
if calculated_sha != file_sha:
logging.warning(
f"SHA mismatch for file. Expected: {file_sha}, Calculated: {calculated_sha}"
)
return False
return True
2024-12-21 11:31:31 +01:00
def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""
Fetches the OWASP rule content for each rule file, extracts SecRule patterns,
and returns a list of dicts with category and pattern.
"""
2024-12-21 01:00:55 +01:00
rules = []
for file in rule_files:
logging.info(f"Fetching {file['name']}...")
blob_b64 = fetch_github_blob(session, file["sha"])
if not blob_b64:
logging.warning(f"Skipping file {file['name']} due to empty blob content.")
continue
2025-01-03 13:22:31 +01:00
# Verify SHA (non-blocking)
verify_blob_sha(file["sha"], blob_b64)
raw_text = base64.b64decode(blob_b64).decode("utf-8")
sec_rules = re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL)
category = file["name"].split("-")[-1].replace(".conf", "")
for rule in sec_rules:
pattern = rule.strip().replace("\\", "")
if pattern:
rules.append({"category": category, "pattern": pattern})
logging.info(f"Fetched {len(rules)} rules.")
2024-12-21 01:00:55 +01:00
return rules
2024-12-21 11:31:31 +01:00
def save_as_json(rules: List[Dict[str, str]], output_file: str) -> bool:
"""
Saves the given list of rules to a JSON file. Returns True if successful, False otherwise.
"""
2024-12-21 11:31:31 +01:00
try:
output_dir = os.path.dirname(output_file)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
with open(output_file, "w", encoding="utf-8") as f:
2024-12-21 11:31:31 +01:00
json.dump(rules, f, indent=4)
logging.info(f"Rules saved to {output_file}.")
return True
2024-12-21 11:31:31 +01:00
except IOError as e:
logging.error(f"Failed to save rules to {output_file}. Reason: {e}")
return False
2024-12-21 11:31:31 +01:00
2024-12-21 01:00:55 +01:00
if __name__ == "__main__":
session = get_session()
latest_ref = fetch_latest_tag(session, GITHUB_REF)
if latest_ref:
rule_files = fetch_rule_files(session, latest_ref)
if rule_files:
rules = fetch_owasp_rules(session, rule_files)
if rules and save_as_json(rules, "owasp_rules.json"):
logging.info("All rules fetched and saved successfully.")
else:
logging.error("Failed to fetch or save rules.")
else:
logging.error("Failed to fetch rule files.")
2024-12-21 11:31:31 +01:00
else:
logging.error("Failed to fetch tags.")