Merge pull request #53 from BlessedRebuS/feat/ip-log-improvements

added ip logging memory improvements, added local ip and public ip ex…
This commit is contained in:
Lorenzo Venerandi
2026-01-24 23:38:52 +01:00
committed by GitHub
8 changed files with 504 additions and 75 deletions

View File

@@ -11,7 +11,6 @@ from wordlists import get_wordlists
from config import get_config
from logger import get_app_logger
import requests
from sanitizer import sanitize_for_storage, sanitize_dict
"""
Functions for user activity analysis
@@ -27,14 +26,12 @@ class Analyzer:
def __init__(self, db_manager: Optional[DatabaseManager] = None):
"""
Initialize the access tracker.
Initialize the analyzer.
Args:
db_manager: Optional DatabaseManager for persistence.
If None, will use the global singleton.
"""
# Database manager for persistence (lazily initialized)
self._db_manager = db_manager
@property
@@ -49,7 +46,6 @@ class Analyzer:
try:
self._db_manager = get_database()
except Exception:
# Database not initialized, persistence disabled
pass
return self._db_manager

View File

@@ -8,6 +8,7 @@ from typing import Optional, Tuple
from zoneinfo import ZoneInfo
import time
from logger import get_app_logger
import socket
import yaml
@@ -50,6 +51,67 @@ class Config:
user_agents_used_threshold: float = None
attack_urls_threshold: float = None
_server_ip: Optional[str] = None
_server_ip_cache_time: float = 0
_ip_cache_ttl: int = 300
def get_server_ip(self, refresh: bool = False) -> Optional[str]:
"""
Get the server's own public IP address.
Excludes requests from the server itself from being tracked.
Caches the IP for 5 minutes to avoid repeated lookups.
Automatically refreshes if cache is stale.
Args:
refresh: Force refresh the IP cache (bypass TTL)
Returns:
Server IP address or None if unable to determine
"""
import time
current_time = time.time()
# Check if cache is valid and not forced refresh
if (
self._server_ip is not None
and not refresh
and (current_time - self._server_ip_cache_time) < self._ip_cache_ttl
):
return self._server_ip
try:
hostname = socket.gethostname()
# Try to get public IP by connecting to an external server
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
self._server_ip = ip
self._server_ip_cache_time = current_time
return ip
except Exception as e:
get_app_logger().warning(
f"Could not determine server IP address: {e}. "
"All IPs will be tracked (including potential server IP)."
)
return None
def refresh_server_ip(self) -> Optional[str]:
"""
Force refresh the cached server IP.
Use this if you suspect the IP has changed.
Returns:
New server IP address or None if unable to determine
"""
return self.get_server_ip(refresh=True)
@classmethod
def from_yaml(cls) -> "Config":
"""Create configuration from YAML file"""
@@ -139,8 +201,8 @@ class Config:
infinite_pages_for_malicious=crawl.get(
"infinite_pages_for_malicious", True
),
max_pages_limit=crawl.get("max_pages_limit", 200),
ban_duration_seconds=crawl.get("ban_duration_seconds", 60),
max_pages_limit=crawl.get("max_pages_limit", 500),
ban_duration_seconds=crawl.get("ban_duration_seconds", 10),
)

View File

@@ -15,6 +15,8 @@ from sqlalchemy import create_engine, func, distinct, case, event
from sqlalchemy.orm import sessionmaker, scoped_session, Session
from sqlalchemy.engine import Engine
from ip_utils import is_local_or_private_ip, is_valid_public_ip
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
@@ -688,7 +690,7 @@ class DatabaseManager:
def get_dashboard_counts(self) -> Dict[str, int]:
"""
Get aggregate statistics for the dashboard.
Get aggregate statistics for the dashboard (excludes local/private IPs and server IP).
Returns:
Dictionary with total_accesses, unique_ips, unique_paths,
@@ -696,33 +698,34 @@ class DatabaseManager:
"""
session = self.session
try:
# Get main aggregate counts in one query
result = session.query(
func.count(AccessLog.id).label("total_accesses"),
func.count(distinct(AccessLog.ip)).label("unique_ips"),
func.count(distinct(AccessLog.path)).label("unique_paths"),
func.sum(case((AccessLog.is_suspicious == True, 1), else_=0)).label(
"suspicious_accesses"
),
func.sum(
case((AccessLog.is_honeypot_trigger == True, 1), else_=0)
).label("honeypot_triggered"),
).first()
# Get unique IPs that triggered honeypots
honeypot_ips = (
session.query(func.count(distinct(AccessLog.ip)))
.filter(AccessLog.is_honeypot_trigger == True)
.scalar()
or 0
)
# Get server IP to filter it out
from config import get_config
config = get_config()
server_ip = config.get_server_ip()
# Get all accesses first, then filter out local IPs and server IP
all_accesses = session.query(AccessLog).all()
# Filter out local/private IPs and server IP
public_accesses = [
log for log in all_accesses
if is_valid_public_ip(log.ip, server_ip)
]
# Calculate counts from filtered data
total_accesses = len(public_accesses)
unique_ips = len(set(log.ip for log in public_accesses))
unique_paths = len(set(log.path for log in public_accesses))
suspicious_accesses = sum(1 for log in public_accesses if log.is_suspicious)
honeypot_triggered = sum(1 for log in public_accesses if log.is_honeypot_trigger)
honeypot_ips = len(set(log.ip for log in public_accesses if log.is_honeypot_trigger))
return {
"total_accesses": result.total_accesses or 0,
"unique_ips": result.unique_ips or 0,
"unique_paths": result.unique_paths or 0,
"suspicious_accesses": int(result.suspicious_accesses or 0),
"honeypot_triggered": int(result.honeypot_triggered or 0),
"total_accesses": total_accesses,
"unique_ips": unique_ips,
"unique_paths": unique_paths,
"suspicious_accesses": suspicious_accesses,
"honeypot_triggered": honeypot_triggered,
"honeypot_ips": honeypot_ips,
}
finally:
@@ -730,7 +733,7 @@ class DatabaseManager:
def get_top_ips(self, limit: int = 10) -> List[tuple]:
"""
Get top IP addresses by access count.
Get top IP addresses by access count (excludes local/private IPs and server IP).
Args:
limit: Maximum number of results
@@ -740,15 +743,25 @@ class DatabaseManager:
"""
session = self.session
try:
# Get server IP to filter it out
from config import get_config
config = get_config()
server_ip = config.get_server_ip()
results = (
session.query(AccessLog.ip, func.count(AccessLog.id).label("count"))
.group_by(AccessLog.ip)
.order_by(func.count(AccessLog.id).desc())
.limit(limit)
.all()
)
return [(row.ip, row.count) for row in results]
# Filter out local/private IPs and server IP, then limit results
filtered = [
(row.ip, row.count)
for row in results
if is_valid_public_ip(row.ip, server_ip)
]
return filtered[:limit]
finally:
self.close_session()
@@ -805,7 +818,7 @@ class DatabaseManager:
def get_recent_suspicious(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
Get recent suspicious access attempts.
Get recent suspicious access attempts (excludes local/private IPs and server IP).
Args:
limit: Maximum number of results
@@ -815,14 +828,24 @@ class DatabaseManager:
"""
session = self.session
try:
# Get server IP to filter it out
from config import get_config
config = get_config()
server_ip = config.get_server_ip()
logs = (
session.query(AccessLog)
.filter(AccessLog.is_suspicious == True)
.order_by(AccessLog.timestamp.desc())
.limit(limit)
.all()
)
# Filter out local/private IPs and server IP
filtered_logs = [
log for log in logs
if is_valid_public_ip(log.ip, server_ip)
]
return [
{
"ip": log.ip,
@@ -830,20 +853,26 @@ class DatabaseManager:
"user_agent": log.user_agent,
"timestamp": log.timestamp.isoformat(),
}
for log in logs
for log in filtered_logs[:limit]
]
finally:
self.close_session()
def get_honeypot_triggered_ips(self) -> List[tuple]:
"""
Get IPs that triggered honeypot paths with the paths they accessed.
Get IPs that triggered honeypot paths with the paths they accessed
(excludes local/private IPs and server IP).
Returns:
List of (ip, [paths]) tuples
"""
session = self.session
try:
# Get server IP to filter it out
from config import get_config
config = get_config()
server_ip = config.get_server_ip()
# Get all honeypot triggers grouped by IP
results = (
session.query(AccessLog.ip, AccessLog.path)
@@ -851,9 +880,12 @@ class DatabaseManager:
.all()
)
# Group paths by IP
# Group paths by IP, filtering out local/private IPs and server IP
ip_paths: Dict[str, List[str]] = {}
for row in results:
# Skip invalid IPs
if not is_valid_public_ip(row.ip, server_ip):
continue
if row.ip not in ip_paths:
ip_paths[row.ip] = []
if row.path not in ip_paths[row.ip]:

