From 152682de2cfd41b6b43d064601b82e93d91abb25 Mon Sep 17 00:00:00 2001 From: Lorenzo Venerandi Date: Wed, 18 Feb 2026 00:02:44 +0100 Subject: [PATCH] code linted --- src/database.py | 10 ++++++---- src/routes/honeypot.py | 6 ++++-- src/tasks/db_retention.py | 4 +--- src/tasks/memory_cleanup.py | 4 +--- src/tracker.py | 4 +++- tests/test_insert_fake_ips.py | 4 ++-- 6 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/database.py b/src/database.py index 41d2200..3e04de5 100644 --- a/src/database.py +++ b/src/database.py @@ -1061,9 +1061,9 @@ class DatabaseManager: row = base.one() # Honeypot unique IPs (separate query for distinct on filtered subset) - hp_query = session.query( - func.count(distinct(AccessLog.ip)) - ).filter(AccessLog.is_honeypot_trigger == True) + hp_query = session.query(func.count(distinct(AccessLog.ip))).filter( + AccessLog.is_honeypot_trigger == True + ) hp_query = self._public_ip_filter(hp_query, AccessLog.ip, server_ip) honeypot_ips = hp_query.scalar() or 0 @@ -1653,7 +1653,9 @@ class DatabaseManager: ) # Get paginated access logs with attack detections - query = session.query(AccessLog).join(AttackDetection).distinct(AccessLog.id) + query = ( + session.query(AccessLog).join(AttackDetection).distinct(AccessLog.id) + ) if sort_by == "timestamp": query = query.order_by( diff --git a/src/routes/honeypot.py b/src/routes/honeypot.py index 381594b..6db1c65 100644 --- a/src/routes/honeypot.py +++ b/src/routes/honeypot.py @@ -41,7 +41,6 @@ from deception_responses import ( from wordlists import get_wordlists from logger import get_app_logger, get_access_logger, get_credential_logger - # --- Auto-tracking dependency --- # Records requests that match attack patterns or honeypot trap paths. @@ -63,6 +62,7 @@ async def _track_honeypot_request(request: Request): if body: import urllib.parse + decoded_body = urllib.parse.unquote(body) attack_findings.extend(tracker.detect_attack_type(decoded_body)) @@ -401,7 +401,9 @@ async def trap_page(request: Request, path: str): # Record access unless the router dependency already handled it # (attack pattern or honeypot path → already recorded by _track_honeypot_request) - if not tracker.detect_attack_type(full_path) and not tracker.is_honeypot_path(full_path): + if not tracker.detect_attack_type(full_path) and not tracker.is_honeypot_path( + full_path + ): tracker.record_access( ip=client_ip, path=full_path, diff --git a/src/tasks/db_retention.py b/src/tasks/db_retention.py index bcfe1df..b4feaa7 100644 --- a/src/tasks/db_retention.py +++ b/src/tasks/db_retention.py @@ -42,9 +42,7 @@ def main(): cutoff = datetime.now() - timedelta(days=retention_days) # Delete attack detections linked to old access logs first (FK constraint) - old_log_ids = session.query(AccessLog.id).filter( - AccessLog.timestamp < cutoff - ) + old_log_ids = session.query(AccessLog.id).filter(AccessLog.timestamp < cutoff) detections_deleted = ( session.query(AttackDetection) .filter(AttackDetection.access_log_id.in_(old_log_ids)) diff --git a/src/tasks/memory_cleanup.py b/src/tasks/memory_cleanup.py index 49474d5..ac9af92 100644 --- a/src/tasks/memory_cleanup.py +++ b/src/tasks/memory_cleanup.py @@ -40,9 +40,7 @@ def main(): stats_after = tracker.get_memory_stats() - visits_reduced = ( - stats_before["ip_page_visits"] - stats_after["ip_page_visits"] - ) + visits_reduced = stats_before["ip_page_visits"] - stats_after["ip_page_visits"] if visits_reduced > 0: app_logger.info( diff --git a/src/tracker.py b/src/tracker.py index aae0d08..292ebba 100644 --- a/src/tracker.py +++ b/src/tracker.py @@ -573,7 +573,9 @@ class AccessTracker: try: ban_time = datetime.fromisoformat(ban_timestamp) time_diff = (current_time - ban_time).total_seconds() - effective_duration = self.ban_duration_seconds * data.get("ban_multiplier", 1) + effective_duration = self.ban_duration_seconds * data.get( + "ban_multiplier", 1 + ) if time_diff > effective_duration: data["count"] = 0 data["ban_timestamp"] = None diff --git a/tests/test_insert_fake_ips.py b/tests/test_insert_fake_ips.py index 3f3735f..1eba765 100644 --- a/tests/test_insert_fake_ips.py +++ b/tests/test_insert_fake_ips.py @@ -243,13 +243,13 @@ def fetch_geolocation_from_api(ip: str, app_logger) -> tuple: """ try: geoloc_data = extract_geolocation_from_ip(ip) - + if geoloc_data: country_code = geoloc_data.get("country_code") city = geoloc_data.get("city") asn = geoloc_data.get("asn") asn_org = geoloc_data.get("org") - + return (country_code, city, asn, asn_org) except requests.RequestException as e: app_logger.warning(f"Failed to fetch geolocation for {ip}: {e}")