Update owasp.py

- Added exponential backoff and retry logic for GitHub API requests to handle rate limits and transient errors.
- Introduced SHA verification for fetched blobs to ensure data integrity.
- Implemented optional GitHub token support for authenticated requests.
- Improved handling of the latest matching tag by dynamically sorting and selecting the newest available version.
- Reorganized functions for better modularity and readability.
- Added error handling for blob fetching and decoding with base64 processing.
- Directory creation for output files now ensured, improving save reliability.
This commit is contained in:
fab
2024-12-29 23:50:29 +01:00
committed by GitHub
parent 1e4bb70b5d
commit 7f7f7fecd4

192
owasp.py
View File

@@ -2,57 +2,90 @@ import requests
import re import re
import json import json
import logging import logging
from typing import List, Dict import os
import time
import base64
import hashlib
from typing import List, Dict, Optional
# Logging setup # Logging setup
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
OWASP_CRS_BASE_URL = "https://api.github.com/repos/coreruleset/coreruleset/contents/rules" # GitHub Configuration
GITHUB_REF = "v4.0"
GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset" GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset"
OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules"
GITHUB_REF = "v4" # Target latest v4.x version (adjust as needed)
# Rate Limit and Retry Configuration
RATE_LIMIT_DELAY = 600 # Default delay in seconds if rate limit headers are missing (10 mins)
RETRY_DELAY = 5 # Base retry delay in seconds
MAX_RETRIES = 6 # Maximum number of retries
EXPONENTIAL_BACKOFF = True
BACKOFF_MULTIPLIER = 2
# GitHub Token (optional)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # Read from environment variable
def fetch_rule_files() -> List[str]: def fetch_with_retries(url: str) -> requests.Response:
""" retries = 0
Fetches a list of rule files from the OWASP Core Rule Set GitHub repository. headers = {}
It attempts to match a specific tag, and falls back to latest.
Returns: # Add token if available
List[str]: A list of rule file names (e.g., 'REQUEST-901-INITIALIZATION.conf'). if GITHUB_TOKEN:
""" headers['Authorization'] = f'token {GITHUB_TOKEN}'
logging.info("Fetching available rule files from GitHub...") logging.info("Using GitHub token for authenticated request.")
# Step 1: Fetch all tags
while retries < MAX_RETRIES:
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response
if response.status_code == 403 and 'X-RateLimit-Remaining' in response.headers:
reset_time = int(response.headers['X-RateLimit-Reset'])
wait_time = max(reset_time - int(time.time()), RATE_LIMIT_DELAY)
logging.warning(f"Rate limit exceeded. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
wait_time = RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries) if EXPONENTIAL_BACKOFF else RETRY_DELAY
logging.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.")
time.sleep(wait_time)
retries += 1
except requests.RequestException as e:
logging.error(f"Error fetching {url}: {e}")
retries += 1
raise requests.RequestException(f"Failed to fetch {url} after {MAX_RETRIES} retries.")
def fetch_latest_tag(ref_prefix: str) -> Optional[str]:
logging.info("Fetching tags from GitHub...")
ref_url = f"{GITHUB_REPO_URL}/git/refs/tags" ref_url = f"{GITHUB_REPO_URL}/git/refs/tags"
try: try:
ref_response = requests.get(ref_url) ref_response = fetch_with_retries(ref_url)
ref_response.raise_for_status() refs = ref_response.json()
ref_data = ref_response.json() matching_refs = [ref['ref'] for ref in refs if ref['ref'].startswith(f"refs/tags/{ref_prefix}.")]
available_refs = [ref['ref'] for ref in ref_data] matching_refs.sort(reverse=True, key=lambda x: x.split('.')[-1])
logging.debug(f"Available refs: {available_refs}") if matching_refs:
except requests.RequestException as e: latest_tag = matching_refs[0]
logging.error(f"Failed to fetch tags from {ref_url}. Reason: {e}") logging.info(f"Latest matching tag: {latest_tag}")
return [] return latest_tag
logging.warning(f"No matching refs found for prefix {ref_prefix}. Falling back to latest tag.")
return refs[-1]['ref']
except Exception as e:
logging.error(f"Failed to fetch tags. Reason: {e}")
return None
# Step 2: Find the closest matching tag
matched_ref = next((ref for ref in available_refs if ref.endswith(f"{GITHUB_REF}.0")), None)
if matched_ref: def fetch_rule_files(ref: str) -> List[Dict[str, str]]:
ref_sha = next(ref['object']['sha'] for ref in ref_data if ref['ref'] == matched_ref) logging.info(f"Fetching rule files for ref {ref}...")
logging.info(f"Found exact match for {GITHUB_REF}: {matched_ref}") rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref.split('/')[-1]}"
else:
# Fallback to latest tag
latest_ref = ref_data[-1]
ref_sha = latest_ref['object']['sha']
logging.warning(f"{GITHUB_REF} not found. Using latest: {latest_ref['ref']}")
logging.info(f"Using ref SHA: {ref_sha}")
# Step 3: Fetch rule files using the selected SHA
rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref_sha}"
try: try:
rules_response = requests.get(rules_url) rules_response = fetch_with_retries(rules_url)
rules_response.raise_for_status() files = [
files = [item['name'] for item in rules_response.json() if item['name'].endswith('.conf')] {"name": item['name'], "sha": item['sha']}
for item in rules_response.json()
if item['name'].endswith('.conf')
]
logging.info(f"Found {len(files)} rule files.") logging.info(f"Found {len(files)} rule files.")
return files return files
except requests.RequestException as e: except requests.RequestException as e:
@@ -60,49 +93,54 @@ def fetch_rule_files() -> List[str]:
return [] return []
def fetch_owasp_rules(rule_files: List[str]) -> List[Dict[str, str]]: def fetch_github_blob(sha: str) -> str:
""" blob_url = f"{GITHUB_REPO_URL}/git/blobs/{sha}"
Fetches SecRule patterns from OWASP CRS files, categorizes them, and returns a list of dictionaries. try:
response = fetch_with_retries(blob_url)
blob_data = response.json()
return blob_data['content']
except requests.RequestException as e:
logging.error(f"Failed to fetch blob for SHA {sha}. Reason: {e}")
return ""
Parameters:
rule_files (List[str]): A list of rule file names (e.g., ['REQUEST-901-INITIALIZATION.conf',...])
Returns: def verify_blob_sha(file_sha: str, blob_content: str) -> bool:
List[Dict[str,str]]: A list of dictionaries, each containing a pattern and its category. calculated_sha = hashlib.sha1(base64.b64decode(blob_content)).hexdigest()
e.g. [{'category': 'INITIALIZATION', 'pattern':'...'},...] return calculated_sha == file_sha
"""
def fetch_owasp_rules(rule_files: List[Dict[str, str]], ref: str) -> List[Dict[str, str]]:
logging.info("Fetching OWASP rules...") logging.info("Fetching OWASP rules...")
base_url = f"https://raw.githubusercontent.com/coreruleset/coreruleset/{GITHUB_REF}.0/rules/"
rules = [] rules = []
for file in rule_files: for file in rule_files:
logging.info(f"Fetching {file}...") logging.info(f"Fetching {file['name']}...")
try: blob_content = fetch_github_blob(file['sha'])
response = requests.get(base_url + file)
response.raise_for_status() if not verify_blob_sha(file['sha'], blob_content):
raw_text = response.text logging.warning(
sec_rules = re.findall(r'SecRule.*?"(.*?)"', raw_text, re.DOTALL) f"SHA mismatch for {file['name']}. Expected: {file['sha']}, "
for rule in sec_rules: f"Calculated: {hashlib.sha1(base64.b64decode(blob_content)).hexdigest()}"
pattern = rule.strip().replace("\\", "") )
category = file.split('-')[-1].replace('.conf', '')
if pattern: raw_text = base64.b64decode(blob_content).decode('utf-8')
rules.append({"category": category, "pattern": pattern}) sec_rules = re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL)
except requests.RequestException as e: for rule in sec_rules:
logging.error(f"Failed to fetch or process {file}. Reason: {e}") pattern = rule.strip().replace("\\", "")
category = file['name'].split('-')[-1].replace('.conf', '')
if pattern:
rules.append({"category": category, "pattern": pattern})
logging.info(f"{len(rules)} rules fetched.") logging.info(f"{len(rules)} rules fetched.")
return rules return rules
def save_as_json(rules: List[Dict[str, str]], output_file: str) -> None: def save_as_json(rules: List[Dict[str, str]], output_file: str) -> None:
"""
Saves the extracted rules as a JSON file.
Parameters:
rules (List[Dict[str, str]]): The list of extracted rules.
output_file (str): The path of the output JSON file.
"""
logging.info(f"Saving rules to {output_file}...") logging.info(f"Saving rules to {output_file}...")
try: try:
output_dir = os.path.dirname(output_file)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
with open(output_file, 'w') as f: with open(output_file, 'w') as f:
json.dump(rules, f, indent=4) json.dump(rules, f, indent=4)
logging.info(f"Rules saved successfully to {output_file}.") logging.info(f"Rules saved successfully to {output_file}.")
@@ -111,10 +149,16 @@ def save_as_json(rules: List[Dict[str, str]], output_file: str) -> None:
if __name__ == "__main__": if __name__ == "__main__":
rule_files = fetch_rule_files() latest_ref = fetch_latest_tag(GITHUB_REF)
if rule_files: if latest_ref:
rules = fetch_owasp_rules(rule_files) rule_files = fetch_rule_files(latest_ref)
if rules: if rule_files:
save_as_json(rules, "owasp_rules.json") rules = fetch_owasp_rules(rule_files, latest_ref)
if rules:
save_as_json(rules, "owasp_rules.json")
else:
logging.error("Failed to fetch rules. Exiting.")
else:
logging.error("Failed to fetch rule files. Exiting.")
else: else:
logging.error("Failed to fetch rule files. Exiting.") logging.error("Failed to fetch tags. Exiting.")