added site depth limit mechanism (#48)

* added site depth limit mechanism

* modified max pages limit and ban duration seconds

---------

Co-authored-by: Leonardo Bambini <lbambini@Leonardos-MacBook-Air.local>
Co-authored-by: BlessedRebuS <patrick.difa@gmail.com>
This commit is contained in:
leonardobambini
2026-01-23 21:33:32 +01:00
committed by GitHub
parent 223883a781
commit 4e4c370b72
5 changed files with 175 additions and 6 deletions

View File

@@ -17,7 +17,7 @@ class AccessTracker:
Maintains in-memory structures for fast dashboard access and
persists data to SQLite for long-term storage and analysis.
"""
def __init__(self, db_manager: Optional[DatabaseManager] = None):
def __init__(self, max_pages_limit, ban_duration_seconds, db_manager: Optional[DatabaseManager] = None):
"""
Initialize the access tracker.
@@ -25,11 +25,17 @@ class AccessTracker:
db_manager: Optional DatabaseManager for persistence.
If None, will use the global singleton.
"""
self.max_pages_limit = max_pages_limit
self.ban_duration_seconds = ban_duration_seconds
self.ip_counts: Dict[str, int] = defaultdict(int)
self.path_counts: Dict[str, int] = defaultdict(int)
self.user_agent_counts: Dict[str, int] = defaultdict(int)
self.access_log: List[Dict] = []
self.credential_attempts: List[Dict] = []
# Track pages visited by each IP (for good crawler limiting)
self.ip_page_visits: Dict[str, Dict[str, object]] = defaultdict(dict)
self.suspicious_patterns = [
'bot', 'crawler', 'spider', 'scraper', 'curl', 'wget', 'python-requests',
'scanner', 'nikto', 'sqlmap', 'nmap', 'masscan', 'nessus', 'acunetix',
@@ -253,6 +259,113 @@ class AccessTracker:
ua_lower = user_agent.lower()
return any(pattern in ua_lower for pattern in self.suspicious_patterns)
def get_category_by_ip(self, client_ip: str) -> str:
"""
Check if an IP has been categorized as a 'good crawler' in the database.
Uses the IP category from IpStats table.
Args:
client_ip: The client IP address (will be sanitized)
Returns:
True if the IP is categorized as 'good crawler', False otherwise
"""
try:
from sanitizer import sanitize_ip
# Sanitize the IP address
safe_ip = sanitize_ip(client_ip)
# Query the database for this IP's category
db = self.db
if not db:
return False
ip_stats = db.get_ip_stats_by_ip(safe_ip)
if not ip_stats or not ip_stats.get('category'):
return False
# Check if category matches "good crawler"
category = ip_stats.get('category', '').lower().strip()
return category
except Exception as e:
# Log but don't crash on database errors
import logging
logging.error(f"Error checking IP category for {client_ip}: {str(e)}")
return False
def increment_page_visit(self, client_ip: str) -> int:
"""
Increment page visit counter for an IP and return the new count.
If ban timestamp exists and 60+ seconds have passed, reset the counter.
Args:
client_ip: The client IP address
Returns:
The updated page visit count for this IP
"""
try:
# Initialize if not exists
if client_ip not in self.ip_page_visits:
self.ip_page_visits[client_ip] = {"count": 0, "ban_timestamp": None}
# Increment count
self.ip_page_visits[client_ip]["count"] += 1
# Set ban if reached limit
if self.ip_page_visits[client_ip]["count"] >= self.max_pages_limit:
self.ip_page_visits[client_ip]["ban_timestamp"] = datetime.now().isoformat()
return self.ip_page_visits[client_ip]["count"]
except Exception:
return 0
def is_banned_ip(self, client_ip: str) -> bool:
"""
Check if an IP is currently banned due to exceeding page visit limits.
Args:
client_ip: The client IP address
Returns:
True if the IP is banned, False otherwise
"""
try:
if client_ip in self.ip_page_visits:
ban_timestamp = self.ip_page_visits[client_ip]["ban_timestamp"]
if ban_timestamp is not None:
banned = True
#Check if ban period has expired (> 60 seconds)
ban_time = datetime.fromisoformat(self.ip_page_visits[client_ip]["ban_timestamp"])
time_diff = datetime.now() - ban_time
if time_diff.total_seconds() > self.ban_duration_seconds:
self.ip_page_visits[client_ip]["count"] = 0
self.ip_page_visits[client_ip]["ban_timestamp"] = None
banned = False
return banned
except Exception:
return False
def get_page_visit_count(self, client_ip: str) -> int:
"""
Get the current page visit count for an IP.
Args:
client_ip: The client IP address
Returns:
The page visit count for this IP
"""
try:
return self.ip_page_visits.get(client_ip, 0)
except Exception:
return 0
def get_top_ips(self, limit: int = 10) -> List[Tuple[str, int]]:
"""Get top N IP addresses by access count"""
return sorted(self.ip_counts.items(), key=lambda x: x[1], reverse=True)[:limit]