View File

@@ -1,6 +0,0 @@
127.0.0.1
175.23.45.67
205.32.180.65
198.51.100.89
210.45.67.89
203.0.113.45

61
src/ip_utils.py Normal file
View File

@@ -0,0 +1,61 @@
#!/usr/bin/env python3
"""
IP utility functions for filtering and validating IP addresses.
Provides common IP filtering logic used across the Krawl honeypot.
"""
import ipaddress
from typing import Optional
def is_local_or_private_ip(ip_str: str) -> bool:
"""
Check if an IP address is local, private, or reserved.
Filters out:
- 127.0.0.1 (localhost)
- 127.0.0.0/8 (loopback)
- 10.0.0.0/8 (private network)
- 172.16.0.0/12 (private network)
- 192.168.0.0/16 (private network)
- 0.0.0.0/8 (this network)
- ::1 (IPv6 localhost)
- ::ffff:127.0.0.0/104 (IPv6-mapped IPv4 loopback)
Args:
ip_str: IP address string
Returns:
True if IP is local/private/reserved, False if it's public
"""
try:
ip = ipaddress.ip_address(ip_str)
return (
ip.is_private
or ip.is_loopback
or ip.is_reserved
or ip.is_link_local
or str(ip) in ("0.0.0.0", "::1")
)
except ValueError:
# Invalid IP address
return True
def is_valid_public_ip(ip: str, server_ip: Optional[str] = None) -> bool:
"""
Check if an IP is public and not the server's own IP.
Returns True only if:
- IP is not in local/private ranges AND
- IP is not the server's own public IP (if server_ip provided)
Args:
ip: IP address string to check
server_ip: Server's public IP (optional). If provided, filters out this IP too.
Returns:
True if IP is a valid public IP to track, False otherwise
"""
return not is_local_or_private_ip(ip) and (server_ip is None or ip != server_ip)

