diff --git a/src/analyzer.py b/src/analyzer.py index c0ff515..860a206 100644 --- a/src/analyzer.py +++ b/src/analyzer.py @@ -19,10 +19,12 @@ Functions for user activity analysis app_logger = get_app_logger() + class Analyzer: """ Analyzes users activity and produces aggregated insights """ + def __init__(self, db_manager: Optional[DatabaseManager] = None): """ Initialize the access tracker. @@ -102,7 +104,6 @@ class Analyzer: # } # } - # accesses = self.db.get_access_logs(ip_filter = ip, limit=1000) # total_accesses_count = len(accesses) # if total_accesses_count <= 0: @@ -119,7 +120,6 @@ class Analyzer: # #--------------------- HTTP Methods --------------------- - # get_accesses_count = len([item for item in accesses if item["method"] == "GET"]) # post_accesses_count = len([item for item in accesses if item["method"] == "POST"]) # put_accesses_count = len([item for item in accesses if item["method"] == "PUT"]) @@ -214,7 +214,6 @@ class Analyzer: # score["bad_crawler"]["uneven_request_timing"] = False # score["regular_user"]["uneven_request_timing"] = False - # #--------------------- Different User Agents --------------------- # #Header Quality and Consistency: Crawlers tend to use complete and consistent headers, attackers might miss, fake, or change headers # user_agents_used = [item["user_agent"] for item in accesses] @@ -317,8 +316,6 @@ class Analyzer: # return 0 - - # def update_ip_rep_infos(self, ip: str) -> list[str]: # api_url = "https://iprep.lcrawl.com/api/iprep/" # params = { diff --git a/src/config.py b/src/config.py index 771e8c2..629c18c 100644 --- a/src/config.py +++ b/src/config.py @@ -14,12 +14,13 @@ import yaml @dataclass class Config: """Configuration class for the deception server""" + port: int = 5000 delay: int = 100 # milliseconds server_header: str = "" links_length_range: Tuple[int, int] = (5, 15) links_per_page_range: Tuple[int, int] = (10, 15) - char_space: str = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' + char_space: str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" max_counter: int = 10 canary_token_url: Optional[str] = None canary_token_tries: int = 10 @@ -30,7 +31,9 @@ class Config: probability_error_codes: int = 0 # Percentage (0-100) # Crawl limiting settings - for legitimate vs malicious crawlers - max_pages_limit: int = 100 # Max pages limit for good crawlers and regular users (and bad crawlers/attackers if infinite_pages_for_malicious is False) + max_pages_limit: int = ( + 100 # Max pages limit for good crawlers and regular users (and bad crawlers/attackers if infinite_pages_for_malicious is False) + ) infinite_pages_for_malicious: bool = True # Infinite pages for malicious crawlers ban_duration_seconds: int = 600 # Ban duration in seconds for IPs exceeding limits @@ -47,90 +50,111 @@ class Config: attack_urls_threshold: float = None @classmethod - def from_yaml(cls) -> 'Config': + def from_yaml(cls) -> "Config": """Create configuration from YAML file""" - config_location = os.getenv('CONFIG_LOCATION', 'config.yaml') + config_location = os.getenv("CONFIG_LOCATION", "config.yaml") config_path = Path(__file__).parent.parent / config_location try: - with open(config_path, 'r') as f: + with open(config_path, "r") as f: data = yaml.safe_load(f) except FileNotFoundError: - print(f"Error: Configuration file '{config_path}' not found.", file=sys.stderr) - print(f"Please create a config.yaml file or set CONFIG_LOCATION environment variable.", file=sys.stderr) + print( + f"Error: Configuration file '{config_path}' not found.", file=sys.stderr + ) + print( + f"Please create a config.yaml file or set CONFIG_LOCATION environment variable.", + file=sys.stderr, + ) sys.exit(1) except yaml.YAMLError as e: - print(f"Error: Invalid YAML in configuration file '{config_path}': {e}", file=sys.stderr) + print( + f"Error: Invalid YAML in configuration file '{config_path}': {e}", + file=sys.stderr, + ) sys.exit(1) if data is None: data = {} # Extract nested values with defaults - server = data.get('server', {}) - links = data.get('links', {}) - canary = data.get('canary', {}) - dashboard = data.get('dashboard', {}) - api = data.get('api', {}) - database = data.get('database', {}) - behavior = data.get('behavior', {}) - analyzer = data.get('analyzer') or {} - crawl = data.get('crawl', {}) + server = data.get("server", {}) + links = data.get("links", {}) + canary = data.get("canary", {}) + dashboard = data.get("dashboard", {}) + api = data.get("api", {}) + database = data.get("database", {}) + behavior = data.get("behavior", {}) + analyzer = data.get("analyzer") or {} + crawl = data.get("crawl", {}) # Handle dashboard_secret_path - auto-generate if null/not set - dashboard_path = dashboard.get('secret_path') + dashboard_path = dashboard.get("secret_path") if dashboard_path is None: - dashboard_path = f'/{os.urandom(16).hex()}' + dashboard_path = f"/{os.urandom(16).hex()}" else: # ensure the dashboard path starts with a / if dashboard_path[:1] != "/": dashboard_path = f"/{dashboard_path}" return cls( - port=server.get('port', 5000), - delay=server.get('delay', 100), - server_header=server.get('server_header',""), + port=server.get("port", 5000), + delay=server.get("delay", 100), + server_header=server.get("server_header", ""), links_length_range=( - links.get('min_length', 5), - links.get('max_length', 15) + links.get("min_length", 5), + links.get("max_length", 15), ), links_per_page_range=( - links.get('min_per_page', 10), - links.get('max_per_page', 15) + links.get("min_per_page", 10), + links.get("max_per_page", 15), ), - char_space=links.get('char_space', 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'), - max_counter=links.get('max_counter', 10), - canary_token_url=canary.get('token_url'), - canary_token_tries=canary.get('token_tries', 10), + char_space=links.get( + "char_space", + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", + ), + max_counter=links.get("max_counter", 10), + canary_token_url=canary.get("token_url"), + canary_token_tries=canary.get("token_tries", 10), dashboard_secret_path=dashboard_path, - api_server_url=api.get('server_url'), - api_server_port=api.get('server_port', 8080), - api_server_path=api.get('server_path', '/api/v2/users'), - probability_error_codes=behavior.get('probability_error_codes', 0), - database_path=database.get('path', 'data/krawl.db'), - database_retention_days=database.get('retention_days', 30), - http_risky_methods_threshold=analyzer.get('http_risky_methods_threshold', 0.1), - violated_robots_threshold=analyzer.get('violated_robots_threshold', 0.1), - uneven_request_timing_threshold=analyzer.get('uneven_request_timing_threshold', 0.5), # coefficient of variation - uneven_request_timing_time_window_seconds=analyzer.get('uneven_request_timing_time_window_seconds', 300), - user_agents_used_threshold=analyzer.get('user_agents_used_threshold', 2), - attack_urls_threshold=analyzer.get('attack_urls_threshold', 1), - infinite_pages_for_malicious=crawl.get('infinite_pages_for_malicious', True), - max_pages_limit=crawl.get('max_pages_limit', 200), - ban_duration_seconds=crawl.get('ban_duration_seconds', 60) + api_server_url=api.get("server_url"), + api_server_port=api.get("server_port", 8080), + api_server_path=api.get("server_path", "/api/v2/users"), + probability_error_codes=behavior.get("probability_error_codes", 0), + database_path=database.get("path", "data/krawl.db"), + database_retention_days=database.get("retention_days", 30), + http_risky_methods_threshold=analyzer.get( + "http_risky_methods_threshold", 0.1 + ), + violated_robots_threshold=analyzer.get("violated_robots_threshold", 0.1), + uneven_request_timing_threshold=analyzer.get( + "uneven_request_timing_threshold", 0.5 + ), # coefficient of variation + uneven_request_timing_time_window_seconds=analyzer.get( + "uneven_request_timing_time_window_seconds", 300 + ), + user_agents_used_threshold=analyzer.get("user_agents_used_threshold", 2), + attack_urls_threshold=analyzer.get("attack_urls_threshold", 1), + infinite_pages_for_malicious=crawl.get( + "infinite_pages_for_malicious", True + ), + max_pages_limit=crawl.get("max_pages_limit", 200), + ban_duration_seconds=crawl.get("ban_duration_seconds", 60), ) + def __get_env_from_config(config: str) -> str: - - env = config.upper().replace('.', '_').replace('-', '__').replace(' ', '_') - - return f'KRAWL_{env}' + + env = config.upper().replace(".", "_").replace("-", "__").replace(" ", "_") + + return f"KRAWL_{env}" + def override_config_from_env(config: Config = None): """Initialize configuration from environment variables""" - + for field in config.__dataclass_fields__: - + env_var = __get_env_from_config(field) if env_var in os.environ: field_type = config.__dataclass_fields__[field].type @@ -140,20 +164,22 @@ def override_config_from_env(config: Config = None): elif field_type == float: setattr(config, field, float(env_value)) elif field_type == Tuple[int, int]: - parts = env_value.split(',') + parts = env_value.split(",") if len(parts) == 2: setattr(config, field, (int(parts[0]), int(parts[1]))) else: setattr(config, field, env_value) + _config_instance = None + def get_config() -> Config: """Get the singleton Config instance""" global _config_instance if _config_instance is None: _config_instance = Config.from_yaml() - + override_config_from_env(_config_instance) - - return _config_instance \ No newline at end of file + + return _config_instance diff --git a/src/database.py b/src/database.py index bfe2725..6f21d91 100644 --- a/src/database.py +++ b/src/database.py @@ -24,7 +24,15 @@ def set_sqlite_pragma(dbapi_connection, connection_record): cursor.execute("PRAGMA busy_timeout=30000") cursor.close() -from models import Base, AccessLog, CredentialAttempt, AttackDetection, IpStats, CategoryHistory + +from models import ( + Base, + AccessLog, + CredentialAttempt, + AttackDetection, + IpStats, + CategoryHistory, +) from sanitizer import ( sanitize_ip, sanitize_path, @@ -37,6 +45,7 @@ from logger import get_app_logger applogger = get_app_logger() + class DatabaseManager: """ Singleton database manager for the Krawl honeypot. @@ -44,6 +53,7 @@ class DatabaseManager: Handles database initialization, session management, and provides methods for persisting access logs, credentials, and attack detections. """ + _instance: Optional["DatabaseManager"] = None def __new__(cls) -> "DatabaseManager": @@ -72,7 +82,7 @@ class DatabaseManager: self._engine = create_engine( database_url, connect_args={"check_same_thread": False}, - echo=False # Set to True for SQL debugging + echo=False, # Set to True for SQL debugging ) # Create session factory with scoped_session for thread safety @@ -96,7 +106,9 @@ class DatabaseManager: def session(self) -> Session: """Get a thread-local database session.""" if not self._initialized: - raise RuntimeError("DatabaseManager not initialized. Call initialize() first.") + raise RuntimeError( + "DatabaseManager not initialized. Call initialize() first." + ) return self._Session() def close_session(self) -> None: @@ -113,7 +125,7 @@ class DatabaseManager: is_suspicious: bool = False, is_honeypot_trigger: bool = False, attack_types: Optional[List[str]] = None, - matched_patterns: Optional[Dict[str, str]] = None + matched_patterns: Optional[Dict[str, str]] = None, ) -> Optional[int]: """ Persist an access log entry to the database. @@ -141,7 +153,7 @@ class DatabaseManager: method=method[:10], is_suspicious=is_suspicious, is_honeypot_trigger=is_honeypot_trigger, - timestamp=datetime.now() + timestamp=datetime.now(), ) session.add(access_log) session.flush() # Get the ID before committing @@ -155,7 +167,7 @@ class DatabaseManager: attack_type=attack_type[:50], matched_pattern=sanitize_attack_pattern( matched_patterns.get(attack_type, "") - ) + ), ) session.add(detection) @@ -178,7 +190,7 @@ class DatabaseManager: ip: str, path: str, username: Optional[str] = None, - password: Optional[str] = None + password: Optional[str] = None, ) -> Optional[int]: """ Persist a credential attempt to the database. @@ -199,7 +211,7 @@ class DatabaseManager: path=sanitize_path(path), username=sanitize_credential(username), password=sanitize_credential(password), - timestamp=datetime.now() + timestamp=datetime.now(), ) session.add(credential) session.commit() @@ -230,14 +242,18 @@ class DatabaseManager: ip_stats.last_seen = now else: ip_stats = IpStats( - ip=sanitized_ip, - total_requests=1, - first_seen=now, - last_seen=now + ip=sanitized_ip, total_requests=1, first_seen=now, last_seen=now ) session.add(ip_stats) - def update_ip_stats_analysis(self, ip: str, analyzed_metrics: Dict[str, object], category: str, category_scores: Dict[str, int], last_analysis: datetime) -> None: + def update_ip_stats_analysis( + self, + ip: str, + analyzed_metrics: Dict[str, object], + category: str, + category_scores: Dict[str, int], + last_analysis: datetime, + ) -> None: """ Update IP statistics (ip is already persisted). Records category change in history if category has changed. @@ -250,7 +266,9 @@ class DatabaseManager: last_analysis: timestamp of last analysis """ - applogger.debug(f"Analyzed metrics {analyzed_metrics}, category {category}, category scores {category_scores}, last analysis {last_analysis}") + applogger.debug( + f"Analyzed metrics {analyzed_metrics}, category {category}, category scores {category_scores}, last analysis {last_analysis}" + ) applogger.info(f"IP: {ip} category has been updated to {category}") session = self.session @@ -260,7 +278,9 @@ class DatabaseManager: # Check if category has changed and record it old_category = ip_stats.category if old_category != category: - self._record_category_change(sanitized_ip, old_category, category, last_analysis) + self._record_category_change( + sanitized_ip, old_category, category, last_analysis + ) ip_stats.analyzed_metrics = analyzed_metrics ip_stats.category = category @@ -286,11 +306,12 @@ class DatabaseManager: sanitized_ip = sanitize_ip(ip) ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first() - # Record the manual category change old_category = ip_stats.category if old_category != category: - self._record_category_change(sanitized_ip, old_category, category, datetime.now()) + self._record_category_change( + sanitized_ip, old_category, category, datetime.now() + ) ip_stats.category = category ip_stats.manual_category = True @@ -301,7 +322,13 @@ class DatabaseManager: session.rollback() print(f"Error updating manual category: {e}") - def _record_category_change(self, ip: str, old_category: Optional[str], new_category: str, timestamp: datetime) -> None: + def _record_category_change( + self, + ip: str, + old_category: Optional[str], + new_category: str, + timestamp: datetime, + ) -> None: """ Internal method to record category changes in history. Only records if there's an actual change from a previous category. @@ -323,7 +350,7 @@ class DatabaseManager: ip=ip, old_category=old_category, new_category=new_category, - timestamp=timestamp + timestamp=timestamp, ) session.add(history_entry) session.commit() @@ -344,22 +371,32 @@ class DatabaseManager: session = self.session try: sanitized_ip = sanitize_ip(ip) - history = session.query(CategoryHistory).filter( - CategoryHistory.ip == sanitized_ip - ).order_by(CategoryHistory.timestamp.asc()).all() + history = ( + session.query(CategoryHistory) + .filter(CategoryHistory.ip == sanitized_ip) + .order_by(CategoryHistory.timestamp.asc()) + .all() + ) return [ { - 'old_category': h.old_category, - 'new_category': h.new_category, - 'timestamp': h.timestamp.isoformat() + "old_category": h.old_category, + "new_category": h.new_category, + "timestamp": h.timestamp.isoformat(), } for h in history ] finally: self.close_session() - def update_ip_rep_infos(self, ip: str, country_code: str, asn: str, asn_org: str, list_on: Dict[str,str]) -> None: + def update_ip_rep_infos( + self, + ip: str, + country_code: str, + asn: str, + asn_org: str, + list_on: Dict[str, str], + ) -> None: """ Update IP rep stats @@ -400,20 +437,25 @@ class DatabaseManager: """ session = self.session try: - ips = session.query(IpStats.ip).filter( - IpStats.country_code.is_(None), - ~IpStats.ip.like('10.%'), - ~IpStats.ip.like('172.16.%'), - ~IpStats.ip.like('172.17.%'), - ~IpStats.ip.like('172.18.%'), - ~IpStats.ip.like('172.19.%'), - ~IpStats.ip.like('172.2_.%'), - ~IpStats.ip.like('172.30.%'), - ~IpStats.ip.like('172.31.%'), - ~IpStats.ip.like('192.168.%'), - ~IpStats.ip.like('127.%'), - ~IpStats.ip.like('169.254.%') - ).limit(limit).all() + ips = ( + session.query(IpStats.ip) + .filter( + IpStats.country_code.is_(None), + ~IpStats.ip.like("10.%"), + ~IpStats.ip.like("172.16.%"), + ~IpStats.ip.like("172.17.%"), + ~IpStats.ip.like("172.18.%"), + ~IpStats.ip.like("172.19.%"), + ~IpStats.ip.like("172.2_.%"), + ~IpStats.ip.like("172.30.%"), + ~IpStats.ip.like("172.31.%"), + ~IpStats.ip.like("192.168.%"), + ~IpStats.ip.like("127.%"), + ~IpStats.ip.like("169.254.%"), + ) + .limit(limit) + .all() + ) return [ip[0] for ip in ips] finally: self.close_session() @@ -424,7 +466,7 @@ class DatabaseManager: offset: int = 0, ip_filter: Optional[str] = None, suspicious_only: bool = False, - since_minutes: Optional[int] = None + since_minutes: Optional[int] = None, ) -> List[Dict[str, Any]]: """ Retrieve access logs with optional filtering. @@ -455,15 +497,15 @@ class DatabaseManager: return [ { - 'id': log.id, - 'ip': log.ip, - 'path': log.path, - 'user_agent': log.user_agent, - 'method': log.method, - 'is_suspicious': log.is_suspicious, - 'is_honeypot_trigger': log.is_honeypot_trigger, - 'timestamp': log.timestamp.isoformat(), - 'attack_types': [d.attack_type for d in log.attack_detections] + "id": log.id, + "ip": log.ip, + "path": log.path, + "user_agent": log.user_agent, + "method": log.method, + "is_suspicious": log.is_suspicious, + "is_honeypot_trigger": log.is_honeypot_trigger, + "timestamp": log.timestamp.isoformat(), + "attack_types": [d.attack_type for d in log.attack_detections], } for log in logs ] @@ -521,10 +563,7 @@ class DatabaseManager: # self.close_session() def get_credential_attempts( - self, - limit: int = 100, - offset: int = 0, - ip_filter: Optional[str] = None + self, limit: int = 100, offset: int = 0, ip_filter: Optional[str] = None ) -> List[Dict[str, Any]]: """ Retrieve credential attempts with optional filtering. @@ -550,12 +589,12 @@ class DatabaseManager: return [ { - 'id': attempt.id, - 'ip': attempt.ip, - 'path': attempt.path, - 'username': attempt.username, - 'password': attempt.password, - 'timestamp': attempt.timestamp.isoformat() + "id": attempt.id, + "ip": attempt.ip, + "path": attempt.path, + "username": attempt.username, + "password": attempt.password, + "timestamp": attempt.timestamp.isoformat(), } for attempt in attempts ] @@ -574,26 +613,29 @@ class DatabaseManager: """ session = self.session try: - stats = session.query(IpStats).order_by( - IpStats.total_requests.desc() - ).limit(limit).all() + stats = ( + session.query(IpStats) + .order_by(IpStats.total_requests.desc()) + .limit(limit) + .all() + ) return [ { - 'ip': s.ip, - 'total_requests': s.total_requests, - 'first_seen': s.first_seen.isoformat(), - 'last_seen': s.last_seen.isoformat(), - 'country_code': s.country_code, - 'city': s.city, - 'asn': s.asn, - 'asn_org': s.asn_org, - 'reputation_score': s.reputation_score, - 'reputation_source': s.reputation_source, - 'analyzed_metrics': s.analyzed_metrics, - 'category': s.category, - 'manual_category': s.manual_category, - 'last_analysis': s.last_analysis + "ip": s.ip, + "total_requests": s.total_requests, + "first_seen": s.first_seen.isoformat(), + "last_seen": s.last_seen.isoformat(), + "country_code": s.country_code, + "city": s.city, + "asn": s.asn, + "asn_org": s.asn_org, + "reputation_score": s.reputation_score, + "reputation_source": s.reputation_source, + "analyzed_metrics": s.analyzed_metrics, + "category": s.category, + "manual_category": s.manual_category, + "last_analysis": s.last_analysis, } for s in stats ] @@ -621,23 +663,25 @@ class DatabaseManager: category_history = self.get_category_history(ip) return { - 'ip': stat.ip, - 'total_requests': stat.total_requests, - 'first_seen': stat.first_seen.isoformat() if stat.first_seen else None, - 'last_seen': stat.last_seen.isoformat() if stat.last_seen else None, - 'country_code': stat.country_code, - 'city': stat.city, - 'asn': stat.asn, - 'asn_org': stat.asn_org, - 'list_on': stat.list_on or {}, - 'reputation_score': stat.reputation_score, - 'reputation_source': stat.reputation_source, - 'analyzed_metrics': stat.analyzed_metrics or {}, - 'category': stat.category, - 'category_scores': stat.category_scores or {}, - 'manual_category': stat.manual_category, - 'last_analysis': stat.last_analysis.isoformat() if stat.last_analysis else None, - 'category_history': category_history + "ip": stat.ip, + "total_requests": stat.total_requests, + "first_seen": stat.first_seen.isoformat() if stat.first_seen else None, + "last_seen": stat.last_seen.isoformat() if stat.last_seen else None, + "country_code": stat.country_code, + "city": stat.city, + "asn": stat.asn, + "asn_org": stat.asn_org, + "list_on": stat.list_on or {}, + "reputation_score": stat.reputation_score, + "reputation_source": stat.reputation_source, + "analyzed_metrics": stat.analyzed_metrics or {}, + "category": stat.category, + "category_scores": stat.category_scores or {}, + "manual_category": stat.manual_category, + "last_analysis": ( + stat.last_analysis.isoformat() if stat.last_analysis else None + ), + "category_history": category_history, } finally: self.close_session() @@ -654,25 +698,32 @@ class DatabaseManager: try: # Get main aggregate counts in one query result = session.query( - func.count(AccessLog.id).label('total_accesses'), - func.count(distinct(AccessLog.ip)).label('unique_ips'), - func.count(distinct(AccessLog.path)).label('unique_paths'), - func.sum(case((AccessLog.is_suspicious == True, 1), else_=0)).label('suspicious_accesses'), - func.sum(case((AccessLog.is_honeypot_trigger == True, 1), else_=0)).label('honeypot_triggered') + func.count(AccessLog.id).label("total_accesses"), + func.count(distinct(AccessLog.ip)).label("unique_ips"), + func.count(distinct(AccessLog.path)).label("unique_paths"), + func.sum(case((AccessLog.is_suspicious == True, 1), else_=0)).label( + "suspicious_accesses" + ), + func.sum( + case((AccessLog.is_honeypot_trigger == True, 1), else_=0) + ).label("honeypot_triggered"), ).first() # Get unique IPs that triggered honeypots - honeypot_ips = session.query( - func.count(distinct(AccessLog.ip)) - ).filter(AccessLog.is_honeypot_trigger == True).scalar() or 0 + honeypot_ips = ( + session.query(func.count(distinct(AccessLog.ip))) + .filter(AccessLog.is_honeypot_trigger == True) + .scalar() + or 0 + ) return { - 'total_accesses': result.total_accesses or 0, - 'unique_ips': result.unique_ips or 0, - 'unique_paths': result.unique_paths or 0, - 'suspicious_accesses': int(result.suspicious_accesses or 0), - 'honeypot_triggered': int(result.honeypot_triggered or 0), - 'honeypot_ips': honeypot_ips + "total_accesses": result.total_accesses or 0, + "unique_ips": result.unique_ips or 0, + "unique_paths": result.unique_paths or 0, + "suspicious_accesses": int(result.suspicious_accesses or 0), + "honeypot_triggered": int(result.honeypot_triggered or 0), + "honeypot_ips": honeypot_ips, } finally: self.close_session() @@ -689,12 +740,13 @@ class DatabaseManager: """ session = self.session try: - results = session.query( - AccessLog.ip, - func.count(AccessLog.id).label('count') - ).group_by(AccessLog.ip).order_by( - func.count(AccessLog.id).desc() - ).limit(limit).all() + results = ( + session.query(AccessLog.ip, func.count(AccessLog.id).label("count")) + .group_by(AccessLog.ip) + .order_by(func.count(AccessLog.id).desc()) + .limit(limit) + .all() + ) return [(row.ip, row.count) for row in results] finally: @@ -712,12 +764,13 @@ class DatabaseManager: """ session = self.session try: - results = session.query( - AccessLog.path, - func.count(AccessLog.id).label('count') - ).group_by(AccessLog.path).order_by( - func.count(AccessLog.id).desc() - ).limit(limit).all() + results = ( + session.query(AccessLog.path, func.count(AccessLog.id).label("count")) + .group_by(AccessLog.path) + .order_by(func.count(AccessLog.id).desc()) + .limit(limit) + .all() + ) return [(row.path, row.count) for row in results] finally: @@ -735,15 +788,16 @@ class DatabaseManager: """ session = self.session try: - results = session.query( - AccessLog.user_agent, - func.count(AccessLog.id).label('count') - ).filter( - AccessLog.user_agent.isnot(None), - AccessLog.user_agent != '' - ).group_by(AccessLog.user_agent).order_by( - func.count(AccessLog.id).desc() - ).limit(limit).all() + results = ( + session.query( + AccessLog.user_agent, func.count(AccessLog.id).label("count") + ) + .filter(AccessLog.user_agent.isnot(None), AccessLog.user_agent != "") + .group_by(AccessLog.user_agent) + .order_by(func.count(AccessLog.id).desc()) + .limit(limit) + .all() + ) return [(row.user_agent, row.count) for row in results] finally: @@ -761,16 +815,20 @@ class DatabaseManager: """ session = self.session try: - logs = session.query(AccessLog).filter( - AccessLog.is_suspicious == True - ).order_by(AccessLog.timestamp.desc()).limit(limit).all() + logs = ( + session.query(AccessLog) + .filter(AccessLog.is_suspicious == True) + .order_by(AccessLog.timestamp.desc()) + .limit(limit) + .all() + ) return [ { - 'ip': log.ip, - 'path': log.path, - 'user_agent': log.user_agent, - 'timestamp': log.timestamp.isoformat() + "ip": log.ip, + "path": log.path, + "user_agent": log.user_agent, + "timestamp": log.timestamp.isoformat(), } for log in logs ] @@ -787,12 +845,11 @@ class DatabaseManager: session = self.session try: # Get all honeypot triggers grouped by IP - results = session.query( - AccessLog.ip, - AccessLog.path - ).filter( - AccessLog.is_honeypot_trigger == True - ).all() + results = ( + session.query(AccessLog.ip, AccessLog.path) + .filter(AccessLog.is_honeypot_trigger == True) + .all() + ) # Group paths by IP ip_paths: Dict[str, List[str]] = {} @@ -819,17 +876,21 @@ class DatabaseManager: session = self.session try: # Get access logs that have attack detections - logs = session.query(AccessLog).join( - AttackDetection - ).order_by(AccessLog.timestamp.desc()).limit(limit).all() + logs = ( + session.query(AccessLog) + .join(AttackDetection) + .order_by(AccessLog.timestamp.desc()) + .limit(limit) + .all() + ) return [ { - 'ip': log.ip, - 'path': log.path, - 'user_agent': log.user_agent, - 'timestamp': log.timestamp.isoformat(), - 'attack_types': [d.attack_type for d in log.attack_detections] + "ip": log.ip, + "path": log.path, + "user_agent": log.user_agent, + "timestamp": log.timestamp.isoformat(), + "attack_types": [d.attack_type for d in log.attack_detections], } for log in logs ] diff --git a/src/generators.py b/src/generators.py index 92eb590..fd29f38 100644 --- a/src/generators.py +++ b/src/generators.py @@ -11,6 +11,7 @@ from templates import html_templates from wordlists import get_wordlists from config import get_config + def random_username() -> str: """Generate random username""" wl = get_wordlists() @@ -21,10 +22,10 @@ def random_password() -> str: """Generate random password""" wl = get_wordlists() templates = [ - lambda: ''.join(random.choices(string.ascii_letters + string.digits, k=12)), + lambda: "".join(random.choices(string.ascii_letters + string.digits, k=12)), lambda: f"{random.choice(wl.password_prefixes)}{random.randint(100, 999)}!", lambda: f"{random.choice(wl.simple_passwords)}{random.randint(1000, 9999)}", - lambda: ''.join(random.choices(string.ascii_lowercase, k=8)), + lambda: "".join(random.choices(string.ascii_lowercase, k=8)), ] return random.choice(templates)() @@ -36,6 +37,7 @@ def random_email(username: str = None) -> str: username = random_username() return f"{username}@{random.choice(wl.email_domains)}" + def random_server_header() -> str: """Generate random server header from wordlists""" config = get_config() @@ -44,10 +46,11 @@ def random_server_header() -> str: wl = get_wordlists() return random.choice(wl.server_headers) + def random_api_key() -> str: """Generate random API key""" wl = get_wordlists() - key = ''.join(random.choices(string.ascii_letters + string.digits, k=32)) + key = "".join(random.choices(string.ascii_letters + string.digits, k=32)) return random.choice(wl.api_key_prefixes) + key @@ -87,14 +90,16 @@ def users_json() -> str: users = [] for i in range(random.randint(3, 8)): username = random_username() - users.append({ - "id": i + 1, - "username": username, - "email": random_email(username), - "password": random_password(), - "role": random.choice(wl.user_roles), - "api_token": random_api_key() - }) + users.append( + { + "id": i + 1, + "username": username, + "email": random_email(username), + "password": random_password(), + "role": random.choice(wl.user_roles), + "api_token": random_api_key(), + } + ) return json.dumps({"users": users}, indent=2) @@ -102,20 +107,28 @@ def api_keys_json() -> str: """Generate fake api_keys.json with random data""" keys = { "stripe": { - "public_key": "pk_live_" + ''.join(random.choices(string.ascii_letters + string.digits, k=24)), - "secret_key": random_api_key() + "public_key": "pk_live_" + + "".join(random.choices(string.ascii_letters + string.digits, k=24)), + "secret_key": random_api_key(), }, "aws": { - "access_key_id": "AKIA" + ''.join(random.choices(string.ascii_uppercase + string.digits, k=16)), - "secret_access_key": ''.join(random.choices(string.ascii_letters + string.digits + '+/', k=40)) + "access_key_id": "AKIA" + + "".join(random.choices(string.ascii_uppercase + string.digits, k=16)), + "secret_access_key": "".join( + random.choices(string.ascii_letters + string.digits + "+/", k=40) + ), }, "sendgrid": { - "api_key": "SG." + ''.join(random.choices(string.ascii_letters + string.digits, k=48)) + "api_key": "SG." + + "".join(random.choices(string.ascii_letters + string.digits, k=48)) }, "twilio": { - "account_sid": "AC" + ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)), - "auth_token": ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)) - } + "account_sid": "AC" + + "".join(random.choices(string.ascii_lowercase + string.digits, k=32)), + "auth_token": "".join( + random.choices(string.ascii_lowercase + string.digits, k=32) + ), + }, } return json.dumps(keys, indent=2) @@ -123,51 +136,70 @@ def api_keys_json() -> str: def api_response(path: str) -> str: """Generate fake API JSON responses with random data""" wl = get_wordlists() - + def random_users(count: int = 3): users = [] for i in range(count): username = random_username() - users.append({ - "id": i + 1, - "username": username, - "email": random_email(username), - "role": random.choice(wl.user_roles) - }) + users.append( + { + "id": i + 1, + "username": username, + "email": random_email(username), + "role": random.choice(wl.user_roles), + } + ) return users - + responses = { - '/api/users': json.dumps({ - "users": random_users(random.randint(2, 5)), - "total": random.randint(50, 500) - }, indent=2), - '/api/v1/users': json.dumps({ - "status": "success", - "data": [{ - "id": random.randint(1, 100), - "name": random_username(), - "api_key": random_api_key() - }] - }, indent=2), - '/api/v2/secrets': json.dumps({ - "database": { - "host": random.choice(wl.database_hosts), - "username": random_username(), - "password": random_password(), - "database": random_database_name() + "/api/users": json.dumps( + { + "users": random_users(random.randint(2, 5)), + "total": random.randint(50, 500), }, - "api_keys": { - "stripe": random_api_key(), - "aws": 'AKIA' + ''.join(random.choices(string.ascii_uppercase + string.digits, k=16)) - } - }, indent=2), - '/api/config': json.dumps({ - "app_name": random.choice(wl.application_names), - "debug": random.choice([True, False]), - "secret_key": random_api_key(), - "database_url": f"postgresql://{random_username()}:{random_password()}@localhost/{random_database_name()}" - }, indent=2), - '/.env': f"""APP_NAME={random.choice(wl.application_names)} + indent=2, + ), + "/api/v1/users": json.dumps( + { + "status": "success", + "data": [ + { + "id": random.randint(1, 100), + "name": random_username(), + "api_key": random_api_key(), + } + ], + }, + indent=2, + ), + "/api/v2/secrets": json.dumps( + { + "database": { + "host": random.choice(wl.database_hosts), + "username": random_username(), + "password": random_password(), + "database": random_database_name(), + }, + "api_keys": { + "stripe": random_api_key(), + "aws": "AKIA" + + "".join( + random.choices(string.ascii_uppercase + string.digits, k=16) + ), + }, + }, + indent=2, + ), + "/api/config": json.dumps( + { + "app_name": random.choice(wl.application_names), + "debug": random.choice([True, False]), + "secret_key": random_api_key(), + "database_url": f"postgresql://{random_username()}:{random_password()}@localhost/{random_database_name()}", + }, + indent=2, + ), + "/.env": f"""APP_NAME={random.choice(wl.application_names)} DEBUG={random.choice(['true', 'false'])} APP_KEY=base64:{''.join(random.choices(string.ascii_letters + string.digits, k=32))}= DB_CONNECTION=mysql @@ -179,7 +211,7 @@ DB_PASSWORD={random_password()} AWS_ACCESS_KEY_ID=AKIA{''.join(random.choices(string.ascii_uppercase + string.digits, k=16))} AWS_SECRET_ACCESS_KEY={''.join(random.choices(string.ascii_letters + string.digits + '+/', k=40))} STRIPE_SECRET={random_api_key()} -""" +""", } return responses.get(path, json.dumps({"error": "Not found"}, indent=2)) @@ -187,11 +219,13 @@ STRIPE_SECRET={random_api_key()} def directory_listing(path: str) -> str: """Generate fake directory listing using wordlists""" wl = get_wordlists() - + files = wl.directory_files dirs = wl.directory_dirs - - selected_files = [(f, random.randint(1024, 1024*1024)) - for f in random.sample(files, min(6, len(files)))] - + + selected_files = [ + (f, random.randint(1024, 1024 * 1024)) + for f in random.sample(files, min(6, len(files))) + ] + return html_templates.directory_listing(path, dirs, selected_files) diff --git a/src/handler.py b/src/handler.py index 9cae1ce..1be7c2c 100644 --- a/src/handler.py +++ b/src/handler.py @@ -14,8 +14,13 @@ from analyzer import Analyzer from templates import html_templates from templates.dashboard_template import generate_dashboard from generators import ( - credentials_txt, passwords_txt, users_json, api_keys_json, - api_response, directory_listing, random_server_header + credentials_txt, + passwords_txt, + users_json, + api_keys_json, + api_response, + directory_listing, + random_server_header, ) from wordlists import get_wordlists from sql_errors import generate_sql_error_response, get_sql_response_with_data @@ -25,6 +30,7 @@ from server_errors import generate_server_error class Handler(BaseHTTPRequestHandler): """HTTP request handler for the deception server""" + webpages: Optional[List[str]] = None config: Config = None tracker: AccessTracker = None @@ -37,15 +43,15 @@ class Handler(BaseHTTPRequestHandler): def _get_client_ip(self) -> str: """Extract client IP address from request, checking proxy headers first""" # Headers might not be available during early error logging - if hasattr(self, 'headers') and self.headers: + if hasattr(self, "headers") and self.headers: # Check X-Forwarded-For header (set by load balancers/proxies) - forwarded_for = self.headers.get('X-Forwarded-For') + forwarded_for = self.headers.get("X-Forwarded-For") if forwarded_for: # X-Forwarded-For can contain multiple IPs, get the first (original client) - return forwarded_for.split(',')[0].strip() + return forwarded_for.split(",")[0].strip() # Check X-Real-IP header (set by nginx and other proxies) - real_ip = self.headers.get('X-Real-IP') + real_ip = self.headers.get("X-Real-IP") if real_ip: return real_ip.strip() @@ -54,7 +60,7 @@ class Handler(BaseHTTPRequestHandler): def _get_user_agent(self) -> str: """Extract user agent from request""" - return self.headers.get('User-Agent', '') + return self.headers.get("User-Agent", "") def _get_category_by_ip(self, client_ip: str) -> str: """Get the category of an IP from the database""" @@ -97,7 +103,7 @@ class Handler(BaseHTTPRequestHandler): Returns True if the path was handled, False otherwise. """ # SQL-vulnerable endpoints - sql_endpoints = ['/api/search', '/api/sql', '/api/database'] + sql_endpoints = ["/api/search", "/api/sql", "/api/database"] base_path = urlparse(path).path if base_path not in sql_endpoints: @@ -112,22 +118,30 @@ class Handler(BaseHTTPRequestHandler): user_agent = self._get_user_agent() # Always check for SQL injection patterns - error_msg, content_type, status_code = generate_sql_error_response(query_string or "") + error_msg, content_type, status_code = generate_sql_error_response( + query_string or "" + ) if error_msg: # SQL injection detected - log and return error - self.access_logger.warning(f"[SQL INJECTION DETECTED] {client_ip} - {base_path} - Query: {query_string[:100] if query_string else 'empty'}") + self.access_logger.warning( + f"[SQL INJECTION DETECTED] {client_ip} - {base_path} - Query: {query_string[:100] if query_string else 'empty'}" + ) self.send_response(status_code) - self.send_header('Content-type', content_type) + self.send_header("Content-type", content_type) self.end_headers() self.wfile.write(error_msg.encode()) else: # No injection detected - return fake data - self.access_logger.info(f"[SQL ENDPOINT] {client_ip} - {base_path} - Query: {query_string[:100] if query_string else 'empty'}") + self.access_logger.info( + f"[SQL ENDPOINT] {client_ip} - {base_path} - Query: {query_string[:100] if query_string else 'empty'}" + ) self.send_response(200) - self.send_header('Content-type', 'application/json') + self.send_header("Content-type", "application/json") self.end_headers() - response_data = get_sql_response_with_data(base_path, query_string or "") + response_data = get_sql_response_with_data( + base_path, query_string or "" + ) self.wfile.write(response_data.encode()) return True @@ -140,7 +154,7 @@ class Handler(BaseHTTPRequestHandler): # Still send a response even on error try: self.send_response(500) - self.send_header('Content-type', 'application/json') + self.send_header("Content-type", "application/json") self.end_headers() self.wfile.write(b'{"error": "Internal server error"}') except: @@ -148,31 +162,35 @@ class Handler(BaseHTTPRequestHandler): return True def generate_page(self, seed: str, page_visit_count: int) -> str: - """Generate a webpage containing random links or canary token""" + """Generate a webpage containing random links or canary token""" random.seed(seed) num_pages = random.randint(*self.config.links_per_page_range) - + # Check if this is a good crawler by IP category from database ip_category = self._get_category_by_ip(self._get_client_ip()) - + # Determine if we should apply crawler page limit based on config and IP category should_apply_crawler_limit = False if self.config.infinite_pages_for_malicious: - if (ip_category == "good_crawler" or ip_category == "regular_user") and page_visit_count >= self.config.max_pages_limit: + if ( + ip_category == "good_crawler" or ip_category == "regular_user" + ) and page_visit_count >= self.config.max_pages_limit: should_apply_crawler_limit = True else: - if (ip_category == "good_crawler" or ip_category == "bad_crawler" or ip_category == "attacker") and page_visit_count >= self.config.max_pages_limit: + if ( + ip_category == "good_crawler" + or ip_category == "bad_crawler" + or ip_category == "attacker" + ) and page_visit_count >= self.config.max_pages_limit: should_apply_crawler_limit = True - # If good crawler reached max pages, return a simple page with no links if should_apply_crawler_limit: return html_templates.main_page( - Handler.counter, - '

