Update owasp.py

Minor improvements.
This commit is contained in:
fab 2025-01-03 13:22:31 +01:00 committed by GitHub
parent 61e1a856c9
commit 9aba2163c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -8,17 +8,19 @@ import logging
import requests import requests
from typing import List, Dict, Optional from typing import List, Dict, Optional
# Logging setup
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Constants
GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset" GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset"
OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules" OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules"
GITHUB_REF = "v4" GITHUB_REF = "v4" # Default version prefix
RATE_LIMIT_DELAY = 600 RATE_LIMIT_DELAY = 600 # Rate limit delay in seconds
RETRY_DELAY = 5 RETRY_DELAY = 5 # Base retry delay in seconds
MAX_RETRIES = 6 MAX_RETRIES = 6 # Maximum number of retries
EXPONENTIAL_BACKOFF = True EXPONENTIAL_BACKOFF = True # Use exponential backoff for retries
BACKOFF_MULTIPLIER = 2 BACKOFF_MULTIPLIER = 2 # Multiplier for exponential backoff
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # GitHub token for authentication
class GitHubRequestError(Exception): class GitHubRequestError(Exception):
@ -27,7 +29,7 @@ class GitHubRequestError(Exception):
def get_session() -> requests.Session: def get_session() -> requests.Session:
""" """
Creates and returns a requests.Session with optional GitHub token auth. Creates and returns a requests.Session with optional GitHub token authentication.
""" """
session = requests.Session() session = requests.Session()
if GITHUB_TOKEN: if GITHUB_TOKEN:
@ -128,13 +130,19 @@ def fetch_github_blob(session: requests.Session, sha: str) -> str:
def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool: def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool:
""" """
Verifies that the SHA of the decoded content matches the expected file_sha. Verifies that the SHA of the decoded content matches the expected file_sha.
Logs a warning if the verification fails but does not block execution.
""" """
decoded_bytes = base64.b64decode(blob_content_b64) decoded_bytes = base64.b64decode(blob_content_b64)
# Option 1: Verify Gits actual blob SHA (header + content) # Option 1: Verify Gits actual blob SHA (header + content)
blob_header = f"blob {len(decoded_bytes)}\0".encode("utf-8") blob_header = f"blob {len(decoded_bytes)}\0".encode("utf-8")
calculated_sha = hashlib.sha1(blob_header + decoded_bytes).hexdigest() calculated_sha = hashlib.sha1(blob_header + decoded_bytes).hexdigest()
return calculated_sha == file_sha if calculated_sha != file_sha:
logging.warning(
f"SHA mismatch for file. Expected: {file_sha}, Calculated: {calculated_sha}"
)
return False
return True
def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]: def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]:
@ -150,13 +158,8 @@ def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]
logging.warning(f"Skipping file {file['name']} due to empty blob content.") logging.warning(f"Skipping file {file['name']} due to empty blob content.")
continue continue
if not verify_blob_sha(file["sha"], blob_b64): # Verify SHA (non-blocking)
decoded_bytes = base64.b64decode(blob_b64) verify_blob_sha(file["sha"], blob_b64)
calculated_sha = hashlib.sha1(decoded_bytes).hexdigest()
logging.warning(
f"SHA mismatch for {file['name']}. "
f"Expected: {file['sha']}, Calculated: {calculated_sha}"
)
raw_text = base64.b64decode(blob_b64).decode("utf-8") raw_text = base64.b64decode(blob_b64).decode("utf-8")
sec_rules = re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL) sec_rules = re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL)