From f7b9ee54e3c56100785f127b80b860307c9e9de0 Mon Sep 17 00:00:00 2001 From: Phillip Tarrant Date: Sat, 10 Jan 2026 14:59:15 -0600 Subject: [PATCH 1/2] Fix SQLite "database is locked" errors in fetch_ip_rep task - Add missing session.commit() and cleanup to update_ip_rep_infos() **bugfix** - Enable SQLite WAL mode and 30s busy timeout for better concurrency **race condition prevention** - Add get_unenriched_ips() method to only fetch IPs needing enrichment **don't enrich what's already done* - Rewrite fetch_ip_rep task to process only unenriched IPs (limit 50) **API kindness** - Change task frequency from every 1 minute to every 5 minutes **API kindness** - Add request timeout (10s) and proper error handling **can update to longer if needed** --- src/database.py | 52 +++++++++++++++++++++++----- src/tasks/fetch_ip_rep.py | 71 ++++++++++++++++++--------------------- 2 files changed, 75 insertions(+), 48 deletions(-) diff --git a/src/database.py b/src/database.py index 59d7072..5d41e2c 100644 --- a/src/database.py +++ b/src/database.py @@ -11,8 +11,18 @@ from datetime import datetime from typing import Optional, List, Dict, Any from zoneinfo import ZoneInfo -from sqlalchemy import create_engine, func, distinct, case +from sqlalchemy import create_engine, func, distinct, case, event from sqlalchemy.orm import sessionmaker, scoped_session, Session +from sqlalchemy.engine import Engine + + +@event.listens_for(Engine, "connect") +def set_sqlite_pragma(dbapi_connection, connection_record): + """Enable WAL mode and set busy timeout for SQLite connections.""" + cursor = dbapi_connection.cursor() + cursor.execute("PRAGMA journal_mode=WAL") + cursor.execute("PRAGMA busy_timeout=30000") + cursor.close() from models import Base, AccessLog, CredentialAttempt, AttackDetection, IpStats, CategoryHistory from sanitizer import ( @@ -359,18 +369,42 @@ class DatabaseManager: asn: IP address ASN asn_org: IP address ASN ORG list_on: public lists containing the IP address - + """ session = self.session - - sanitized_ip = sanitize_ip(ip) - ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first() + try: + sanitized_ip = sanitize_ip(ip) + ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first() + if ip_stats: + ip_stats.country_code = country_code + ip_stats.asn = asn + ip_stats.asn_org = asn_org + ip_stats.list_on = list_on + session.commit() + except Exception as e: + session.rollback() + raise + finally: + self.close_session() - ip_stats.country_code = country_code - ip_stats.asn = asn - ip_stats.asn_org = asn_org - ip_stats.list_on = list_on + def get_unenriched_ips(self, limit: int = 100) -> List[str]: + """ + Get IPs that don't have reputation data yet. + Args: + limit: Maximum number of IPs to return + + Returns: + List of IP addresses without reputation data + """ + session = self.session + try: + ips = session.query(IpStats.ip).filter( + IpStats.country_code.is_(None) + ).limit(limit).all() + return [ip[0] for ip in ips] + finally: + self.close_session() def get_access_logs( self, diff --git a/src/tasks/fetch_ip_rep.py b/src/tasks/fetch_ip_rep.py index 8171ae6..9a78ee6 100644 --- a/src/tasks/fetch_ip_rep.py +++ b/src/tasks/fetch_ip_rep.py @@ -1,13 +1,4 @@ -from sqlalchemy import select -from typing import Optional -from database import get_database, DatabaseManager -from zoneinfo import ZoneInfo -from pathlib import Path -from datetime import datetime, timedelta -import re -import urllib.parse -from wordlists import get_wordlists -from config import get_config +from database import get_database from logger import get_app_logger import requests from sanitizer import sanitize_for_storage, sanitize_dict @@ -18,42 +9,44 @@ from sanitizer import sanitize_for_storage, sanitize_dict TASK_CONFIG = { "name": "fetch-ip-rep", - "cron": "*/1 * * * *", + "cron": "*/5 * * * *", "enabled": True, "run_when_loaded": True } def main(): - - config = get_config() db_manager = get_database() app_logger = get_app_logger() - accesses = db_manager.get_access_logs(limit=999999999) - ips = {item['ip'] for item in accesses} + # Only get IPs that haven't been enriched yet + unenriched_ips = db_manager.get_unenriched_ips(limit=50) - for ip in ips: - api_url = "https://iprep.lcrawl.com/api/iprep/" - params = { - "cidr": ip - } - headers = { - "Content-Type": "application/json" - } - response = requests.get(api_url, headers=headers, params=params) - payload = response.json() - if payload["results"]: - data = payload["results"][0] - country_iso_code = data["geoip_data"]["country_iso_code"] - asn = data["geoip_data"]["asn_autonomous_system_number"] - asn_org = data["geoip_data"]["asn_autonomous_system_organization"] - list_on = data["list_on"] - sanitized_country_iso_code = sanitize_for_storage(country_iso_code, 3) - sanitized_asn = sanitize_for_storage(asn, 100) - sanitized_asn_org = sanitize_for_storage(asn_org, 100) - sanitized_list_on = sanitize_dict(list_on, 100000) - - db_manager.update_ip_rep_infos(ip, sanitized_country_iso_code, sanitized_asn, sanitized_asn_org, sanitized_list_on) - - return \ No newline at end of file + for ip in unenriched_ips: + try: + api_url = "https://iprep.lcrawl.com/api/iprep/" + params = {"cidr": ip} + headers = {"Content-Type": "application/json"} + response = requests.get(api_url, headers=headers, params=params, timeout=10) + payload = response.json() + + if payload.get("results"): + data = payload["results"][0] + country_iso_code = data["geoip_data"]["country_iso_code"] + asn = data["geoip_data"]["asn_autonomous_system_number"] + asn_org = data["geoip_data"]["asn_autonomous_system_organization"] + list_on = data["list_on"] + + sanitized_country_iso_code = sanitize_for_storage(country_iso_code, 3) + sanitized_asn = sanitize_for_storage(asn, 100) + sanitized_asn_org = sanitize_for_storage(asn_org, 100) + sanitized_list_on = sanitize_dict(list_on, 100000) + + db_manager.update_ip_rep_infos( + ip, sanitized_country_iso_code, sanitized_asn, + sanitized_asn_org, sanitized_list_on + ) + except requests.RequestException as e: + app_logger.warning(f"Failed to fetch IP rep for {ip}: {e}") + except Exception as e: + app_logger.error(f"Error processing IP {ip}: {e}") From 0e0639ba8b40d8cf6c934f334259a01f994083b6 Mon Sep 17 00:00:00 2001 From: Phillip Tarrant Date: Sat, 10 Jan 2026 15:14:17 -0600 Subject: [PATCH 2/2] adding requests to requirements --- requirements.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index cafbb7d..9ffdbc9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,6 @@ PyYAML>=6.0 SQLAlchemy>=2.0.0,<3.0.0 # Scheduling -APScheduler>=3.11.2 \ No newline at end of file +APScheduler>=3.11.2 + +requests>=2.32.5 \ No newline at end of file