diff --git a/owasp.py b/owasp.py index 2e1154d..7192186 100644 --- a/owasp.py +++ b/owasp.py @@ -8,17 +8,19 @@ import logging import requests from typing import List, Dict, Optional +# Logging setup logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +# Constants GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset" OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules" -GITHUB_REF = "v4" -RATE_LIMIT_DELAY = 600 -RETRY_DELAY = 5 -MAX_RETRIES = 6 -EXPONENTIAL_BACKOFF = True -BACKOFF_MULTIPLIER = 2 -GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") +GITHUB_REF = "v4" # Default version prefix +RATE_LIMIT_DELAY = 600 # Rate limit delay in seconds +RETRY_DELAY = 5 # Base retry delay in seconds +MAX_RETRIES = 6 # Maximum number of retries +EXPONENTIAL_BACKOFF = True # Use exponential backoff for retries +BACKOFF_MULTIPLIER = 2 # Multiplier for exponential backoff +GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # GitHub token for authentication class GitHubRequestError(Exception): @@ -27,7 +29,7 @@ class GitHubRequestError(Exception): def get_session() -> requests.Session: """ - Creates and returns a requests.Session with optional GitHub token auth. + Creates and returns a requests.Session with optional GitHub token authentication. """ session = requests.Session() if GITHUB_TOKEN: @@ -128,13 +130,19 @@ def fetch_github_blob(session: requests.Session, sha: str) -> str: def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool: """ Verifies that the SHA of the decoded content matches the expected file_sha. + Logs a warning if the verification fails but does not block execution. """ decoded_bytes = base64.b64decode(blob_content_b64) # Option 1: Verify Git’s actual blob SHA (header + content) blob_header = f"blob {len(decoded_bytes)}\0".encode("utf-8") calculated_sha = hashlib.sha1(blob_header + decoded_bytes).hexdigest() - return calculated_sha == file_sha + if calculated_sha != file_sha: + logging.warning( + f"SHA mismatch for file. Expected: {file_sha}, Calculated: {calculated_sha}" + ) + return False + return True def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]: @@ -150,13 +158,8 @@ def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str] logging.warning(f"Skipping file {file['name']} due to empty blob content.") continue - if not verify_blob_sha(file["sha"], blob_b64): - decoded_bytes = base64.b64decode(blob_b64) - calculated_sha = hashlib.sha1(decoded_bytes).hexdigest() - logging.warning( - f"SHA mismatch for {file['name']}. " - f"Expected: {file['sha']}, Calculated: {calculated_sha}" - ) + # Verify SHA (non-blocking) + verify_blob_sha(file["sha"], blob_b64) raw_text = base64.b64decode(blob_b64).decode("utf-8") sec_rules = re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL)