#!/usr/bin/env python3 from typing import Dict, Tuple, Optional import re import urllib.parse from wordlists import get_wordlists from database import get_database, DatabaseManager # Module-level singleton for background task access _tracker_instance: "AccessTracker | None" = None def get_tracker() -> "AccessTracker | None": """Get the global AccessTracker singleton (set during app startup).""" return _tracker_instance def set_tracker(tracker: "AccessTracker"): """Store the AccessTracker singleton for background task access.""" global _tracker_instance _tracker_instance = tracker class AccessTracker: """ Track IP addresses and paths accessed. Maintains in-memory structures for fast dashboard access and persists data to SQLite for long-term storage and analysis. """ def __init__( self, max_pages_limit, ban_duration_seconds, db_manager: Optional[DatabaseManager] = None, ): """ Initialize the access tracker. Args: db_manager: Optional DatabaseManager for persistence. If None, will use the global singleton. """ self.max_pages_limit = max_pages_limit self.ban_duration_seconds = ban_duration_seconds # Load suspicious patterns from wordlists wl = get_wordlists() self.suspicious_patterns = wl.suspicious_patterns # Fallback if wordlists not loaded if not self.suspicious_patterns: self.suspicious_patterns = [ "bot", "crawler", "spider", "scraper", "curl", "wget", "python-requests", "scanner", "nikto", "sqlmap", "nmap", "masscan", "nessus", "acunetix", "burp", "zap", "w3af", "metasploit", "nuclei", "gobuster", "dirbuster", ] # Load attack patterns from wordlists self.attack_types = wl.attack_patterns # Fallback if wordlists not loaded if not self.attack_types: self.attack_types = { "path_traversal": r"\.\.", "sql_injection": r"('|--|;|\bOR\b|\bUNION\b|\bSELECT\b|\bDROP\b)", "xss_attempt": r"( Optional[DatabaseManager]: """ Get the database manager, lazily initializing if needed. Returns: DatabaseManager instance or None if not available """ if self._db_manager is None: try: self._db_manager = get_database() except Exception: # Database not initialized, persistence disabled pass return self._db_manager def parse_credentials(self, post_data: str) -> Tuple[str, str]: """ Parse username and password from POST data. Returns tuple (username, password) or (None, None) if not found. """ if not post_data: return None, None username = None password = None try: # Parse URL-encoded form data parsed = urllib.parse.parse_qs(post_data) # Get credential field names from wordlists wl = get_wordlists() username_fields = wl.username_fields password_fields = wl.password_fields # Fallback if wordlists not loaded if not username_fields: username_fields = [ "username", "user", "login", "email", "log", "userid", "account", ] if not password_fields: password_fields = ["password", "pass", "passwd", "pwd", "passphrase"] for field in username_fields: if field in parsed and parsed[field]: username = parsed[field][0] break for field in password_fields: if field in parsed and parsed[field]: password = parsed[field][0] break except Exception: # If parsing fails, try simple regex patterns wl = get_wordlists() username_fields = wl.username_fields or [ "username", "user", "login", "email", "log", ] password_fields = wl.password_fields or [ "password", "pass", "passwd", "pwd", ] # Build regex pattern from wordlist fields username_pattern = "(?:" + "|".join(username_fields) + ")=([^&\\s]+)" password_pattern = "(?:" + "|".join(password_fields) + ")=([^&\\s]+)" username_match = re.search(username_pattern, post_data, re.IGNORECASE) password_match = re.search(password_pattern, post_data, re.IGNORECASE) if username_match: username = urllib.parse.unquote_plus(username_match.group(1)) if password_match: password = urllib.parse.unquote_plus(password_match.group(1)) return username, password def record_credential_attempt( self, ip: str, path: str, username: str, password: str ): """ Record a credential login attempt. Stores in both in-memory list and SQLite database. Skips recording if the IP is the server's own public IP. """ # Skip if this is the server's own IP from config import get_config config = get_config() server_ip = config.get_server_ip() if server_ip and ip == server_ip: return # Persist to database if self.db: try: self.db.persist_credential( ip=ip, path=path, username=username, password=password ) except Exception: # Don't crash if database persistence fails pass def record_access( self, ip: str, path: str, user_agent: str = "", body: str = "", method: str = "GET", raw_request: str = "", ): """ Record an access attempt. Stores in both in-memory structures and SQLite database. Skips recording if the IP is the server's own public IP. Args: ip: Client IP address path: Requested path user_agent: Client user agent string body: Request body (for POST/PUT) method: HTTP method raw_request: Full raw HTTP request for forensic analysis """ # Skip if this is the server's own IP from config import get_config config = get_config() server_ip = config.get_server_ip() if server_ip and ip == server_ip: return # Path attack type detection attack_findings = self.detect_attack_type(path) # POST/PUT body attack detection if len(body) > 0: # Decode URL-encoded body so patterns can match (e.g., %3Cscript%3E ->