View File

@@ -0,0 +1,66 @@
#!/usr/bin/env python3
"""
Memory cleanup task for Krawl honeypot.
Periodically trims unbounded in-memory structures to prevent OOM.
"""
from database import get_database
from logger import get_app_logger
# ----------------------
# TASK CONFIG
# ----------------------
TASK_CONFIG = {
"name": "memory-cleanup",
"cron": "*/5 * * * *", # Run every 5 minutes
"enabled": True,
"run_when_loaded": False,
}
app_logger = get_app_logger()
def main():
"""
Clean up in-memory structures in the tracker.
Called periodically to prevent unbounded memory growth.
"""
try:
# Import here to avoid circular imports
from handler import Handler
if not Handler.tracker:
app_logger.warning("Tracker not initialized, skipping memory cleanup")
return
# Get memory stats before cleanup
stats_before = Handler.tracker.get_memory_stats()
# Run cleanup
Handler.tracker.cleanup_memory()
# Get memory stats after cleanup
stats_after = Handler.tracker.get_memory_stats()
# Log changes
access_log_reduced = stats_before["access_log_size"] - stats_after["access_log_size"]
cred_reduced = stats_before["credential_attempts_size"] - stats_after["credential_attempts_size"]
if access_log_reduced > 0 or cred_reduced > 0:
app_logger.info(
f"Memory cleanup: Trimmed {access_log_reduced} access logs, "
f"{cred_reduced} credential attempts"
)
# Log current memory state for monitoring
app_logger.debug(
f"Memory stats after cleanup: "
f"access_logs={stats_after['access_log_size']}, "
f"credentials={stats_after['credential_attempts_size']}, "
f"unique_ips={stats_after['unique_ips_tracked']}"
)
except Exception as e:
app_logger.error(f"Error during memory cleanup: {e}")

View File

