patterns/owasp.py

205 lines
7.8 KiB
Python
Raw Normal View History

import os
import re
import time
import json
import base64
import hashlib
import logging
import requests
from typing import List, Dict, Optional
2024-12-21 01:00:55 +01:00
2024-12-21 11:31:31 +01:00
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset"
OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules"
GITHUB_REF = "v4"
RATE_LIMIT_DELAY = 600
RETRY_DELAY = 5
MAX_RETRIES = 6
EXPONENTIAL_BACKOFF = True
BACKOFF_MULTIPLIER = 2
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
2024-12-21 01:00:55 +01:00
2024-12-21 11:31:31 +01:00
class GitHubRequestError(Exception):
"""Raised when fetching data from GitHub fails after all retries."""
2024-12-21 01:00:55 +01:00
def get_session() -> requests.Session:
"""
Creates and returns a requests.Session with optional GitHub token auth.
"""
session = requests.Session()
if GITHUB_TOKEN:
session.headers.update({"Authorization": f"token {GITHUB_TOKEN}"})
return session
2024-12-21 01:00:55 +01:00
def fetch_with_retries(session: requests.Session, url: str) -> requests.Response:
"""
Fetches the given URL with retries, handling rate limits and transient HTTP errors.
Raises GitHubRequestError if the request cannot be completed after all retries.
"""
retries = 0
while retries < MAX_RETRIES:
try:
response = session.get(url)
if response.status_code == 403 and "X-RateLimit-Remaining" in response.headers:
reset_time = int(response.headers.get("X-RateLimit-Reset", 0))
wait_time = max(reset_time - int(time.time()), RATE_LIMIT_DELAY)
logging.warning(f"Rate limit exceeded. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
continue
try:
response.raise_for_status()
return response
except requests.HTTPError:
# Handle non-200 codes that are not rate-limit related
pass
# Retry logic for other errors
wait_time = (RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries)
if EXPONENTIAL_BACKOFF else RETRY_DELAY)
logging.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.")
time.sleep(wait_time)
retries += 1
except requests.RequestException as e:
logging.error(f"Error fetching {url}: {e}")
retries += 1
raise GitHubRequestError(f"Failed to fetch {url} after {MAX_RETRIES} retries.")
2024-12-21 01:00:55 +01:00
def fetch_latest_tag(session: requests.Session, ref_prefix: str) -> Optional[str]:
"""
Fetches the latest matching Git tag from the GitHub repository based on a version prefix.
Falls back to the newest tag if no matching prefix is found. Returns the tag reference.
"""
ref_url = f"{GITHUB_REPO_URL}/git/refs/tags"
2024-12-21 11:31:31 +01:00
try:
response = fetch_with_retries(session, ref_url)
tags = response.json()
if not tags:
logging.warning("No tags found in the repository.")
return None
matching = [r["ref"] for r in tags if r["ref"].startswith(f"refs/tags/{ref_prefix}.")]
matching.sort(reverse=True, key=lambda x: x.split(".")[-1])
if matching:
latest_tag = matching[0]
logging.info(f"Latest matching tag: {latest_tag}")
return latest_tag
logging.warning(f"No matching refs found for prefix {ref_prefix}. Falling back to the latest tag.")
return tags[-1]["ref"]
except Exception as e:
logging.error(f"Failed to fetch tags. Reason: {e}")
return None
def fetch_rule_files(session: requests.Session, ref: str) -> List[Dict[str, str]]:
"""
Fetches the list of rule files (.conf) from the given ref in the repository.
Returns a list of dictionaries containing file name and SHA.
"""
ref_name = ref.split("/")[-1] if "/" in ref else ref
rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref_name}"
try:
response = fetch_with_retries(session, rules_url)
files = response.json()
return [{"name": f["name"], "sha": f["sha"]} for f in files if f["name"].endswith(".conf")]
except (GitHubRequestError, requests.RequestException) as e:
2024-12-21 11:31:31 +01:00
logging.error(f"Failed to fetch rule files from {rules_url}. Reason: {e}")
return []
2024-12-21 01:00:55 +01:00
def fetch_github_blob(session: requests.Session, sha: str) -> str:
"""
Fetches the blob content (base64-encoded) for a given SHA from GitHub.
Returns the content if successful, or an empty string on failure.
"""
blob_url = f"{GITHUB_REPO_URL}/git/blobs/{sha}"
try:
response = fetch_with_retries(session, blob_url)
return response.json().get("content", "")
except (GitHubRequestError, requests.RequestException) as e:
logging.error(f"Failed to fetch blob for SHA {sha}. Reason: {e}")
return ""
def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool:
"""
Verifies that the SHA of the decoded content matches the expected file_sha.
"""
decoded_bytes = base64.b64decode(blob_content_b64)
# Option 1: Verify Gits actual blob SHA (header + content)
blob_header = f"blob {len(decoded_bytes)}\0".encode("utf-8")
calculated_sha = hashlib.sha1(blob_header + decoded_bytes).hexdigest()
return calculated_sha == file_sha
2024-12-21 11:31:31 +01:00
def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""
Fetches the OWASP rule content for each rule file, extracts SecRule patterns,
and returns a list of dicts with category and pattern.
"""
2024-12-21 01:00:55 +01:00
rules = []
for file in rule_files:
logging.info(f"Fetching {file['name']}...")
blob_b64 = fetch_github_blob(session, file["sha"])
if not blob_b64:
logging.warning(f"Skipping file {file['name']} due to empty blob content.")
continue
if not verify_blob_sha(file["sha"], blob_b64):
decoded_bytes = base64.b64decode(blob_b64)
calculated_sha = hashlib.sha1(decoded_bytes).hexdigest()
logging.warning(
f"SHA mismatch for {file['name']}. "
f"Expected: {file['sha']}, Calculated: {calculated_sha}"
)
raw_text = base64.b64decode(blob_b64).decode("utf-8")
sec_rules = re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL)
category = file["name"].split("-")[-1].replace(".conf", "")
for rule in sec_rules:
pattern = rule.strip().replace("\\", "")
if pattern:
rules.append({"category": category, "pattern": pattern})
logging.info(f"Fetched {len(rules)} rules.")
2024-12-21 01:00:55 +01:00
return rules
2024-12-21 11:31:31 +01:00
def save_as_json(rules: List[Dict[str, str]], output_file: str) -> bool:
"""
Saves the given list of rules to a JSON file. Returns True if successful, False otherwise.
"""
2024-12-21 11:31:31 +01:00
try:
output_dir = os.path.dirname(output_file)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
with open(output_file, "w", encoding="utf-8") as f:
2024-12-21 11:31:31 +01:00
json.dump(rules, f, indent=4)
logging.info(f"Rules saved to {output_file}.")
return True
2024-12-21 11:31:31 +01:00
except IOError as e:
logging.error(f"Failed to save rules to {output_file}. Reason: {e}")
return False
2024-12-21 11:31:31 +01:00
2024-12-21 01:00:55 +01:00
if __name__ == "__main__":
session = get_session()
latest_ref = fetch_latest_tag(session, GITHUB_REF)
if latest_ref:
rule_files = fetch_rule_files(session, latest_ref)
if rule_files:
rules = fetch_owasp_rules(session, rule_files)
if rules and save_as_json(rules, "owasp_rules.json"):
logging.info("All rules fetched and saved successfully.")
else:
logging.error("Failed to fetch or save rules.")
else:
logging.error("Failed to fetch rule files.")
2024-12-21 11:31:31 +01:00
else:
logging.error("Failed to fetch tags.")