Merge branch 'dev' into feat/blocklist-api

This commit is contained in:
Patrick Di Fazio
2026-02-02 22:38:44 +01:00
committed by GitHub
8 changed files with 607 additions and 132 deletions

View File

@@ -145,7 +145,10 @@ This file can either be mounted from the Docker container into another system or
curl https://your-krawl-instance/<DASHBOARD-PATH>/api/download/malicious_ips.txt
```
This file can be used to [update a set of firewall rules](https://www.allthingstech.ch/using-opnsense-and-ip-blocklists-to-block-malicious-traffic), for example on OPNsense and pfSense, enabling automatic blocking of malicious IPs or using IPtables
This file enables automatic blocking of malicious traffic across various platforms. You can use it to update firewall rules on:
* [OPNsense and pfSense](https://www.allthingstech.ch/using-opnsense-and-ip-blocklists-to-block-malicious-traffic)
* [RouterOS](https://rentry.co/krawl-routeros)
* IPtables
## IP Reputation
Krawl [uses tasks that analyze recent traffic to build and continuously update an IP reputation](src/tasks/analyze_ips.py) score. It runs periodically and evaluates each active IP address based on multiple behavioral indicators to classify it as an attacker, crawler, or regular user. Thresholds are fully customizable.
@@ -180,6 +183,7 @@ location / {
## API
Krawl uses the following APIs
- http://ip-api.com (IP Data)
- https://iprep.lcrawl.com (IP Reputation)
- https://nominatim.openstreetmap.org/reverse (Reverse IP Lookup)
- https://api.ipify.org (Public IP discovery)
@@ -361,3 +365,4 @@ Use responsibly and in compliance with applicable laws and regulations.
## Star History
<img src="https://api.star-history.com/svg?repos=BlessedRebuS/Krawl&type=Date" width="600" alt="Star History Chart" />

View File

@@ -3,7 +3,7 @@ name: krawl-chart
description: A Helm chart for Krawl honeypot server
type: application
version: 1.0.0
appVersion: 1.0.0
appVersion: 1.0.1
keywords:
- honeypot
- security

View File

@@ -137,6 +137,39 @@ class DatabaseManager:
cursor.execute("ALTER TABLE ip_stats ADD COLUMN longitude REAL")
migrations_run.append("longitude")
# Add new geolocation columns
if "country" not in columns:
cursor.execute("ALTER TABLE ip_stats ADD COLUMN country VARCHAR(100)")
migrations_run.append("country")
if "region" not in columns:
cursor.execute("ALTER TABLE ip_stats ADD COLUMN region VARCHAR(2)")
migrations_run.append("region")
if "region_name" not in columns:
cursor.execute("ALTER TABLE ip_stats ADD COLUMN region_name VARCHAR(100)")
migrations_run.append("region_name")
if "timezone" not in columns:
cursor.execute("ALTER TABLE ip_stats ADD COLUMN timezone VARCHAR(50)")
migrations_run.append("timezone")
if "isp" not in columns:
cursor.execute("ALTER TABLE ip_stats ADD COLUMN isp VARCHAR(100)")
migrations_run.append("isp")
if "is_proxy" not in columns:
cursor.execute("ALTER TABLE ip_stats ADD COLUMN is_proxy BOOLEAN")
migrations_run.append("is_proxy")
if "is_hosting" not in columns:
cursor.execute("ALTER TABLE ip_stats ADD COLUMN is_hosting BOOLEAN")
migrations_run.append("is_hosting")
if "reverse" not in columns:
cursor.execute("ALTER TABLE ip_stats ADD COLUMN reverse VARCHAR(255)")
migrations_run.append("reverse")
if migrations_run:
conn.commit()
applogger.info(
@@ -377,7 +410,7 @@ class DatabaseManager:
) -> None:
"""
Internal method to record category changes in history.
Only records if there's an actual change from a previous category.
Records all category changes including initial categorization.
Args:
ip: IP address
@@ -385,11 +418,6 @@ class DatabaseManager:
new_category: New category
timestamp: When the change occurred
"""
# Don't record initial categorization (when old_category is None)
# Only record actual category changes
if old_category is None:
return
session = self.session
try:
history_entry = CategoryHistory(
@@ -445,6 +473,14 @@ class DatabaseManager:
city: Optional[str] = None,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
country: Optional[str] = None,
region: Optional[str] = None,
region_name: Optional[str] = None,
timezone: Optional[str] = None,
isp: Optional[str] = None,
reverse: Optional[str] = None,
is_proxy: Optional[bool] = None,
is_hosting: Optional[bool] = None,
) -> None:
"""
Update IP rep stats
@@ -458,6 +494,14 @@ class DatabaseManager:
city: City name (optional)
latitude: Latitude coordinate (optional)
longitude: Longitude coordinate (optional)
country: Full country name (optional)
region: Region code (optional)
region_name: Region name (optional)
timezone: Timezone (optional)
isp: Internet Service Provider (optional)
reverse: Reverse DNS lookup (optional)
is_proxy: Whether IP is a proxy (optional)
is_hosting: Whether IP is a hosting provider (optional)
"""
session = self.session
@@ -475,6 +519,22 @@ class DatabaseManager:
ip_stats.latitude = latitude
if longitude is not None:
ip_stats.longitude = longitude
if country:
ip_stats.country = country
if region:
ip_stats.region = region
if region_name:
ip_stats.region_name = region_name
if timezone:
ip_stats.timezone = timezone
if isp:
ip_stats.isp = isp
if reverse:
ip_stats.reverse = reverse
if is_proxy is not None:
ip_stats.is_proxy = is_proxy
if is_hosting is not None:
ip_stats.is_hosting = is_hosting
session.commit()
except Exception as e:
session.rollback()
@@ -714,8 +774,16 @@ class DatabaseManager:
"last_seen": stat.last_seen.isoformat() if stat.last_seen else None,
"country_code": stat.country_code,
"city": stat.city,
"country": stat.country,
"region": stat.region,
"region_name": stat.region_name,
"timezone": stat.timezone,
"isp": stat.isp,
"reverse": stat.reverse,
"asn": stat.asn,
"asn_org": stat.asn_org,
"is_proxy": stat.is_proxy,
"is_hosting": stat.is_hosting,
"list_on": stat.list_on or {},
"reputation_score": stat.reputation_score,
"reputation_source": stat.reputation_source,

View File

@@ -1,113 +1,124 @@
#!/usr/bin/env python3
"""
Geolocation utilities for reverse geocoding and city lookups.
Geolocation utilities for IP lookups using ip-api.com.
"""
import requests
from typing import Optional, Tuple
from typing import Optional, Dict, Any
from logger import get_app_logger
app_logger = get_app_logger()
# Simple city name cache to avoid repeated API calls
_city_cache = {}
# Cache for IP geolocation data to avoid repeated API calls
_geoloc_cache = {}
def reverse_geocode_city(latitude: float, longitude: float) -> Optional[str]:
def fetch_ip_geolocation(ip_address: str) -> Optional[Dict[str, Any]]:
"""
Reverse geocode coordinates to get city name using Nominatim (OpenStreetMap).
Fetch geolocation data for an IP address using ip-api.com.
Args:
latitude: Latitude coordinate
longitude: Longitude coordinate
ip_address: IP address to lookup
Returns:
City name or None if not found
Dictionary containing geolocation data or None if lookup fails
"""
# Check cache first
cache_key = f"{latitude},{longitude}"
if cache_key in _city_cache:
return _city_cache[cache_key]
if ip_address in _geoloc_cache:
return _geoloc_cache[ip_address]
# This is now replacing lcrawl to fetch IP data like latitude/longitude, city, etc...
try:
# Use Nominatim reverse geocoding API (free, no API key required)
url = "https://nominatim.openstreetmap.org/reverse"
# Use ip-api.com API for geolocation
url = f"http://ip-api.com/json/{ip_address}"
params = {
"lat": latitude,
"lon": longitude,
"format": "json",
"zoom": 10, # City level
"addressdetails": 1,
"fields": "status,message,country,countryCode,region,regionName,city,zip,lat,lon,timezone,isp,org,as,reverse,mobile,proxy,hosting,query"
}
headers = {"User-Agent": "Krawl-Honeypot/1.0"} # Required by Nominatim ToS
response = requests.get(url, params=params, headers=headers, timeout=5)
response = requests.get(url, params=params, timeout=5)
response.raise_for_status()
data = response.json()
address = data.get("address", {})
# Try to get city from various possible fields
city = (
address.get("city")
or address.get("town")
or address.get("village")
or address.get("municipality")
or address.get("county")
)
# Check if the API call was successful
if data.get("status") != "success":
app_logger.warning(f"IP lookup failed for {ip_address}: {data.get('message')}")
return None
# Cache the result
_city_cache[cache_key] = city
_geoloc_cache[ip_address] = data
if city:
app_logger.debug(f"Reverse geocoded {latitude},{longitude} to {city}")
return city
app_logger.debug(f"Fetched geolocation for {ip_address}")
return data
except requests.RequestException as e:
app_logger.warning(f"Reverse geocoding failed for {latitude},{longitude}: {e}")
app_logger.warning(f"Geolocation API call failed for {ip_address}: {e}")
return None
except Exception as e:
app_logger.error(f"Error in reverse geocoding: {e}")
app_logger.error(f"Error fetching geolocation for {ip_address}: {e}")
return None
def get_most_recent_geoip_data(results: list) -> Optional[dict]:
def extract_geolocation_from_ip(ip_address: str) -> Optional[Dict[str, Any]]:
"""
Extract the most recent geoip_data from API results.
Results are assumed to be sorted by record_added (most recent first).
Extract geolocation data for an IP address.
Args:
results: List of result dictionaries from IP reputation API
ip_address: IP address to lookup
Returns:
Most recent geoip_data dict or None
Dictionary with city, country, lat, lon, and other geolocation data or None
"""
if not results:
geoloc_data = fetch_ip_geolocation(ip_address)
if not geoloc_data:
return None
# The first result is the most recent (sorted by record_added)
most_recent = results[0]
return most_recent.get("geoip_data")
return {
"city": geoloc_data.get("city"),
"country": geoloc_data.get("country"),
"country_code": geoloc_data.get("countryCode"),
"region": geoloc_data.get("region"),
"region_name": geoloc_data.get("regionName"),
"latitude": geoloc_data.get("lat"),
"longitude": geoloc_data.get("lon"),
"timezone": geoloc_data.get("timezone"),
"isp": geoloc_data.get("isp"),
"org": geoloc_data.get("org"),
"reverse": geoloc_data.get("reverse"),
"is_proxy": geoloc_data.get("proxy"),
"is_hosting": geoloc_data.get("hosting"),
}
def extract_city_from_coordinates(geoip_data: dict) -> Optional[str]:
def fetch_blocklist_data(ip_address: str) -> Optional[Dict[str, Any]]:
"""
Extract city name from geoip_data using reverse geocoding.
Fetch blocklist data for an IP address using lcrawl API.
Args:
geoip_data: Dictionary containing location_latitude and location_longitude
ip_address: IP address to lookup
Returns:
City name or None
Dictionary containing blocklist information or None if lookup fails
"""
if not geoip_data:
return None
# This is now used only for ip reputation
try:
api_url = "https://iprep.lcrawl.com/api/iprep/"
params = {"cidr": ip_address}
headers = {"Content-Type": "application/json"}
response = requests.get(api_url, headers=headers, params=params, timeout=10)
latitude = geoip_data.get("location_latitude")
longitude = geoip_data.get("location_longitude")
if response.status_code == 200:
payload = response.json()
if payload.get("results"):
results = payload["results"]
# Get the most recent result (first in list, sorted by record_added)
most_recent = results[0]
list_on = most_recent.get("list_on", {})
if latitude is None or longitude is None:
return None
app_logger.debug(f"Fetched blocklist data for {ip_address}")
return list_on
except requests.RequestException as e:
app_logger.warning(f"Failed to fetch blocklist data for {ip_address}: {e}")
except Exception as e:
app_logger.error(f"Error processing blocklist data for {ip_address}: {e}")
return reverse_geocode_city(latitude, longitude)
return None

View File

@@ -162,12 +162,20 @@ class IpStats(Base):
# GeoIP fields (populated by future enrichment)
country_code: Mapped[Optional[str]] = mapped_column(String(2), nullable=True)
city: Mapped[Optional[str]] = mapped_column(String(MAX_CITY_LENGTH), nullable=True)
country: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
region: Mapped[Optional[str]] = mapped_column(String(2), nullable=True)
region_name: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
timezone: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
isp: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
reverse: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
latitude: Mapped[Optional[float]] = mapped_column(Float, nullable=True)
longitude: Mapped[Optional[float]] = mapped_column(Float, nullable=True)
asn: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
asn_org: Mapped[Optional[str]] = mapped_column(
String(MAX_ASN_ORG_LENGTH), nullable=True
)
is_proxy: Mapped[Optional[bool]] = mapped_column(Boolean, nullable=True)
is_hosting: Mapped[Optional[bool]] = mapped_column(Boolean, nullable=True)
list_on: Mapped[Optional[Dict[str, str]]] = mapped_column(JSON, nullable=True)
# Reputation fields (populated by future enrichment)

View File

@@ -2,7 +2,7 @@ from database import get_database
from logger import get_app_logger
import requests
from sanitizer import sanitize_for_storage, sanitize_dict
from geo_utils import get_most_recent_geoip_data, extract_city_from_coordinates
from geo_utils import extract_geolocation_from_ip, fetch_blocklist_data
# ----------------------
# TASK CONFIG
@@ -27,34 +27,49 @@ def main():
)
for ip in unenriched_ips:
try:
api_url = "https://iprep.lcrawl.com/api/iprep/"
params = {"cidr": ip}
headers = {"Content-Type": "application/json"}
response = requests.get(api_url, headers=headers, params=params, timeout=10)
payload = response.json()
# Fetch geolocation data using ip-api.com
geoloc_data = extract_geolocation_from_ip(ip)
if payload.get("results"):
results = payload["results"]
# Fetch blocklist data from lcrawl API
blocklist_data = fetch_blocklist_data(ip)
# Get the most recent result (first in list, sorted by record_added)
most_recent = results[0]
geoip_data = most_recent.get("geoip_data", {})
list_on = most_recent.get("list_on", {})
if geoloc_data:
# Extract fields from the new API response
country_iso_code = geoloc_data.get("country_code")
country = geoloc_data.get("country")
region = geoloc_data.get("region")
region_name = geoloc_data.get("region_name")
city = geoloc_data.get("city")
timezone = geoloc_data.get("timezone")
isp = geoloc_data.get("isp")
reverse = geoloc_data.get("reverse")
asn = geoloc_data.get("asn")
asn_org = geoloc_data.get("org")
latitude = geoloc_data.get("latitude")
longitude = geoloc_data.get("longitude")
is_proxy = geoloc_data.get("is_proxy", False)
is_hosting = geoloc_data.get("is_hosting", False)
# Extract standard fields
country_iso_code = geoip_data.get("country_iso_code")
asn = geoip_data.get("asn_autonomous_system_number")
asn_org = geoip_data.get("asn_autonomous_system_organization")
latitude = geoip_data.get("location_latitude")
longitude = geoip_data.get("location_longitude")
# Use blocklist data if available, otherwise create default with flags
if blocklist_data:
list_on = blocklist_data
else:
list_on = {}
# Extract city from coordinates using reverse geocoding
city = extract_city_from_coordinates(geoip_data)
# Add flags to list_on
list_on["is_proxy"] = is_proxy
list_on["is_hosting"] = is_hosting
sanitized_country_iso_code = sanitize_for_storage(country_iso_code, 3)
sanitized_country = sanitize_for_storage(country, 100)
sanitized_region = sanitize_for_storage(region, 2)
sanitized_region_name = sanitize_for_storage(region_name, 100)
sanitized_asn = sanitize_for_storage(asn, 100)
sanitized_asn_org = sanitize_for_storage(asn_org, 100)
sanitized_city = sanitize_for_storage(city, 100) if city else None
sanitized_timezone = sanitize_for_storage(timezone, 50)
sanitized_isp = sanitize_for_storage(isp, 100)
sanitized_reverse = sanitize_for_storage(reverse, 255) if reverse else None
sanitized_list_on = sanitize_dict(list_on, 100000)
db_manager.update_ip_rep_infos(
@@ -63,11 +78,19 @@ def main():
sanitized_asn,
sanitized_asn_org,
sanitized_list_on,
sanitized_city,
latitude,
longitude,
city=sanitized_city,
latitude=latitude,
longitude=longitude,
country=sanitized_country,
region=sanitized_region,
region_name=sanitized_region_name,
timezone=sanitized_timezone,
isp=sanitized_isp,
reverse=sanitized_reverse,
is_proxy=is_proxy,
is_hosting=is_hosting,
)
except requests.RequestException as e:
app_logger.warning(f"Failed to fetch IP rep for {ip}: {e}")
app_logger.warning(f"Failed to fetch geolocation for {ip}: {e}")
except Exception as e:
app_logger.error(f"Error processing IP {ip}: {e}")

View File

@@ -708,6 +708,330 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
max-height: 400px;
}}
/* Mobile Optimization - Tablets (768px and down) */
@media (max-width: 768px) {{
body {{
padding: 12px;
}}
.container {{
max-width: 100%;
}}
h1 {{
font-size: 24px;
margin-bottom: 20px;
}}
.github-logo {{
position: relative;
top: auto;
left: auto;
margin-bottom: 15px;
}}
.download-section {{
position: relative;
top: auto;
right: auto;
margin-bottom: 20px;
}}
.stats-grid {{
grid-template-columns: repeat(2, 1fr);
gap: 12px;
margin-bottom: 20px;
}}
.stat-value {{
font-size: 28px;
}}
.stat-card {{
padding: 15px;
}}
.table-container {{
padding: 12px;
margin-bottom: 15px;
overflow-x: auto;
}}
table {{
font-size: 13px;
}}
th, td {{
padding: 10px 6px;
}}
h2 {{
font-size: 18px;
}}
.tabs-container {{
gap: 0;
overflow-x: auto;
-webkit-overflow-scrolling: touch;
}}
.tab-button {{
padding: 10px 16px;
font-size: 12px;
}}
.ip-stats-dropdown {{
flex-direction: column;
gap: 15px;
}}
.stats-right {{
flex: 0 0 auto;
width: 100%;
}}
.radar-chart {{
width: 160px;
height: 160px;
}}
.timeline-container {{
flex-direction: column;
gap: 15px;
min-height: auto;
}}
.timeline-column {{
flex: 1 !important;
max-height: 300px;
}}
#attacker-map {{
height: 350px !important;
}}
.leaflet-popup-content {{
min-width: 200px !important;
}}
.ip-marker {{
font-size: 8px;
}}
.ip-detail-content {{
padding: 20px;
max-width: 95%;
max-height: 85vh;
}}
.download-btn {{
padding: 6px 12px;
font-size: 12px;
}}
}}
/* Mobile Optimization - Small phones (480px and down) */
@media (max-width: 480px) {{
body {{
padding: 8px;
}}
h1 {{
font-size: 20px;
margin-bottom: 15px;
}}
.stats-grid {{
grid-template-columns: 1fr;
gap: 10px;
margin-bottom: 15px;
}}
.stat-value {{
font-size: 24px;
}}
.stat-card {{
padding: 12px;
}}
.stat-label {{
font-size: 12px;
}}
.table-container {{
padding: 10px;
margin-bottom: 12px;
border-radius: 4px;
}}
table {{
font-size: 12px;
}}
th, td {{
padding: 8px 4px;
}}
th {{
position: relative;
}}
th.sortable::after {{
right: 4px;
font-size: 10px;
}}
h2 {{
font-size: 16px;
margin-bottom: 12px;
}}
.tabs-container {{
gap: 0;
}}
.tab-button {{
padding: 10px 12px;
font-size: 11px;
flex: 1;
}}
.ip-row {{
display: block;
margin-bottom: 10px;
background: #1c2128;
padding: 10px;
border-radius: 4px;
}}
.ip-row td {{
display: block;
padding: 4px 0;
border: none;
}}
.ip-row td::before {{
content: attr(data-label);
font-weight: bold;
color: #8b949e;
margin-right: 8px;
}}
.ip-clickable {{
display: inline-block;
}}
.ip-stats-dropdown {{
flex-direction: column;
gap: 12px;
font-size: 12px;
}}
.stats-left {{
flex: 1;
}}
.stats-right {{
flex: 0 0 auto;
width: 100%;
}}
.radar-chart {{
width: 140px;
height: 140px;
}}
.radar-legend {{
margin-top: 8px;
font-size: 10px;
}}
.stat-row {{
padding: 4px 0;
}}
.stat-label-sm {{
font-size: 12px;
}}
.stat-value-sm {{
font-size: 13px;
}}
.category-badge {{
padding: 3px 6px;
font-size: 10px;
}}
.timeline-container {{
flex-direction: column;
gap: 12px;
min-height: auto;
}}
.timeline-column {{
flex: 1 !important;
max-height: 250px;
font-size: 11px;
}}
.timeline-header {{
font-size: 12px;
margin-bottom: 8px;
}}
.timeline-item {{
padding-bottom: 10px;
font-size: 11px;
}}
.timeline-marker {{
left: -19px;
width: 12px;
height: 12px;
}}
.reputation-badge {{
display: block;
margin-bottom: 6px;
margin-right: 0;
font-size: 10px;
}}
#attacker-map {{
height: 300px !important;
}}
.leaflet-popup-content {{
min-width: 150px !important;
}}
.ip-marker {{
font-size: 7px;
}}
.ip-detail-modal {{
justify-content: flex-end;
align-items: flex-end;
}}
.ip-detail-content {{
padding: 15px;
max-width: 100%;
max-height: 90vh;
border-radius: 8px 8px 0 0;
width: 100%;
}}
.download-btn {{
padding: 6px 10px;
font-size: 11px;
}}
.github-logo {{
font-size: 12px;
}}
.github-logo svg {{
width: 24px;
height: 24px;
}}
}}
/* Landscape mode optimization */
@media (max-height: 600px) and (orientation: landscape) {{
body {{
padding: 8px;
}}
h1 {{
margin-bottom: 10px;
font-size: 18px;
}}
.stats-grid {{
margin-bottom: 10px;
gap: 8px;
}}
.stat-value {{
font-size: 20px;
}}
.stat-card {{
padding: 8px;
}}
#attacker-map {{
height: 250px !important;
}}
.ip-stats-dropdown {{
gap: 10px;
}}
.radar-chart {{
width: 120px;
height: 120px;
}}
}}
/* Touch-friendly optimizations */
@media (hover: none) and (pointer: coarse) {{
.ip-clickable {{
-webkit-user-select: none;
user-select: none;
-webkit-tap-highlight-color: rgba(88, 166, 255, 0.2);
}}
.tab-button {{
-webkit-user-select: none;
user-select: none;
-webkit-tap-highlight-color: rgba(88, 166, 255, 0.2);
padding: 14px 18px;
}}
.download-btn {{
-webkit-user-select: none;
user-select: none;
-webkit-tap-highlight-color: rgba(36, 134, 54, 0.3);
}}
input[type="checkbox"] {{
width: 18px;
height: 18px;
cursor: pointer;
}}
}}
</style>
</head>
<body>
@@ -1191,6 +1515,20 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
html += '</div>';
}}
if (stats.country) {{
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">Country:</span>';
html += `<span class="stat-value-sm">${{stats.country}}</span>`;
html += '</div>';
}}
if (stats.reverse) {{
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">Reverse DNS:</span>';
html += `<span class="stat-value-sm">${{stats.reverse}}</span>`;
html += '</div>';
}}
if (stats.asn_org) {{
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">ASN Org:</span>';
@@ -1198,6 +1536,19 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
html += '</div>';
}}
if (stats.is_proxy !== undefined || stats.is_hosting !== undefined) {{
const flags = [];
if (stats.is_proxy) flags.push('Proxy');
if (stats.is_hosting) flags.push('Hosting');
if (flags.length > 0) {{
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">Flags: <span title="Proxy: IP is using a proxy service. Hosting: IP is from a hosting/cloud provider" style="cursor: help; color: #58a6ff; font-weight: bold;">ⓘ</span></span>';
html += `<span class="stat-value-sm">${{flags.join(', ')}}</span>`;
html += '</div>';
}}
}}
if (stats.reputation_score !== null && stats.reputation_score !== undefined) {{
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">Reputation Score:</span>';
@@ -1235,8 +1586,6 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
if (change.old_category) {{
html += `<span class="category-badge ${{oldClass}}">${{change.old_category}}</span>`;
html += '<span style="color: #8b949e; margin: 0 4px;">→</span>';
}} else {{
html += '<span style="color: #8b949e;">Initial:</span>';
}}
html += `<span class="category-badge ${{newClass}}">${{change.new_category}}</span>`;
@@ -1252,16 +1601,35 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
html += '<div class="timeline-column">';
if (stats.list_on && Object.keys(stats.list_on).length > 0) {{
html += '<div class="timeline-header">Listed On</div>';
const sortedSources = Object.entries(stats.list_on).sort((a, b) => a[0].localeCompare(b[0]));
// Filter out is_hosting and is_proxy from the displayed list
const filteredList = Object.entries(stats.list_on).filter(([source, data]) =>
source !== 'is_hosting' && source !== 'is_proxy'
);
sortedSources.forEach(([source, url]) => {{
if (url && url !== 'N/A') {{
html += `<a href="${{url}}" target="_blank" rel="noopener noreferrer" class="reputation-badge" title="${{source}}">${{source}}</a>`;
}} else {{
html += `<span class="reputation-badge">${{source}}</span>`;
}}
}});
if (filteredList.length > 0) {{
html += '<div class="timeline-header">Listed On</div>';
const sortedSources = filteredList.sort((a, b) => a[0].localeCompare(b[0]));
sortedSources.forEach(([source, data]) => {{
// Handle both string URLs and nested object data
if (typeof data === 'string' && data !== 'N/A') {{
html += `<a href="${{data}}" target="_blank" rel="noopener noreferrer" class="reputation-badge" title="${{source}}">${{source}}</a>`;
}} else if (typeof data === 'object' && data !== null) {{
// For nested blocklist data, extract source_link if available
const sourceLink = data['__source_link'] || data.source_link;
if (sourceLink) {{
html += `<a href="${{sourceLink}}" target="_blank" rel="noopener noreferrer" class="reputation-badge" title="${{source}}">${{source}}</a>`;
}} else {{
html += `<span class="reputation-badge">${{source}}</span>`;
}}
}} else {{
html += `<span class="reputation-badge">${{source}}</span>`;
}}
}});
}} else if (stats.country_code || stats.asn) {{
html += '<div class="timeline-header">Reputation</div>';
html += '<span class="reputation-clean" title="Not found on public blacklists">✓ Clean</span>';
}}
}} else if (stats.country_code || stats.asn) {{
html += '<div class="timeline-header">Reputation</div>';
html += '<span class="reputation-clean" title="Not found on public blacklists">✓ Clean</span>';

View File

@@ -35,7 +35,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from database import get_database
from logger import get_app_logger
from geo_utils import extract_city_from_coordinates
from geo_utils import extract_geolocation_from_ip
# ----------------------
# TEST DATA GENERATORS
@@ -44,6 +44,12 @@ from geo_utils import extract_city_from_coordinates
# Fake IPs for testing - geolocation data will be fetched from API
# These are real public IPs from various locations around the world
FAKE_IPS = [
# VPN
"31.13.189.236",
"37.120.215.246",
"37.120.215.247",
"37.120.215.248",
"37.120.215.249",
# United States
"45.142.120.10",
"107.189.10.143",
@@ -226,8 +232,7 @@ def cleanup_database(db_manager, app_logger):
def fetch_geolocation_from_api(ip: str, app_logger) -> tuple:
"""
Fetch geolocation data from the IP reputation API.
Uses the most recent result and extracts city from coordinates.
Fetch geolocation data using ip-api.com.
Args:
ip: IP address to lookup
@@ -237,28 +242,15 @@ def fetch_geolocation_from_api(ip: str, app_logger) -> tuple:
Tuple of (country_code, city, asn, asn_org) or None if failed
"""
try:
api_url = "https://iprep.lcrawl.com/api/iprep/"
params = {"cidr": ip}
headers = {"Content-Type": "application/json"}
response = requests.get(api_url, headers=headers, params=params, timeout=10)
geoloc_data = extract_geolocation_from_ip(ip)
if response.status_code == 200:
payload = response.json()
if payload.get("results"):
results = payload["results"]
if geoloc_data:
country_code = geoloc_data.get("country_code")
city = geoloc_data.get("city")
asn = geoloc_data.get("asn")
asn_org = geoloc_data.get("org")
# Get the most recent result (first in list, sorted by record_added)
most_recent = results[0]
geoip_data = most_recent.get("geoip_data", {})
country_code = geoip_data.get("country_iso_code")
asn = geoip_data.get("asn_autonomous_system_number")
asn_org = geoip_data.get("asn_autonomous_system_organization")
# Extract city from coordinates using reverse geocoding
city = extract_city_from_coordinates(geoip_data)
return (country_code, city, asn, asn_org)
return (country_code, city, asn, asn_org)
except requests.RequestException as e:
app_logger.warning(f"Failed to fetch geolocation for {ip}: {e}")
except Exception as e:
@@ -313,9 +305,9 @@ def generate_fake_data(
request_counts = []
for i in range(len(selected_ips)):
if i < len(selected_ips) // 5: # 20% high-traffic IPs
count = random.randint(1000, 10000)
elif i < len(selected_ips) // 2: # 30% medium-traffic IPs
count = random.randint(100, 1000)
elif i < len(selected_ips) // 2: # 30% medium-traffic IPs
count = random.randint(10, 100)
else: # 50% low-traffic IPs
count = random.randint(5, 100)
request_counts.append(count)