modified dashboard, added ip-api data fetch

This commit is contained in:
BlessedRebuS
2026-02-01 22:43:12 +01:00
parent c8d0ef3da1
commit 863fac251d
8 changed files with 601 additions and 130 deletions

View File

@@ -1,113 +1,124 @@
#!/usr/bin/env python3
"""
Geolocation utilities for reverse geocoding and city lookups.
Geolocation utilities for IP lookups using ip-api.com.
"""
import requests
from typing import Optional, Tuple
from typing import Optional, Dict, Any
from logger import get_app_logger
app_logger = get_app_logger()
# Simple city name cache to avoid repeated API calls
_city_cache = {}
# Cache for IP geolocation data to avoid repeated API calls
_geoloc_cache = {}
def reverse_geocode_city(latitude: float, longitude: float) -> Optional[str]:
def fetch_ip_geolocation(ip_address: str) -> Optional[Dict[str, Any]]:
"""
Reverse geocode coordinates to get city name using Nominatim (OpenStreetMap).
Fetch geolocation data for an IP address using ip-api.com.
Args:
latitude: Latitude coordinate
longitude: Longitude coordinate
ip_address: IP address to lookup
Returns:
City name or None if not found
Dictionary containing geolocation data or None if lookup fails
"""
# Check cache first
cache_key = f"{latitude},{longitude}"
if cache_key in _city_cache:
return _city_cache[cache_key]
if ip_address in _geoloc_cache:
return _geoloc_cache[ip_address]
# This is now replacing lcrawl to fetch IP data like latitude/longitude, city, etc...
try:
# Use Nominatim reverse geocoding API (free, no API key required)
url = "https://nominatim.openstreetmap.org/reverse"
# Use ip-api.com API for geolocation
url = f"http://ip-api.com/json/{ip_address}"
params = {
"lat": latitude,
"lon": longitude,
"format": "json",
"zoom": 10, # City level
"addressdetails": 1,
"fields": "status,message,country,countryCode,region,regionName,city,zip,lat,lon,timezone,isp,org,as,reverse,mobile,proxy,hosting,query"
}
headers = {"User-Agent": "Krawl-Honeypot/1.0"} # Required by Nominatim ToS
response = requests.get(url, params=params, headers=headers, timeout=5)
response = requests.get(url, params=params, timeout=5)
response.raise_for_status()
data = response.json()
address = data.get("address", {})
# Try to get city from various possible fields
city = (
address.get("city")
or address.get("town")
or address.get("village")
or address.get("municipality")
or address.get("county")
)
# Check if the API call was successful
if data.get("status") != "success":
app_logger.warning(f"IP lookup failed for {ip_address}: {data.get('message')}")
return None
# Cache the result
_city_cache[cache_key] = city
_geoloc_cache[ip_address] = data
if city:
app_logger.debug(f"Reverse geocoded {latitude},{longitude} to {city}")
return city
app_logger.debug(f"Fetched geolocation for {ip_address}")
return data
except requests.RequestException as e:
app_logger.warning(f"Reverse geocoding failed for {latitude},{longitude}: {e}")
app_logger.warning(f"Geolocation API call failed for {ip_address}: {e}")
return None
except Exception as e:
app_logger.error(f"Error in reverse geocoding: {e}")
app_logger.error(f"Error fetching geolocation for {ip_address}: {e}")
return None
def get_most_recent_geoip_data(results: list) -> Optional[dict]:
def extract_geolocation_from_ip(ip_address: str) -> Optional[Dict[str, Any]]:
"""
Extract the most recent geoip_data from API results.
Results are assumed to be sorted by record_added (most recent first).
Extract geolocation data for an IP address.
Args:
results: List of result dictionaries from IP reputation API
ip_address: IP address to lookup
Returns:
Most recent geoip_data dict or None
Dictionary with city, country, lat, lon, and other geolocation data or None
"""
if not results:
geoloc_data = fetch_ip_geolocation(ip_address)
if not geoloc_data:
return None
# The first result is the most recent (sorted by record_added)
most_recent = results[0]
return most_recent.get("geoip_data")
return {
"city": geoloc_data.get("city"),
"country": geoloc_data.get("country"),
"country_code": geoloc_data.get("countryCode"),
"region": geoloc_data.get("region"),
"region_name": geoloc_data.get("regionName"),
"latitude": geoloc_data.get("lat"),
"longitude": geoloc_data.get("lon"),
"timezone": geoloc_data.get("timezone"),
"isp": geoloc_data.get("isp"),
"org": geoloc_data.get("org"),
"reverse": geoloc_data.get("reverse"),
"is_proxy": geoloc_data.get("proxy"),
"is_hosting": geoloc_data.get("hosting"),
}
def extract_city_from_coordinates(geoip_data: dict) -> Optional[str]:
def fetch_blocklist_data(ip_address: str) -> Optional[Dict[str, Any]]:
"""
Extract city name from geoip_data using reverse geocoding.
Fetch blocklist data for an IP address using lcrawl API.
Args:
geoip_data: Dictionary containing location_latitude and location_longitude
ip_address: IP address to lookup
Returns:
City name or None
Dictionary containing blocklist information or None if lookup fails
"""
if not geoip_data:
return None
# This is now used only for ip reputation
try:
api_url = "https://iprep.lcrawl.com/api/iprep/"
params = {"cidr": ip_address}
headers = {"Content-Type": "application/json"}
response = requests.get(api_url, headers=headers, params=params, timeout=10)
latitude = geoip_data.get("location_latitude")
longitude = geoip_data.get("location_longitude")
if response.status_code == 200:
payload = response.json()
if payload.get("results"):
results = payload["results"]
# Get the most recent result (first in list, sorted by record_added)
most_recent = results[0]
list_on = most_recent.get("list_on", {})
app_logger.debug(f"Fetched blocklist data for {ip_address}")
return list_on
except requests.RequestException as e:
app_logger.warning(f"Failed to fetch blocklist data for {ip_address}: {e}")
except Exception as e:
app_logger.error(f"Error processing blocklist data for {ip_address}: {e}")
if latitude is None or longitude is None:
return None
return reverse_geocode_city(latitude, longitude)
return None