@@ -5,7 +5,9 @@ from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from logger import get_app_logger
from database import get_database
from config import get_config
from models import AccessLog
from ip_utils import is_local_or_private_ip, is_valid_public_ip
from sqlalchemy import distinct
app_logger = get_app_logger()
@@ -66,16 +68,26 @@ def main():
.all()
)
# Filter out local/private IPs and the server's own IP
config = get_config()
server_ip = config.get_server_ip()
public_ips = [
ip for (ip,) in results
if is_valid_public_ip(ip, server_ip)
]
# Ensure exports directory exists
os.makedirs(EXPORTS_DIR, exist_ok=True)
# Write IPs to file (one per line)
with open(OUTPUT_FILE, "w") as f:
for (ip,) in results:
for ip in public_ips:
f.write(f"{ip}\n")
app_logger.info(
f"[Background Task] {task_name} exported {len(results)} IPs to {OUTPUT_FILE}"
f"[Background Task] {task_name} exported {len(public_ips)} public IPs "
f"(filtered {len(results) - len(public_ips)} local/private IPs) to {OUTPUT_FILE}"
)
except Exception as e:

View File

@@ -6,8 +6,10 @@ from datetime import datetime
from zoneinfo import ZoneInfo
import re
import urllib.parse
from wordlists import get_wordlists
from database import get_database, DatabaseManager
from ip_utils import is_local_or_private_ip, is_valid_public_ip
class AccessTracker:
@@ -39,6 +41,11 @@ class AccessTracker:
self.access_log: List[Dict] = []
self.credential_attempts: List[Dict] = []
# Memory limits for in-memory lists (prevents unbounded growth)
self.max_access_log_size = 10_000 # Keep only recent 10k accesses
self.max_credential_log_size = 5_000 # Keep only recent 5k attempts
self.max_counter_keys = 100_000 # Max unique IPs/paths/user agents
# Track pages visited by each IP (for good crawler limiting)
self.ip_page_visits: Dict[str, Dict[str, object]] = defaultdict(dict)
@@ -162,7 +169,15 @@ class AccessTracker:
Record a credential login attempt.
Stores in both in-memory list and SQLite database.
Skips recording if the IP is the server's own public IP.
"""
# Skip if this is the server's own IP
from config import get_config
config = get_config()
server_ip = config.get_server_ip()
if server_ip and ip == server_ip:
return
# In-memory storage for dashboard
self.credential_attempts.append(
{
@@ -174,6 +189,12 @@ class AccessTracker:
}
)
# Trim if exceeding max size (prevent unbounded growth)
if len(self.credential_attempts) > self.max_credential_log_size:
self.credential_attempts = self.credential_attempts[
-self.max_credential_log_size :
]
# Persist to database
if self.db:
try:
@@ -196,6 +217,7 @@ class AccessTracker:
Record an access attempt.
Stores in both in-memory structures and SQLite database.
Skips recording if the IP is the server's own public IP.
Args:
ip: Client IP address
@@ -204,6 +226,13 @@ class AccessTracker:
body: Request body (for POST/PUT)
method: HTTP method
"""
# Skip if this is the server's own IP
from config import get_config
config = get_config()
server_ip = config.get_server_ip()
if server_ip and ip == server_ip:
return
self.ip_counts[ip] += 1
self.path_counts[path] += 1
if user_agent:
@@ -240,6 +269,10 @@ class AccessTracker:
}
)
# Trim if exceeding max size (prevent unbounded growth)
if len(self.access_log) > self.max_access_log_size:
self.access_log = self.access_log[-self.max_access_log_size :]
# Persist to database
if self.db:
try:
@@ -348,7 +381,13 @@ class AccessTracker:
def increment_page_visit(self, client_ip: str) -> int:
"""
Increment page visit counter for an IP and return the new count.
If ban timestamp exists and 60+ seconds have passed, reset the counter.
Implements incremental bans: each violation increases ban duration exponentially.
Ban duration formula: base_duration * (2 ^ violation_count)
- 1st violation: base_duration (e.g., 60 seconds)
- 2nd violation: base_duration * 2 (120 seconds)
- 3rd violation: base_duration * 4 (240 seconds)
- Nth violation: base_duration * 2^(N-1)
Args:
client_ip: The client IP address
@@ -356,19 +395,41 @@ class AccessTracker:
Returns:
The updated page visit count for this IP
"""
# Skip if this is the server's own IP
from config import get_config
config = get_config()
server_ip = config.get_server_ip()
if server_ip and client_ip == server_ip:
return 0
try:
# Initialize if not exists
if client_ip not in self.ip_page_visits:
self.ip_page_visits[client_ip] = {"count": 0, "ban_timestamp": None}
self.ip_page_visits[client_ip] = {
"count": 0,
"ban_timestamp": None,
"total_violations": 0,
"ban_multiplier": 1,
}
# Increment count
self.ip_page_visits[client_ip]["count"] += 1
# Set ban if reached limit
if self.ip_page_visits[client_ip]["count"] >= self.max_pages_limit:
self.ip_page_visits[client_ip][
"ban_timestamp"
] = datetime.now().isoformat()
# Increment violation counter
self.ip_page_visits[client_ip]["total_violations"] += 1
violations = self.ip_page_visits[client_ip]["total_violations"]
# Calculate exponential ban multiplier: 2^(violations - 1)
# Violation 1: 2^0 = 1x
# Violation 2: 2^1 = 2x
# Violation 3: 2^2 = 4x
# Violation 4: 2^3 = 8x, etc.
self.ip_page_visits[client_ip]["ban_multiplier"] = 2 ** (violations - 1)
# Set ban timestamp
self.ip_page_visits[client_ip]["ban_timestamp"] = datetime.now().isoformat()
return self.ip_page_visits[client_ip]["count"]
@@ -378,6 +439,10 @@ class AccessTracker:
def is_banned_ip(self, client_ip: str) -> bool:
"""
Check if an IP is currently banned due to exceeding page visit limits.
Uses incremental ban duration based on violation count.
Ban duration = base_duration * (2 ^ (violations - 1))
Each time an IP is banned again, duration doubles.
Args:
client_ip: The client IP address
@@ -386,26 +451,87 @@ class AccessTracker:
"""
try:
if client_ip in self.ip_page_visits:
ban_timestamp = self.ip_page_visits[client_ip]["ban_timestamp"]
ban_timestamp = self.ip_page_visits[client_ip].get("ban_timestamp")
if ban_timestamp is not None:
banned = True
# Get the ban multiplier for this violation
ban_multiplier = self.ip_page_visits[client_ip].get(
"ban_multiplier", 1
)
# Check if ban period has expired (> 60 seconds)
ban_time = datetime.fromisoformat(
self.ip_page_visits[client_ip]["ban_timestamp"]
)
time_diff = datetime.now() - ban_time
if time_diff.total_seconds() > self.ban_duration_seconds:
self.ip_page_visits[client_ip]["count"] = 0
self.ip_page_visits[client_ip]["ban_timestamp"] = None
banned = False
# Calculate effective ban duration based on violations
effective_ban_duration = self.ban_duration_seconds * ban_multiplier
return banned
# Check if ban period has expired
ban_time = datetime.fromisoformat(ban_timestamp)
time_diff = datetime.now() - ban_time
if time_diff.total_seconds() > effective_ban_duration:
# Ban expired, reset for next cycle
# Keep violation count for next offense
self.ip_page_visits[client_ip]["count"] = 0
self.ip_page_visits[client_ip]["ban_timestamp"] = None
return False
else:
# Still banned
return True
return False
except Exception:
return False
def get_page_visit_count(self, client_ip: str) -> int:
def get_ban_info(self, client_ip: str) -> dict:
"""
Get detailed ban information for an IP.
Returns:
Dictionary with ban status, violations, and remaining ban time
"""
try:
if client_ip not in self.ip_page_visits:
return {
"is_banned": False,
"violations": 0,
"ban_multiplier": 1,
"remaining_ban_seconds": 0,
}
ip_data = self.ip_page_visits[client_ip]
ban_timestamp = ip_data.get("ban_timestamp")
if ban_timestamp is None:
return {
"is_banned": False,
"violations": ip_data.get("total_violations", 0),
"ban_multiplier": ip_data.get("ban_multiplier", 1),
"remaining_ban_seconds": 0,
}
# Ban is active, calculate remaining time
ban_multiplier = ip_data.get("ban_multiplier", 1)
effective_ban_duration = self.ban_duration_seconds * ban_multiplier
ban_time = datetime.fromisoformat(ban_timestamp)
time_diff = datetime.now() - ban_time
remaining_seconds = max(
0, effective_ban_duration - time_diff.total_seconds()
)
return {
"is_banned": remaining_seconds > 0,
"violations": ip_data.get("total_violations", 0),
"ban_multiplier": ban_multiplier,
"effective_ban_duration_seconds": effective_ban_duration,
"remaining_ban_seconds": remaining_seconds,
}
except Exception:
return {
"is_banned": False,
"violations": 0,
"ban_multiplier": 1,
"remaining_ban_seconds": 0,
}
"""
Get the current page visit count for an IP.
@@ -421,8 +547,13 @@ class AccessTracker:
return 0
def get_top_ips(self, limit: int = 10) -> List[Tuple[str, int]]:
"""Get top N IP addresses by access count"""
return sorted(self.ip_counts.items(), key=lambda x: x[1], reverse=True)[:limit]
"""Get top N IP addresses by access count (excludes local/private IPs)"""
filtered = [
(ip, count)
for ip, count in self.ip_counts.items()
if not is_local_or_private_ip(ip)
]
return sorted(filtered, key=lambda x: x[1], reverse=True)[:limit]
def get_top_paths(self, limit: int = 10) -> List[Tuple[str, int]]:
"""Get top N paths by access count"""
@@ -437,18 +568,30 @@ class AccessTracker:
]
def get_suspicious_accesses(self, limit: int = 20) -> List[Dict]:
"""Get recent suspicious accesses"""
suspicious = [log for log in self.access_log if log.get("suspicious", False)]
"""Get recent suspicious accesses (excludes local/private IPs)"""
suspicious = [
log
for log in self.access_log
if log.get("suspicious", False) and not is_local_or_private_ip(log.get("ip", ""))
]
return suspicious[-limit:]
def get_attack_type_accesses(self, limit: int = 20) -> List[Dict]:
"""Get recent accesses with detected attack types"""
attacks = [log for log in self.access_log if log.get("attack_types")]
"""Get recent accesses with detected attack types (excludes local/private IPs)"""
attacks = [
log
for log in self.access_log
if log.get("attack_types") and not is_local_or_private_ip(log.get("ip", ""))
]
return attacks[-limit:]
def get_honeypot_triggered_ips(self) -> List[Tuple[str, List[str]]]:
"""Get IPs that accessed honeypot paths"""
return [(ip, paths) for ip, paths in self.honeypot_triggered.items()]
"""Get IPs that accessed honeypot paths (excludes local/private IPs)"""
return [
(ip, paths)
for ip, paths in self.honeypot_triggered.items()
if not is_local_or_private_ip(ip)
]
def get_stats(self) -> Dict:
"""Get statistics summary from database."""
@@ -468,3 +611,66 @@ class AccessTracker:
stats["credential_attempts"] = self.db.get_credential_attempts(limit=50)
return stats
def cleanup_memory(self) -> None:
"""
Clean up in-memory structures to prevent unbounded growth.
Should be called periodically (e.g., every 5 minutes).
Trimming strategy:
- Keep most recent N entries in logs
- Remove oldest entries when limit exceeded
- Clean expired ban entries from ip_page_visits
"""
# Trim access_log to max size (keep most recent)
if len(self.access_log) > self.max_access_log_size:
self.access_log = self.access_log[-self.max_access_log_size:]
# Trim credential_attempts to max size (keep most recent)
if len(self.credential_attempts) > self.max_credential_log_size:
self.credential_attempts = self.credential_attempts[
-self.max_credential_log_size :
]
# Clean expired ban entries from ip_page_visits
current_time = datetime.now()
ips_to_clean = []
for ip, data in self.ip_page_visits.items():
ban_timestamp = data.get("ban_timestamp")
if ban_timestamp is not None:
try:
ban_time = datetime.fromisoformat(ban_timestamp)
time_diff = (current_time - ban_time).total_seconds()
if time_diff > self.ban_duration_seconds:
# Ban expired, reset the entry
data["count"] = 0
data["ban_timestamp"] = None
except (ValueError, TypeError):
pass
# Optional: Remove IPs with zero activity (advanced cleanup)
# Comment out to keep indefinite history of zero-activity IPs
# ips_to_remove = [
# ip
# for ip, data in self.ip_page_visits.items()
# if data.get("count", 0) == 0 and data.get("ban_timestamp") is None
# ]
# for ip in ips_to_remove:
# del self.ip_page_visits[ip]
def get_memory_stats(self) -> Dict[str, int]:
"""
Get current memory usage statistics for monitoring.
Returns:
Dictionary with counts of in-memory items
"""
return {
"access_log_size": len(self.access_log),
"credential_attempts_size": len(self.credential_attempts),
"unique_ips_tracked": len(self.ip_counts),
"unique_paths_tracked": len(self.path_counts),
"unique_user_agents": len(self.user_agent_counts),
"unique_ip_page_visits": len(self.ip_page_visits),
"honeypot_triggered_ips": len(self.honeypot_triggered),
}