Crawl limit reached.

' + Handler.counter, "

Crawl limit reached.

" ) - + num_pages = random.randint(*self.config.links_per_page_range) # Build the content HTML @@ -189,10 +207,12 @@ class Handler(BaseHTTPRequestHandler): # Add links if self.webpages is None: for _ in range(num_pages): - address = ''.join([ - random.choice(self.config.char_space) - for _ in range(random.randint(*self.config.links_length_range)) - ]) + address = "".join( + [ + random.choice(self.config.char_space) + for _ in range(random.randint(*self.config.links_length_range)) + ] + ) content += f""" - ''' - for i, (ip, count) in enumerate(stats['top_ips']) - ]) or 'No data' + """ for i, (ip, count) in enumerate(stats["top_ips"])]) + or 'No data' + ) # Generate paths rows (CRITICAL: paths can contain XSS payloads) - top_paths_rows = '\n'.join([ - f'{i+1}{_escape(path)}{count}' - for i, (path, count) in enumerate(stats['top_paths']) - ]) or 'No data' + top_paths_rows = ( + "\n".join( + [ + f'{i+1}{_escape(path)}{count}' + for i, (path, count) in enumerate(stats["top_paths"]) + ] + ) + or 'No data' + ) # Generate User-Agent rows (CRITICAL: user agents can contain XSS payloads) - top_ua_rows = '\n'.join([ - f'{i+1}{_escape(ua[:80])}{count}' - for i, (ua, count) in enumerate(stats['top_user_agents']) - ]) or 'No data' + top_ua_rows = ( + "\n".join( + [ + f'{i+1}{_escape(ua[:80])}{count}' + for i, (ua, count) in enumerate(stats["top_user_agents"]) + ] + ) + or 'No data' + ) # Generate suspicious accesses rows with clickable IPs - suspicious_rows = '\n'.join([ - f''' + suspicious_rows = ( + "\n".join([f""" {_escape(log["ip"])} {_escape(log["path"])} {_escape(log["user_agent"][:60])} @@ -84,13 +98,13 @@ def generate_dashboard(stats: dict, dashboard_path: str = '') -> str:
Loading stats...
- ''' - for log in stats['recent_suspicious'][-10:] - ]) or 'No suspicious activity detected' + """ for log in stats["recent_suspicious"][-10:]]) + or 'No suspicious activity detected' + ) # Generate honeypot triggered IPs rows with clickable IPs - honeypot_rows = '\n'.join([ - f''' + honeypot_rows = ( + "\n".join([f""" {_escape(ip)} {_escape(", ".join(paths))} {len(paths)} @@ -101,13 +115,13 @@ def generate_dashboard(stats: dict, dashboard_path: str = '') -> str:
Loading stats...
- ''' - for ip, paths in stats.get('honeypot_triggered_ips', []) - ]) or 'No honeypot triggers yet' + """ for ip, paths in stats.get("honeypot_triggered_ips", [])]) + or 'No honeypot triggers yet' + ) # Generate attack types rows with clickable IPs - attack_type_rows = '\n'.join([ - f''' + attack_type_rows = ( + "\n".join([f""" {_escape(log["ip"])} {_escape(log["path"])} {_escape(", ".join(log["attack_types"]))} @@ -120,13 +134,13 @@ def generate_dashboard(stats: dict, dashboard_path: str = '') -> str:
Loading stats...
- ''' - for log in stats.get('attack_types', [])[-10:] - ]) or 'No attacks detected' + """ for log in stats.get("attack_types", [])[-10:]]) + or 'No attacks detected' + ) # Generate credential attempts rows with clickable IPs - credential_rows = '\n'.join([ - f''' + credential_rows = ( + "\n".join([f""" {_escape(log["ip"])} {_escape(log["username"])} {_escape(log["password"])} @@ -139,9 +153,9 @@ def generate_dashboard(stats: dict, dashboard_path: str = '') -> str:
Loading stats...
- ''' - for log in stats.get('credential_attempts', [])[-20:] - ]) or 'No credentials captured yet' + """ for log in stats.get("credential_attempts", [])[-20:]]) + or 'No credentials captured yet' + ) return f""" diff --git a/src/templates/template_loader.py b/src/templates/template_loader.py index fd1febc..fe53bf5 100644 --- a/src/templates/template_loader.py +++ b/src/templates/template_loader.py @@ -11,6 +11,7 @@ from typing import Dict class TemplateNotFoundError(Exception): """Raised when a template file cannot be found.""" + pass @@ -42,11 +43,11 @@ def load_template(name: str, **kwargs) -> str: """ # debug # print(f"Loading Template: {name}") - + # Check cache first if name not in _template_cache: # Determine file path based on whether name has an extension - if '.' in name: + if "." in name: file_path = _TEMPLATE_DIR / name else: file_path = _TEMPLATE_DIR / f"{name}.html" @@ -54,7 +55,7 @@ def load_template(name: str, **kwargs) -> str: if not file_path.exists(): raise TemplateNotFoundError(f"Template '{name}' not found at {file_path}") - _template_cache[name] = file_path.read_text(encoding='utf-8') + _template_cache[name] = file_path.read_text(encoding="utf-8") template = _template_cache[name] diff --git a/src/tracker.py b/src/tracker.py index da07569..f7024ac 100644 --- a/src/tracker.py +++ b/src/tracker.py @@ -17,7 +17,13 @@ class AccessTracker: Maintains in-memory structures for fast dashboard access and persists data to SQLite for long-term storage and analysis. """ - def __init__(self, max_pages_limit, ban_duration_seconds, db_manager: Optional[DatabaseManager] = None): + + def __init__( + self, + max_pages_limit, + ban_duration_seconds, + db_manager: Optional[DatabaseManager] = None, + ): """ Initialize the access tracker. @@ -32,14 +38,32 @@ class AccessTracker: self.user_agent_counts: Dict[str, int] = defaultdict(int) self.access_log: List[Dict] = [] self.credential_attempts: List[Dict] = [] - + # Track pages visited by each IP (for good crawler limiting) self.ip_page_visits: Dict[str, Dict[str, object]] = defaultdict(dict) - + self.suspicious_patterns = [ - 'bot', 'crawler', 'spider', 'scraper', 'curl', 'wget', 'python-requests', - 'scanner', 'nikto', 'sqlmap', 'nmap', 'masscan', 'nessus', 'acunetix', - 'burp', 'zap', 'w3af', 'metasploit', 'nuclei', 'gobuster', 'dirbuster' + "bot", + "crawler", + "spider", + "scraper", + "curl", + "wget", + "python-requests", + "scanner", + "nikto", + "sqlmap", + "nmap", + "masscan", + "nessus", + "acunetix", + "burp", + "zap", + "w3af", + "metasploit", + "nuclei", + "gobuster", + "dirbuster", ] # Load attack patterns from wordlists @@ -49,11 +73,11 @@ class AccessTracker: # Fallback if wordlists not loaded if not self.attack_types: self.attack_types = { - 'path_traversal': r'\.\.', - 'sql_injection': r"('|--|;|\bOR\b|\bUNION\b|\bSELECT\b|\bDROP\b)", - 'xss_attempt': r'( 0 + self.is_suspicious_user_agent(user_agent) + or self.is_honeypot_path(path) + or len(attack_findings) > 0 ) is_honeypot = self.is_honeypot_path(path) @@ -191,15 +228,17 @@ class AccessTracker: self.honeypot_triggered[ip].append(path) # In-memory storage for dashboard - self.access_log.append({ - 'ip': ip, - 'path': path, - 'user_agent': user_agent, - 'suspicious': is_suspicious, - 'honeypot_triggered': self.is_honeypot_path(path), - 'attack_types':attack_findings, - 'timestamp': datetime.now().isoformat() - }) + self.access_log.append( + { + "ip": ip, + "path": path, + "user_agent": user_agent, + "suspicious": is_suspicious, + "honeypot_triggered": self.is_honeypot_path(path), + "attack_types": attack_findings, + "timestamp": datetime.now().isoformat(), + } + ) # Persist to database if self.db: @@ -211,13 +250,13 @@ class AccessTracker: method=method, is_suspicious=is_suspicious, is_honeypot_trigger=is_honeypot, - attack_types=attack_findings if attack_findings else None + attack_types=attack_findings if attack_findings else None, ) except Exception: # Don't crash if database persistence fails pass - def detect_attack_type(self, data:str) -> list[str]: + def detect_attack_type(self, data: str) -> list[str]: """ Returns a list of all attack types found in path data """ @@ -230,27 +269,37 @@ class AccessTracker: def is_honeypot_path(self, path: str) -> bool: """Check if path is one of the honeypot traps from robots.txt""" honeypot_paths = [ - '/admin', - '/admin/', - '/backup', - '/backup/', - '/config', - '/config/', - '/private', - '/private/', - '/database', - '/database/', - '/credentials.txt', - '/passwords.txt', - '/admin_notes.txt', - '/api_keys.json', - '/.env', - '/wp-admin', - '/wp-admin/', - '/phpmyadmin', - '/phpMyAdmin/' + "/admin", + "/admin/", + "/backup", + "/backup/", + "/config", + "/config/", + "/private", + "/private/", + "/database", + "/database/", + "/credentials.txt", + "/passwords.txt", + "/admin_notes.txt", + "/api_keys.json", + "/.env", + "/wp-admin", + "/wp-admin/", + "/phpmyadmin", + "/phpMyAdmin/", ] - return path in honeypot_paths or any(hp in path.lower() for hp in ['/backup', '/admin', '/config', '/private', '/database', 'phpmyadmin']) + return path in honeypot_paths or any( + hp in path.lower() + for hp in [ + "/backup", + "/admin", + "/config", + "/private", + "/database", + "phpmyadmin", + ] + ) def is_suspicious_user_agent(self, user_agent: str) -> bool: """Check if user agent matches suspicious patterns""" @@ -263,34 +312,36 @@ class AccessTracker: """ Check if an IP has been categorized as a 'good crawler' in the database. Uses the IP category from IpStats table. - + Args: client_ip: The client IP address (will be sanitized) - + Returns: True if the IP is categorized as 'good crawler', False otherwise """ try: from sanitizer import sanitize_ip + # Sanitize the IP address safe_ip = sanitize_ip(client_ip) - + # Query the database for this IP's category db = self.db if not db: return False - + ip_stats = db.get_ip_stats_by_ip(safe_ip) - if not ip_stats or not ip_stats.get('category'): + if not ip_stats or not ip_stats.get("category"): return False - + # Check if category matches "good crawler" - category = ip_stats.get('category', '').lower().strip() + category = ip_stats.get("category", "").lower().strip() return category - + except Exception as e: # Log but don't crash on database errors import logging + logging.error(f"Error checking IP category for {client_ip}: {str(e)}") return False @@ -298,10 +349,10 @@ class AccessTracker: """ Increment page visit counter for an IP and return the new count. If ban timestamp exists and 60+ seconds have passed, reset the counter. - + Args: client_ip: The client IP address - + Returns: The updated page visit count for this IP """ @@ -309,55 +360,58 @@ class AccessTracker: # Initialize if not exists if client_ip not in self.ip_page_visits: self.ip_page_visits[client_ip] = {"count": 0, "ban_timestamp": None} - + # Increment count self.ip_page_visits[client_ip]["count"] += 1 - + # Set ban if reached limit if self.ip_page_visits[client_ip]["count"] >= self.max_pages_limit: - self.ip_page_visits[client_ip]["ban_timestamp"] = datetime.now().isoformat() - + self.ip_page_visits[client_ip][ + "ban_timestamp" + ] = datetime.now().isoformat() + return self.ip_page_visits[client_ip]["count"] - + except Exception: return 0 - + def is_banned_ip(self, client_ip: str) -> bool: """ Check if an IP is currently banned due to exceeding page visit limits. - + Args: client_ip: The client IP address Returns: True if the IP is banned, False otherwise - """ + """ try: if client_ip in self.ip_page_visits: ban_timestamp = self.ip_page_visits[client_ip]["ban_timestamp"] if ban_timestamp is not None: banned = True - - #Check if ban period has expired (> 60 seconds) - ban_time = datetime.fromisoformat(self.ip_page_visits[client_ip]["ban_timestamp"]) + + # Check if ban period has expired (> 60 seconds) + ban_time = datetime.fromisoformat( + self.ip_page_visits[client_ip]["ban_timestamp"] + ) time_diff = datetime.now() - ban_time if time_diff.total_seconds() > self.ban_duration_seconds: self.ip_page_visits[client_ip]["count"] = 0 self.ip_page_visits[client_ip]["ban_timestamp"] = None banned = False - + return banned except Exception: return False - def get_page_visit_count(self, client_ip: str) -> int: """ Get the current page visit count for an IP. - + Args: client_ip: The client IP address - + Returns: The page visit count for this IP """ @@ -372,20 +426,24 @@ class AccessTracker: def get_top_paths(self, limit: int = 10) -> List[Tuple[str, int]]: """Get top N paths by access count""" - return sorted(self.path_counts.items(), key=lambda x: x[1], reverse=True)[:limit] + return sorted(self.path_counts.items(), key=lambda x: x[1], reverse=True)[ + :limit + ] def get_top_user_agents(self, limit: int = 10) -> List[Tuple[str, int]]: """Get top N user agents by access count""" - return sorted(self.user_agent_counts.items(), key=lambda x: x[1], reverse=True)[:limit] + return sorted(self.user_agent_counts.items(), key=lambda x: x[1], reverse=True)[ + :limit + ] def get_suspicious_accesses(self, limit: int = 20) -> List[Dict]: """Get recent suspicious accesses""" - suspicious = [log for log in self.access_log if log.get('suspicious', False)] + suspicious = [log for log in self.access_log if log.get("suspicious", False)] return suspicious[-limit:] def get_attack_type_accesses(self, limit: int = 20) -> List[Dict]: """Get recent accesses with detected attack types""" - attacks = [log for log in self.access_log if log.get('attack_types')] + attacks = [log for log in self.access_log if log.get("attack_types")] return attacks[-limit:] def get_honeypot_triggered_ips(self) -> List[Tuple[str, List[str]]]: @@ -401,12 +459,12 @@ class AccessTracker: stats = self.db.get_dashboard_counts() # Add detailed lists from database - stats['top_ips'] = self.db.get_top_ips(10) - stats['top_paths'] = self.db.get_top_paths(10) - stats['top_user_agents'] = self.db.get_top_user_agents(10) - stats['recent_suspicious'] = self.db.get_recent_suspicious(20) - stats['honeypot_triggered_ips'] = self.db.get_honeypot_triggered_ips() - stats['attack_types'] = self.db.get_recent_attacks(20) - stats['credential_attempts'] = self.db.get_credential_attempts(limit=50) + stats["top_ips"] = self.db.get_top_ips(10) + stats["top_paths"] = self.db.get_top_paths(10) + stats["top_user_agents"] = self.db.get_top_user_agents(10) + stats["recent_suspicious"] = self.db.get_recent_suspicious(20) + stats["honeypot_triggered_ips"] = self.db.get_honeypot_triggered_ips() + stats["attack_types"] = self.db.get_recent_attacks(20) + stats["credential_attempts"] = self.db.get_credential_attempts(limit=50) return stats diff --git a/src/wordlists.py b/src/wordlists.py index 81f2022..1910fc7 100644 --- a/src/wordlists.py +++ b/src/wordlists.py @@ -13,122 +13,116 @@ from logger import get_app_logger class Wordlists: """Loads and provides access to wordlists from wordlists.json""" - + def __init__(self): self._data = self._load_config() - + def _load_config(self): """Load wordlists from JSON file""" - config_path = Path(__file__).parent.parent / 'wordlists.json' + config_path = Path(__file__).parent.parent / "wordlists.json" try: - with open(config_path, 'r') as f: + with open(config_path, "r") as f: return json.load(f) except FileNotFoundError: - get_app_logger().warning(f"Wordlists file {config_path} not found, using default values") + get_app_logger().warning( + f"Wordlists file {config_path} not found, using default values" + ) return self._get_defaults() except json.JSONDecodeError as e: get_app_logger().warning(f"Invalid JSON in {config_path}: {e}") return self._get_defaults() - + def _get_defaults(self): """Fallback default wordlists if JSON file is missing or invalid""" return { "usernames": { "prefixes": ["admin", "user", "root"], - "suffixes": ["", "_prod", "_dev"] + "suffixes": ["", "_prod", "_dev"], }, "passwords": { "prefixes": ["P@ssw0rd", "Admin"], - "simple": ["test", "demo", "password"] - }, - "emails": { - "domains": ["example.com", "test.com"] - }, - "api_keys": { - "prefixes": ["sk_live_", "api_", ""] + "simple": ["test", "demo", "password"], }, + "emails": {"domains": ["example.com", "test.com"]}, + "api_keys": {"prefixes": ["sk_live_", "api_", ""]}, "databases": { "names": ["production", "main_db"], - "hosts": ["localhost", "db.internal"] + "hosts": ["localhost", "db.internal"], }, - "applications": { - "names": ["WebApp", "Dashboard"] - }, - "users": { - "roles": ["Administrator", "User"] - }, - "server_headers": ["Apache/2.4.41 (Ubuntu)", "nginx/1.18.0"] + "applications": {"names": ["WebApp", "Dashboard"]}, + "users": {"roles": ["Administrator", "User"]}, + "server_headers": ["Apache/2.4.41 (Ubuntu)", "nginx/1.18.0"], } - + @property def username_prefixes(self): return self._data.get("usernames", {}).get("prefixes", []) - + @property def username_suffixes(self): return self._data.get("usernames", {}).get("suffixes", []) - + @property def password_prefixes(self): return self._data.get("passwords", {}).get("prefixes", []) - + @property def simple_passwords(self): return self._data.get("passwords", {}).get("simple", []) - + @property def email_domains(self): return self._data.get("emails", {}).get("domains", []) - + @property def api_key_prefixes(self): return self._data.get("api_keys", {}).get("prefixes", []) - + @property def database_names(self): return self._data.get("databases", {}).get("names", []) - + @property def database_hosts(self): return self._data.get("databases", {}).get("hosts", []) - + @property def application_names(self): return self._data.get("applications", {}).get("names", []) - + @property def user_roles(self): return self._data.get("users", {}).get("roles", []) - + @property def directory_files(self): return self._data.get("directory_listing", {}).get("files", []) - + @property def directory_dirs(self): return self._data.get("directory_listing", {}).get("directories", []) - + @property def error_codes(self): return self._data.get("error_codes", []) - + @property def sql_errors(self): return self._data.get("sql_errors", {}) - + @property def attack_patterns(self): return self._data.get("attack_patterns", {}) - + @property def server_errors(self): return self._data.get("server_errors", {}) - + @property def server_headers(self): return self._data.get("server_headers", []) - + @property def attack_urls(self): """Deprecated: use attack_patterns instead. Returns attack_patterns for backward compatibility.""" @@ -137,10 +131,10 @@ class Wordlists: _wordlists_instance = None + def get_wordlists(): """Get the singleton Wordlists instance""" global _wordlists_instance if _wordlists_instance is None: _wordlists_instance = Wordlists() return _wordlists_instance - diff --git a/src/xss_detector.py b/src/xss_detector.py index 0f3da14..618ccb2 100644 --- a/src/xss_detector.py +++ b/src/xss_detector.py @@ -8,25 +8,25 @@ from wordlists import get_wordlists def detect_xss_pattern(input_string: str) -> bool: if not input_string: return False - + wl = get_wordlists() - xss_pattern = wl.attack_patterns.get('xss_attempt', '') - + xss_pattern = wl.attack_patterns.get("xss_attempt", "") + if not xss_pattern: - xss_pattern = r'( str: xss_detected = False reflected_content = [] - + for key, value in input_data.items(): if detect_xss_pattern(value): xss_detected = True reflected_content.append(f"

{key}: {value}

") - + if xss_detected: html = f""" @@ -51,7 +51,7 @@ def generate_xss_response(input_data: dict) -> str: """ return html - + return """