Files
krawl.es/tests/test_insert_fake_ips.py
Lorenzo Venerandi 152682de2c code linted
2026-02-18 00:02:44 +01:00

565 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Test script to insert fake external IPs into the database for testing the dashboard.
This generates realistic-looking test data including:
- Access logs with various suspicious activities
- Credential attempts
- Attack detections (SQL injection, XSS, etc.)
- Category behavior changes for timeline demonstration
- Geolocation data fetched from API with reverse geocoded city names
- Real good crawler IPs (Googlebot, Bingbot, etc.)
Usage:
python test_insert_fake_ips.py [num_ips] [logs_per_ip] [credentials_per_ip] [--no-cleanup]
Examples:
python test_insert_fake_ips.py # Generate 20 IPs with defaults, cleanup DB first
python test_insert_fake_ips.py 30 # Generate 30 IPs with defaults
python test_insert_fake_ips.py 30 20 5 # Generate 30 IPs, 20 logs each, 5 credentials each
python test_insert_fake_ips.py --no-cleanup # Generate data without cleaning DB first
Note: This script will make API calls to fetch geolocation data, so it may take a while.
"""
import random
import time
import sys
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from pathlib import Path
import requests
# Add parent src directory to path so we can import database and logger
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from database import get_database
from logger import get_app_logger
from geo_utils import extract_geolocation_from_ip
# ----------------------
# TEST DATA GENERATORS
# ----------------------
# Fake IPs for testing - geolocation data will be fetched from API
# These are real public IPs from various locations around the world
FAKE_IPS = [
# VPN
"31.13.189.236",
"37.120.215.246",
"37.120.215.247",
"37.120.215.248",
"37.120.215.249",
# United States
"45.142.120.10",
"107.189.10.143",
"162.243.175.23",
"198.51.100.89",
# Europe
"185.220.101.45",
"195.154.133.20",
"178.128.83.165",
"87.251.67.90",
"91.203.5.165",
"46.105.57.169",
"217.182.143.207",
"188.166.123.45",
# Asia
"103.253.145.36",
"42.112.28.216",
"118.163.74.160",
"43.229.53.35",
"115.78.208.140",
"14.139.56.18",
"61.19.25.207",
"121.126.219.198",
"202.134.4.212",
"171.244.140.134",
# South America
"177.87.169.20",
"200.21.19.58",
"181.13.140.98",
"190.150.24.34",
# Middle East & Africa
"41.223.53.141",
"196.207.35.152",
"5.188.62.214",
"37.48.93.125",
"102.66.137.29",
# Australia & Oceania
"103.28.248.110",
"202.168.45.33",
# Additional European IPs
"94.102.49.190",
"213.32.93.140",
"79.137.79.167",
"37.9.169.146",
"188.92.80.123",
"80.240.25.198",
]
# Real good crawler IPs (Googlebot, Bingbot, etc.) - geolocation will be fetched from API
GOOD_CRAWLER_IPS = [
"66.249.66.1", # Googlebot
"66.249.79.23", # Googlebot
"40.77.167.52", # Bingbot
"157.55.39.145", # Bingbot
"17.58.98.100", # Applebot
"199.59.150.39", # Twitterbot
"54.236.1.15", # Amazon Bot
]
FAKE_PATHS = [
"/admin",
"/login",
"/admin/login",
"/api/users",
"/wp-admin",
"/.env",
"/config.php",
"/admin.php",
"/shell.php",
"/../../../etc/passwd",
"/sqlmap",
"/w00t.php",
"/shell",
"/joomla/administrator",
]
FAKE_USER_AGENTS = [
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
"Nmap Scripting Engine",
"curl/7.68.0",
"python-requests/2.28.1",
"sqlmap/1.6.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"ZmEu",
"nikto/2.1.6",
]
FAKE_CREDENTIALS = [
("admin", "admin"),
("admin", "password"),
("root", "123456"),
("test", "test"),
("guest", "guest"),
("user", "12345"),
]
ATTACK_TYPES = [
"sql_injection",
"xss_attempt",
"path_traversal",
"suspicious_pattern",
"credential_submission",
]
CATEGORIES = [
"attacker",
"bad_crawler",
"good_crawler",
"regular_user",
"unknown",
]
def generate_category_scores():
"""Generate random category scores."""
scores = {
"attacker": random.randint(0, 100),
"good_crawler": random.randint(0, 100),
"bad_crawler": random.randint(0, 100),
"regular_user": random.randint(0, 100),
"unknown": random.randint(0, 100),
}
return scores
def generate_analyzed_metrics():
"""Generate random analyzed metrics."""
return {
"request_frequency": random.uniform(0.1, 100.0),
"suspicious_patterns": random.randint(0, 20),
"credential_attempts": random.randint(0, 10),
"attack_diversity": random.uniform(0, 1.0),
}
def cleanup_database(db_manager, app_logger):
"""
Clean up all existing test data from the database.
Args:
db_manager: Database manager instance
app_logger: Logger instance
"""
from models import (
AccessLog,
CredentialAttempt,
AttackDetection,
IpStats,
CategoryHistory,
)
app_logger.info("=" * 60)
app_logger.info("Cleaning up existing database data")
app_logger.info("=" * 60)
session = db_manager.session
try:
# Delete all records from each table
deleted_attack_detections = session.query(AttackDetection).delete()
deleted_access_logs = session.query(AccessLog).delete()
deleted_credentials = session.query(CredentialAttempt).delete()
deleted_category_history = session.query(CategoryHistory).delete()
deleted_ip_stats = session.query(IpStats).delete()
session.commit()
app_logger.info(f"Deleted {deleted_access_logs} access logs")
app_logger.info(f"Deleted {deleted_attack_detections} attack detections")
app_logger.info(f"Deleted {deleted_credentials} credential attempts")
app_logger.info(f"Deleted {deleted_category_history} category history records")
app_logger.info(f"Deleted {deleted_ip_stats} IP statistics")
app_logger.info("✓ Database cleanup complete")
except Exception as e:
session.rollback()
app_logger.error(f"Error during database cleanup: {e}")
raise
finally:
db_manager.close_session()
def fetch_geolocation_from_api(ip: str, app_logger) -> tuple:
"""
Fetch geolocation data using ip-api.com.
Args:
ip: IP address to lookup
app_logger: Logger instance
Returns:
Tuple of (country_code, city, asn, asn_org) or None if failed
"""
try:
geoloc_data = extract_geolocation_from_ip(ip)
if geoloc_data:
country_code = geoloc_data.get("country_code")
city = geoloc_data.get("city")
asn = geoloc_data.get("asn")
asn_org = geoloc_data.get("org")
return (country_code, city, asn, asn_org)
except requests.RequestException as e:
app_logger.warning(f"Failed to fetch geolocation for {ip}: {e}")
except Exception as e:
app_logger.error(f"Error processing geolocation for {ip}: {e}")
return None
def generate_fake_data(
num_ips: int = 20,
logs_per_ip: int = 15,
credentials_per_ip: int = 3,
include_good_crawlers: bool = True,
cleanup: bool = True,
):
"""
Generate and insert fake test data into the database.
Args:
num_ips: Number of unique fake IPs to generate (default: 20)
logs_per_ip: Number of access logs per IP (default: 15)
credentials_per_ip: Number of credential attempts per IP (default: 3)
include_good_crawlers: Whether to add real good crawler IPs with API-fetched geolocation (default: True)
cleanup: Whether to clean up existing database data before generating new data (default: True)
"""
db_manager = get_database()
app_logger = get_app_logger()
# Ensure database is initialized
if not db_manager._initialized:
db_manager.initialize()
# Clean up existing data if requested
if cleanup:
cleanup_database(db_manager, app_logger)
print() # Add blank line for readability
app_logger.info("=" * 60)
app_logger.info("Starting fake IP data generation for testing")
app_logger.info("=" * 60)
total_logs = 0
total_credentials = 0
total_attacks = 0
total_category_changes = 0
# Select random IPs from the pool
selected_ips = random.sample(FAKE_IPS, min(num_ips, len(FAKE_IPS)))
# Create a varied distribution of request counts for better visualization
# Some IPs with very few requests, some with moderate, some with high
request_counts = []
for i in range(len(selected_ips)):
if i < len(selected_ips) // 5: # 20% high-traffic IPs
count = random.randint(100, 1000)
elif i < len(selected_ips) // 2: # 30% medium-traffic IPs
count = random.randint(10, 100)
else: # 50% low-traffic IPs
count = random.randint(5, 100)
request_counts.append(count)
random.shuffle(request_counts) # Randomize the order
for idx, ip in enumerate(selected_ips):
current_logs_count = request_counts[idx]
app_logger.info(
f"\nGenerating data for IP: {ip} ({current_logs_count} requests)"
)
# Generate access logs for this IP
for _ in range(current_logs_count):
path = random.choice(FAKE_PATHS)
user_agent = random.choice(FAKE_USER_AGENTS)
is_suspicious = random.choice(
[True, False, False]
) # 33% chance of suspicious
is_honeypot = random.choice(
[True, False, False, False]
) # 25% chance of honeypot trigger
# Randomly decide if this log has attack detections
attack_types = None
if random.choice([True, False, False]): # 33% chance
num_attacks = random.randint(1, 3)
attack_types = random.sample(ATTACK_TYPES, num_attacks)
log_id = db_manager.persist_access(
ip=ip,
path=path,
user_agent=user_agent,
method=random.choice(["GET", "POST"]),
is_suspicious=is_suspicious,
is_honeypot_trigger=is_honeypot,
attack_types=attack_types,
)
if log_id:
total_logs += 1
if attack_types:
total_attacks += len(attack_types)
# Generate credential attempts for this IP
for _ in range(credentials_per_ip):
username, password = random.choice(FAKE_CREDENTIALS)
path = random.choice(["/login", "/admin/login", "/api/auth"])
cred_id = db_manager.persist_credential(
ip=ip,
path=path,
username=username,
password=password,
)
if cred_id:
total_credentials += 1
app_logger.info(f" ✓ Generated {current_logs_count} access logs")
app_logger.info(f" ✓ Generated {credentials_per_ip} credential attempts")
# Fetch geolocation data from API
app_logger.info(f" 🌍 Fetching geolocation from API...")
geo_data = fetch_geolocation_from_api(ip, app_logger)
if geo_data:
country_code, city, asn, asn_org = geo_data
db_manager.update_ip_rep_infos(
ip=ip,
country_code=country_code,
asn=asn if asn else 12345,
asn_org=asn_org or "Unknown",
list_on={},
city=city,
)
location_display = (
f"{city}, {country_code}" if city else country_code or "Unknown"
)
app_logger.info(
f" 📍 API-fetched geolocation: {location_display} ({asn_org or 'Unknown'})"
)
else:
app_logger.warning(f" ⚠ Could not fetch geolocation for {ip}")
# Small delay to be nice to the API
time.sleep(0.5)
# Trigger behavior/category changes to demonstrate timeline feature
# First analysis
initial_category = random.choice(CATEGORIES)
app_logger.info(
f" ⟳ Analyzing behavior - Initial category: {initial_category}"
)
db_manager.update_ip_stats_analysis(
ip=ip,
analyzed_metrics=generate_analyzed_metrics(),
category=initial_category,
category_scores=generate_category_scores(),
last_analysis=datetime.now(tz=ZoneInfo("UTC")),
)
total_category_changes += 1
# Small delay to ensure timestamps are different
time.sleep(0.1)
# Second analysis with potential category change (70% chance)
if random.random() < 0.7:
new_category = random.choice(
[c for c in CATEGORIES if c != initial_category]
)
app_logger.info(
f" ⟳ Behavior change detected: {initial_category}{new_category}"
)
db_manager.update_ip_stats_analysis(
ip=ip,
analyzed_metrics=generate_analyzed_metrics(),
category=new_category,
category_scores=generate_category_scores(),
last_analysis=datetime.now(tz=ZoneInfo("UTC")),
)
total_category_changes += 1
# Optional third change (40% chance)
if random.random() < 0.4:
final_category = random.choice(
[c for c in CATEGORIES if c != new_category]
)
app_logger.info(
f" ⟳ Another behavior change: {new_category}{final_category}"
)
time.sleep(0.1)
db_manager.update_ip_stats_analysis(
ip=ip,
analyzed_metrics=generate_analyzed_metrics(),
category=final_category,
category_scores=generate_category_scores(),
last_analysis=datetime.now(tz=ZoneInfo("UTC")),
)
total_category_changes += 1
# Add good crawler IPs with real geolocation from API
total_good_crawlers = 0
if include_good_crawlers:
app_logger.info("\n" + "=" * 60)
app_logger.info("Adding Good Crawler IPs with API-fetched geolocation")
app_logger.info("=" * 60)
for crawler_ip in GOOD_CRAWLER_IPS:
app_logger.info(f"\nProcessing Good Crawler: {crawler_ip}")
# Fetch real geolocation from API
geo_data = fetch_geolocation_from_api(crawler_ip, app_logger)
# Don't generate access logs for good crawlers to prevent re-categorization
# We'll just create the IP stats entry with the category set
app_logger.info(
f" ✓ Adding as good crawler (no logs to prevent re-categorization)"
)
# First, we need to create the IP in the database via persist_access
# (but we'll only create one minimal log entry)
db_manager.persist_access(
ip=crawler_ip,
path="/robots.txt", # Minimal, normal crawler behavior
user_agent="Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
method="GET",
is_suspicious=False,
is_honeypot_trigger=False,
attack_types=None,
)
# Add geolocation if API fetch was successful
if geo_data:
country_code, city, asn, asn_org = geo_data
db_manager.update_ip_rep_infos(
ip=crawler_ip,
country_code=country_code,
asn=asn if asn else 12345,
asn_org=asn_org,
list_on={},
city=city,
)
app_logger.info(
f" 📍 API-fetched geolocation: {city}, {country_code} ({asn_org})"
)
else:
app_logger.warning(f" ⚠ Could not fetch geolocation for {crawler_ip}")
# Set category to good_crawler - this sets manual_category=True to prevent re-analysis
db_manager.update_ip_stats_analysis(
ip=crawler_ip,
analyzed_metrics={
"request_frequency": 0.1, # Very low frequency
"suspicious_patterns": 0,
"credential_attempts": 0,
"attack_diversity": 0.0,
},
category="good_crawler",
category_scores={
"attacker": 0,
"good_crawler": 100,
"bad_crawler": 0,
"regular_user": 0,
"unknown": 0,
},
last_analysis=datetime.now(tz=ZoneInfo("UTC")),
)
total_good_crawlers += 1
time.sleep(0.5) # Small delay between API calls
# Print summary
app_logger.info("\n" + "=" * 60)
app_logger.info("Test Data Generation Complete!")
app_logger.info("=" * 60)
app_logger.info(f"Total IPs created: {len(selected_ips) + total_good_crawlers}")
app_logger.info(f" - Attackers/Mixed: {len(selected_ips)}")
app_logger.info(f" - Good Crawlers: {total_good_crawlers}")
app_logger.info(f"Total access logs: {total_logs}")
app_logger.info(f"Total attack detections: {total_attacks}")
app_logger.info(f"Total credential attempts: {total_credentials}")
app_logger.info(f"Total category changes: {total_category_changes}")
app_logger.info("=" * 60)
app_logger.info("\nYou can now view the dashboard with this test data.")
app_logger.info(
"The 'Behavior Timeline' will show category transitions for each IP."
)
app_logger.info(
"All IPs have API-fetched geolocation with reverse geocoded city names."
)
app_logger.info("Run: uvicorn app:app --app-dir src")
app_logger.info("=" * 60)
if __name__ == "__main__":
import sys
# Allow command-line arguments for customization
num_ips = int(sys.argv[1]) if len(sys.argv) > 1 else 20
logs_per_ip = int(sys.argv[2]) if len(sys.argv) > 2 else 15
credentials_per_ip = int(sys.argv[3]) if len(sys.argv) > 3 else 3
# Add --no-cleanup flag to skip database cleanup
cleanup = "--no-cleanup" not in sys.argv
generate_fake_data(
num_ips,
logs_per_ip,
credentials_per_ip,
include_good_crawlers=True,
cleanup=cleanup,
)