Update owasp.py

- verify github function fixed
This commit is contained in:
fab 2024-12-30 00:18:50 +01:00 committed by GitHub
parent a3065eb405
commit 36f08db3eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

200
owasp.py
View File

@ -1,164 +1,204 @@
import requests
import re
import json
import logging
import os import os
import re
import time import time
import json
import base64 import base64
import hashlib import hashlib
import logging
import requests
from typing import List, Dict, Optional from typing import List, Dict, Optional
# Logging setup
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# GitHub Configuration
GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset" GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset"
OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules" OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules"
GITHUB_REF = "v4" # Target latest v4.x version (adjust as needed) GITHUB_REF = "v4"
RATE_LIMIT_DELAY = 600
# Rate Limit and Retry Configuration RETRY_DELAY = 5
RATE_LIMIT_DELAY = 600 # Default delay in seconds if rate limit headers are missing (10 mins) MAX_RETRIES = 6
RETRY_DELAY = 5 # Base retry delay in seconds
MAX_RETRIES = 6 # Maximum number of retries
EXPONENTIAL_BACKOFF = True EXPONENTIAL_BACKOFF = True
BACKOFF_MULTIPLIER = 2 BACKOFF_MULTIPLIER = 2
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
# GitHub Token (optional)
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # Read from environment variable
def fetch_with_retries(url: str) -> requests.Response: class GitHubRequestError(Exception):
retries = 0 """Raised when fetching data from GitHub fails after all retries."""
headers = {}
# Add token if available
def get_session() -> requests.Session:
"""
Creates and returns a requests.Session with optional GitHub token auth.
"""
session = requests.Session()
if GITHUB_TOKEN: if GITHUB_TOKEN:
headers['Authorization'] = f'token {GITHUB_TOKEN}' session.headers.update({"Authorization": f"token {GITHUB_TOKEN}"})
logging.info("Using GitHub token for authenticated request.") return session
def fetch_with_retries(session: requests.Session, url: str) -> requests.Response:
"""
Fetches the given URL with retries, handling rate limits and transient HTTP errors.
Raises GitHubRequestError if the request cannot be completed after all retries.
"""
retries = 0
while retries < MAX_RETRIES: while retries < MAX_RETRIES:
try: try:
response = requests.get(url, headers=headers) response = session.get(url)
if response.status_code == 200: if response.status_code == 403 and "X-RateLimit-Remaining" in response.headers:
return response reset_time = int(response.headers.get("X-RateLimit-Reset", 0))
if response.status_code == 403 and 'X-RateLimit-Remaining' in response.headers:
reset_time = int(response.headers['X-RateLimit-Reset'])
wait_time = max(reset_time - int(time.time()), RATE_LIMIT_DELAY) wait_time = max(reset_time - int(time.time()), RATE_LIMIT_DELAY)
logging.warning(f"Rate limit exceeded. Retrying in {wait_time} seconds...") logging.warning(f"Rate limit exceeded. Retrying in {wait_time} seconds...")
time.sleep(wait_time) time.sleep(wait_time)
else: continue
wait_time = RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries) if EXPONENTIAL_BACKOFF else RETRY_DELAY try:
response.raise_for_status()
return response
except requests.HTTPError:
# Handle non-200 codes that are not rate-limit related
pass
# Retry logic for other errors
wait_time = (RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries)
if EXPONENTIAL_BACKOFF else RETRY_DELAY)
logging.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.") logging.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.")
time.sleep(wait_time) time.sleep(wait_time)
retries += 1 retries += 1
except requests.RequestException as e: except requests.RequestException as e:
logging.error(f"Error fetching {url}: {e}") logging.error(f"Error fetching {url}: {e}")
retries += 1 retries += 1
raise requests.RequestException(f"Failed to fetch {url} after {MAX_RETRIES} retries.")
raise GitHubRequestError(f"Failed to fetch {url} after {MAX_RETRIES} retries.")
def fetch_latest_tag(ref_prefix: str) -> Optional[str]: def fetch_latest_tag(session: requests.Session, ref_prefix: str) -> Optional[str]:
logging.info("Fetching tags from GitHub...") """
Fetches the latest matching Git tag from the GitHub repository based on a version prefix.
Falls back to the newest tag if no matching prefix is found. Returns the tag reference.
"""
ref_url = f"{GITHUB_REPO_URL}/git/refs/tags" ref_url = f"{GITHUB_REPO_URL}/git/refs/tags"
try: try:
ref_response = fetch_with_retries(ref_url) response = fetch_with_retries(session, ref_url)
refs = ref_response.json() tags = response.json()
matching_refs = [ref['ref'] for ref in refs if ref['ref'].startswith(f"refs/tags/{ref_prefix}.")] if not tags:
matching_refs.sort(reverse=True, key=lambda x: x.split('.')[-1]) logging.warning("No tags found in the repository.")
if matching_refs: return None
latest_tag = matching_refs[0] matching = [r["ref"] for r in tags if r["ref"].startswith(f"refs/tags/{ref_prefix}.")]
matching.sort(reverse=True, key=lambda x: x.split(".")[-1])
if matching:
latest_tag = matching[0]
logging.info(f"Latest matching tag: {latest_tag}") logging.info(f"Latest matching tag: {latest_tag}")
return latest_tag return latest_tag
logging.warning(f"No matching refs found for prefix {ref_prefix}. Falling back to latest tag.") logging.warning(f"No matching refs found for prefix {ref_prefix}. Falling back to the latest tag.")
return refs[-1]['ref'] return tags[-1]["ref"]
except Exception as e: except Exception as e:
logging.error(f"Failed to fetch tags. Reason: {e}") logging.error(f"Failed to fetch tags. Reason: {e}")
return None return None
def fetch_rule_files(ref: str) -> List[Dict[str, str]]: def fetch_rule_files(session: requests.Session, ref: str) -> List[Dict[str, str]]:
logging.info(f"Fetching rule files for ref {ref}...") """
rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref.split('/')[-1]}" Fetches the list of rule files (.conf) from the given ref in the repository.
Returns a list of dictionaries containing file name and SHA.
"""
ref_name = ref.split("/")[-1] if "/" in ref else ref
rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref_name}"
try: try:
rules_response = fetch_with_retries(rules_url) response = fetch_with_retries(session, rules_url)
files = [ files = response.json()
{"name": item['name'], "sha": item['sha']} return [{"name": f["name"], "sha": f["sha"]} for f in files if f["name"].endswith(".conf")]
for item in rules_response.json() except (GitHubRequestError, requests.RequestException) as e:
if item['name'].endswith('.conf')
]
logging.info(f"Found {len(files)} rule files.")
return files
except requests.RequestException as e:
logging.error(f"Failed to fetch rule files from {rules_url}. Reason: {e}") logging.error(f"Failed to fetch rule files from {rules_url}. Reason: {e}")
return [] return []
def fetch_github_blob(sha: str) -> str: def fetch_github_blob(session: requests.Session, sha: str) -> str:
"""
Fetches the blob content (base64-encoded) for a given SHA from GitHub.
Returns the content if successful, or an empty string on failure.
"""
blob_url = f"{GITHUB_REPO_URL}/git/blobs/{sha}" blob_url = f"{GITHUB_REPO_URL}/git/blobs/{sha}"
try: try:
response = fetch_with_retries(blob_url) response = fetch_with_retries(session, blob_url)
blob_data = response.json() return response.json().get("content", "")
return blob_data['content'] except (GitHubRequestError, requests.RequestException) as e:
except requests.RequestException as e:
logging.error(f"Failed to fetch blob for SHA {sha}. Reason: {e}") logging.error(f"Failed to fetch blob for SHA {sha}. Reason: {e}")
return "" return ""
def verify_blob_sha(file_sha: str, blob_content: str) -> bool: def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool:
calculated_sha = hashlib.sha1(base64.b64decode(blob_content)).hexdigest() """
Verifies that the SHA of the decoded content matches the expected file_sha.
"""
decoded_bytes = base64.b64decode(blob_content_b64)
# Option 1: Verify Gits actual blob SHA (header + content)
blob_header = f"blob {len(decoded_bytes)}\0".encode("utf-8")
calculated_sha = hashlib.sha1(blob_header + decoded_bytes).hexdigest()
return calculated_sha == file_sha return calculated_sha == file_sha
def fetch_owasp_rules(rule_files: List[Dict[str, str]], ref: str) -> List[Dict[str, str]]: def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]:
logging.info("Fetching OWASP rules...") """
Fetches the OWASP rule content for each rule file, extracts SecRule patterns,
and returns a list of dicts with category and pattern.
"""
rules = [] rules = []
for file in rule_files: for file in rule_files:
logging.info(f"Fetching {file['name']}...") logging.info(f"Fetching {file['name']}...")
blob_content = fetch_github_blob(file['sha']) blob_b64 = fetch_github_blob(session, file["sha"])
if not blob_b64:
logging.warning(f"Skipping file {file['name']} due to empty blob content.")
continue
if not verify_blob_sha(file['sha'], blob_content): if not verify_blob_sha(file["sha"], blob_b64):
decoded_bytes = base64.b64decode(blob_b64)
calculated_sha = hashlib.sha1(decoded_bytes).hexdigest()
logging.warning( logging.warning(
f"SHA mismatch for {file['name']}. Expected: {file['sha']}, " f"SHA mismatch for {file['name']}. "
f"Calculated: {hashlib.sha1(base64.b64decode(blob_content)).hexdigest()}" f"Expected: {file['sha']}, Calculated: {calculated_sha}"
) )
raw_text = base64.b64decode(blob_content).decode('utf-8') raw_text = base64.b64decode(blob_b64).decode("utf-8")
sec_rules = re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL) sec_rules = re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL)
category = file["name"].split("-")[-1].replace(".conf", "")
for rule in sec_rules: for rule in sec_rules:
pattern = rule.strip().replace("\\", "") pattern = rule.strip().replace("\\", "")
category = file['name'].split('-')[-1].replace('.conf', '')
if pattern: if pattern:
rules.append({"category": category, "pattern": pattern}) rules.append({"category": category, "pattern": pattern})
logging.info(f"{len(rules)} rules fetched.") logging.info(f"Fetched {len(rules)} rules.")
return rules return rules
def save_as_json(rules: List[Dict[str, str]], output_file: str) -> None: def save_as_json(rules: List[Dict[str, str]], output_file: str) -> bool:
logging.info(f"Saving rules to {output_file}...") """
Saves the given list of rules to a JSON file. Returns True if successful, False otherwise.
"""
try: try:
output_dir = os.path.dirname(output_file) output_dir = os.path.dirname(output_file)
if output_dir: if output_dir:
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
with open(output_file, 'w') as f: with open(output_file, "w", encoding="utf-8") as f:
json.dump(rules, f, indent=4) json.dump(rules, f, indent=4)
logging.info(f"Rules saved successfully to {output_file}.") logging.info(f"Rules saved to {output_file}.")
return True
except IOError as e: except IOError as e:
logging.error(f"Failed to save rules to {output_file}. Reason: {e}") logging.error(f"Failed to save rules to {output_file}. Reason: {e}")
return False
if __name__ == "__main__": if __name__ == "__main__":
latest_ref = fetch_latest_tag(GITHUB_REF) session = get_session()
latest_ref = fetch_latest_tag(session, GITHUB_REF)
if latest_ref: if latest_ref:
rule_files = fetch_rule_files(latest_ref) rule_files = fetch_rule_files(session, latest_ref)
if rule_files: if rule_files:
rules = fetch_owasp_rules(rule_files, latest_ref) rules = fetch_owasp_rules(session, rule_files)
if rules: if rules and save_as_json(rules, "owasp_rules.json"):
save_as_json(rules, "owasp_rules.json") logging.info("All rules fetched and saved successfully.")
else: else:
logging.error("Failed to fetch rules. Exiting.") logging.error("Failed to fetch or save rules.")
else: else:
logging.error("Failed to fetch rule files. Exiting.") logging.error("Failed to fetch rule files.")
else: else:
logging.error("Failed to fetch tags. Exiting.") logging.error("Failed to fetch tags.")