refactor: remove unused tracking variables and streamline memory stats logging

This commit is contained in:
Lorenzo Venerandi
2026-02-17 17:59:03 +01:00
parent 98a8b3deca
commit 507e3f2667
2 changed files with 1 additions and 51 deletions

View File

@@ -63,8 +63,7 @@ def main():
app_logger.debug( app_logger.debug(
f"Memory stats after cleanup: " f"Memory stats after cleanup: "
f"access_logs={stats_after['access_log_size']}, " f"access_logs={stats_after['access_log_size']}, "
f"credentials={stats_after['credential_attempts_size']}, " f"credentials={stats_after['credential_attempts_size']}"
f"unique_ips={stats_after['unique_ips_tracked']}"
) )
except Exception as e: except Exception as e:

View File

@@ -49,16 +49,12 @@ class AccessTracker:
""" """
self.max_pages_limit = max_pages_limit self.max_pages_limit = max_pages_limit
self.ban_duration_seconds = ban_duration_seconds self.ban_duration_seconds = ban_duration_seconds
self.ip_counts: Dict[str, int] = defaultdict(int)
self.path_counts: Dict[str, int] = defaultdict(int)
self.user_agent_counts: Dict[str, int] = defaultdict(int)
self.access_log: List[Dict] = [] self.access_log: List[Dict] = []
self.credential_attempts: List[Dict] = [] self.credential_attempts: List[Dict] = []
# Memory limits for in-memory lists (prevents unbounded growth) # Memory limits for in-memory lists (prevents unbounded growth)
self.max_access_log_size = 10_000 # Keep only recent 10k accesses self.max_access_log_size = 10_000 # Keep only recent 10k accesses
self.max_credential_log_size = 5_000 # Keep only recent 5k attempts self.max_credential_log_size = 5_000 # Keep only recent 5k attempts
self.max_counter_keys = 100_000 # Max unique IPs/paths/user agents
# Track pages visited by each IP (for good crawler limiting) # Track pages visited by each IP (for good crawler limiting)
self.ip_page_visits: Dict[str, Dict[str, object]] = defaultdict(dict) self.ip_page_visits: Dict[str, Dict[str, object]] = defaultdict(dict)
@@ -106,9 +102,6 @@ class AccessTracker:
"command_injection": r"(\||;|`|\$\(|&&)", "command_injection": r"(\||;|`|\$\(|&&)",
} }
# Track IPs that accessed honeypot paths from robots.txt
self.honeypot_triggered: Dict[str, List[str]] = defaultdict(list)
# Database manager for persistence (lazily initialized) # Database manager for persistence (lazily initialized)
self._db_manager = db_manager self._db_manager = db_manager
@@ -278,11 +271,6 @@ class AccessTracker:
if server_ip and ip == server_ip: if server_ip and ip == server_ip:
return return
self.ip_counts[ip] += 1
self.path_counts[path] += 1
if user_agent:
self.user_agent_counts[user_agent] += 1
# Path attack type detection # Path attack type detection
attack_findings = self.detect_attack_type(path) attack_findings = self.detect_attack_type(path)
@@ -299,10 +287,6 @@ class AccessTracker:
) )
is_honeypot = self.is_honeypot_path(path) is_honeypot = self.is_honeypot_path(path)
# Track if this IP accessed a honeypot path
if is_honeypot:
self.honeypot_triggered[ip].append(path)
# In-memory storage for dashboard # In-memory storage for dashboard
self.access_log.append( self.access_log.append(
{ {
@@ -597,27 +581,6 @@ class AccessTracker:
except Exception: except Exception:
return 0 return 0
def get_top_ips(self, limit: int = 10) -> List[Tuple[str, int]]:
"""Get top N IP addresses by access count (excludes local/private IPs)"""
filtered = [
(ip, count)
for ip, count in self.ip_counts.items()
if not is_local_or_private_ip(ip)
]
return sorted(filtered, key=lambda x: x[1], reverse=True)[:limit]
def get_top_paths(self, limit: int = 10) -> List[Tuple[str, int]]:
"""Get top N paths by access count"""
return sorted(self.path_counts.items(), key=lambda x: x[1], reverse=True)[
:limit
]
def get_top_user_agents(self, limit: int = 10) -> List[Tuple[str, int]]:
"""Get top N user agents by access count"""
return sorted(self.user_agent_counts.items(), key=lambda x: x[1], reverse=True)[
:limit
]
def get_suspicious_accesses(self, limit: int = 20) -> List[Dict]: def get_suspicious_accesses(self, limit: int = 20) -> List[Dict]:
"""Get recent suspicious accesses (excludes local/private IPs)""" """Get recent suspicious accesses (excludes local/private IPs)"""
suspicious = [ suspicious = [
@@ -637,14 +600,6 @@ class AccessTracker:
] ]
return attacks[-limit:] return attacks[-limit:]
def get_honeypot_triggered_ips(self) -> List[Tuple[str, List[str]]]:
"""Get IPs that accessed honeypot paths (excludes local/private IPs)"""
return [
(ip, paths)
for ip, paths in self.honeypot_triggered.items()
if not is_local_or_private_ip(ip)
]
def get_stats(self) -> Dict: def get_stats(self) -> Dict:
"""Get statistics summary from database.""" """Get statistics summary from database."""
if not self.db: if not self.db:
@@ -720,9 +675,5 @@ class AccessTracker:
return { return {
"access_log_size": len(self.access_log), "access_log_size": len(self.access_log),
"credential_attempts_size": len(self.credential_attempts), "credential_attempts_size": len(self.credential_attempts),
"unique_ips_tracked": len(self.ip_counts),
"unique_paths_tracked": len(self.path_counts),
"unique_user_agents": len(self.user_agent_counts),
"unique_ip_page_visits": len(self.ip_page_visits), "unique_ip_page_visits": len(self.ip_page_visits),
"honeypot_triggered_ips": len(self.honeypot_triggered),
} }