refactor: streamline memory management by removing unused variables and enhancing cleanup logic

This commit is contained in:
Lorenzo Venerandi
2026-02-17 18:09:35 +01:00
parent b94cd38b61
commit 846fba631f
3 changed files with 8 additions and 100 deletions

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
from typing import Dict, List, Tuple, Optional
from typing import Dict, Tuple, Optional
from collections import defaultdict
from datetime import datetime
from zoneinfo import ZoneInfo
@@ -9,7 +9,6 @@ import urllib.parse
from wordlists import get_wordlists
from database import get_database, DatabaseManager
from ip_utils import is_local_or_private_ip, is_valid_public_ip
# Module-level singleton for background task access
_tracker_instance: "AccessTracker | None" = None
@@ -49,12 +48,6 @@ class AccessTracker:
"""
self.max_pages_limit = max_pages_limit
self.ban_duration_seconds = ban_duration_seconds
self.access_log: List[Dict] = []
self.credential_attempts: List[Dict] = []
# Memory limits for in-memory lists (prevents unbounded growth)
self.max_access_log_size = 10_000 # Keep only recent 10k accesses
self.max_credential_log_size = 5_000 # Keep only recent 5k attempts
# Track pages visited by each IP (for good crawler limiting)
self.ip_page_visits: Dict[str, Dict[str, object]] = defaultdict(dict)
@@ -213,23 +206,6 @@ class AccessTracker:
if server_ip and ip == server_ip:
return
# In-memory storage for dashboard
self.credential_attempts.append(
{
"ip": ip,
"path": path,
"username": username,
"password": password,
"timestamp": datetime.now().isoformat(),
}
)
# Trim if exceeding max size (prevent unbounded growth)
if len(self.credential_attempts) > self.max_credential_log_size:
self.credential_attempts = self.credential_attempts[
-self.max_credential_log_size :
]
# Persist to database
if self.db:
try:
@@ -288,22 +264,6 @@ class AccessTracker:
is_honeypot = self.is_honeypot_path(path)
# In-memory storage for dashboard
self.access_log.append(
{
"ip": ip,
"path": path,
"user_agent": user_agent,
"suspicious": is_suspicious,
"honeypot_triggered": self.is_honeypot_path(path),
"attack_types": attack_findings,
"timestamp": datetime.now().isoformat(),
}
)
# Trim if exceeding max size (prevent unbounded growth)
if len(self.access_log) > self.max_access_log_size:
self.access_log = self.access_log[-self.max_access_log_size :]
# Persist to database
if self.db:
try:
@@ -581,25 +541,6 @@ class AccessTracker:
except Exception:
return 0
def get_suspicious_accesses(self, limit: int = 20) -> List[Dict]:
"""Get recent suspicious accesses (excludes local/private IPs)"""
suspicious = [
log
for log in self.access_log
if log.get("suspicious", False)
and not is_local_or_private_ip(log.get("ip", ""))
]
return suspicious[-limit:]
def get_attack_type_accesses(self, limit: int = 20) -> List[Dict]:
"""Get recent accesses with detected attack types (excludes local/private IPs)"""
attacks = [
log
for log in self.access_log
if log.get("attack_types") and not is_local_or_private_ip(log.get("ip", ""))
]
return attacks[-limit:]
def get_stats(self) -> Dict:
"""Get statistics summary from database."""
if not self.db:
@@ -623,22 +564,7 @@ class AccessTracker:
"""
Clean up in-memory structures to prevent unbounded growth.
Should be called periodically (e.g., every 5 minutes).
Trimming strategy:
- Keep most recent N entries in logs
- Remove oldest entries when limit exceeded
- Clean expired ban entries from ip_page_visits
"""
# Trim access_log to max size (keep most recent)
if len(self.access_log) > self.max_access_log_size:
self.access_log = self.access_log[-self.max_access_log_size :]
# Trim credential_attempts to max size (keep most recent)
if len(self.credential_attempts) > self.max_credential_log_size:
self.credential_attempts = self.credential_attempts[
-self.max_credential_log_size :
]
# Clean expired ban entries from ip_page_visits
current_time = datetime.now()
for ip, data in self.ip_page_visits.items():
@@ -671,7 +597,5 @@ class AccessTracker:
Dictionary with counts of in-memory items
"""
return {
"access_log_size": len(self.access_log),
"credential_attempts_size": len(self.credential_attempts),
"unique_ip_page_visits": len(self.ip_page_visits),
"ip_page_visits": len(self.ip_page_visits),
}