mirror of
https://github.com/fabriziosalmi/patterns.git
synced 2025-12-29 16:15:12 +00:00
nginx snippets generation fix + others minor improvements.
This commit is contained in:
@@ -156,6 +156,37 @@ def verify_blob_sha(file_sha: str, blob_content_b64: str) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def extract_sec_rules(raw_text: str) -> List[str]:
|
||||
"""
|
||||
Extracts SecRule patterns from the raw text.
|
||||
"""
|
||||
return re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL)
|
||||
|
||||
|
||||
def process_rule_file(file: Dict[str, str], session: requests.Session) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Processes a single rule file, fetching its content and extracting SecRule patterns.
|
||||
"""
|
||||
rules = []
|
||||
blob_b64 = fetch_github_blob(session, file["sha"])
|
||||
if not blob_b64:
|
||||
logger.warning(f"Skipping file {file['name']} due to empty blob content.")
|
||||
return rules
|
||||
|
||||
# Verify SHA (non-blocking)
|
||||
verify_blob_sha(file["sha"], blob_b64)
|
||||
|
||||
raw_text = base64.b64decode(blob_b64).decode("utf-8")
|
||||
sec_rules = extract_sec_rules(raw_text)
|
||||
category = file["name"].split("-")[-1].replace(".conf", "")
|
||||
for rule in sec_rules:
|
||||
pattern = rule.strip().replace("\\", "")
|
||||
if pattern:
|
||||
rules.append({"category": category, "pattern": pattern})
|
||||
|
||||
return rules
|
||||
|
||||
|
||||
def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]]) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Fetches the OWASP rule content for each rule file, extracts SecRule patterns,
|
||||
@@ -164,28 +195,13 @@ def fetch_owasp_rules(session: requests.Session, rule_files: List[Dict[str, str]
|
||||
rules = []
|
||||
with ThreadPoolExecutor(max_workers=CONNECTION_POOL_SIZE) as executor:
|
||||
futures = {
|
||||
executor.submit(fetch_github_blob, session, file["sha"]): file for file in rule_files
|
||||
executor.submit(process_rule_file, file, session): file for file in rule_files
|
||||
}
|
||||
for future in tqdm(as_completed(futures), total=len(rule_files), desc="Fetching rule files"):
|
||||
file = futures[future]
|
||||
try:
|
||||
blob_b64 = future.result()
|
||||
if not blob_b64:
|
||||
logger.warning(f"Skipping file {file['name']} due to empty blob content.")
|
||||
continue
|
||||
|
||||
# Verify SHA (non-blocking)
|
||||
verify_blob_sha(file["sha"], blob_b64)
|
||||
|
||||
raw_text = base64.b64decode(blob_b64).decode("utf-8")
|
||||
sec_rules = re.findall(r'SecRule\s+.*?"((?:[^"\\]|\\.)+?)"', raw_text, re.DOTALL)
|
||||
category = file["name"].split("-")[-1].replace(".conf", "")
|
||||
for rule in sec_rules:
|
||||
pattern = rule.strip().replace("\\", "")
|
||||
if pattern:
|
||||
rules.append({"category": category, "pattern": pattern})
|
||||
rules.extend(future.result())
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process file {file['name']}. Reason: {e}")
|
||||
logger.error(f"Failed to process file. Reason: {e}")
|
||||
|
||||
logger.info(f"Fetched {len(rules)} rules.")
|
||||
return rules
|
||||
@@ -239,4 +255,4 @@ def main():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
Reference in New Issue
Block a user