#!/usr/bin/env python3 """ Test script to insert fake external IPs into the database for testing the dashboard. This generates realistic-looking test data including: - Access logs with various suspicious activities - Credential attempts - Attack detections (SQL injection, XSS, etc.) - Category behavior changes for timeline demonstration - Geolocation data fetched from API with reverse geocoded city names - Real good crawler IPs (Googlebot, Bingbot, etc.) Usage: python test_insert_fake_ips.py [num_ips] [logs_per_ip] [credentials_per_ip] [--no-cleanup] Examples: python test_insert_fake_ips.py # Generate 20 IPs with defaults, cleanup DB first python test_insert_fake_ips.py 30 # Generate 30 IPs with defaults python test_insert_fake_ips.py 30 20 5 # Generate 30 IPs, 20 logs each, 5 credentials each python test_insert_fake_ips.py --no-cleanup # Generate data without cleaning DB first Note: This script will make API calls to fetch geolocation data, so it may take a while. """ import random import time import sys from datetime import datetime, timedelta from zoneinfo import ZoneInfo from pathlib import Path import requests # Add parent src directory to path so we can import database and logger sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from database import get_database from logger import get_app_logger from geo_utils import extract_geolocation_from_ip # ---------------------- # TEST DATA GENERATORS # ---------------------- # Fake IPs for testing - geolocation data will be fetched from API # These are real public IPs from various locations around the world FAKE_IPS = [ # VPN "31.13.189.236", "37.120.215.246", "37.120.215.247", "37.120.215.248", "37.120.215.249", # United States "45.142.120.10", "107.189.10.143", "162.243.175.23", "198.51.100.89", # Europe "185.220.101.45", "195.154.133.20", "178.128.83.165", "87.251.67.90", "91.203.5.165", "46.105.57.169", "217.182.143.207", "188.166.123.45", # Asia "103.253.145.36", "42.112.28.216", "118.163.74.160", "43.229.53.35", "115.78.208.140", "14.139.56.18", "61.19.25.207", "121.126.219.198", "202.134.4.212", "171.244.140.134", # South America "177.87.169.20", "200.21.19.58", "181.13.140.98", "190.150.24.34", # Middle East & Africa "41.223.53.141", "196.207.35.152", "5.188.62.214", "37.48.93.125", "102.66.137.29", # Australia & Oceania "103.28.248.110", "202.168.45.33", # Additional European IPs "94.102.49.190", "213.32.93.140", "79.137.79.167", "37.9.169.146", "188.92.80.123", "80.240.25.198", ] # Real good crawler IPs (Googlebot, Bingbot, etc.) - geolocation will be fetched from API GOOD_CRAWLER_IPS = [ "66.249.66.1", # Googlebot "66.249.79.23", # Googlebot "40.77.167.52", # Bingbot "157.55.39.145", # Bingbot "17.58.98.100", # Applebot "199.59.150.39", # Twitterbot "54.236.1.15", # Amazon Bot ] FAKE_PATHS = [ "/admin", "/login", "/admin/login", "/api/users", "/wp-admin", "/.env", "/config.php", "/admin.php", "/shell.php", "/../../../etc/passwd", "/sqlmap", "/w00t.php", "/shell", "/joomla/administrator", ] FAKE_USER_AGENTS = [ "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", "Nmap Scripting Engine", "curl/7.68.0", "python-requests/2.28.1", "sqlmap/1.6.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x64)", "ZmEu", "nikto/2.1.6", ] FAKE_CREDENTIALS = [ ("admin", "admin"), ("admin", "password"), ("root", "123456"), ("test", "test"), ("guest", "guest"), ("user", "12345"), ] ATTACK_TYPES = [ "sql_injection", "xss_attempt", "path_traversal", "suspicious_pattern", "credential_submission", ] CATEGORIES = [ "attacker", "bad_crawler", "good_crawler", "regular_user", "unknown", ] def generate_category_scores(): """Generate random category scores.""" scores = { "attacker": random.randint(0, 100), "good_crawler": random.randint(0, 100), "bad_crawler": random.randint(0, 100), "regular_user": random.randint(0, 100), "unknown": random.randint(0, 100), } return scores def generate_analyzed_metrics(): """Generate random analyzed metrics.""" return { "request_frequency": random.uniform(0.1, 100.0), "suspicious_patterns": random.randint(0, 20), "credential_attempts": random.randint(0, 10), "attack_diversity": random.uniform(0, 1.0), } def cleanup_database(db_manager, app_logger): """ Clean up all existing test data from the database. Args: db_manager: Database manager instance app_logger: Logger instance """ from models import ( AccessLog, CredentialAttempt, AttackDetection, IpStats, CategoryHistory, ) app_logger.info("=" * 60) app_logger.info("Cleaning up existing database data") app_logger.info("=" * 60) session = db_manager.session try: # Delete all records from each table deleted_attack_detections = session.query(AttackDetection).delete() deleted_access_logs = session.query(AccessLog).delete() deleted_credentials = session.query(CredentialAttempt).delete() deleted_category_history = session.query(CategoryHistory).delete() deleted_ip_stats = session.query(IpStats).delete() session.commit() app_logger.info(f"Deleted {deleted_access_logs} access logs") app_logger.info(f"Deleted {deleted_attack_detections} attack detections") app_logger.info(f"Deleted {deleted_credentials} credential attempts") app_logger.info(f"Deleted {deleted_category_history} category history records") app_logger.info(f"Deleted {deleted_ip_stats} IP statistics") app_logger.info("✓ Database cleanup complete") except Exception as e: session.rollback() app_logger.error(f"Error during database cleanup: {e}") raise finally: db_manager.close_session() def fetch_geolocation_from_api(ip: str, app_logger) -> tuple: """ Fetch geolocation data using ip-api.com. Args: ip: IP address to lookup app_logger: Logger instance Returns: Tuple of (country_code, city, asn, asn_org) or None if failed """ try: geoloc_data = extract_geolocation_from_ip(ip) if geoloc_data: country_code = geoloc_data.get("country_code") city = geoloc_data.get("city") asn = geoloc_data.get("asn") asn_org = geoloc_data.get("org") return (country_code, city, asn, asn_org) except requests.RequestException as e: app_logger.warning(f"Failed to fetch geolocation for {ip}: {e}") except Exception as e: app_logger.error(f"Error processing geolocation for {ip}: {e}") return None def generate_fake_data( num_ips: int = 20, logs_per_ip: int = 15, credentials_per_ip: int = 3, include_good_crawlers: bool = True, cleanup: bool = True, ): """ Generate and insert fake test data into the database. Args: num_ips: Number of unique fake IPs to generate (default: 20) logs_per_ip: Number of access logs per IP (default: 15) credentials_per_ip: Number of credential attempts per IP (default: 3) include_good_crawlers: Whether to add real good crawler IPs with API-fetched geolocation (default: True) cleanup: Whether to clean up existing database data before generating new data (default: True) """ db_manager = get_database() app_logger = get_app_logger() # Ensure database is initialized if not db_manager._initialized: db_manager.initialize() # Clean up existing data if requested if cleanup: cleanup_database(db_manager, app_logger) print() # Add blank line for readability app_logger.info("=" * 60) app_logger.info("Starting fake IP data generation for testing") app_logger.info("=" * 60) total_logs = 0 total_credentials = 0 total_attacks = 0 total_category_changes = 0 # Select random IPs from the pool selected_ips = random.sample(FAKE_IPS, min(num_ips, len(FAKE_IPS))) # Create a varied distribution of request counts for better visualization # Some IPs with very few requests, some with moderate, some with high request_counts = [] for i in range(len(selected_ips)): if i < len(selected_ips) // 5: # 20% high-traffic IPs count = random.randint(100, 1000) elif i < len(selected_ips) // 2: # 30% medium-traffic IPs count = random.randint(10, 100) else: # 50% low-traffic IPs count = random.randint(5, 100) request_counts.append(count) random.shuffle(request_counts) # Randomize the order for idx, ip in enumerate(selected_ips): current_logs_count = request_counts[idx] app_logger.info( f"\nGenerating data for IP: {ip} ({current_logs_count} requests)" ) # Generate access logs for this IP for _ in range(current_logs_count): path = random.choice(FAKE_PATHS) user_agent = random.choice(FAKE_USER_AGENTS) is_suspicious = random.choice( [True, False, False] ) # 33% chance of suspicious is_honeypot = random.choice( [True, False, False, False] ) # 25% chance of honeypot trigger # Randomly decide if this log has attack detections attack_types = None if random.choice([True, False, False]): # 33% chance num_attacks = random.randint(1, 3) attack_types = random.sample(ATTACK_TYPES, num_attacks) log_id = db_manager.persist_access( ip=ip, path=path, user_agent=user_agent, method=random.choice(["GET", "POST"]), is_suspicious=is_suspicious, is_honeypot_trigger=is_honeypot, attack_types=attack_types, ) if log_id: total_logs += 1 if attack_types: total_attacks += len(attack_types) # Generate credential attempts for this IP for _ in range(credentials_per_ip): username, password = random.choice(FAKE_CREDENTIALS) path = random.choice(["/login", "/admin/login", "/api/auth"]) cred_id = db_manager.persist_credential( ip=ip, path=path, username=username, password=password, ) if cred_id: total_credentials += 1 app_logger.info(f" ✓ Generated {current_logs_count} access logs") app_logger.info(f" ✓ Generated {credentials_per_ip} credential attempts") # Fetch geolocation data from API app_logger.info(f" 🌍 Fetching geolocation from API...") geo_data = fetch_geolocation_from_api(ip, app_logger) if geo_data: country_code, city, asn, asn_org = geo_data db_manager.update_ip_rep_infos( ip=ip, country_code=country_code, asn=asn if asn else 12345, asn_org=asn_org or "Unknown", list_on={}, city=city, ) location_display = ( f"{city}, {country_code}" if city else country_code or "Unknown" ) app_logger.info( f" 📍 API-fetched geolocation: {location_display} ({asn_org or 'Unknown'})" ) else: app_logger.warning(f" ⚠ Could not fetch geolocation for {ip}") # Small delay to be nice to the API time.sleep(0.5) # Trigger behavior/category changes to demonstrate timeline feature # First analysis initial_category = random.choice(CATEGORIES) app_logger.info( f" ⟳ Analyzing behavior - Initial category: {initial_category}" ) db_manager.update_ip_stats_analysis( ip=ip, analyzed_metrics=generate_analyzed_metrics(), category=initial_category, category_scores=generate_category_scores(), last_analysis=datetime.now(tz=ZoneInfo("UTC")), ) total_category_changes += 1 # Small delay to ensure timestamps are different time.sleep(0.1) # Second analysis with potential category change (70% chance) if random.random() < 0.7: new_category = random.choice( [c for c in CATEGORIES if c != initial_category] ) app_logger.info( f" ⟳ Behavior change detected: {initial_category} → {new_category}" ) db_manager.update_ip_stats_analysis( ip=ip, analyzed_metrics=generate_analyzed_metrics(), category=new_category, category_scores=generate_category_scores(), last_analysis=datetime.now(tz=ZoneInfo("UTC")), ) total_category_changes += 1 # Optional third change (40% chance) if random.random() < 0.4: final_category = random.choice( [c for c in CATEGORIES if c != new_category] ) app_logger.info( f" ⟳ Another behavior change: {new_category} → {final_category}" ) time.sleep(0.1) db_manager.update_ip_stats_analysis( ip=ip, analyzed_metrics=generate_analyzed_metrics(), category=final_category, category_scores=generate_category_scores(), last_analysis=datetime.now(tz=ZoneInfo("UTC")), ) total_category_changes += 1 # Add good crawler IPs with real geolocation from API total_good_crawlers = 0 if include_good_crawlers: app_logger.info("\n" + "=" * 60) app_logger.info("Adding Good Crawler IPs with API-fetched geolocation") app_logger.info("=" * 60) for crawler_ip in GOOD_CRAWLER_IPS: app_logger.info(f"\nProcessing Good Crawler: {crawler_ip}") # Fetch real geolocation from API geo_data = fetch_geolocation_from_api(crawler_ip, app_logger) # Don't generate access logs for good crawlers to prevent re-categorization # We'll just create the IP stats entry with the category set app_logger.info( f" ✓ Adding as good crawler (no logs to prevent re-categorization)" ) # First, we need to create the IP in the database via persist_access # (but we'll only create one minimal log entry) db_manager.persist_access( ip=crawler_ip, path="/robots.txt", # Minimal, normal crawler behavior user_agent="Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)", method="GET", is_suspicious=False, is_honeypot_trigger=False, attack_types=None, ) # Add geolocation if API fetch was successful if geo_data: country_code, city, asn, asn_org = geo_data db_manager.update_ip_rep_infos( ip=crawler_ip, country_code=country_code, asn=asn if asn else 12345, asn_org=asn_org, list_on={}, city=city, ) app_logger.info( f" 📍 API-fetched geolocation: {city}, {country_code} ({asn_org})" ) else: app_logger.warning(f" ⚠ Could not fetch geolocation for {crawler_ip}") # Set category to good_crawler - this sets manual_category=True to prevent re-analysis db_manager.update_ip_stats_analysis( ip=crawler_ip, analyzed_metrics={ "request_frequency": 0.1, # Very low frequency "suspicious_patterns": 0, "credential_attempts": 0, "attack_diversity": 0.0, }, category="good_crawler", category_scores={ "attacker": 0, "good_crawler": 100, "bad_crawler": 0, "regular_user": 0, "unknown": 0, }, last_analysis=datetime.now(tz=ZoneInfo("UTC")), ) total_good_crawlers += 1 time.sleep(0.5) # Small delay between API calls # Print summary app_logger.info("\n" + "=" * 60) app_logger.info("Test Data Generation Complete!") app_logger.info("=" * 60) app_logger.info(f"Total IPs created: {len(selected_ips) + total_good_crawlers}") app_logger.info(f" - Attackers/Mixed: {len(selected_ips)}") app_logger.info(f" - Good Crawlers: {total_good_crawlers}") app_logger.info(f"Total access logs: {total_logs}") app_logger.info(f"Total attack detections: {total_attacks}") app_logger.info(f"Total credential attempts: {total_credentials}") app_logger.info(f"Total category changes: {total_category_changes}") app_logger.info("=" * 60) app_logger.info("\nYou can now view the dashboard with this test data.") app_logger.info( "The 'Behavior Timeline' will show category transitions for each IP." ) app_logger.info( "All IPs have API-fetched geolocation with reverse geocoded city names." ) app_logger.info("Run: python server.py") app_logger.info("=" * 60) if __name__ == "__main__": import sys # Allow command-line arguments for customization num_ips = int(sys.argv[1]) if len(sys.argv) > 1 else 20 logs_per_ip = int(sys.argv[2]) if len(sys.argv) > 2 else 15 credentials_per_ip = int(sys.argv[3]) if len(sys.argv) > 3 else 3 # Add --no-cleanup flag to skip database cleanup cleanup = "--no-cleanup" not in sys.argv generate_fake_data( num_ips, logs_per_ip, credentials_per_ip, include_good_crawlers=True, cleanup=cleanup, )