From 4e4c370b72e5504ef7d8786a484708359583e67d Mon Sep 17 00:00:00 2001 From: leonardobambini <91343329+leonardobambini@users.noreply.github.com> Date: Fri, 23 Jan 2026 21:33:32 +0100 Subject: [PATCH] added site depth limit mechanism (#48) * added site depth limit mechanism * modified max pages limit and ban duration seconds --------- Co-authored-by: Leonardo Bambini Co-authored-by: BlessedRebuS --- src/config.py | 11 +++- src/exports/malicious_ips.txt | 6 ++ src/handler.py | 47 +++++++++++++- src/server.py | 2 +- src/tracker.py | 115 +++++++++++++++++++++++++++++++++- 5 files changed, 175 insertions(+), 6 deletions(-) create mode 100644 src/exports/malicious_ips.txt diff --git a/src/config.py b/src/config.py index df83380..771e8c2 100644 --- a/src/config.py +++ b/src/config.py @@ -29,6 +29,11 @@ class Config: api_server_path: str = "/api/v2/users" probability_error_codes: int = 0 # Percentage (0-100) + # Crawl limiting settings - for legitimate vs malicious crawlers + max_pages_limit: int = 100 # Max pages limit for good crawlers and regular users (and bad crawlers/attackers if infinite_pages_for_malicious is False) + infinite_pages_for_malicious: bool = True # Infinite pages for malicious crawlers + ban_duration_seconds: int = 600 # Ban duration in seconds for IPs exceeding limits + # Database settings database_path: str = "data/krawl.db" database_retention_days: int = 30 @@ -70,6 +75,7 @@ class Config: database = data.get('database', {}) behavior = data.get('behavior', {}) analyzer = data.get('analyzer') or {} + crawl = data.get('crawl', {}) # Handle dashboard_secret_path - auto-generate if null/not set dashboard_path = dashboard.get('secret_path') @@ -108,7 +114,10 @@ class Config: uneven_request_timing_threshold=analyzer.get('uneven_request_timing_threshold', 0.5), # coefficient of variation uneven_request_timing_time_window_seconds=analyzer.get('uneven_request_timing_time_window_seconds', 300), user_agents_used_threshold=analyzer.get('user_agents_used_threshold', 2), - attack_urls_threshold=analyzer.get('attack_urls_threshold', 1) + attack_urls_threshold=analyzer.get('attack_urls_threshold', 1), + infinite_pages_for_malicious=crawl.get('infinite_pages_for_malicious', True), + max_pages_limit=crawl.get('max_pages_limit', 200), + ban_duration_seconds=crawl.get('ban_duration_seconds', 60) ) def __get_env_from_config(config: str) -> str: diff --git a/src/exports/malicious_ips.txt b/src/exports/malicious_ips.txt new file mode 100644 index 0000000..34fc01a --- /dev/null +++ b/src/exports/malicious_ips.txt @@ -0,0 +1,6 @@ +127.0.0.1 +175.23.45.67 +205.32.180.65 +198.51.100.89 +210.45.67.89 +203.0.113.45 diff --git a/src/handler.py b/src/handler.py index ef26fb5..9cae1ce 100644 --- a/src/handler.py +++ b/src/handler.py @@ -56,6 +56,18 @@ class Handler(BaseHTTPRequestHandler): """Extract user agent from request""" return self.headers.get('User-Agent', '') + def _get_category_by_ip(self, client_ip: str) -> str: + """Get the category of an IP from the database""" + return self.tracker.get_category_by_ip(client_ip) + + def _get_page_visit_count(self, client_ip: str) -> int: + """Get current page visit count for an IP""" + return self.tracker.get_page_visit_count(client_ip) + + def _increment_page_visit(self, client_ip: str) -> int: + """Increment page visit counter for an IP and return new count""" + return self.tracker.increment_page_visit(client_ip) + def version_string(self) -> str: """Return custom server version for deception.""" return random_server_header() @@ -135,10 +147,33 @@ class Handler(BaseHTTPRequestHandler): pass return True - def generate_page(self, seed: str) -> str: - """Generate a webpage containing random links or canary token""" + def generate_page(self, seed: str, page_visit_count: int) -> str: + """Generate a webpage containing random links or canary token""" + random.seed(seed) num_pages = random.randint(*self.config.links_per_page_range) + + # Check if this is a good crawler by IP category from database + ip_category = self._get_category_by_ip(self._get_client_ip()) + + # Determine if we should apply crawler page limit based on config and IP category + should_apply_crawler_limit = False + if self.config.infinite_pages_for_malicious: + if (ip_category == "good_crawler" or ip_category == "regular_user") and page_visit_count >= self.config.max_pages_limit: + should_apply_crawler_limit = True + else: + if (ip_category == "good_crawler" or ip_category == "bad_crawler" or ip_category == "attacker") and page_visit_count >= self.config.max_pages_limit: + should_apply_crawler_limit = True + + + # If good crawler reached max pages, return a simple page with no links + if should_apply_crawler_limit: + return html_templates.main_page( + Handler.counter, + '

Crawl limit reached.

' + ) + + num_pages = random.randint(*self.config.links_per_page_range) # Build the content HTML content = "" @@ -399,6 +434,10 @@ class Handler(BaseHTTPRequestHandler): def do_GET(self): """Responds to webpage requests""" client_ip = self._get_client_ip() + if self.tracker.is_banned_ip(client_ip): + self.send_response(500) + self.end_headers() + return user_agent = self._get_user_agent() if self.config.dashboard_secret_path and self.path == self.config.dashboard_secret_path: @@ -495,7 +534,9 @@ class Handler(BaseHTTPRequestHandler): self.end_headers() try: - self.wfile.write(self.generate_page(self.path).encode()) + # Increment page visit counter for this IP and get the current count + current_visit_count = self._increment_page_visit(client_ip) + self.wfile.write(self.generate_page(self.path, current_visit_count).encode()) Handler.counter -= 1 diff --git a/src/server.py b/src/server.py index a61a372..05bc006 100644 --- a/src/server.py +++ b/src/server.py @@ -67,7 +67,7 @@ def main(): except Exception as e: app_logger.warning(f'Database initialization failed: {e}. Continuing with in-memory only.') - tracker = AccessTracker() + tracker = AccessTracker(config.max_pages_limit, config.ban_duration_seconds) analyzer = Analyzer() Handler.config = config diff --git a/src/tracker.py b/src/tracker.py index 8bec7ce..da07569 100644 --- a/src/tracker.py +++ b/src/tracker.py @@ -17,7 +17,7 @@ class AccessTracker: Maintains in-memory structures for fast dashboard access and persists data to SQLite for long-term storage and analysis. """ - def __init__(self, db_manager: Optional[DatabaseManager] = None): + def __init__(self, max_pages_limit, ban_duration_seconds, db_manager: Optional[DatabaseManager] = None): """ Initialize the access tracker. @@ -25,11 +25,17 @@ class AccessTracker: db_manager: Optional DatabaseManager for persistence. If None, will use the global singleton. """ + self.max_pages_limit = max_pages_limit + self.ban_duration_seconds = ban_duration_seconds self.ip_counts: Dict[str, int] = defaultdict(int) self.path_counts: Dict[str, int] = defaultdict(int) self.user_agent_counts: Dict[str, int] = defaultdict(int) self.access_log: List[Dict] = [] self.credential_attempts: List[Dict] = [] + + # Track pages visited by each IP (for good crawler limiting) + self.ip_page_visits: Dict[str, Dict[str, object]] = defaultdict(dict) + self.suspicious_patterns = [ 'bot', 'crawler', 'spider', 'scraper', 'curl', 'wget', 'python-requests', 'scanner', 'nikto', 'sqlmap', 'nmap', 'masscan', 'nessus', 'acunetix', @@ -253,6 +259,113 @@ class AccessTracker: ua_lower = user_agent.lower() return any(pattern in ua_lower for pattern in self.suspicious_patterns) + def get_category_by_ip(self, client_ip: str) -> str: + """ + Check if an IP has been categorized as a 'good crawler' in the database. + Uses the IP category from IpStats table. + + Args: + client_ip: The client IP address (will be sanitized) + + Returns: + True if the IP is categorized as 'good crawler', False otherwise + """ + try: + from sanitizer import sanitize_ip + # Sanitize the IP address + safe_ip = sanitize_ip(client_ip) + + # Query the database for this IP's category + db = self.db + if not db: + return False + + ip_stats = db.get_ip_stats_by_ip(safe_ip) + if not ip_stats or not ip_stats.get('category'): + return False + + # Check if category matches "good crawler" + category = ip_stats.get('category', '').lower().strip() + return category + + except Exception as e: + # Log but don't crash on database errors + import logging + logging.error(f"Error checking IP category for {client_ip}: {str(e)}") + return False + + def increment_page_visit(self, client_ip: str) -> int: + """ + Increment page visit counter for an IP and return the new count. + If ban timestamp exists and 60+ seconds have passed, reset the counter. + + Args: + client_ip: The client IP address + + Returns: + The updated page visit count for this IP + """ + try: + # Initialize if not exists + if client_ip not in self.ip_page_visits: + self.ip_page_visits[client_ip] = {"count": 0, "ban_timestamp": None} + + # Increment count + self.ip_page_visits[client_ip]["count"] += 1 + + # Set ban if reached limit + if self.ip_page_visits[client_ip]["count"] >= self.max_pages_limit: + self.ip_page_visits[client_ip]["ban_timestamp"] = datetime.now().isoformat() + + return self.ip_page_visits[client_ip]["count"] + + except Exception: + return 0 + + def is_banned_ip(self, client_ip: str) -> bool: + """ + Check if an IP is currently banned due to exceeding page visit limits. + + Args: + client_ip: The client IP address + Returns: + True if the IP is banned, False otherwise + """ + try: + if client_ip in self.ip_page_visits: + ban_timestamp = self.ip_page_visits[client_ip]["ban_timestamp"] + if ban_timestamp is not None: + banned = True + + #Check if ban period has expired (> 60 seconds) + ban_time = datetime.fromisoformat(self.ip_page_visits[client_ip]["ban_timestamp"]) + time_diff = datetime.now() - ban_time + if time_diff.total_seconds() > self.ban_duration_seconds: + self.ip_page_visits[client_ip]["count"] = 0 + self.ip_page_visits[client_ip]["ban_timestamp"] = None + banned = False + + return banned + + except Exception: + return False + + + def get_page_visit_count(self, client_ip: str) -> int: + """ + Get the current page visit count for an IP. + + Args: + client_ip: The client IP address + + Returns: + The page visit count for this IP + """ + try: + return self.ip_page_visits.get(client_ip, 0) + except Exception: + return 0 + def get_top_ips(self, limit: int = 10) -> List[Tuple[str, int]]: """Get top N IP addresses by access count""" return sorted(self.ip_counts.items(), key=lambda x: x[1], reverse=True)[:limit]