diff --git a/owasp.py b/owasp.py index 2dde2ef..a42f289 100644 --- a/owasp.py +++ b/owasp.py @@ -2,57 +2,90 @@ import requests import re import json import logging -from typing import List, Dict +import os +import time +import base64 +import hashlib +from typing import List, Dict, Optional # Logging setup logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") -OWASP_CRS_BASE_URL = "https://api.github.com/repos/coreruleset/coreruleset/contents/rules" -GITHUB_REF = "v4.0" +# GitHub Configuration GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset" +OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules" +GITHUB_REF = "v4" # Target latest v4.x version (adjust as needed) + +# Rate Limit and Retry Configuration +RATE_LIMIT_DELAY = 600 # Default delay in seconds if rate limit headers are missing (10 mins) +RETRY_DELAY = 5 # Base retry delay in seconds +MAX_RETRIES = 6 # Maximum number of retries +EXPONENTIAL_BACKOFF = True +BACKOFF_MULTIPLIER = 2 + +# GitHub Token (optional) +GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # Read from environment variable -def fetch_rule_files() -> List[str]: - """ - Fetches a list of rule files from the OWASP Core Rule Set GitHub repository. - It attempts to match a specific tag, and falls back to latest. +def fetch_with_retries(url: str) -> requests.Response: + retries = 0 + headers = {} - Returns: - List[str]: A list of rule file names (e.g., 'REQUEST-901-INITIALIZATION.conf'). - """ - logging.info("Fetching available rule files from GitHub...") - # Step 1: Fetch all tags + # Add token if available + if GITHUB_TOKEN: + headers['Authorization'] = f'token {GITHUB_TOKEN}' + logging.info("Using GitHub token for authenticated request.") + + while retries < MAX_RETRIES: + try: + response = requests.get(url, headers=headers) + if response.status_code == 200: + return response + if response.status_code == 403 and 'X-RateLimit-Remaining' in response.headers: + reset_time = int(response.headers['X-RateLimit-Reset']) + wait_time = max(reset_time - int(time.time()), RATE_LIMIT_DELAY) + logging.warning(f"Rate limit exceeded. Retrying in {wait_time} seconds...") + time.sleep(wait_time) + else: + wait_time = RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries) if EXPONENTIAL_BACKOFF else RETRY_DELAY + logging.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.") + time.sleep(wait_time) + retries += 1 + except requests.RequestException as e: + logging.error(f"Error fetching {url}: {e}") + retries += 1 + raise requests.RequestException(f"Failed to fetch {url} after {MAX_RETRIES} retries.") + + +def fetch_latest_tag(ref_prefix: str) -> Optional[str]: + logging.info("Fetching tags from GitHub...") ref_url = f"{GITHUB_REPO_URL}/git/refs/tags" try: - ref_response = requests.get(ref_url) - ref_response.raise_for_status() - ref_data = ref_response.json() - available_refs = [ref['ref'] for ref in ref_data] - logging.debug(f"Available refs: {available_refs}") - except requests.RequestException as e: - logging.error(f"Failed to fetch tags from {ref_url}. Reason: {e}") - return [] + ref_response = fetch_with_retries(ref_url) + refs = ref_response.json() + matching_refs = [ref['ref'] for ref in refs if ref['ref'].startswith(f"refs/tags/{ref_prefix}.")] + matching_refs.sort(reverse=True, key=lambda x: x.split('.')[-1]) + if matching_refs: + latest_tag = matching_refs[0] + logging.info(f"Latest matching tag: {latest_tag}") + return latest_tag + logging.warning(f"No matching refs found for prefix {ref_prefix}. Falling back to latest tag.") + return refs[-1]['ref'] + except Exception as e: + logging.error(f"Failed to fetch tags. Reason: {e}") + return None - # Step 2: Find the closest matching tag - matched_ref = next((ref for ref in available_refs if ref.endswith(f"{GITHUB_REF}.0")), None) - if matched_ref: - ref_sha = next(ref['object']['sha'] for ref in ref_data if ref['ref'] == matched_ref) - logging.info(f"Found exact match for {GITHUB_REF}: {matched_ref}") - else: - # Fallback to latest tag - latest_ref = ref_data[-1] - ref_sha = latest_ref['object']['sha'] - logging.warning(f"{GITHUB_REF} not found. Using latest: {latest_ref['ref']}") - - logging.info(f"Using ref SHA: {ref_sha}") - - # Step 3: Fetch rule files using the selected SHA - rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref_sha}" +def fetch_rule_files(ref: str) -> List[Dict[str, str]]: + logging.info(f"Fetching rule files for ref {ref}...") + rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref.split('/')[-1]}" try: - rules_response = requests.get(rules_url) - rules_response.raise_for_status() - files = [item['name'] for item in rules_response.json() if item['name'].endswith('.conf')] + rules_response = fetch_with_retries(rules_url) + files = [ + {"name": item['name'], "sha": item['sha']} + for item in rules_response.json() + if item['name'].endswith('.conf') + ] logging.info(f"Found {len(files)} rule files.") return files except requests.RequestException as e: @@ -60,49 +93,54 @@ def fetch_rule_files() -> List[str]: return [] -def fetch_owasp_rules(rule_files: List[str]) -> List[Dict[str, str]]: - """ - Fetches SecRule patterns from OWASP CRS files, categorizes them, and returns a list of dictionaries. +def fetch_github_blob(sha: str) -> str: + blob_url = f"{GITHUB_REPO_URL}/git/blobs/{sha}" + try: + response = fetch_with_retries(blob_url) + blob_data = response.json() + return blob_data['content'] + except requests.RequestException as e: + logging.error(f"Failed to fetch blob for SHA {sha}. Reason: {e}") + return "" - Parameters: - rule_files (List[str]): A list of rule file names (e.g., ['REQUEST-901-INITIALIZATION.conf',...]) - Returns: - List[Dict[str,str]]: A list of dictionaries, each containing a pattern and its category. - e.g. [{'category': 'INITIALIZATION', 'pattern':'...'},...] - """ +def verify_blob_sha(file_sha: str, blob_content: str) -> bool: + calculated_sha = hashlib.sha1(base64.b64decode(blob_content)).hexdigest() + return calculated_sha == file_sha + + +def fetch_owasp_rules(rule_files: List[Dict[str, str]], ref: str) -> List[Dict[str, str]]: logging.info("Fetching OWASP rules...") - base_url = f"https://raw.githubusercontent.com/coreruleset/coreruleset/{GITHUB_REF}.0/rules/" rules = [] for file in rule_files: - logging.info(f"Fetching {file}...") - try: - response = requests.get(base_url + file) - response.raise_for_status() - raw_text = response.text - sec_rules = re.findall(r'SecRule.*?"(.*?)"', raw_text, re.DOTALL) - for rule in sec_rules: - pattern = rule.strip().replace("\\", "") - category = file.split('-')[-1].replace('.conf', '') - if pattern: - rules.append({"category": category, "pattern": pattern}) - except requests.RequestException as e: - logging.error(f"Failed to fetch or process {file}. Reason: {e}") + logging.info(f"Fetching {file['name']}...") + blob_content = fetch_github_blob(file['sha']) + + if not verify_blob_sha(file['sha'], blob_content): + logging.warning( + f"SHA mismatch for {file['name']}. Expected: {file['sha']}, " + f"Calculated: {hashlib.sha1(base64.b64decode(blob_content)).hexdigest()}" + ) + + raw_text = base64.b64decode(blob_content).decode('utf-8') + sec_rules = re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL) + for rule in sec_rules: + pattern = rule.strip().replace("\\", "") + category = file['name'].split('-')[-1].replace('.conf', '') + if pattern: + rules.append({"category": category, "pattern": pattern}) + logging.info(f"{len(rules)} rules fetched.") return rules def save_as_json(rules: List[Dict[str, str]], output_file: str) -> None: - """ - Saves the extracted rules as a JSON file. - - Parameters: - rules (List[Dict[str, str]]): The list of extracted rules. - output_file (str): The path of the output JSON file. - """ logging.info(f"Saving rules to {output_file}...") try: + output_dir = os.path.dirname(output_file) + if output_dir: + os.makedirs(output_dir, exist_ok=True) with open(output_file, 'w') as f: json.dump(rules, f, indent=4) logging.info(f"Rules saved successfully to {output_file}.") @@ -111,10 +149,16 @@ def save_as_json(rules: List[Dict[str, str]], output_file: str) -> None: if __name__ == "__main__": - rule_files = fetch_rule_files() - if rule_files: - rules = fetch_owasp_rules(rule_files) - if rules: - save_as_json(rules, "owasp_rules.json") + latest_ref = fetch_latest_tag(GITHUB_REF) + if latest_ref: + rule_files = fetch_rule_files(latest_ref) + if rule_files: + rules = fetch_owasp_rules(rule_files, latest_ref) + if rules: + save_as_json(rules, "owasp_rules.json") + else: + logging.error("Failed to fetch rules. Exiting.") + else: + logging.error("Failed to fetch rule files. Exiting.") else: - logging.error("Failed to fetch rule files. Exiting.") + logging.error("Failed to fetch tags. Exiting.")