2024-12-29 23:50:29 +01:00
|
|
|
|
import os
|
2024-12-30 00:18:50 +01:00
|
|
|
|
import re
|
2024-12-29 23:50:29 +01:00
|
|
|
|
import time
|
2024-12-30 00:18:50 +01:00
|
|
|
|
import json
|
2024-12-29 23:50:29 +01:00
|
|
|
|
import base64
|
|
|
|
|
|
import hashlib
|
2024-12-30 00:18:50 +01:00
|
|
|
|
import logging
|
|
|
|
|
|
import requests
|
2024-12-29 23:50:29 +01:00
|
|
|
|
from typing import List, Dict, Optional
|
2024-12-21 01:00:55 +01:00
|
|
|
|
|
2025-01-03 13:22:31 +01:00
|
|
|
|
# Logging setup
|
2024-12-21 11:31:31 +01:00
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
|
|
|
|
|
|
2025-01-03 13:22:31 +01:00
|
|
|
|
# Constants
|
2024-12-21 11:31:31 +01:00
|
|
|
|
GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset"
|
2024-12-29 23:50:29 +01:00
|
|
|
|
OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules"
|
2025-01-03 13:22:31 +01:00
|
|
|
|
GITHUB_REF = "v4" # Default version prefix
|
|
|
|
|
|
RATE_LIMIT_DELAY = 600 # Rate limit delay in seconds
|
|
|
|
|
|
RETRY_DELAY = 5 # Base retry delay in seconds
|
|
|
|
|
|
MAX_RETRIES = 6 # Maximum number of retries
|
|
|
|
|
|
EXPONENTIAL_BACKOFF = True # Use exponential backoff for retries
|
|
|
|
|
|
BACKOFF_MULTIPLIER = 2 # Multiplier for exponential backoff
|
|
|
|
|
|
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # GitHub token for authentication
|
2024-12-21 01:00:55 +01:00
|
|
|
|
|
2024-12-21 11:31:31 +01:00
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
|
class GitHubRequestError(Exception):
|
|
|
|
|
|
"""Raised when fetching data from GitHub fails after all retries."""
|
2024-12-21 01:00:55 +01:00
|
|
|
|
|
|
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
|
def get_session() -> requests.Session:
|
|
|
|
|
|
"""
|
2025-01-03 13:22:31 +01:00
|
|
|
|
Creates and returns a requests.Session with optional GitHub token authentication.
|
2024-12-30 00:18:50 +01:00
|
|
|
|
"""
|
|
|
|
|
|
session = requests.Session()
|
2024-12-29 23:50:29 +01:00
|
|
|
|
if GITHUB_TOKEN:
|
2024-12-30 00:18:50 +01:00
|
|
|
|
session.headers.update({"Authorization": f"token {GITHUB_TOKEN}"})
|
|
|
|
|
|
return session
|
|
|
|
|
|
|
2024-12-21 01:00:55 +01:00
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
|
def fetch_with_retries(session: requests.Session, url: str) -> requests.Response:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Fetches the given URL with retries, handling rate limits and transient HTTP errors.
|
|
|
|
|
|
Raises GitHubRequestError if the request cannot be completed after all retries.
|
|
|
|
|
|
"""
|
|
|
|
|
|
retries = 0
|
2024-12-29 23:50:29 +01:00
|
|
|
|
while retries < MAX_RETRIES:
|
|
|
|
|
|
try:
|
2024-12-30 00:18:50 +01:00
|
|
|
|
response = session.get(url)
|
|
|
|
|
|
if response.status_code == 403 and "X-RateLimit-Remaining" in response.headers:
|
|
|
|
|
|
reset_time = int(response.headers.get("X-RateLimit-Reset", 0))
|
2024-12-29 23:50:29 +01:00
|
|
|
|
wait_time = max(reset_time - int(time.time()), RATE_LIMIT_DELAY)
|
|
|
|
|
|
logging.warning(f"Rate limit exceeded. Retrying in {wait_time} seconds...")
|
|
|
|
|
|
time.sleep(wait_time)
|
2024-12-30 00:18:50 +01:00
|
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
|
return response
|
|
|
|
|
|
except requests.HTTPError:
|
|
|
|
|
|
# Handle non-200 codes that are not rate-limit related
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
# Retry logic for other errors
|
|
|
|
|
|
wait_time = (RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries)
|
|
|
|
|
|
if EXPONENTIAL_BACKOFF else RETRY_DELAY)
|
|
|
|
|
|
logging.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.")
|
|
|
|
|
|
time.sleep(wait_time)
|
|
|
|
|
|
retries += 1
|
2024-12-29 23:50:29 +01:00
|
|
|
|
except requests.RequestException as e:
|
|
|
|
|
|
logging.error(f"Error fetching {url}: {e}")
|
|
|
|
|
|
retries += 1
|
2024-12-30 00:18:50 +01:00
|
|
|
|
|
|
|
|
|
|
raise GitHubRequestError(f"Failed to fetch {url} after {MAX_RETRIES} retries.")
|
2024-12-21 01:00:55 +01:00
|
|
|
|
|
2024-12-29 23:50:29 +01:00
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
|
def fetch_latest_tag(session: requests.Session, ref_prefix: str) -> Optional[str]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Fetches the latest matching Git tag from the GitHub repository based on a version prefix.
|
|
|
|
|
|
Falls back to the newest tag if no matching prefix is found. Returns the tag reference.
|
|
|
|
|
|
"""
|
2024-12-29 23:50:29 +01:00
|
|
|
|
ref_url = f"{GITHUB_REPO_URL}/git/refs/tags"
|
2024-12-21 11:31:31 +01:00
|
|
|
|
try:
|
2024-12-30 00:18:50 +01:00
|
|
|
|
response = fetch_with_retries(session, ref_url)
|
|
|
|
|
|
tags = response.json()
|
|
|
|
|
|
if not tags:
|
|
|
|
|
|
logging.warning("No tags found in the repository.")
|
|
|
|
|
|
return None
|
|
|
|
|
|
matching = [r["ref"] for r in tags if r["ref"].startswith(f"refs/tags/{ref_prefix}.")]
|
|
|
|
|
|
matching.sort(reverse=True, key=lambda x: x.split(".")[-1])
|
|
|
|
|
|
if matching:
|
|
|
|
|
|
latest_tag = matching[0]
|
2024-12-29 23:50:29 +01:00
|
|
|
|
logging.info(f"Latest matching tag: {latest_tag}")
|
|
|
|
|
|
return latest_tag
|
2024-12-30 00:18:50 +01:00
|
|
|
|
logging.warning(f"No matching refs found for prefix {ref_prefix}. Falling back to the latest tag.")
|
|
|
|
|
|
return tags[-1]["ref"]
|
2024-12-29 23:50:29 +01:00
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logging.error(f"Failed to fetch tags. Reason: {e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
|
def fetch_rule_files(session: requests.Session, ref: str) -> List[Dict[str, str]]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Fetches the list of rule files (.conf) from the given ref in the repository.
|
|
|
|
|
|
Returns a list of dictionaries containing file name and SHA.
|
|
|
|
|
|
"""
|
|
|
|
|
|
ref_name = ref.split("/")[-1] if "/" in ref else ref
|
|
|
|
|
|
rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref_name}"
|
2024-12-29 23:50:29 +01:00
|
|
|
|
try:
|
2024-12-30 00:18:50 +01:00
|
|
|
|
response = fetch_with_retries(session, rules_url)
|
|
|
|
|
|
files = response.json()
|
|
|
|
|
|
return [{"name": f["name"], "sha": f["sha"]} for f in files if f["name"].endswith(".conf")]
|
|
|
|
|
|
except (GitHubRequestError, requests.RequestException) as e:
|
2024-12-21 11:31:31 +01:00
|
|
|
|
logging.error(f"Failed to fetch rule files from {rules_url}. Reason: {e}")
|
|
|
|
|
|
return []
|
2024-12-21 01:00:55 +01:00
|
|
|
|
|
|
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
|
def fetch_github_blob(session: requests.Session, sha: str) -> str:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Fetches the blob content (base64-encoded) for a given SHA from GitHub.
|
|
|
|
|
|
Returns the content if successful, or an empty string on failure.
|
|
|
|
|
|
"""
|
2024-12-29 23:50:29 +01:00
|
|
|
|
blob_url = f"{GITHUB_REPO_URL}/git/blobs/{sha}"
|
|
|
|
|
|
try:
|
2024-12-30 00:18:50 +01:00
|
|
|
|
response = fetch_with_retries(session, blob_url)
|
|
|
|
|
|
return response.json().get("content", "")
|
|
|
|
|
|
except (GitHubRequestError, requests.RequestException) as e:
|
2024-12-29 23:50:29 +01:00
|
|
|
|
logging.error(f"Failed to fetch blob for SHA {sha}. Reason: {e}")
|
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
|
def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Verifies that the SHA of the decoded content matches the expected file_sha.
|
2025-01-03 13:22:31 +01:00
|
|
|
|
Logs a warning if the verification fails but does not block execution.
|
2024-12-30 00:18:50 +01:00
|
|
|
|
"""
|
|
|
|
|
|
decoded_bytes = base64.b64decode(blob_content_b64)
|
|
|
|
|
|
# Option 1: Verify Git’s actual blob SHA (header + content)
|
|
|
|
|
|
blob_header = f"blob {len(decoded_bytes)}\0".encode("utf-8")
|
|
|
|
|
|
calculated_sha = hashlib.sha1(blob_header + decoded_bytes).hexdigest()
|
|
|
|
|
|
|
2025-01-03 13:22:31 +01:00
|
|
|
|
if calculated_sha != file_sha:
|
|
|
|
|
|
logging.warning(
|
|
|
|
|
|
f"SHA mismatch for file. Expected: {file_sha}, Calculated: {calculated_sha}"
|
|
|
|
|
|
)
|
|
|
|
|
|
return False
|
|
|
|
|
|
return True
|
2024-12-21 11:31:31 +01:00
|
|
|
|
|
|
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
|
def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Fetches the OWASP rule content for each rule file, extracts SecRule patterns,
|
|
|
|
|
|
and returns a list of dicts with category and pattern.
|
|
|
|
|
|
"""
|
2024-12-21 01:00:55 +01:00
|
|
|
|
rules = []
|
|
|
|
|
|
for file in rule_files:
|
2024-12-29 23:50:29 +01:00
|
|
|
|
logging.info(f"Fetching {file['name']}...")
|
2024-12-30 00:18:50 +01:00
|
|
|
|
blob_b64 = fetch_github_blob(session, file["sha"])
|
|
|
|
|
|
if not blob_b64:
|
|
|
|
|
|
logging.warning(f"Skipping file {file['name']} due to empty blob content.")
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
2025-01-03 13:22:31 +01:00
|
|
|
|
# Verify SHA (non-blocking)
|
|
|
|
|
|
verify_blob_sha(file["sha"], blob_b64)
|
2024-12-29 23:50:29 +01:00
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
|
raw_text = base64.b64decode(blob_b64).decode("utf-8")
|
2024-12-29 23:50:29 +01:00
|
|
|
|
sec_rules = re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL)
|
2024-12-30 00:18:50 +01:00
|
|
|
|
category = file["name"].split("-")[-1].replace(".conf", "")
|
2024-12-29 23:50:29 +01:00
|
|
|
|
for rule in sec_rules:
|
|
|
|
|
|
pattern = rule.strip().replace("\\", "")
|
|
|
|
|
|
if pattern:
|
|
|
|
|
|
rules.append({"category": category, "pattern": pattern})
|
|
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
|
logging.info(f"Fetched {len(rules)} rules.")
|
2024-12-21 01:00:55 +01:00
|
|
|
|
return rules
|
|
|
|
|
|
|
2024-12-21 11:31:31 +01:00
|
|
|
|
|
2024-12-30 00:18:50 +01:00
|
|
|
|
def save_as_json(rules: List[Dict[str, str]], output_file: str) -> bool:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Saves the given list of rules to a JSON file. Returns True if successful, False otherwise.
|
|
|
|
|
|
"""
|
2024-12-21 11:31:31 +01:00
|
|
|
|
try:
|
2024-12-29 23:50:29 +01:00
|
|
|
|
output_dir = os.path.dirname(output_file)
|
|
|
|
|
|
if output_dir:
|
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
2024-12-30 00:18:50 +01:00
|
|
|
|
with open(output_file, "w", encoding="utf-8") as f:
|
2024-12-21 11:31:31 +01:00
|
|
|
|
json.dump(rules, f, indent=4)
|
2024-12-30 00:18:50 +01:00
|
|
|
|
logging.info(f"Rules saved to {output_file}.")
|
|
|
|
|
|
return True
|
2024-12-21 11:31:31 +01:00
|
|
|
|
except IOError as e:
|
|
|
|
|
|
logging.error(f"Failed to save rules to {output_file}. Reason: {e}")
|
2024-12-30 00:18:50 +01:00
|
|
|
|
return False
|
2024-12-21 11:31:31 +01:00
|
|
|
|
|
2024-12-21 01:00:55 +01:00
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2024-12-30 00:18:50 +01:00
|
|
|
|
session = get_session()
|
|
|
|
|
|
latest_ref = fetch_latest_tag(session, GITHUB_REF)
|
2024-12-29 23:50:29 +01:00
|
|
|
|
if latest_ref:
|
2024-12-30 00:18:50 +01:00
|
|
|
|
rule_files = fetch_rule_files(session, latest_ref)
|
2024-12-29 23:50:29 +01:00
|
|
|
|
if rule_files:
|
2024-12-30 00:18:50 +01:00
|
|
|
|
rules = fetch_owasp_rules(session, rule_files)
|
|
|
|
|
|
if rules and save_as_json(rules, "owasp_rules.json"):
|
|
|
|
|
|
logging.info("All rules fetched and saved successfully.")
|
2024-12-29 23:50:29 +01:00
|
|
|
|
else:
|
2024-12-30 00:18:50 +01:00
|
|
|
|
logging.error("Failed to fetch or save rules.")
|
2024-12-29 23:50:29 +01:00
|
|
|
|
else:
|
2024-12-30 00:18:50 +01:00
|
|
|
|
logging.error("Failed to fetch rule files.")
|
2024-12-21 11:31:31 +01:00
|
|
|
|
else:
|
2024-12-30 00:18:50 +01:00
|
|
|
|
logging.error("Failed to fetch tags.")
|