Feat/attack map improvement (#58)
* Enhance geolocation functionality and improve unenriched IP retrieval logic * Refactor test_insert_fake_ips.py to enhance geolocation data handling and improve IP data structure * Refactor code for improved readability and consistency in database and geolocation utilities
This commit is contained in:
committed by
GitHub
parent
5aca684df9
commit
39d9d62247
@@ -11,7 +11,7 @@ from datetime import datetime, timedelta
|
||||
from typing import Optional, List, Dict, Any
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from sqlalchemy import create_engine, func, distinct, case, event
|
||||
from sqlalchemy import create_engine, func, distinct, case, event, or_
|
||||
from sqlalchemy.orm import sessionmaker, scoped_session, Session
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
@@ -432,21 +432,22 @@ class DatabaseManager:
|
||||
|
||||
def get_unenriched_ips(self, limit: int = 100) -> List[str]:
|
||||
"""
|
||||
Get IPs that don't have reputation data yet.
|
||||
Get IPs that don't have complete reputation data yet.
|
||||
Returns IPs without country_code OR without city data.
|
||||
Excludes RFC1918 private addresses and other non-routable IPs.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of IPs to return
|
||||
|
||||
Returns:
|
||||
List of IP addresses without reputation data
|
||||
List of IP addresses without complete reputation data
|
||||
"""
|
||||
session = self.session
|
||||
try:
|
||||
ips = (
|
||||
session.query(IpStats.ip)
|
||||
.filter(
|
||||
IpStats.country_code.is_(None),
|
||||
or_(IpStats.country_code.is_(None), IpStats.city.is_(None)),
|
||||
~IpStats.ip.like("10.%"),
|
||||
~IpStats.ip.like("172.16.%"),
|
||||
~IpStats.ip.like("172.17.%"),
|
||||
|
||||
113
src/geo_utils.py
Normal file
113
src/geo_utils.py
Normal file
@@ -0,0 +1,113 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Geolocation utilities for reverse geocoding and city lookups.
|
||||
"""
|
||||
|
||||
import requests
|
||||
from typing import Optional, Tuple
|
||||
from logger import get_app_logger
|
||||
|
||||
app_logger = get_app_logger()
|
||||
|
||||
# Simple city name cache to avoid repeated API calls
|
||||
_city_cache = {}
|
||||
|
||||
|
||||
def reverse_geocode_city(latitude: float, longitude: float) -> Optional[str]:
|
||||
"""
|
||||
Reverse geocode coordinates to get city name using Nominatim (OpenStreetMap).
|
||||
|
||||
Args:
|
||||
latitude: Latitude coordinate
|
||||
longitude: Longitude coordinate
|
||||
|
||||
Returns:
|
||||
City name or None if not found
|
||||
"""
|
||||
# Check cache first
|
||||
cache_key = f"{latitude},{longitude}"
|
||||
if cache_key in _city_cache:
|
||||
return _city_cache[cache_key]
|
||||
|
||||
try:
|
||||
# Use Nominatim reverse geocoding API (free, no API key required)
|
||||
url = "https://nominatim.openstreetmap.org/reverse"
|
||||
params = {
|
||||
"lat": latitude,
|
||||
"lon": longitude,
|
||||
"format": "json",
|
||||
"zoom": 10, # City level
|
||||
"addressdetails": 1,
|
||||
}
|
||||
headers = {"User-Agent": "Krawl-Honeypot/1.0"} # Required by Nominatim ToS
|
||||
|
||||
response = requests.get(url, params=params, headers=headers, timeout=5)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
address = data.get("address", {})
|
||||
|
||||
# Try to get city from various possible fields
|
||||
city = (
|
||||
address.get("city")
|
||||
or address.get("town")
|
||||
or address.get("village")
|
||||
or address.get("municipality")
|
||||
or address.get("county")
|
||||
)
|
||||
|
||||
# Cache the result
|
||||
_city_cache[cache_key] = city
|
||||
|
||||
if city:
|
||||
app_logger.debug(f"Reverse geocoded {latitude},{longitude} to {city}")
|
||||
|
||||
return city
|
||||
|
||||
except requests.RequestException as e:
|
||||
app_logger.warning(f"Reverse geocoding failed for {latitude},{longitude}: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
app_logger.error(f"Error in reverse geocoding: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def get_most_recent_geoip_data(results: list) -> Optional[dict]:
|
||||
"""
|
||||
Extract the most recent geoip_data from API results.
|
||||
Results are assumed to be sorted by record_added (most recent first).
|
||||
|
||||
Args:
|
||||
results: List of result dictionaries from IP reputation API
|
||||
|
||||
Returns:
|
||||
Most recent geoip_data dict or None
|
||||
"""
|
||||
if not results:
|
||||
return None
|
||||
|
||||
# The first result is the most recent (sorted by record_added)
|
||||
most_recent = results[0]
|
||||
return most_recent.get("geoip_data")
|
||||
|
||||
|
||||
def extract_city_from_coordinates(geoip_data: dict) -> Optional[str]:
|
||||
"""
|
||||
Extract city name from geoip_data using reverse geocoding.
|
||||
|
||||
Args:
|
||||
geoip_data: Dictionary containing location_latitude and location_longitude
|
||||
|
||||
Returns:
|
||||
City name or None
|
||||
"""
|
||||
if not geoip_data:
|
||||
return None
|
||||
|
||||
latitude = geoip_data.get("location_latitude")
|
||||
longitude = geoip_data.get("location_longitude")
|
||||
|
||||
if latitude is None or longitude is None:
|
||||
return None
|
||||
|
||||
return reverse_geocode_city(latitude, longitude)
|
||||
@@ -2,6 +2,7 @@ from database import get_database
|
||||
from logger import get_app_logger
|
||||
import requests
|
||||
from sanitizer import sanitize_for_storage, sanitize_dict
|
||||
from geo_utils import get_most_recent_geoip_data, extract_city_from_coordinates
|
||||
|
||||
# ----------------------
|
||||
# TASK CONFIG
|
||||
@@ -33,13 +34,20 @@ def main():
|
||||
payload = response.json()
|
||||
|
||||
if payload.get("results"):
|
||||
data = payload["results"][0]
|
||||
geoip_data = data["geoip_data"]
|
||||
results = payload["results"]
|
||||
|
||||
# Get the most recent result (first in list, sorted by record_added)
|
||||
most_recent = results[0]
|
||||
geoip_data = most_recent.get("geoip_data", {})
|
||||
list_on = most_recent.get("list_on", {})
|
||||
|
||||
# Extract standard fields
|
||||
country_iso_code = geoip_data.get("country_iso_code")
|
||||
asn = geoip_data.get("asn_autonomous_system_number")
|
||||
asn_org = geoip_data.get("asn_autonomous_system_organization")
|
||||
city = geoip_data.get("city_name") # Extract city name from API
|
||||
list_on = data["list_on"]
|
||||
|
||||
# Extract city from coordinates using reverse geocoding
|
||||
city = extract_city_from_coordinates(geoip_data)
|
||||
|
||||
sanitized_country_iso_code = sanitize_for_storage(country_iso_code, 3)
|
||||
sanitized_asn = sanitize_for_storage(asn, 100)
|
||||
@@ -53,7 +61,7 @@ def main():
|
||||
sanitized_asn,
|
||||
sanitized_asn_org,
|
||||
sanitized_list_on,
|
||||
sanitized_city, # Pass city to database
|
||||
sanitized_city,
|
||||
)
|
||||
except requests.RequestException as e:
|
||||
app_logger.warning(f"Failed to fetch IP rep for {ip}: {e}")
|
||||
|
||||
Reference in New Issue
Block a user