diff --git a/owasp.py b/owasp.py index a42f289..2e1154d 100644 --- a/owasp.py +++ b/owasp.py @@ -1,164 +1,204 @@ -import requests -import re -import json -import logging import os +import re import time +import json import base64 import hashlib +import logging +import requests from typing import List, Dict, Optional -# Logging setup logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") -# GitHub Configuration GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset" OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules" -GITHUB_REF = "v4" # Target latest v4.x version (adjust as needed) - -# Rate Limit and Retry Configuration -RATE_LIMIT_DELAY = 600 # Default delay in seconds if rate limit headers are missing (10 mins) -RETRY_DELAY = 5 # Base retry delay in seconds -MAX_RETRIES = 6 # Maximum number of retries +GITHUB_REF = "v4" +RATE_LIMIT_DELAY = 600 +RETRY_DELAY = 5 +MAX_RETRIES = 6 EXPONENTIAL_BACKOFF = True BACKOFF_MULTIPLIER = 2 - -# GitHub Token (optional) -GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # Read from environment variable +GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") -def fetch_with_retries(url: str) -> requests.Response: - retries = 0 - headers = {} +class GitHubRequestError(Exception): + """Raised when fetching data from GitHub fails after all retries.""" - # Add token if available + +def get_session() -> requests.Session: + """ + Creates and returns a requests.Session with optional GitHub token auth. + """ + session = requests.Session() if GITHUB_TOKEN: - headers['Authorization'] = f'token {GITHUB_TOKEN}' - logging.info("Using GitHub token for authenticated request.") + session.headers.update({"Authorization": f"token {GITHUB_TOKEN}"}) + return session + +def fetch_with_retries(session: requests.Session, url: str) -> requests.Response: + """ + Fetches the given URL with retries, handling rate limits and transient HTTP errors. + Raises GitHubRequestError if the request cannot be completed after all retries. + """ + retries = 0 while retries < MAX_RETRIES: try: - response = requests.get(url, headers=headers) - if response.status_code == 200: - return response - if response.status_code == 403 and 'X-RateLimit-Remaining' in response.headers: - reset_time = int(response.headers['X-RateLimit-Reset']) + response = session.get(url) + if response.status_code == 403 and "X-RateLimit-Remaining" in response.headers: + reset_time = int(response.headers.get("X-RateLimit-Reset", 0)) wait_time = max(reset_time - int(time.time()), RATE_LIMIT_DELAY) logging.warning(f"Rate limit exceeded. Retrying in {wait_time} seconds...") time.sleep(wait_time) - else: - wait_time = RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries) if EXPONENTIAL_BACKOFF else RETRY_DELAY - logging.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.") - time.sleep(wait_time) - retries += 1 + continue + try: + response.raise_for_status() + return response + except requests.HTTPError: + # Handle non-200 codes that are not rate-limit related + pass + + # Retry logic for other errors + wait_time = (RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries) + if EXPONENTIAL_BACKOFF else RETRY_DELAY) + logging.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.") + time.sleep(wait_time) + retries += 1 except requests.RequestException as e: logging.error(f"Error fetching {url}: {e}") retries += 1 - raise requests.RequestException(f"Failed to fetch {url} after {MAX_RETRIES} retries.") + + raise GitHubRequestError(f"Failed to fetch {url} after {MAX_RETRIES} retries.") -def fetch_latest_tag(ref_prefix: str) -> Optional[str]: - logging.info("Fetching tags from GitHub...") +def fetch_latest_tag(session: requests.Session, ref_prefix: str) -> Optional[str]: + """ + Fetches the latest matching Git tag from the GitHub repository based on a version prefix. + Falls back to the newest tag if no matching prefix is found. Returns the tag reference. + """ ref_url = f"{GITHUB_REPO_URL}/git/refs/tags" try: - ref_response = fetch_with_retries(ref_url) - refs = ref_response.json() - matching_refs = [ref['ref'] for ref in refs if ref['ref'].startswith(f"refs/tags/{ref_prefix}.")] - matching_refs.sort(reverse=True, key=lambda x: x.split('.')[-1]) - if matching_refs: - latest_tag = matching_refs[0] + response = fetch_with_retries(session, ref_url) + tags = response.json() + if not tags: + logging.warning("No tags found in the repository.") + return None + matching = [r["ref"] for r in tags if r["ref"].startswith(f"refs/tags/{ref_prefix}.")] + matching.sort(reverse=True, key=lambda x: x.split(".")[-1]) + if matching: + latest_tag = matching[0] logging.info(f"Latest matching tag: {latest_tag}") return latest_tag - logging.warning(f"No matching refs found for prefix {ref_prefix}. Falling back to latest tag.") - return refs[-1]['ref'] + logging.warning(f"No matching refs found for prefix {ref_prefix}. Falling back to the latest tag.") + return tags[-1]["ref"] except Exception as e: logging.error(f"Failed to fetch tags. Reason: {e}") return None -def fetch_rule_files(ref: str) -> List[Dict[str, str]]: - logging.info(f"Fetching rule files for ref {ref}...") - rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref.split('/')[-1]}" +def fetch_rule_files(session: requests.Session, ref: str) -> List[Dict[str, str]]: + """ + Fetches the list of rule files (.conf) from the given ref in the repository. + Returns a list of dictionaries containing file name and SHA. + """ + ref_name = ref.split("/")[-1] if "/" in ref else ref + rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref_name}" try: - rules_response = fetch_with_retries(rules_url) - files = [ - {"name": item['name'], "sha": item['sha']} - for item in rules_response.json() - if item['name'].endswith('.conf') - ] - logging.info(f"Found {len(files)} rule files.") - return files - except requests.RequestException as e: + response = fetch_with_retries(session, rules_url) + files = response.json() + return [{"name": f["name"], "sha": f["sha"]} for f in files if f["name"].endswith(".conf")] + except (GitHubRequestError, requests.RequestException) as e: logging.error(f"Failed to fetch rule files from {rules_url}. Reason: {e}") return [] -def fetch_github_blob(sha: str) -> str: +def fetch_github_blob(session: requests.Session, sha: str) -> str: + """ + Fetches the blob content (base64-encoded) for a given SHA from GitHub. + Returns the content if successful, or an empty string on failure. + """ blob_url = f"{GITHUB_REPO_URL}/git/blobs/{sha}" try: - response = fetch_with_retries(blob_url) - blob_data = response.json() - return blob_data['content'] - except requests.RequestException as e: + response = fetch_with_retries(session, blob_url) + return response.json().get("content", "") + except (GitHubRequestError, requests.RequestException) as e: logging.error(f"Failed to fetch blob for SHA {sha}. Reason: {e}") return "" -def verify_blob_sha(file_sha: str, blob_content: str) -> bool: - calculated_sha = hashlib.sha1(base64.b64decode(blob_content)).hexdigest() +def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool: + """ + Verifies that the SHA of the decoded content matches the expected file_sha. + """ + decoded_bytes = base64.b64decode(blob_content_b64) + # Option 1: Verify Git’s actual blob SHA (header + content) + blob_header = f"blob {len(decoded_bytes)}\0".encode("utf-8") + calculated_sha = hashlib.sha1(blob_header + decoded_bytes).hexdigest() + return calculated_sha == file_sha -def fetch_owasp_rules(rule_files: List[Dict[str, str]], ref: str) -> List[Dict[str, str]]: - logging.info("Fetching OWASP rules...") +def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]: + """ + Fetches the OWASP rule content for each rule file, extracts SecRule patterns, + and returns a list of dicts with category and pattern. + """ rules = [] - for file in rule_files: logging.info(f"Fetching {file['name']}...") - blob_content = fetch_github_blob(file['sha']) + blob_b64 = fetch_github_blob(session, file["sha"]) + if not blob_b64: + logging.warning(f"Skipping file {file['name']} due to empty blob content.") + continue - if not verify_blob_sha(file['sha'], blob_content): + if not verify_blob_sha(file["sha"], blob_b64): + decoded_bytes = base64.b64decode(blob_b64) + calculated_sha = hashlib.sha1(decoded_bytes).hexdigest() logging.warning( - f"SHA mismatch for {file['name']}. Expected: {file['sha']}, " - f"Calculated: {hashlib.sha1(base64.b64decode(blob_content)).hexdigest()}" + f"SHA mismatch for {file['name']}. " + f"Expected: {file['sha']}, Calculated: {calculated_sha}" ) - raw_text = base64.b64decode(blob_content).decode('utf-8') + raw_text = base64.b64decode(blob_b64).decode("utf-8") sec_rules = re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL) + category = file["name"].split("-")[-1].replace(".conf", "") for rule in sec_rules: pattern = rule.strip().replace("\\", "") - category = file['name'].split('-')[-1].replace('.conf', '') if pattern: rules.append({"category": category, "pattern": pattern}) - logging.info(f"{len(rules)} rules fetched.") + logging.info(f"Fetched {len(rules)} rules.") return rules -def save_as_json(rules: List[Dict[str, str]], output_file: str) -> None: - logging.info(f"Saving rules to {output_file}...") +def save_as_json(rules: List[Dict[str, str]], output_file: str) -> bool: + """ + Saves the given list of rules to a JSON file. Returns True if successful, False otherwise. + """ try: output_dir = os.path.dirname(output_file) if output_dir: os.makedirs(output_dir, exist_ok=True) - with open(output_file, 'w') as f: + with open(output_file, "w", encoding="utf-8") as f: json.dump(rules, f, indent=4) - logging.info(f"Rules saved successfully to {output_file}.") + logging.info(f"Rules saved to {output_file}.") + return True except IOError as e: logging.error(f"Failed to save rules to {output_file}. Reason: {e}") + return False if __name__ == "__main__": - latest_ref = fetch_latest_tag(GITHUB_REF) + session = get_session() + latest_ref = fetch_latest_tag(session, GITHUB_REF) if latest_ref: - rule_files = fetch_rule_files(latest_ref) + rule_files = fetch_rule_files(session, latest_ref) if rule_files: - rules = fetch_owasp_rules(rule_files, latest_ref) - if rules: - save_as_json(rules, "owasp_rules.json") + rules = fetch_owasp_rules(session, rule_files) + if rules and save_as_json(rules, "owasp_rules.json"): + logging.info("All rules fetched and saved successfully.") else: - logging.error("Failed to fetch rules. Exiting.") + logging.error("Failed to fetch or save rules.") else: - logging.error("Failed to fetch rule files. Exiting.") + logging.error("Failed to fetch rule files.") else: - logging.error("Failed to fetch tags. Exiting.") + logging.error("Failed to fetch tags.")