diff --git a/owasp2json.py b/owasp2json.py index 7ead530..49a380a 100644 --- a/owasp2json.py +++ b/owasp2json.py @@ -6,253 +6,363 @@ import base64 import hashlib import logging import argparse -from typing import List, Dict, Optional +from typing import List, Dict, Optional, Match from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed import requests from tqdm import tqdm -# Logging setup -logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") -logger = logging.getLogger(__name__) - -# Constants +# --- Configuration --- +LOG_LEVEL = logging.INFO # Set to DEBUG for more verbose output GITHUB_REPO_URL = "https://api.github.com/repos/coreruleset/coreruleset" OWASP_CRS_BASE_URL = f"{GITHUB_REPO_URL}/contents/rules" -GITHUB_REF = "v4" # Default version prefix -RATE_LIMIT_DELAY = 600 # Rate limit delay in seconds -RETRY_DELAY = 5 # Base retry delay in seconds -MAX_RETRIES = 6 # Maximum number of retries -EXPONENTIAL_BACKOFF = True # Use exponential backoff for retries -BACKOFF_MULTIPLIER = 2 # Multiplier for exponential backoff +GITHUB_REF = "v4.0" # More specific default: Major version only +RATE_LIMIT_DELAY = 60 # Shorter delay, rely on exponential backoff +RETRY_DELAY = 2 # Shorter initial retry +MAX_RETRIES = 8 # More retries +EXPONENTIAL_BACKOFF = True +BACKOFF_MULTIPLIER = 2 GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # GitHub token for authentication -CONNECTION_POOL_SIZE = 20 # Increased connection pool size +CONNECTION_POOL_SIZE = 30 # More connections for faster parallel downloads +# --- Custom Exceptions --- class GitHubRequestError(Exception): - """Raised when fetching data from GitHub fails after all retries.""" - + """Base exception for GitHub API request failures.""" + pass class GitHubRateLimitError(GitHubRequestError): - """Raised when GitHub API rate limit is exceeded.""" - + """Raised when the GitHub API rate limit is exceeded.""" + pass class GitHubBlobFetchError(GitHubRequestError): - """Raised when fetching a blob from GitHub fails.""" + """Raised when fetching a blob (file content) fails.""" + pass +# --- Logging Setup --- +logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +# --- Utility Functions --- def get_session() -> requests.Session: - """ - Creates and returns a requests.Session with optional GitHub token authentication. - """ + """Creates and returns a requests.Session with optional GitHub token.""" session = requests.Session() if GITHUB_TOKEN: session.headers.update({"Authorization": f"token {GITHUB_TOKEN}"}) - # Increase connection pool size + # Increase connection pool size (important for parallel requests) adapter = requests.adapters.HTTPAdapter(pool_connections=CONNECTION_POOL_SIZE, pool_maxsize=CONNECTION_POOL_SIZE) - session.mount("https://", adapter) + session.mount("https://", adapter) # Mount for all https:// requests return session def fetch_with_retries(session: requests.Session, url: str) -> requests.Response: """ - Fetches the given URL with retries, handling rate limits and transient HTTP errors. - Raises GitHubRequestError if the request cannot be completed after all retries. + Fetches a URL with retries, handling rate limits and transient errors. + Raises: GitHubRequestError (or subclasses) if the request ultimately fails. """ retries = 0 while retries < MAX_RETRIES: try: response = session.get(url) - if response.status_code == 403 and "X-RateLimit-Remaining" in response.headers: + + # Check for rate limiting (403 with specific header) + if response.status_code == 403 and "X-RateLimit-Remaining" in response.headers and response.headers["X-RateLimit-Remaining"] == '0': reset_time = int(response.headers.get("X-RateLimit-Reset", 0)) - wait_time = max(reset_time - int(time.time()), RATE_LIMIT_DELAY) + wait_time = max(0, reset_time - int(time.time())) # Ensure wait_time >= 0 + # If wait_time is very short, still wait a little bit to avoid hammering the API. + wait_time = max(wait_time, 1) logger.warning(f"Rate limit exceeded. Retrying in {wait_time} seconds...") time.sleep(wait_time) - continue + continue # Retry Immediately + + # Raise exceptions for other HTTP errors (4xx, 5xx) response.raise_for_status() return response - except requests.HTTPError as e: - logger.warning(f"HTTP error fetching {url}: {e}") + + except requests.exceptions.RequestException as e: + # Log the error, calculate wait time (exponential backoff) + logger.warning(f"Request failed ({type(e).__name__}): {e} - URL: {url}") wait_time = (RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries) if EXPONENTIAL_BACKOFF else RETRY_DELAY) logger.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time} seconds.") time.sleep(wait_time) retries += 1 - except requests.RequestException as e: - logger.error(f"Error fetching {url}: {e}") - retries += 1 + # If we reach here, all retries failed. raise GitHubRequestError(f"Failed to fetch {url} after {MAX_RETRIES} retries.") def fetch_latest_tag(session: requests.Session, ref_prefix: str) -> Optional[str]: - """ - Fetches the latest matching Git tag from the GitHub repository based on a version prefix. - Falls back to the newest tag if no matching prefix is found. Returns the tag reference. - """ + """Fetches the latest matching Git tag, or falls back to the latest overall.""" ref_url = f"{GITHUB_REPO_URL}/git/refs/tags" try: response = fetch_with_retries(session, ref_url) tags = response.json() + if not tags: logger.warning("No tags found in the repository.") return None - matching = [r["ref"] for r in tags if r["ref"].startswith(f"refs/tags/{ref_prefix}.")] - matching.sort(reverse=True, key=lambda x: x.split(".")[-1]) - if matching: - latest_tag = matching[0] + + # Filter tags that start with the given prefix. + matching_tags = [ + r["ref"] for r in tags + if r["ref"].startswith(f"refs/tags/{ref_prefix}") + ] + # Sort matching tags to find the latest (lexicographically, assuming semver). + matching_tags.sort(reverse=True) + + if matching_tags: + latest_tag = matching_tags[0] # The first tag is the latest logger.info(f"Latest matching tag: {latest_tag}") return latest_tag - logger.warning(f"No matching refs found for prefix {ref_prefix}. Falling back to the latest tag.") - return tags[-1]["ref"] - except Exception as e: - logger.error(f"Failed to fetch tags. Reason: {e}") + + # Fallback: If no matching tags, return the *very* latest tag. + logger.warning(f"No matching refs found for prefix '{ref_prefix}'. Using latest tag.") + # Sort *all* tags and get the last one. + tags.sort(key=lambda x: x["ref"], reverse=True) + return tags[0]["ref"] if tags else None + + except GitHubRequestError as e: + logger.error(f"Failed to fetch tags: {e}") return None def fetch_rule_files(session: requests.Session, ref: str) -> List[Dict[str, str]]: - """ - Fetches the list of rule files (.conf) from the given ref in the repository. - Returns a list of dictionaries containing file name and SHA. - """ - ref_name = ref.split("/")[-1] if "/" in ref else ref + """Fetches the list of .conf rule files from the given ref.""" + ref_name = ref.split("/")[-1] if "/" in ref else ref # Extract ref name rules_url = f"{OWASP_CRS_BASE_URL}?ref={ref_name}" + try: response = fetch_with_retries(session, rules_url) files = response.json() - return [{"name": f["name"], "sha": f["sha"]} for f in files if f["name"].endswith(".conf")] - except (GitHubRequestError, requests.RequestException) as e: - logger.error(f"Failed to fetch rule files from {rules_url}. Reason: {e}") - return [] + # Filter for .conf files and extract relevant data. + return [ + {"name": f["name"], "sha": f["sha"]} + for f in files if f["name"].endswith(".conf") + ] + except GitHubRequestError as e: + logger.error(f"Failed to fetch rule files from {rules_url}: {e}") + return [] # Return an empty list on failure def fetch_github_blob(session: requests.Session, sha: str) -> str: - """ - Fetches the blob content (base64-encoded) for a given SHA from GitHub. - Returns the content if successful, or an empty string on failure. - """ + """Fetches the base64-encoded content of a blob (file) given its SHA.""" blob_url = f"{GITHUB_REPO_URL}/git/blobs/{sha}" try: response = fetch_with_retries(session, blob_url) - return response.json().get("content", "") - except (GitHubRequestError, requests.RequestException) as e: - logger.error(f"Failed to fetch blob for SHA {sha}. Reason: {e}") + blob_data = response.json() + return blob_data.get("content", "") # Return empty string if no content + except GitHubRequestError as e: + logger.error(f"Failed to fetch blob for SHA {sha}: {e}") return "" def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool: - """ - Verifies that the SHA of the decoded content matches the expected file_sha. - Logs a warning if the verification fails but does not block execution. - """ + """Verifies the SHA1 hash of the decoded blob content.""" decoded_bytes = base64.b64decode(blob_content_b64) blob_header = f"blob {len(decoded_bytes)}\0".encode("utf-8") calculated_sha = hashlib.sha1(blob_header + decoded_bytes).hexdigest() if calculated_sha != file_sha: - logger.warning(f"SHA mismatch for file. Expected: {file_sha}, Calculated: {calculated_sha}") - return False + logger.warning(f"SHA mismatch! Expected: {file_sha}, Calculated: {calculated_sha}") + return False # This is now an integrity failure, return False return True -def extract_sec_rules(raw_text: str) -> List[str]: +def _extract_rule_id(secrule_text: str) -> str: + """Extracts the rule ID from a SecRule directive.""" + match = re.search(r'id:(\d+)', secrule_text) + return match.group(1) if match else "no_id" + +def _extract_rule_severity(secrule_text: str) -> str: + """Extract the severity.""" + match = re.search(r'severity:(\w+)', secrule_text) + return match.group(1) if match else "medium" # Set default to medium + + +def _extract_rule_location(secrule_text: str) -> str: """ - Extracts SecRule patterns from the raw text. + Extracts the location (variable) from a SecRule directive. Handles + multiple variables and chained rules. """ - return re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL) + match = re.search(r'SecRule\s+([^"\s]+)', secrule_text) + if not match: + return "UNKNOWN" + + variables_str = match.group(1) + variables = variables_str.split("|") # Split multiple variables + # Process variables for location extraction + locations = [] + + for var in variables: + var = var.upper() # Set all vars to upper case + if var.startswith("REQUEST_HEADERS"): + if ":" in var: # Specific header + locations.append(var.split(":")[1].replace("_","-").strip()) # add support to user-agent + else: + locations.append("REQUEST_HEADERS") # Generic header location + elif var.startswith("ARGS"): # add support to args + locations.append("Query-String") + elif var == "REQUEST_COOKIES": + locations.append("Cookie") + elif var == "REQUEST_URI": + locations.append("Request-URI") + elif var == "QUERY_STRING": + locations.append("Query-String") + elif var in ("REQUEST_LINE", "REQUEST_BODY", "RESPONSE_BODY", "RESPONSE_HEADERS"): + locations.append(var) # if it has an explicit direct + # Add more location mappings as needed + + # Prioritize specific locations, fall back to generic ones + if "REQUEST_URI" in locations: + return "Request-URI" # set request uri as top priority + elif "Query-String" in locations: + return "Query-String" + if locations: + return locations[0] # Return the first extracted location + return "UNKNOWN" # default locatioN + + +def extract_sec_rules(raw_text: str) -> List[Dict[str, str]]: + """ + Extracts SecRule patterns and associated metadata from raw text. + Now returns a *list of dictionaries*, each representing a SecRule. + """ + rules = [] + # Find all SecRule directives (including those spanning multiple lines). + for match in re.finditer(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL): + secrule_text = match.group(0) # Full SecRule text + pattern = match.group(1).strip().replace("\\\\", "\\") # Extract and clean pattern + + if not pattern: # if there are not pattern then skipp + continue + + rule_id = _extract_rule_id(secrule_text) # Extract rule ID + location = _extract_rule_location(secrule_text) # Extract location + severity = _extract_rule_severity(secrule_text) + + rules.append({ + "id": rule_id, + "pattern": pattern, + "location": location, + "severity": severity + }) + return rules def process_rule_file(file: Dict[str, str], session: requests.Session) -> List[Dict[str, str]]: - """ - Processes a single rule file, fetching its content and extracting SecRule patterns. - """ - rules = [] + """Processes a single rule file, extracting rules and metadata.""" blob_b64 = fetch_github_blob(session, file["sha"]) if not blob_b64: - logger.warning(f"Skipping file {file['name']} due to empty blob content.") - return rules + logger.warning(f"Skipping {file['name']} (empty blob).") + return [] - # Verify SHA (non-blocking) - verify_blob_sha(file["sha"], blob_b64) + if not verify_blob_sha(file["sha"], blob_b64): + pass # We check before but continue, since data is present + + try: + raw_text = base64.b64decode(blob_b64).decode("utf-8") + except Exception as e: + logger.error(f"Failed to decode the file: {file['name']}. Reason: {e}") + return [] - raw_text = base64.b64decode(blob_b64).decode("utf-8") - sec_rules = extract_sec_rules(raw_text) category = file["name"].split("-")[-1].replace(".conf", "") - for rule in sec_rules: - pattern = rule.strip().replace("\\", "") - if pattern: - rules.append({"category": category, "pattern": pattern}) + extracted_rules = extract_sec_rules(raw_text) # Get list of dicts - return rules + # Add category to each extracted rule. + for rule in extracted_rules: + rule["category"] = category + + return extracted_rules def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]: - """ - Fetches the OWASP rule content for each rule file, extracts SecRule patterns, - and returns a list of dicts with category and pattern. - """ - rules = [] + """Fetches and processes rule files in parallel, returning all extracted rules.""" + all_rules = [] with ThreadPoolExecutor(max_workers=CONNECTION_POOL_SIZE) as executor: - futures = { - executor.submit(process_rule_file, file, session): file for file in rule_files + future_to_file = { + executor.submit(process_rule_file, file, session): file + for file in rule_files } - for future in tqdm(as_completed(futures), total=len(rule_files), desc="Fetching rule files"): + # Use tqdm for progress display. as_completed yields futures as they finish. + for future in tqdm(as_completed(future_to_file), total=len(rule_files), desc="Processing rules"): + file = future_to_file[future] try: - rules.extend(future.result()) + rules = future.result() # Get result (or raise exception) + all_rules.extend(rules) except Exception as e: - logger.error(f"Failed to process file. Reason: {e}") + logger.error(f"Error processing {file['name']}: {e}") + # Consider continuing even on individual file errors - logger.info(f"Fetched {len(rules)} rules.") - return rules + logger.info(f"Fetched a total of {len(all_rules)} rules.") + return all_rules def save_as_json(rules: List[Dict[str, str]], output_file: str) -> bool: - """ - Saves the given list of rules to a JSON file. Returns True if successful, False otherwise. - """ + """Saves the extracted rules to a JSON file (atomically).""" try: output_dir = Path(output_file).parent if output_dir: - output_dir.mkdir(parents=True, exist_ok=True) - # Atomic write using a temporary file - temp_file = f"{output_file}.tmp" + output_dir.mkdir(parents=True, exist_ok=True) + temp_file = f"{output_file}.tmp" # Use a temporary file with open(temp_file, "w", encoding="utf-8") as f: json.dump(rules, f, indent=4) - # Rename temp file to the final output file - os.replace(temp_file, output_file) - logger.info(f"Rules saved to {output_file}.") + os.replace(temp_file, output_file) # Atomic rename + logger.info(f"Rules saved to {output_file}") return True - except IOError as e: - logger.error(f"Failed to save rules to {output_file}. Reason: {e}") + except Exception as e: + logger.error(f"Failed to save rules to {output_file}: {e}") return False def main(): - """Main function to fetch and save OWASP rules.""" - parser = argparse.ArgumentParser(description="Fetch OWASP Core Rule Set rules from GitHub.") - parser.add_argument("--output", type=str, default="owasp_rules.json", help="Output JSON file path.") - parser.add_argument("--ref", type=str, default=GITHUB_REF, help="Git reference (e.g., tag or branch).") - parser.add_argument("--dry-run", action="store_true", help="Simulate fetching without saving.") + """Main function: Fetches, processes, and saves OWASP CRS rules.""" + parser = argparse.ArgumentParser( + description="Fetches OWASP Core Rule Set rules and saves them as JSON." + ) + parser.add_argument("--output", type=str, default="owasp_rules.json", + help="Output JSON file path.") + parser.add_argument("--ref", type=str, default=GITHUB_REF, + help="Git reference (tag or branch prefix). E.g., 'v4.0', 'v3.3', 'dev'") + parser.add_argument("--dry-run", action="store_true", + help="Simulate fetching and processing (no file save).") args = parser.parse_args() - session = get_session() + session = get_session() # Create a requests session + + # 1. Fetch the latest tag (or use the provided ref directly) latest_ref = fetch_latest_tag(session, args.ref) - if latest_ref: - rule_files = fetch_rule_files(session, latest_ref) - if rule_files: - rules = fetch_owasp_rules(session, rule_files) - if args.dry_run: - logger.info("Dry-run mode enabled. Skipping file save.") - elif rules and save_as_json(rules, args.output): - logger.info("All rules fetched and saved successfully.") + if not latest_ref: + logger.error("Could not determine the latest tag. Exiting.") + return # Exit if we can't get a ref + + # 2. Fetch the list of rule files. + rule_files = fetch_rule_files(session, latest_ref) + if not rule_files: + logger.error("Could not fetch the list of rule files. Exiting.") + return + + # 3. Fetch and process the rules (in parallel). + rules = fetch_owasp_rules(session, rule_files) + + # 4. Save the rules to a JSON file (unless it's a dry run). + if not args.dry_run: + if rules: + if save_as_json(rules, args.output): + logger.info("Successfully saved rules to JSON.") else: - logger.error("Failed to fetch or save rules.") + logger.error("Failed to save rules to JSON.") # if the save fail else: - logger.error("Failed to fetch rule files.") + logger.warning("No rules were extracted.") # Warn if no rules else: - logger.error("Failed to fetch tags.") + logger.info("Dry-run mode: Rules were fetched and processed, but not saved.") + # Optionally print some of the extracted rules here for verification. + if rules: + logger.info(f"Example rule: {rules[0]}") if __name__ == "__main__": - main() \ No newline at end of file + main()