Merge pull request #36 from BlessedRebuS/bug/db-locked-fix

Bug/db locked fix
This commit is contained in:
Patrick Di Fazio
2026-01-10 22:31:03 +01:00
committed by GitHub
3 changed files with 76 additions and 50 deletions

View File

@@ -10,5 +10,4 @@ SQLAlchemy>=2.0.0,<3.0.0
# Scheduling # Scheduling
APScheduler>=3.11.2 APScheduler>=3.11.2
# Utils requests>=2.32.5
requests

View File

@@ -11,8 +11,18 @@ from datetime import datetime
from typing import Optional, List, Dict, Any from typing import Optional, List, Dict, Any
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
from sqlalchemy import create_engine, func, distinct, case from sqlalchemy import create_engine, func, distinct, case, event
from sqlalchemy.orm import sessionmaker, scoped_session, Session from sqlalchemy.orm import sessionmaker, scoped_session, Session
from sqlalchemy.engine import Engine
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
"""Enable WAL mode and set busy timeout for SQLite connections."""
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA journal_mode=WAL")
cursor.execute("PRAGMA busy_timeout=30000")
cursor.close()
from models import Base, AccessLog, CredentialAttempt, AttackDetection, IpStats, CategoryHistory from models import Base, AccessLog, CredentialAttempt, AttackDetection, IpStats, CategoryHistory
from sanitizer import ( from sanitizer import (
@@ -362,15 +372,39 @@ class DatabaseManager:
""" """
session = self.session session = self.session
try:
sanitized_ip = sanitize_ip(ip)
ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
if ip_stats:
ip_stats.country_code = country_code
ip_stats.asn = asn
ip_stats.asn_org = asn_org
ip_stats.list_on = list_on
session.commit()
except Exception as e:
session.rollback()
raise
finally:
self.close_session()
sanitized_ip = sanitize_ip(ip) def get_unenriched_ips(self, limit: int = 100) -> List[str]:
ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first() """
Get IPs that don't have reputation data yet.
ip_stats.country_code = country_code Args:
ip_stats.asn = asn limit: Maximum number of IPs to return
ip_stats.asn_org = asn_org
ip_stats.list_on = list_on
Returns:
List of IP addresses without reputation data
"""
session = self.session
try:
ips = session.query(IpStats.ip).filter(
IpStats.country_code.is_(None)
).limit(limit).all()
return [ip[0] for ip in ips]
finally:
self.close_session()
def get_access_logs( def get_access_logs(
self, self,

View File

@@ -1,13 +1,4 @@
from sqlalchemy import select from database import get_database
from typing import Optional
from database import get_database, DatabaseManager
from zoneinfo import ZoneInfo
from pathlib import Path
from datetime import datetime, timedelta
import re
import urllib.parse
from wordlists import get_wordlists
from config import get_config
from logger import get_app_logger from logger import get_app_logger
import requests import requests
from sanitizer import sanitize_for_storage, sanitize_dict from sanitizer import sanitize_for_storage, sanitize_dict
@@ -18,42 +9,44 @@ from sanitizer import sanitize_for_storage, sanitize_dict
TASK_CONFIG = { TASK_CONFIG = {
"name": "fetch-ip-rep", "name": "fetch-ip-rep",
"cron": "*/1 * * * *", "cron": "*/5 * * * *",
"enabled": True, "enabled": True,
"run_when_loaded": True "run_when_loaded": True
} }
def main(): def main():
config = get_config()
db_manager = get_database() db_manager = get_database()
app_logger = get_app_logger() app_logger = get_app_logger()
accesses = db_manager.get_access_logs(limit=999999999) # Only get IPs that haven't been enriched yet
ips = {item['ip'] for item in accesses} unenriched_ips = db_manager.get_unenriched_ips(limit=50)
for ip in ips: for ip in unenriched_ips:
api_url = "https://iprep.lcrawl.com/api/iprep/" try:
params = { api_url = "https://iprep.lcrawl.com/api/iprep/"
"cidr": ip params = {"cidr": ip}
} headers = {"Content-Type": "application/json"}
headers = { response = requests.get(api_url, headers=headers, params=params, timeout=10)
"Content-Type": "application/json" payload = response.json()
}
response = requests.get(api_url, headers=headers, params=params)
payload = response.json()
if payload["results"]:
data = payload["results"][0]
country_iso_code = data["geoip_data"]["country_iso_code"]
asn = data["geoip_data"]["asn_autonomous_system_number"]
asn_org = data["geoip_data"]["asn_autonomous_system_organization"]
list_on = data["list_on"]
sanitized_country_iso_code = sanitize_for_storage(country_iso_code, 3)
sanitized_asn = sanitize_for_storage(asn, 100)
sanitized_asn_org = sanitize_for_storage(asn_org, 100)
sanitized_list_on = sanitize_dict(list_on, 100000)
db_manager.update_ip_rep_infos(ip, sanitized_country_iso_code, sanitized_asn, sanitized_asn_org, sanitized_list_on) if payload.get("results"):
data = payload["results"][0]
country_iso_code = data["geoip_data"]["country_iso_code"]
asn = data["geoip_data"]["asn_autonomous_system_number"]
asn_org = data["geoip_data"]["asn_autonomous_system_organization"]
list_on = data["list_on"]
return sanitized_country_iso_code = sanitize_for_storage(country_iso_code, 3)
sanitized_asn = sanitize_for_storage(asn, 100)
sanitized_asn_org = sanitize_for_storage(asn_org, 100)
sanitized_list_on = sanitize_dict(list_on, 100000)
db_manager.update_ip_rep_infos(
ip, sanitized_country_iso_code, sanitized_asn,
sanitized_asn_org, sanitized_list_on
)
except requests.RequestException as e:
app_logger.warning(f"Failed to fetch IP rep for {ip}: {e}")
except Exception as e:
app_logger.error(f"Error processing IP {ip}: {e}")