Update badbots.py

- Introduced concurrency with ThreadPoolExecutor for faster bot list fetching.
- Added rate limiting and exponential backoff for resilient fetch operations.
- Expanded bot list sources to include new datasets for improved coverage.
- Implemented GitHub token authentication for rate-limited endpoints.
- Refactored file generation to use pathlib for better path management.
- Replaced os.path with Path for consistency and readability.
- Modularized retry logic and enhanced error handling for robustness.
- Ensured bot lists are aggregated and deduplicated across sources.
- Improved logging for better visibility into fetch and write operations.
This commit is contained in:
fab 2024-12-30 00:08:12 +01:00 committed by GitHub
parent 7f7f7fecd4
commit f347ac2817
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,11 +2,15 @@ import requests
import os import os
import logging import logging
import json import json
import time
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
# Logging setup # Logging setup
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Output directories # Constants and Configuration
OUTPUT_DIRS = { OUTPUT_DIRS = {
"nginx": "waf_patterns/nginx/", "nginx": "waf_patterns/nginx/",
"caddy": "waf_patterns/caddy/", "caddy": "waf_patterns/caddy/",
@ -15,118 +19,152 @@ OUTPUT_DIRS = {
"haproxy": "waf_patterns/haproxy/" "haproxy": "waf_patterns/haproxy/"
} }
# Primary and fallback bot lists (corrected fallback URL)
BOT_LIST_SOURCES = [ BOT_LIST_SOURCES = [
"https://raw.githubusercontent.com/mitchellkrogza/nginx-ultimate-bad-bot-blocker/master/_generator_lists/bad-user-agents.list", "https://raw.githubusercontent.com/mitchellkrogza/nginx-ultimate-bad-bot-blocker/master/_generator_lists/bad-user-agents.list",
"https://raw.githubusercontent.com/atmire/COUNTER-Robots/master/COUNTER_Robots.txt", # Corrected fallback 1 "https://raw.githubusercontent.com/monperrus/crawler-user-agents/master/crawler-user-agents.json",
"https://raw.githubusercontent.com/monperrus/crawler-user-agents/master/crawler-user-agents.json" # Fallback 2 (JSON) "https://raw.githubusercontent.com/matomo-org/referrer-spam-blacklist/master/spammers.txt",
] "https://perishablepress.com/4g-ultimate-user-agent-blacklist/?format=txt"
]
RATE_LIMIT_DELAY = 600
RETRY_DELAY = 5
MAX_RETRIES = 3
EXPONENTIAL_BACKOFF = True
BACKOFF_MULTIPLIER = 2
MAX_WORKERS = 4
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
def fetch_with_retries(url: str) -> list:
retries = 0
headers = {}
if GITHUB_TOKEN:
headers['Authorization'] = f'token {GITHUB_TOKEN}'
logging.info(f"Using GitHub token for {url}")
while retries < MAX_RETRIES:
try:
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
logging.info(f"Fetched from {url}")
return parse_bot_list(url, response)
if response.status_code == 403 and 'X-RateLimit-Remaining' in response.headers:
reset_time = int(response.headers['X-RateLimit-Reset'])
wait_time = max(reset_time - int(time.time()), RATE_LIMIT_DELAY)
logging.warning(f"Rate limit exceeded for {url}. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
jitter = random.uniform(1, 3)
wait_time = (RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries) if EXPONENTIAL_BACKOFF else RETRY_DELAY) + jitter
logging.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time:.2f} seconds.")
time.sleep(wait_time)
retries += 1
except requests.RequestException as e:
logging.error(f"Error fetching {url}: {e}")
retries += 1
logging.error(f"Failed to fetch {url} after {MAX_RETRIES} retries.")
return []
def parse_bot_list(url: str, response: requests.Response) -> list:
bot_patterns = set()
try:
if url.endswith(".json"):
json_data = response.json()
if isinstance(json_data, list):
for entry in json_data:
bot_patterns.add(entry.get('pattern', entry.get('ua', '')))
elif isinstance(json_data, dict):
for entry in json_data.get('test_cases', []):
bot_patterns.add(entry.get('user_agent_string', ''))
else:
bot_patterns.update(response.text.splitlines())
except (ValueError, json.JSONDecodeError) as e:
logging.warning(f"Error parsing {url}: {e}")
return list(bot_patterns)
def fetch_bot_list(): def fetch_bot_list():
for source in BOT_LIST_SOURCES: bot_patterns = set()
try:
logging.info(f"Fetching bad bot list from {source}...")
response = requests.get(source, timeout=10)
response.raise_for_status()
# Handle JSON fallback source with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
if source.endswith(".json"): future_to_url = {executor.submit(fetch_with_retries, url): url for url in BOT_LIST_SOURCES}
bots = [item['pattern'] for item in response.json()]
else:
bots = response.text.splitlines()
logging.info(f"Successfully fetched {len(bots)} bots from {source}") for future in as_completed(future_to_url):
return bots result = future.result()
bot_patterns.update(result)
except (requests.RequestException, ValueError) as e: if not bot_patterns:
logging.warning(f"Failed to fetch from {source}. Reason: {e}") logging.error("❌ No bots were fetched from any source. Exiting...")
logging.error("❌ All bot lists failed to fetch. Exiting...")
exit(1) exit(1)
logging.info(f"✅ Total unique bots collected: {len(bot_patterns)}")
return sorted(bot_patterns)
def write_to_file(path: Path, content: str):
try:
with path.open("w") as f:
f.write(content)
logging.info(f"Generated file: {path}")
except IOError as e:
logging.error(f"Failed to write to {path}: {e}")
def generate_nginx_conf(bots): def generate_nginx_conf(bots):
path = os.path.join(OUTPUT_DIRS['nginx'], "bots.conf") path = Path(OUTPUT_DIRS['nginx'], "bots.conf")
with open(path, "w") as f: content = "map $http_user_agent $bad_bot {\n"
f.write("# Nginx WAF - Bad Bot Blocker\n")
f.write("map $http_user_agent $bad_bot {\n")
for bot in bots: for bot in bots:
f.write(f' "~*{bot}" 1;\n') content += f' "~*{bot}" 1;\n'
f.write(" default 0;\n}\n") content += " default 0;\n}\n"
write_to_file(path, content)
# Evil bit simulation (header check) - added here
f.write("map $http_x_evil_bit $evil_bit_detected {\n default 0;\n \"1\" 1;\n}\n")
f.write("if ($bad_bot or $evil_bit_detected) {\n return 403;\n}\n")
logging.info(f"[+] Generated Nginx bot blocker: {path}")
def generate_caddy_conf(bots): def generate_caddy_conf(bots):
path = os.path.join(OUTPUT_DIRS['caddy'], "bots.conf") path = Path(OUTPUT_DIRS['caddy'], "bots.conf")
with open(path, "w") as f: content = "@bad_bot {\n"
f.write("# Caddy WAF - Bad Bot Blocker\n")
f.write("@bad_bot {\n")
for bot in bots: for bot in bots:
f.write(f' header User-Agent *{bot}*\n') content += f' header User-Agent *{bot}*\n'
f.write("}\n") content += "}\nrespond @bad_bot 403\n"
f.write("@evil_bit {\n header X-Evil-Bit 1\n}\n") # Evil bit simulation write_to_file(path, content)
f.write("respond @bad_bot 403\n")
f.write("respond @evil_bit 403\n")
logging.info(f"[+] Generated Caddy bot blocker: {path}")
def generate_apache_conf(bots): def generate_apache_conf(bots):
path = os.path.join(OUTPUT_DIRS['apache'], "bots.conf") path = Path(OUTPUT_DIRS['apache'], "bots.conf")
with open(path, "w") as f: content = "SecRuleEngine On\n"
f.write("# Apache ModSecurity - Bad Bot Blocker\n")
f.write("SecRuleEngine On\n")
for bot in bots: for bot in bots:
f.write(f'SecRule REQUEST_HEADERS:User-Agent "@contains {bot}" "id:3000,phase:1,deny,status:403,log,msg:\'Bad Bot Blocked\'"\n') content += f'SecRule REQUEST_HEADERS:User-Agent "@contains {bot}" "id:3000,phase:1,deny,status:403"\n'
f.write('SecRule REQUEST_HEADERS:X-Evil-Bit "@streq 1" "id:3001,phase:1,deny,status:403,log,msg:\'Evil Bit Blocked\'"\n') # Evil bit simulation write_to_file(path, content)
logging.info(f"[+] Generated Apache bot blocker: {path}")
def generate_traefik_conf(bots): def generate_traefik_conf(bots):
path = os.path.join(OUTPUT_DIRS['traefik'], "bots.toml") path = Path(OUTPUT_DIRS['traefik'], "bots.toml")
with open(path, "w") as f: content = "[http.middlewares]\n[http.middlewares.bad_bot_block]\n [http.middlewares.bad_bot_block.plugin.badbot]\n userAgent = [\n"
f.write("[http.middlewares]\n")
f.write("[http.middlewares.bad_bot_block]\n")
f.write(" [http.middlewares.bad_bot_block.plugin.badbot]\n")
f.write(" userAgent = [\n")
for bot in bots: for bot in bots:
f.write(f' "{bot}",\n') content += f' "{bot}",\n'
f.write(" ]\n") content += " ]\n"
# Evil bit simulation write_to_file(path, content)
f.write(" [http.middlewares.evil_bit_block]\n")
f.write(" [http.middlewares.evil_bit_block.headers]\n")
f.write(" headers = [\n")
f.write(' "X-Evil-Bit=1",\n')
f.write(" ]\n")
f.write(" [http.middlewares.bad_bot_block.chain.middlewares] = [\"evil_bit_block\"]\n")
f.write("[http.routers.my_router]\n")
f.write(" middlewares = [\"bad_bot_block\"]\n")
logging.info(f"[+] Generated Traefik bot blocker: {path}")
def generate_haproxy_conf(bots): def generate_haproxy_conf(bots):
path = os.path.join(OUTPUT_DIRS['haproxy'], "bots.acl") path = Path(OUTPUT_DIRS['haproxy'], "bots.acl")
with open(path, "w") as f: content = "# HAProxy WAF - Bad Bot Blocker\n"
f.write("# HAProxy WAF - Bad Bot Blocker\n")
for bot in bots: for bot in bots:
f.write(f'acl bad_bot hdr_sub(User-Agent) -i {bot}\n') content += f'acl bad_bot hdr_sub(User-Agent) -i {bot}\n'
f.write("acl evil_bit hdr(X-Evil-Bit) -i 1\n") # Evil bit simulation content += "http-request deny if bad_bot\n"
f.write("http-request deny if bad_bot\n") write_to_file(path, content)
f.write("http-request deny if evil_bit\n") # Evil bit simulation
logging.info(f"[+] Generated HAProxy bot blocker: {path}")
if __name__ == "__main__": if __name__ == "__main__":
# Ensure output directories exist for output_dir in OUTPUT_DIRS.values():
for path in OUTPUT_DIRS.values(): Path(output_dir).mkdir(parents=True, exist_ok=True)
os.makedirs(path, exist_ok=True)
# Fetch bot list
bots = fetch_bot_list() bots = fetch_bot_list()
# Generate bot blocker configs for each platform
generate_nginx_conf(bots) generate_nginx_conf(bots)
generate_caddy_conf(bots) generate_caddy_conf(bots)
generate_apache_conf(bots) generate_apache_conf(bots)