diff --git a/badbots.py b/badbots.py index 02e2123..4a7ed2c 100644 --- a/badbots.py +++ b/badbots.py @@ -2,11 +2,15 @@ import requests import os import logging import json +import time +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor, as_completed +import random # Logging setup logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") -# Output directories +# Constants and Configuration OUTPUT_DIRS = { "nginx": "waf_patterns/nginx/", "caddy": "waf_patterns/caddy/", @@ -15,118 +19,152 @@ OUTPUT_DIRS = { "haproxy": "waf_patterns/haproxy/" } -# Primary and fallback bot lists (corrected fallback URL) BOT_LIST_SOURCES = [ "https://raw.githubusercontent.com/mitchellkrogza/nginx-ultimate-bad-bot-blocker/master/_generator_lists/bad-user-agents.list", - "https://raw.githubusercontent.com/atmire/COUNTER-Robots/master/COUNTER_Robots.txt", # Corrected fallback 1 - "https://raw.githubusercontent.com/monperrus/crawler-user-agents/master/crawler-user-agents.json" # Fallback 2 (JSON) -] + "https://raw.githubusercontent.com/monperrus/crawler-user-agents/master/crawler-user-agents.json", + "https://raw.githubusercontent.com/matomo-org/referrer-spam-blacklist/master/spammers.txt", + "https://perishablepress.com/4g-ultimate-user-agent-blacklist/?format=txt" + ] + +RATE_LIMIT_DELAY = 600 +RETRY_DELAY = 5 +MAX_RETRIES = 3 +EXPONENTIAL_BACKOFF = True +BACKOFF_MULTIPLIER = 2 +MAX_WORKERS = 4 +GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") + + +def fetch_with_retries(url: str) -> list: + retries = 0 + headers = {} + + if GITHUB_TOKEN: + headers['Authorization'] = f'token {GITHUB_TOKEN}' + logging.info(f"Using GitHub token for {url}") + + while retries < MAX_RETRIES: + try: + response = requests.get(url, headers=headers, timeout=10) + if response.status_code == 200: + logging.info(f"Fetched from {url}") + return parse_bot_list(url, response) + + if response.status_code == 403 and 'X-RateLimit-Remaining' in response.headers: + reset_time = int(response.headers['X-RateLimit-Reset']) + wait_time = max(reset_time - int(time.time()), RATE_LIMIT_DELAY) + logging.warning(f"Rate limit exceeded for {url}. Retrying in {wait_time} seconds...") + time.sleep(wait_time) + else: + jitter = random.uniform(1, 3) + wait_time = (RETRY_DELAY * (BACKOFF_MULTIPLIER ** retries) if EXPONENTIAL_BACKOFF else RETRY_DELAY) + jitter + logging.warning(f"Retrying {url}... ({retries + 1}/{MAX_RETRIES}) in {wait_time:.2f} seconds.") + time.sleep(wait_time) + retries += 1 + except requests.RequestException as e: + logging.error(f"Error fetching {url}: {e}") + retries += 1 + + logging.error(f"Failed to fetch {url} after {MAX_RETRIES} retries.") + return [] + + +def parse_bot_list(url: str, response: requests.Response) -> list: + bot_patterns = set() + try: + if url.endswith(".json"): + json_data = response.json() + if isinstance(json_data, list): + for entry in json_data: + bot_patterns.add(entry.get('pattern', entry.get('ua', ''))) + elif isinstance(json_data, dict): + for entry in json_data.get('test_cases', []): + bot_patterns.add(entry.get('user_agent_string', '')) + else: + bot_patterns.update(response.text.splitlines()) + except (ValueError, json.JSONDecodeError) as e: + logging.warning(f"Error parsing {url}: {e}") + + return list(bot_patterns) + def fetch_bot_list(): - for source in BOT_LIST_SOURCES: - try: - logging.info(f"Fetching bad bot list from {source}...") - response = requests.get(source, timeout=10) - response.raise_for_status() + bot_patterns = set() - # Handle JSON fallback source - if source.endswith(".json"): - bots = [item['pattern'] for item in response.json()] - else: - bots = response.text.splitlines() + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + future_to_url = {executor.submit(fetch_with_retries, url): url for url in BOT_LIST_SOURCES} - logging.info(f"Successfully fetched {len(bots)} bots from {source}") - return bots + for future in as_completed(future_to_url): + result = future.result() + bot_patterns.update(result) - except (requests.RequestException, ValueError) as e: - logging.warning(f"Failed to fetch from {source}. Reason: {e}") + if not bot_patterns: + logging.error("❌ No bots were fetched from any source. Exiting...") + exit(1) + + logging.info(f"✅ Total unique bots collected: {len(bot_patterns)}") + return sorted(bot_patterns) + + +def write_to_file(path: Path, content: str): + try: + with path.open("w") as f: + f.write(content) + logging.info(f"Generated file: {path}") + except IOError as e: + logging.error(f"Failed to write to {path}: {e}") - logging.error("❌ All bot lists failed to fetch. Exiting...") - exit(1) def generate_nginx_conf(bots): - path = os.path.join(OUTPUT_DIRS['nginx'], "bots.conf") - with open(path, "w") as f: - f.write("# Nginx WAF - Bad Bot Blocker\n") - f.write("map $http_user_agent $bad_bot {\n") - for bot in bots: - f.write(f' "~*{bot}" 1;\n') - f.write(" default 0;\n}\n") + path = Path(OUTPUT_DIRS['nginx'], "bots.conf") + content = "map $http_user_agent $bad_bot {\n" + for bot in bots: + content += f' "~*{bot}" 1;\n' + content += " default 0;\n}\n" + write_to_file(path, content) - # Evil bit simulation (header check) - added here - f.write("map $http_x_evil_bit $evil_bit_detected {\n default 0;\n \"1\" 1;\n}\n") - - f.write("if ($bad_bot or $evil_bit_detected) {\n return 403;\n}\n") - logging.info(f"[+] Generated Nginx bot blocker: {path}") def generate_caddy_conf(bots): - path = os.path.join(OUTPUT_DIRS['caddy'], "bots.conf") - with open(path, "w") as f: - f.write("# Caddy WAF - Bad Bot Blocker\n") - f.write("@bad_bot {\n") - for bot in bots: - f.write(f' header User-Agent *{bot}*\n') - f.write("}\n") - f.write("@evil_bit {\n header X-Evil-Bit 1\n}\n") # Evil bit simulation - f.write("respond @bad_bot 403\n") - f.write("respond @evil_bit 403\n") - logging.info(f"[+] Generated Caddy bot blocker: {path}") + path = Path(OUTPUT_DIRS['caddy'], "bots.conf") + content = "@bad_bot {\n" + for bot in bots: + content += f' header User-Agent *{bot}*\n' + content += "}\nrespond @bad_bot 403\n" + write_to_file(path, content) + def generate_apache_conf(bots): - path = os.path.join(OUTPUT_DIRS['apache'], "bots.conf") - with open(path, "w") as f: - f.write("# Apache ModSecurity - Bad Bot Blocker\n") - f.write("SecRuleEngine On\n") - for bot in bots: - f.write(f'SecRule REQUEST_HEADERS:User-Agent "@contains {bot}" "id:3000,phase:1,deny,status:403,log,msg:\'Bad Bot Blocked\'"\n') - f.write('SecRule REQUEST_HEADERS:X-Evil-Bit "@streq 1" "id:3001,phase:1,deny,status:403,log,msg:\'Evil Bit Blocked\'"\n') # Evil bit simulation - logging.info(f"[+] Generated Apache bot blocker: {path}") + path = Path(OUTPUT_DIRS['apache'], "bots.conf") + content = "SecRuleEngine On\n" + for bot in bots: + content += f'SecRule REQUEST_HEADERS:User-Agent "@contains {bot}" "id:3000,phase:1,deny,status:403"\n' + write_to_file(path, content) def generate_traefik_conf(bots): - path = os.path.join(OUTPUT_DIRS['traefik'], "bots.toml") - with open(path, "w") as f: - f.write("[http.middlewares]\n") - f.write("[http.middlewares.bad_bot_block]\n") - f.write(" [http.middlewares.bad_bot_block.plugin.badbot]\n") - f.write(" userAgent = [\n") - for bot in bots: - f.write(f' "{bot}",\n') - f.write(" ]\n") - # Evil bit simulation - f.write(" [http.middlewares.evil_bit_block]\n") - f.write(" [http.middlewares.evil_bit_block.headers]\n") - f.write(" headers = [\n") - f.write(' "X-Evil-Bit=1",\n') - f.write(" ]\n") - f.write(" [http.middlewares.bad_bot_block.chain.middlewares] = [\"evil_bit_block\"]\n") - f.write("[http.routers.my_router]\n") - f.write(" middlewares = [\"bad_bot_block\"]\n") - - - logging.info(f"[+] Generated Traefik bot blocker: {path}") + path = Path(OUTPUT_DIRS['traefik'], "bots.toml") + content = "[http.middlewares]\n[http.middlewares.bad_bot_block]\n [http.middlewares.bad_bot_block.plugin.badbot]\n userAgent = [\n" + for bot in bots: + content += f' "{bot}",\n' + content += " ]\n" + write_to_file(path, content) def generate_haproxy_conf(bots): - path = os.path.join(OUTPUT_DIRS['haproxy'], "bots.acl") - with open(path, "w") as f: - f.write("# HAProxy WAF - Bad Bot Blocker\n") - for bot in bots: - f.write(f'acl bad_bot hdr_sub(User-Agent) -i {bot}\n') - f.write("acl evil_bit hdr(X-Evil-Bit) -i 1\n") # Evil bit simulation - f.write("http-request deny if bad_bot\n") - f.write("http-request deny if evil_bit\n") # Evil bit simulation - logging.info(f"[+] Generated HAProxy bot blocker: {path}") + path = Path(OUTPUT_DIRS['haproxy'], "bots.acl") + content = "# HAProxy WAF - Bad Bot Blocker\n" + for bot in bots: + content += f'acl bad_bot hdr_sub(User-Agent) -i {bot}\n' + content += "http-request deny if bad_bot\n" + write_to_file(path, content) + if __name__ == "__main__": - # Ensure output directories exist - for path in OUTPUT_DIRS.values(): - os.makedirs(path, exist_ok=True) + for output_dir in OUTPUT_DIRS.values(): + Path(output_dir).mkdir(parents=True, exist_ok=True) - # Fetch bot list bots = fetch_bot_list() - # Generate bot blocker configs for each platform generate_nginx_conf(bots) generate_caddy_conf(bots) generate_apache_conf(bots)