added ip logging memory improvements, added local ip and public ip exlusion
This commit is contained in:
258
src/tracker.py
258
src/tracker.py
@@ -6,8 +6,10 @@ from datetime import datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
import re
|
||||
import urllib.parse
|
||||
|
||||
from wordlists import get_wordlists
|
||||
from database import get_database, DatabaseManager
|
||||
from ip_utils import is_local_or_private_ip, is_valid_public_ip
|
||||
|
||||
|
||||
class AccessTracker:
|
||||
@@ -39,6 +41,11 @@ class AccessTracker:
|
||||
self.access_log: List[Dict] = []
|
||||
self.credential_attempts: List[Dict] = []
|
||||
|
||||
# Memory limits for in-memory lists (prevents unbounded growth)
|
||||
self.max_access_log_size = 10_000 # Keep only recent 10k accesses
|
||||
self.max_credential_log_size = 5_000 # Keep only recent 5k attempts
|
||||
self.max_counter_keys = 100_000 # Max unique IPs/paths/user agents
|
||||
|
||||
# Track pages visited by each IP (for good crawler limiting)
|
||||
self.ip_page_visits: Dict[str, Dict[str, object]] = defaultdict(dict)
|
||||
|
||||
@@ -162,7 +169,15 @@ class AccessTracker:
|
||||
Record a credential login attempt.
|
||||
|
||||
Stores in both in-memory list and SQLite database.
|
||||
Skips recording if the IP is the server's own public IP.
|
||||
"""
|
||||
# Skip if this is the server's own IP
|
||||
from config import get_config
|
||||
config = get_config()
|
||||
server_ip = config.get_server_ip()
|
||||
if server_ip and ip == server_ip:
|
||||
return
|
||||
|
||||
# In-memory storage for dashboard
|
||||
self.credential_attempts.append(
|
||||
{
|
||||
@@ -174,6 +189,12 @@ class AccessTracker:
|
||||
}
|
||||
)
|
||||
|
||||
# Trim if exceeding max size (prevent unbounded growth)
|
||||
if len(self.credential_attempts) > self.max_credential_log_size:
|
||||
self.credential_attempts = self.credential_attempts[
|
||||
-self.max_credential_log_size :
|
||||
]
|
||||
|
||||
# Persist to database
|
||||
if self.db:
|
||||
try:
|
||||
@@ -196,6 +217,7 @@ class AccessTracker:
|
||||
Record an access attempt.
|
||||
|
||||
Stores in both in-memory structures and SQLite database.
|
||||
Skips recording if the IP is the server's own public IP.
|
||||
|
||||
Args:
|
||||
ip: Client IP address
|
||||
@@ -204,6 +226,13 @@ class AccessTracker:
|
||||
body: Request body (for POST/PUT)
|
||||
method: HTTP method
|
||||
"""
|
||||
# Skip if this is the server's own IP
|
||||
from config import get_config
|
||||
config = get_config()
|
||||
server_ip = config.get_server_ip()
|
||||
if server_ip and ip == server_ip:
|
||||
return
|
||||
|
||||
self.ip_counts[ip] += 1
|
||||
self.path_counts[path] += 1
|
||||
if user_agent:
|
||||
@@ -240,6 +269,10 @@ class AccessTracker:
|
||||
}
|
||||
)
|
||||
|
||||
# Trim if exceeding max size (prevent unbounded growth)
|
||||
if len(self.access_log) > self.max_access_log_size:
|
||||
self.access_log = self.access_log[-self.max_access_log_size :]
|
||||
|
||||
# Persist to database
|
||||
if self.db:
|
||||
try:
|
||||
@@ -348,7 +381,13 @@ class AccessTracker:
|
||||
def increment_page_visit(self, client_ip: str) -> int:
|
||||
"""
|
||||
Increment page visit counter for an IP and return the new count.
|
||||
If ban timestamp exists and 60+ seconds have passed, reset the counter.
|
||||
Implements incremental bans: each violation increases ban duration exponentially.
|
||||
|
||||
Ban duration formula: base_duration * (2 ^ violation_count)
|
||||
- 1st violation: base_duration (e.g., 60 seconds)
|
||||
- 2nd violation: base_duration * 2 (120 seconds)
|
||||
- 3rd violation: base_duration * 4 (240 seconds)
|
||||
- Nth violation: base_duration * 2^(N-1)
|
||||
|
||||
Args:
|
||||
client_ip: The client IP address
|
||||
@@ -356,19 +395,41 @@ class AccessTracker:
|
||||
Returns:
|
||||
The updated page visit count for this IP
|
||||
"""
|
||||
# Skip if this is the server's own IP
|
||||
from config import get_config
|
||||
config = get_config()
|
||||
server_ip = config.get_server_ip()
|
||||
if server_ip and client_ip == server_ip:
|
||||
return 0
|
||||
|
||||
try:
|
||||
# Initialize if not exists
|
||||
if client_ip not in self.ip_page_visits:
|
||||
self.ip_page_visits[client_ip] = {"count": 0, "ban_timestamp": None}
|
||||
self.ip_page_visits[client_ip] = {
|
||||
"count": 0,
|
||||
"ban_timestamp": None,
|
||||
"total_violations": 0,
|
||||
"ban_multiplier": 1,
|
||||
}
|
||||
|
||||
# Increment count
|
||||
self.ip_page_visits[client_ip]["count"] += 1
|
||||
|
||||
# Set ban if reached limit
|
||||
if self.ip_page_visits[client_ip]["count"] >= self.max_pages_limit:
|
||||
self.ip_page_visits[client_ip][
|
||||
"ban_timestamp"
|
||||
] = datetime.now().isoformat()
|
||||
# Increment violation counter
|
||||
self.ip_page_visits[client_ip]["total_violations"] += 1
|
||||
violations = self.ip_page_visits[client_ip]["total_violations"]
|
||||
|
||||
# Calculate exponential ban multiplier: 2^(violations - 1)
|
||||
# Violation 1: 2^0 = 1x
|
||||
# Violation 2: 2^1 = 2x
|
||||
# Violation 3: 2^2 = 4x
|
||||
# Violation 4: 2^3 = 8x, etc.
|
||||
self.ip_page_visits[client_ip]["ban_multiplier"] = 2 ** (violations - 1)
|
||||
|
||||
# Set ban timestamp
|
||||
self.ip_page_visits[client_ip]["ban_timestamp"] = datetime.now().isoformat()
|
||||
|
||||
return self.ip_page_visits[client_ip]["count"]
|
||||
|
||||
@@ -378,6 +439,10 @@ class AccessTracker:
|
||||
def is_banned_ip(self, client_ip: str) -> bool:
|
||||
"""
|
||||
Check if an IP is currently banned due to exceeding page visit limits.
|
||||
Uses incremental ban duration based on violation count.
|
||||
|
||||
Ban duration = base_duration * (2 ^ (violations - 1))
|
||||
Each time an IP is banned again, duration doubles.
|
||||
|
||||
Args:
|
||||
client_ip: The client IP address
|
||||
@@ -386,26 +451,87 @@ class AccessTracker:
|
||||
"""
|
||||
try:
|
||||
if client_ip in self.ip_page_visits:
|
||||
ban_timestamp = self.ip_page_visits[client_ip]["ban_timestamp"]
|
||||
ban_timestamp = self.ip_page_visits[client_ip].get("ban_timestamp")
|
||||
if ban_timestamp is not None:
|
||||
banned = True
|
||||
# Get the ban multiplier for this violation
|
||||
ban_multiplier = self.ip_page_visits[client_ip].get(
|
||||
"ban_multiplier", 1
|
||||
)
|
||||
|
||||
# Check if ban period has expired (> 60 seconds)
|
||||
ban_time = datetime.fromisoformat(
|
||||
self.ip_page_visits[client_ip]["ban_timestamp"]
|
||||
)
|
||||
time_diff = datetime.now() - ban_time
|
||||
if time_diff.total_seconds() > self.ban_duration_seconds:
|
||||
self.ip_page_visits[client_ip]["count"] = 0
|
||||
self.ip_page_visits[client_ip]["ban_timestamp"] = None
|
||||
banned = False
|
||||
# Calculate effective ban duration based on violations
|
||||
effective_ban_duration = self.ban_duration_seconds * ban_multiplier
|
||||
|
||||
return banned
|
||||
# Check if ban period has expired
|
||||
ban_time = datetime.fromisoformat(ban_timestamp)
|
||||
time_diff = datetime.now() - ban_time
|
||||
|
||||
if time_diff.total_seconds() > effective_ban_duration:
|
||||
# Ban expired, reset for next cycle
|
||||
# Keep violation count for next offense
|
||||
self.ip_page_visits[client_ip]["count"] = 0
|
||||
self.ip_page_visits[client_ip]["ban_timestamp"] = None
|
||||
return False
|
||||
else:
|
||||
# Still banned
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def get_page_visit_count(self, client_ip: str) -> int:
|
||||
def get_ban_info(self, client_ip: str) -> dict:
|
||||
"""
|
||||
Get detailed ban information for an IP.
|
||||
|
||||
Returns:
|
||||
Dictionary with ban status, violations, and remaining ban time
|
||||
"""
|
||||
try:
|
||||
if client_ip not in self.ip_page_visits:
|
||||
return {
|
||||
"is_banned": False,
|
||||
"violations": 0,
|
||||
"ban_multiplier": 1,
|
||||
"remaining_ban_seconds": 0,
|
||||
}
|
||||
|
||||
ip_data = self.ip_page_visits[client_ip]
|
||||
ban_timestamp = ip_data.get("ban_timestamp")
|
||||
|
||||
if ban_timestamp is None:
|
||||
return {
|
||||
"is_banned": False,
|
||||
"violations": ip_data.get("total_violations", 0),
|
||||
"ban_multiplier": ip_data.get("ban_multiplier", 1),
|
||||
"remaining_ban_seconds": 0,
|
||||
}
|
||||
|
||||
# Ban is active, calculate remaining time
|
||||
ban_multiplier = ip_data.get("ban_multiplier", 1)
|
||||
effective_ban_duration = self.ban_duration_seconds * ban_multiplier
|
||||
|
||||
ban_time = datetime.fromisoformat(ban_timestamp)
|
||||
time_diff = datetime.now() - ban_time
|
||||
remaining_seconds = max(
|
||||
0, effective_ban_duration - time_diff.total_seconds()
|
||||
)
|
||||
|
||||
return {
|
||||
"is_banned": remaining_seconds > 0,
|
||||
"violations": ip_data.get("total_violations", 0),
|
||||
"ban_multiplier": ban_multiplier,
|
||||
"effective_ban_duration_seconds": effective_ban_duration,
|
||||
"remaining_ban_seconds": remaining_seconds,
|
||||
}
|
||||
|
||||
except Exception:
|
||||
return {
|
||||
"is_banned": False,
|
||||
"violations": 0,
|
||||
"ban_multiplier": 1,
|
||||
"remaining_ban_seconds": 0,
|
||||
}
|
||||
"""
|
||||
Get the current page visit count for an IP.
|
||||
|
||||
@@ -421,8 +547,13 @@ class AccessTracker:
|
||||
return 0
|
||||
|
||||
def get_top_ips(self, limit: int = 10) -> List[Tuple[str, int]]:
|
||||
"""Get top N IP addresses by access count"""
|
||||
return sorted(self.ip_counts.items(), key=lambda x: x[1], reverse=True)[:limit]
|
||||
"""Get top N IP addresses by access count (excludes local/private IPs)"""
|
||||
filtered = [
|
||||
(ip, count)
|
||||
for ip, count in self.ip_counts.items()
|
||||
if not is_local_or_private_ip(ip)
|
||||
]
|
||||
return sorted(filtered, key=lambda x: x[1], reverse=True)[:limit]
|
||||
|
||||
def get_top_paths(self, limit: int = 10) -> List[Tuple[str, int]]:
|
||||
"""Get top N paths by access count"""
|
||||
@@ -437,18 +568,30 @@ class AccessTracker:
|
||||
]
|
||||
|
||||
def get_suspicious_accesses(self, limit: int = 20) -> List[Dict]:
|
||||
"""Get recent suspicious accesses"""
|
||||
suspicious = [log for log in self.access_log if log.get("suspicious", False)]
|
||||
"""Get recent suspicious accesses (excludes local/private IPs)"""
|
||||
suspicious = [
|
||||
log
|
||||
for log in self.access_log
|
||||
if log.get("suspicious", False) and not is_local_or_private_ip(log.get("ip", ""))
|
||||
]
|
||||
return suspicious[-limit:]
|
||||
|
||||
def get_attack_type_accesses(self, limit: int = 20) -> List[Dict]:
|
||||
"""Get recent accesses with detected attack types"""
|
||||
attacks = [log for log in self.access_log if log.get("attack_types")]
|
||||
"""Get recent accesses with detected attack types (excludes local/private IPs)"""
|
||||
attacks = [
|
||||
log
|
||||
for log in self.access_log
|
||||
if log.get("attack_types") and not is_local_or_private_ip(log.get("ip", ""))
|
||||
]
|
||||
return attacks[-limit:]
|
||||
|
||||
def get_honeypot_triggered_ips(self) -> List[Tuple[str, List[str]]]:
|
||||
"""Get IPs that accessed honeypot paths"""
|
||||
return [(ip, paths) for ip, paths in self.honeypot_triggered.items()]
|
||||
"""Get IPs that accessed honeypot paths (excludes local/private IPs)"""
|
||||
return [
|
||||
(ip, paths)
|
||||
for ip, paths in self.honeypot_triggered.items()
|
||||
if not is_local_or_private_ip(ip)
|
||||
]
|
||||
|
||||
def get_stats(self) -> Dict:
|
||||
"""Get statistics summary from database."""
|
||||
@@ -468,3 +611,66 @@ class AccessTracker:
|
||||
stats["credential_attempts"] = self.db.get_credential_attempts(limit=50)
|
||||
|
||||
return stats
|
||||
|
||||
def cleanup_memory(self) -> None:
|
||||
"""
|
||||
Clean up in-memory structures to prevent unbounded growth.
|
||||
Should be called periodically (e.g., every 5 minutes).
|
||||
|
||||
Trimming strategy:
|
||||
- Keep most recent N entries in logs
|
||||
- Remove oldest entries when limit exceeded
|
||||
- Clean expired ban entries from ip_page_visits
|
||||
"""
|
||||
# Trim access_log to max size (keep most recent)
|
||||
if len(self.access_log) > self.max_access_log_size:
|
||||
self.access_log = self.access_log[-self.max_access_log_size:]
|
||||
|
||||
# Trim credential_attempts to max size (keep most recent)
|
||||
if len(self.credential_attempts) > self.max_credential_log_size:
|
||||
self.credential_attempts = self.credential_attempts[
|
||||
-self.max_credential_log_size :
|
||||
]
|
||||
|
||||
# Clean expired ban entries from ip_page_visits
|
||||
current_time = datetime.now()
|
||||
ips_to_clean = []
|
||||
for ip, data in self.ip_page_visits.items():
|
||||
ban_timestamp = data.get("ban_timestamp")
|
||||
if ban_timestamp is not None:
|
||||
try:
|
||||
ban_time = datetime.fromisoformat(ban_timestamp)
|
||||
time_diff = (current_time - ban_time).total_seconds()
|
||||
if time_diff > self.ban_duration_seconds:
|
||||
# Ban expired, reset the entry
|
||||
data["count"] = 0
|
||||
data["ban_timestamp"] = None
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
# Optional: Remove IPs with zero activity (advanced cleanup)
|
||||
# Comment out to keep indefinite history of zero-activity IPs
|
||||
# ips_to_remove = [
|
||||
# ip
|
||||
# for ip, data in self.ip_page_visits.items()
|
||||
# if data.get("count", 0) == 0 and data.get("ban_timestamp") is None
|
||||
# ]
|
||||
# for ip in ips_to_remove:
|
||||
# del self.ip_page_visits[ip]
|
||||
|
||||
def get_memory_stats(self) -> Dict[str, int]:
|
||||
"""
|
||||
Get current memory usage statistics for monitoring.
|
||||
|
||||
Returns:
|
||||
Dictionary with counts of in-memory items
|
||||
"""
|
||||
return {
|
||||
"access_log_size": len(self.access_log),
|
||||
"credential_attempts_size": len(self.credential_attempts),
|
||||
"unique_ips_tracked": len(self.ip_counts),
|
||||
"unique_paths_tracked": len(self.path_counts),
|
||||
"unique_user_agents": len(self.user_agent_counts),
|
||||
"unique_ip_page_visits": len(self.ip_page_visits),
|
||||
"honeypot_triggered_ips": len(self.honeypot_triggered),
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user