code linted

This commit is contained in:
Lorenzo Venerandi
2026-02-18 00:02:44 +01:00
parent cccc9cde35
commit 152682de2c
6 changed files with 17 additions and 15 deletions

View File

@@ -1061,9 +1061,9 @@ class DatabaseManager:
row = base.one() row = base.one()
# Honeypot unique IPs (separate query for distinct on filtered subset) # Honeypot unique IPs (separate query for distinct on filtered subset)
hp_query = session.query( hp_query = session.query(func.count(distinct(AccessLog.ip))).filter(
func.count(distinct(AccessLog.ip)) AccessLog.is_honeypot_trigger == True
).filter(AccessLog.is_honeypot_trigger == True) )
hp_query = self._public_ip_filter(hp_query, AccessLog.ip, server_ip) hp_query = self._public_ip_filter(hp_query, AccessLog.ip, server_ip)
honeypot_ips = hp_query.scalar() or 0 honeypot_ips = hp_query.scalar() or 0
@@ -1653,7 +1653,9 @@ class DatabaseManager:
) )
# Get paginated access logs with attack detections # Get paginated access logs with attack detections
query = session.query(AccessLog).join(AttackDetection).distinct(AccessLog.id) query = (
session.query(AccessLog).join(AttackDetection).distinct(AccessLog.id)
)
if sort_by == "timestamp": if sort_by == "timestamp":
query = query.order_by( query = query.order_by(

View File

@@ -41,7 +41,6 @@ from deception_responses import (
from wordlists import get_wordlists from wordlists import get_wordlists
from logger import get_app_logger, get_access_logger, get_credential_logger from logger import get_app_logger, get_access_logger, get_credential_logger
# --- Auto-tracking dependency --- # --- Auto-tracking dependency ---
# Records requests that match attack patterns or honeypot trap paths. # Records requests that match attack patterns or honeypot trap paths.
@@ -63,6 +62,7 @@ async def _track_honeypot_request(request: Request):
if body: if body:
import urllib.parse import urllib.parse
decoded_body = urllib.parse.unquote(body) decoded_body = urllib.parse.unquote(body)
attack_findings.extend(tracker.detect_attack_type(decoded_body)) attack_findings.extend(tracker.detect_attack_type(decoded_body))
@@ -401,7 +401,9 @@ async def trap_page(request: Request, path: str):
# Record access unless the router dependency already handled it # Record access unless the router dependency already handled it
# (attack pattern or honeypot path → already recorded by _track_honeypot_request) # (attack pattern or honeypot path → already recorded by _track_honeypot_request)
if not tracker.detect_attack_type(full_path) and not tracker.is_honeypot_path(full_path): if not tracker.detect_attack_type(full_path) and not tracker.is_honeypot_path(
full_path
):
tracker.record_access( tracker.record_access(
ip=client_ip, ip=client_ip,
path=full_path, path=full_path,

View File

@@ -42,9 +42,7 @@ def main():
cutoff = datetime.now() - timedelta(days=retention_days) cutoff = datetime.now() - timedelta(days=retention_days)
# Delete attack detections linked to old access logs first (FK constraint) # Delete attack detections linked to old access logs first (FK constraint)
old_log_ids = session.query(AccessLog.id).filter( old_log_ids = session.query(AccessLog.id).filter(AccessLog.timestamp < cutoff)
AccessLog.timestamp < cutoff
)
detections_deleted = ( detections_deleted = (
session.query(AttackDetection) session.query(AttackDetection)
.filter(AttackDetection.access_log_id.in_(old_log_ids)) .filter(AttackDetection.access_log_id.in_(old_log_ids))

View File

@@ -40,9 +40,7 @@ def main():
stats_after = tracker.get_memory_stats() stats_after = tracker.get_memory_stats()
visits_reduced = ( visits_reduced = stats_before["ip_page_visits"] - stats_after["ip_page_visits"]
stats_before["ip_page_visits"] - stats_after["ip_page_visits"]
)
if visits_reduced > 0: if visits_reduced > 0:
app_logger.info( app_logger.info(

View File

@@ -573,7 +573,9 @@ class AccessTracker:
try: try:
ban_time = datetime.fromisoformat(ban_timestamp) ban_time = datetime.fromisoformat(ban_timestamp)
time_diff = (current_time - ban_time).total_seconds() time_diff = (current_time - ban_time).total_seconds()
effective_duration = self.ban_duration_seconds * data.get("ban_multiplier", 1) effective_duration = self.ban_duration_seconds * data.get(
"ban_multiplier", 1
)
if time_diff > effective_duration: if time_diff > effective_duration:
data["count"] = 0 data["count"] = 0
data["ban_timestamp"] = None data["ban_timestamp"] = None

View File

@@ -243,13 +243,13 @@ def fetch_geolocation_from_api(ip: str, app_logger) -> tuple:
""" """
try: try:
geoloc_data = extract_geolocation_from_ip(ip) geoloc_data = extract_geolocation_from_ip(ip)
if geoloc_data: if geoloc_data:
country_code = geoloc_data.get("country_code") country_code = geoloc_data.get("country_code")
city = geoloc_data.get("city") city = geoloc_data.get("city")
asn = geoloc_data.get("asn") asn = geoloc_data.get("asn")
asn_org = geoloc_data.get("org") asn_org = geoloc_data.get("org")
return (country_code, city, asn, asn_org) return (country_code, city, asn, asn_org)
except requests.RequestException as e: except requests.RequestException as e:
app_logger.warning(f"Failed to fetch geolocation for {ip}: {e}") app_logger.warning(f"Failed to fetch geolocation for {ip}: {e}")