From 130e81ad64c7740798ca59db256ccb48018b7fd2 Mon Sep 17 00:00:00 2001 From: Lorenzo Venerandi <68255980+Lore09@users.noreply.github.com> Date: Sun, 25 Jan 2026 22:50:27 +0100 Subject: [PATCH] Feat/dashboard improvements (#55) * fixed external ip resoultion * added dashboard logic division, filtering capabilities, geoip map, attacker stats * refactor: replace print statements with applogger for error logging in DatabaseManager * feat: add click listeners for IP cells in dashboard tables to fetch and display stats --------- Co-authored-by: BlessedRebuS --- exports/.gitkeep | 0 helm/Chart.yaml | 2 +- src/config.py | 2 - src/database.py | 538 ++++++++-- src/exports/malicious_ips.txt | 2 + src/handler.py | 294 ++++++ src/templates/dashboard_template.py | 1520 ++++++++++++++++++++++++--- tests/test_insert_fake_ips.py | 2 +- 8 files changed, 2101 insertions(+), 259 deletions(-) delete mode 100644 exports/.gitkeep create mode 100644 src/exports/malicious_ips.txt diff --git a/exports/.gitkeep b/exports/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/helm/Chart.yaml b/helm/Chart.yaml index 928d0f8..938bfa3 100644 --- a/helm/Chart.yaml +++ b/helm/Chart.yaml @@ -3,7 +3,7 @@ name: krawl-chart description: A Helm chart for Krawl honeypot server type: application version: 0.1.5 -appVersion: 0.1.7 +appVersion: 0.1.8 keywords: - honeypot - security diff --git a/src/config.py b/src/config.py index 1f1d122..1e96e09 100644 --- a/src/config.py +++ b/src/config.py @@ -100,8 +100,6 @@ class Config: self._server_ip = ip self._server_ip_cache_time = current_time - - get_app_logger().info(f"Server external IP detected: {ip}") return ip except Exception as e: diff --git a/src/database.py b/src/database.py index 88d72d7..80eb194 100644 --- a/src/database.py +++ b/src/database.py @@ -293,7 +293,7 @@ class DatabaseManager: session.commit() except Exception as e: session.rollback() - print(f"Error updating IP stats analysis: {e}") + applogger.error(f"Error updating IP stats analysis: {e}") def manual_update_category(self, ip: str, category: str) -> None: """ @@ -322,7 +322,7 @@ class DatabaseManager: session.commit() except Exception as e: session.rollback() - print(f"Error updating manual category: {e}") + applogger.error(f"Error updating manual category: {e}") def _record_category_change( self, @@ -514,56 +514,6 @@ class DatabaseManager: finally: self.close_session() - # def persist_ip( - # self, - # ip: str - # ) -> Optional[int]: - # """ - # Persist an ip entry to the database. - - # Args: - # ip: Client IP address - - # Returns: - # The ID of the created IpLog record, or None on error - # """ - # session = self.session - # try: - # # Create access log with sanitized fields - # ip_log = AccessLog( - # ip=sanitize_ip(ip), - # manual_category = False - # ) - # session.add(access_log) - # session.flush() # Get the ID before committing - - # # Add attack detections if any - # if attack_types: - # matched_patterns = matched_patterns or {} - # for attack_type in attack_types: - # detection = AttackDetection( - # access_log_id=access_log.id, - # attack_type=attack_type[:50], - # matched_pattern=sanitize_attack_pattern( - # matched_patterns.get(attack_type, "") - # ) - # ) - # session.add(detection) - - # # Update IP stats - # self._update_ip_stats(session, ip) - - # session.commit() - # return access_log.id - - # except Exception as e: - # session.rollback() - # # Log error but don't crash - database persistence is secondary to honeypot function - # print(f"Database error persisting access: {e}") - # return None - # finally: - # self.close_session() - def get_credential_attempts( self, limit: int = 100, offset: int = 0, ip_filter: Optional[str] = None ) -> List[Dict[str, Any]]: @@ -626,8 +576,8 @@ class DatabaseManager: { "ip": s.ip, "total_requests": s.total_requests, - "first_seen": s.first_seen.isoformat(), - "last_seen": s.last_seen.isoformat(), + "first_seen": s.first_seen.isoformat() if s.first_seen else None, + "last_seen": s.last_seen.isoformat() if s.last_seen else None, "country_code": s.country_code, "city": s.city, "asn": s.asn, @@ -637,7 +587,7 @@ class DatabaseManager: "analyzed_metrics": s.analyzed_metrics, "category": s.category, "manual_category": s.manual_category, - "last_analysis": s.last_analysis, + "last_analysis": s.last_analysis.isoformat() if s.last_analysis else None, } for s in stats ] @@ -688,6 +638,84 @@ class DatabaseManager: finally: self.close_session() + def get_attackers_paginated(self, page: int = 1, page_size: int = 25, sort_by: str = "total_requests", sort_order: str = "desc") -> Dict[str, Any]: + """ + Retrieve paginated list of attacker IPs ordered by specified field. + + Args: + page: Page number (1-indexed) + page_size: Number of results per page + sort_by: Field to sort by (total_requests, first_seen, last_seen) + sort_order: Sort order (asc or desc) + + Returns: + Dictionary with attackers list and pagination info + """ + session = self.session + try: + offset = (page - 1) * page_size + + # Validate sort parameters + valid_sort_fields = {"total_requests", "first_seen", "last_seen"} + sort_by = sort_by if sort_by in valid_sort_fields else "total_requests" + sort_order = sort_order.lower() if sort_order.lower() in {"asc", "desc"} else "desc" + + # Get total count of attackers + total_attackers = ( + session.query(IpStats) + .filter(IpStats.category == "attacker") + .count() + ) + + # Build query with sorting + query = session.query(IpStats).filter(IpStats.category == "attacker") + + if sort_by == "total_requests": + query = query.order_by( + IpStats.total_requests.desc() if sort_order == "desc" else IpStats.total_requests.asc() + ) + elif sort_by == "first_seen": + query = query.order_by( + IpStats.first_seen.desc() if sort_order == "desc" else IpStats.first_seen.asc() + ) + elif sort_by == "last_seen": + query = query.order_by( + IpStats.last_seen.desc() if sort_order == "desc" else IpStats.last_seen.asc() + ) + + # Get paginated attackers + attackers = query.offset(offset).limit(page_size).all() + + total_pages = (total_attackers + page_size - 1) // page_size + + return { + "attackers": [ + { + "ip": a.ip, + "total_requests": a.total_requests, + "first_seen": a.first_seen.isoformat() if a.first_seen else None, + "last_seen": a.last_seen.isoformat() if a.last_seen else None, + "country_code": a.country_code, + "city": a.city, + "asn": a.asn, + "asn_org": a.asn_org, + "reputation_score": a.reputation_score, + "reputation_source": a.reputation_source, + "category": a.category, + "category_scores": a.category_scores or {}, + } + for a in attackers + ], + "pagination": { + "page": page, + "page_size": page_size, + "total_attackers": total_attackers, + "total_pages": total_pages, + }, + } + finally: + self.close_session() + def get_dashboard_counts(self) -> Dict[str, int]: """ Get aggregate statistics for the dashboard (excludes local/private IPs and server IP). @@ -719,6 +747,9 @@ class DatabaseManager: suspicious_accesses = sum(1 for log in public_accesses if log.is_suspicious) honeypot_triggered = sum(1 for log in public_accesses if log.is_honeypot_trigger) honeypot_ips = len(set(log.ip for log in public_accesses if log.is_honeypot_trigger)) + + # Count unique attackers from IpStats (matching the "Attackers by Total Requests" table) + unique_attackers = session.query(IpStats).filter(IpStats.category == "attacker").count() return { "total_accesses": total_accesses, @@ -727,6 +758,7 @@ class DatabaseManager: "suspicious_accesses": suspicious_accesses, "honeypot_triggered": honeypot_triggered, "honeypot_ips": honeypot_ips, + "unique_attackers": unique_attackers, } finally: self.close_session() @@ -772,7 +804,7 @@ class DatabaseManager: Args: limit: Maximum number of results - Returns:data + Returns: List of (path, count) tuples ordered by count descending """ session = self.session @@ -929,46 +961,370 @@ class DatabaseManager: finally: self.close_session() - # def get_ip_logs( - # self, - # limit: int = 100, - # offset: int = 0, - # ip_filter: Optional[str] = None - # ) -> List[Dict[str, Any]]: - # """ - # Retrieve ip logs with optional filtering. + def get_honeypot_paginated(self, page: int = 1, page_size: int = 5, sort_by: str = "count", sort_order: str = "desc") -> Dict[str, Any]: + """ + Retrieve paginated list of honeypot-triggered IPs with their paths. - # Args: - # limit: Maximum number of records to return - # offset: Number of records to skip - # ip_filter: Filter by IP address + Args: + page: Page number (1-indexed) + page_size: Number of results per page + sort_by: Field to sort by (count or ip) + sort_order: Sort order (asc or desc) - # Returns: - # List of ip log dictionaries - # """ - # session = self.session - # try: - # query = session.query(IpLog).order_by(IpLog.last_access.desc()) + Returns: + Dictionary with honeypots list and pagination info + """ + session = self.session + try: + from config import get_config + config = get_config() + server_ip = config.get_server_ip() - # if ip_filter: - # query = query.filter(IpLog.ip == sanitize_ip(ip_filter)) + offset = (page - 1) * page_size - # logs = query.offset(offset).limit(limit).all() + # Get honeypot triggers grouped by IP + results = ( + session.query(AccessLog.ip, AccessLog.path) + .filter(AccessLog.is_honeypot_trigger == True) + .all() + ) - # return [ - # { - # 'id': log.id, - # 'ip': log.ip, - # 'stats': log.stats, - # 'category': log.category, - # 'manual_category': log.manual_category, - # 'last_evaluation': log.last_evaluation, - # 'last_access': log.last_access - # } - # for log in logs - # ] - # finally: - # self.close_session() + # Group paths by IP, filtering out invalid IPs + ip_paths: Dict[str, List[str]] = {} + for row in results: + if not is_valid_public_ip(row.ip, server_ip): + continue + if row.ip not in ip_paths: + ip_paths[row.ip] = [] + if row.path not in ip_paths[row.ip]: + ip_paths[row.ip].append(row.path) + + # Create list and sort + honeypot_list = [ + {"ip": ip, "paths": paths, "count": len(paths)} + for ip, paths in ip_paths.items() + ] + + if sort_by == "count": + honeypot_list.sort( + key=lambda x: x["count"], + reverse=(sort_order == "desc") + ) + else: # sort by ip + honeypot_list.sort( + key=lambda x: x["ip"], + reverse=(sort_order == "desc") + ) + + total_honeypots = len(honeypot_list) + paginated = honeypot_list[offset:offset + page_size] + total_pages = (total_honeypots + page_size - 1) // page_size + + return { + "honeypots": paginated, + "pagination": { + "page": page, + "page_size": page_size, + "total": total_honeypots, + "total_pages": total_pages, + }, + } + finally: + self.close_session() + + def get_credentials_paginated(self, page: int = 1, page_size: int = 5, sort_by: str = "timestamp", sort_order: str = "desc") -> Dict[str, Any]: + """ + Retrieve paginated list of credential attempts. + + Args: + page: Page number (1-indexed) + page_size: Number of results per page + sort_by: Field to sort by (timestamp, ip, username) + sort_order: Sort order (asc or desc) + + Returns: + Dictionary with credentials list and pagination info + """ + session = self.session + try: + offset = (page - 1) * page_size + + # Validate sort parameters + valid_sort_fields = {"timestamp", "ip", "username"} + sort_by = sort_by if sort_by in valid_sort_fields else "timestamp" + sort_order = sort_order.lower() if sort_order.lower() in {"asc", "desc"} else "desc" + + total_credentials = session.query(CredentialAttempt).count() + + # Build query with sorting + query = session.query(CredentialAttempt) + + if sort_by == "timestamp": + query = query.order_by( + CredentialAttempt.timestamp.desc() if sort_order == "desc" else CredentialAttempt.timestamp.asc() + ) + elif sort_by == "ip": + query = query.order_by( + CredentialAttempt.ip.desc() if sort_order == "desc" else CredentialAttempt.ip.asc() + ) + elif sort_by == "username": + query = query.order_by( + CredentialAttempt.username.desc() if sort_order == "desc" else CredentialAttempt.username.asc() + ) + + credentials = query.offset(offset).limit(page_size).all() + total_pages = (total_credentials + page_size - 1) // page_size + + return { + "credentials": [ + { + "ip": c.ip, + "username": c.username, + "password": c.password, + "path": c.path, + "timestamp": c.timestamp.isoformat() if c.timestamp else None, + } + for c in credentials + ], + "pagination": { + "page": page, + "page_size": page_size, + "total": total_credentials, + "total_pages": total_pages, + }, + } + finally: + self.close_session() + + def get_top_ips_paginated(self, page: int = 1, page_size: int = 5, sort_by: str = "count", sort_order: str = "desc") -> Dict[str, Any]: + """ + Retrieve paginated list of top IP addresses by access count. + + Args: + page: Page number (1-indexed) + page_size: Number of results per page + sort_by: Field to sort by (count or ip) + sort_order: Sort order (asc or desc) + + Returns: + Dictionary with IPs list and pagination info + """ + session = self.session + try: + from config import get_config + config = get_config() + server_ip = config.get_server_ip() + + offset = (page - 1) * page_size + + results = ( + session.query(AccessLog.ip, func.count(AccessLog.id).label("count")) + .group_by(AccessLog.ip) + .all() + ) + + # Filter out local/private IPs and server IP, then sort + filtered = [ + {"ip": row.ip, "count": row.count} + for row in results + if is_valid_public_ip(row.ip, server_ip) + ] + + if sort_by == "count": + filtered.sort(key=lambda x: x["count"], reverse=(sort_order == "desc")) + else: # sort by ip + filtered.sort(key=lambda x: x["ip"], reverse=(sort_order == "desc")) + + total_ips = len(filtered) + paginated = filtered[offset:offset + page_size] + total_pages = (total_ips + page_size - 1) // page_size + + return { + "ips": paginated, + "pagination": { + "page": page, + "page_size": page_size, + "total": total_ips, + "total_pages": total_pages, + }, + } + finally: + self.close_session() + + def get_top_paths_paginated(self, page: int = 1, page_size: int = 5, sort_by: str = "count", sort_order: str = "desc") -> Dict[str, Any]: + """ + Retrieve paginated list of top paths by access count. + + Args: + page: Page number (1-indexed) + page_size: Number of results per page + sort_by: Field to sort by (count or path) + sort_order: Sort order (asc or desc) + + Returns: + Dictionary with paths list and pagination info + """ + session = self.session + try: + offset = (page - 1) * page_size + + results = ( + session.query(AccessLog.path, func.count(AccessLog.id).label("count")) + .group_by(AccessLog.path) + .all() + ) + + # Create list and sort + paths_list = [ + {"path": row.path, "count": row.count} + for row in results + ] + + if sort_by == "count": + paths_list.sort(key=lambda x: x["count"], reverse=(sort_order == "desc")) + else: # sort by path + paths_list.sort(key=lambda x: x["path"], reverse=(sort_order == "desc")) + + total_paths = len(paths_list) + paginated = paths_list[offset:offset + page_size] + total_pages = (total_paths + page_size - 1) // page_size + + return { + "paths": paginated, + "pagination": { + "page": page, + "page_size": page_size, + "total": total_paths, + "total_pages": total_pages, + }, + } + finally: + self.close_session() + + def get_top_user_agents_paginated(self, page: int = 1, page_size: int = 5, sort_by: str = "count", sort_order: str = "desc") -> Dict[str, Any]: + """ + Retrieve paginated list of top user agents by access count. + + Args: + page: Page number (1-indexed) + page_size: Number of results per page + sort_by: Field to sort by (count or user_agent) + sort_order: Sort order (asc or desc) + + Returns: + Dictionary with user agents list and pagination info + """ + session = self.session + try: + offset = (page - 1) * page_size + + results = ( + session.query(AccessLog.user_agent, func.count(AccessLog.id).label("count")) + .filter(AccessLog.user_agent.isnot(None), AccessLog.user_agent != "") + .group_by(AccessLog.user_agent) + .all() + ) + + # Create list and sort + ua_list = [ + {"user_agent": row.user_agent, "count": row.count} + for row in results + ] + + if sort_by == "count": + ua_list.sort(key=lambda x: x["count"], reverse=(sort_order == "desc")) + else: # sort by user_agent + ua_list.sort(key=lambda x: x["user_agent"], reverse=(sort_order == "desc")) + + total_uas = len(ua_list) + paginated = ua_list[offset:offset + page_size] + total_pages = (total_uas + page_size - 1) // page_size + + return { + "user_agents": paginated, + "pagination": { + "page": page, + "page_size": page_size, + "total": total_uas, + "total_pages": total_pages, + }, + } + finally: + self.close_session() + + def get_attack_types_paginated(self, page: int = 1, page_size: int = 5, sort_by: str = "timestamp", sort_order: str = "desc") -> Dict[str, Any]: + """ + Retrieve paginated list of detected attack types with access logs. + + Args: + page: Page number (1-indexed) + page_size: Number of results per page + sort_by: Field to sort by (timestamp, ip, attack_type) + sort_order: Sort order (asc or desc) + + Returns: + Dictionary with attacks list and pagination info + """ + session = self.session + try: + offset = (page - 1) * page_size + + # Validate sort parameters + valid_sort_fields = {"timestamp", "ip", "attack_type"} + sort_by = sort_by if sort_by in valid_sort_fields else "timestamp" + sort_order = sort_order.lower() if sort_order.lower() in {"asc", "desc"} else "desc" + + # Get all access logs with attack detections + query = ( + session.query(AccessLog) + .join(AttackDetection) + ) + + if sort_by == "timestamp": + query = query.order_by( + AccessLog.timestamp.desc() if sort_order == "desc" else AccessLog.timestamp.asc() + ) + elif sort_by == "ip": + query = query.order_by( + AccessLog.ip.desc() if sort_order == "desc" else AccessLog.ip.asc() + ) + + logs = query.all() + + # Convert to attack list + attack_list = [ + { + "ip": log.ip, + "path": log.path, + "user_agent": log.user_agent, + "timestamp": log.timestamp.isoformat() if log.timestamp else None, + "attack_types": [d.attack_type for d in log.attack_detections], + } + for log in logs + ] + + # Sort by attack_type if needed (this must be done post-fetch since it's in a related table) + if sort_by == "attack_type": + attack_list.sort( + key=lambda x: x["attack_types"][0] if x["attack_types"] else "", + reverse=(sort_order == "desc") + ) + + total_attacks = len(attack_list) + paginated = attack_list[offset:offset + page_size] + total_pages = (total_attacks + page_size - 1) // page_size + + return { + "attacks": paginated, + "pagination": { + "page": page, + "page_size": page_size, + "total": total_attacks, + "total_pages": total_pages, + }, + } + finally: + self.close_session() # Module-level singleton instance diff --git a/src/exports/malicious_ips.txt b/src/exports/malicious_ips.txt new file mode 100644 index 0000000..2541a21 --- /dev/null +++ b/src/exports/malicious_ips.txt @@ -0,0 +1,2 @@ +175.23.45.67 +210.45.67.89 diff --git a/src/handler.py b/src/handler.py index 1be7c2c..df04465 100644 --- a/src/handler.py +++ b/src/handler.py @@ -510,6 +510,72 @@ class Handler(BaseHTTPRequestHandler): self.app_logger.error(f"Error generating dashboard: {e}") return + # API endpoint for fetching all IP statistics + if self.config.dashboard_secret_path and self.path == f"{self.config.dashboard_secret_path}/api/all-ip-stats": + self.send_response(200) + self.send_header("Content-type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header( + "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0" + ) + self.send_header("Pragma", "no-cache") + self.send_header("Expires", "0") + self.end_headers() + try: + from database import get_database + import json + + db = get_database() + ip_stats_list = db.get_ip_stats(limit=500) + self.wfile.write(json.dumps({"ips": ip_stats_list}).encode()) + except BrokenPipeError: + pass + except Exception as e: + self.app_logger.error(f"Error fetching all IP stats: {e}") + self.wfile.write(json.dumps({"error": str(e)}).encode()) + return + + # API endpoint for fetching paginated attackers + if self.config.dashboard_secret_path and self.path.startswith( + f"{self.config.dashboard_secret_path}/api/attackers" + ): + self.send_response(200) + self.send_header("Content-type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header( + "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0" + ) + self.send_header("Pragma", "no-cache") + self.send_header("Expires", "0") + self.end_headers() + try: + from database import get_database + import json + from urllib.parse import urlparse, parse_qs + + db = get_database() + + # Parse query parameters + parsed_url = urlparse(self.path) + query_params = parse_qs(parsed_url.query) + page = int(query_params.get("page", ["1"])[0]) + page_size = int(query_params.get("page_size", ["25"])[0]) + sort_by = query_params.get("sort_by", ["total_requests"])[0] + sort_order = query_params.get("sort_order", ["desc"])[0] + + # Ensure valid parameters + page = max(1, page) + page_size = min(max(1, page_size), 100) # Max 100 per page + + result = db.get_attackers_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order) + self.wfile.write(json.dumps(result).encode()) + except BrokenPipeError: + pass + except Exception as e: + self.app_logger.error(f"Error fetching attackers: {e}") + self.wfile.write(json.dumps({"error": str(e)}).encode()) + return + # API endpoint for fetching IP stats if self.config.dashboard_secret_path and self.path.startswith( f"{self.config.dashboard_secret_path}/api/ip-stats/" @@ -544,6 +610,234 @@ class Handler(BaseHTTPRequestHandler): self.wfile.write(json.dumps({"error": str(e)}).encode()) return + # API endpoint for paginated honeypot triggers + if self.config.dashboard_secret_path and self.path.startswith( + f"{self.config.dashboard_secret_path}/api/honeypot" + ): + self.send_response(200) + self.send_header("Content-type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header( + "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0" + ) + self.send_header("Pragma", "no-cache") + self.send_header("Expires", "0") + self.end_headers() + try: + from database import get_database + import json + from urllib.parse import urlparse, parse_qs + + db = get_database() + parsed_url = urlparse(self.path) + query_params = parse_qs(parsed_url.query) + page = int(query_params.get("page", ["1"])[0]) + page_size = int(query_params.get("page_size", ["5"])[0]) + sort_by = query_params.get("sort_by", ["count"])[0] + sort_order = query_params.get("sort_order", ["desc"])[0] + + page = max(1, page) + page_size = min(max(1, page_size), 100) + + result = db.get_honeypot_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order) + self.wfile.write(json.dumps(result).encode()) + except BrokenPipeError: + pass + except Exception as e: + self.app_logger.error(f"Error fetching honeypot data: {e}") + self.wfile.write(json.dumps({"error": str(e)}).encode()) + return + + # API endpoint for paginated credentials + if self.config.dashboard_secret_path and self.path.startswith( + f"{self.config.dashboard_secret_path}/api/credentials" + ): + self.send_response(200) + self.send_header("Content-type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header( + "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0" + ) + self.send_header("Pragma", "no-cache") + self.send_header("Expires", "0") + self.end_headers() + try: + from database import get_database + import json + from urllib.parse import urlparse, parse_qs + + db = get_database() + parsed_url = urlparse(self.path) + query_params = parse_qs(parsed_url.query) + page = int(query_params.get("page", ["1"])[0]) + page_size = int(query_params.get("page_size", ["5"])[0]) + sort_by = query_params.get("sort_by", ["timestamp"])[0] + sort_order = query_params.get("sort_order", ["desc"])[0] + + page = max(1, page) + page_size = min(max(1, page_size), 100) + + result = db.get_credentials_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order) + self.wfile.write(json.dumps(result).encode()) + except BrokenPipeError: + pass + except Exception as e: + self.app_logger.error(f"Error fetching credentials: {e}") + self.wfile.write(json.dumps({"error": str(e)}).encode()) + return + + # API endpoint for paginated top IPs + if self.config.dashboard_secret_path and self.path.startswith( + f"{self.config.dashboard_secret_path}/api/top-ips" + ): + self.send_response(200) + self.send_header("Content-type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header( + "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0" + ) + self.send_header("Pragma", "no-cache") + self.send_header("Expires", "0") + self.end_headers() + try: + from database import get_database + import json + from urllib.parse import urlparse, parse_qs + + db = get_database() + parsed_url = urlparse(self.path) + query_params = parse_qs(parsed_url.query) + page = int(query_params.get("page", ["1"])[0]) + page_size = int(query_params.get("page_size", ["5"])[0]) + sort_by = query_params.get("sort_by", ["count"])[0] + sort_order = query_params.get("sort_order", ["desc"])[0] + + page = max(1, page) + page_size = min(max(1, page_size), 100) + + result = db.get_top_ips_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order) + self.wfile.write(json.dumps(result).encode()) + except BrokenPipeError: + pass + except Exception as e: + self.app_logger.error(f"Error fetching top IPs: {e}") + self.wfile.write(json.dumps({"error": str(e)}).encode()) + return + + # API endpoint for paginated top paths + if self.config.dashboard_secret_path and self.path.startswith( + f"{self.config.dashboard_secret_path}/api/top-paths" + ): + self.send_response(200) + self.send_header("Content-type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header( + "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0" + ) + self.send_header("Pragma", "no-cache") + self.send_header("Expires", "0") + self.end_headers() + try: + from database import get_database + import json + from urllib.parse import urlparse, parse_qs + + db = get_database() + parsed_url = urlparse(self.path) + query_params = parse_qs(parsed_url.query) + page = int(query_params.get("page", ["1"])[0]) + page_size = int(query_params.get("page_size", ["5"])[0]) + sort_by = query_params.get("sort_by", ["count"])[0] + sort_order = query_params.get("sort_order", ["desc"])[0] + + page = max(1, page) + page_size = min(max(1, page_size), 100) + + result = db.get_top_paths_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order) + self.wfile.write(json.dumps(result).encode()) + except BrokenPipeError: + pass + except Exception as e: + self.app_logger.error(f"Error fetching top paths: {e}") + self.wfile.write(json.dumps({"error": str(e)}).encode()) + return + + # API endpoint for paginated top user agents + if self.config.dashboard_secret_path and self.path.startswith( + f"{self.config.dashboard_secret_path}/api/top-user-agents" + ): + self.send_response(200) + self.send_header("Content-type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header( + "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0" + ) + self.send_header("Pragma", "no-cache") + self.send_header("Expires", "0") + self.end_headers() + try: + from database import get_database + import json + from urllib.parse import urlparse, parse_qs + + db = get_database() + parsed_url = urlparse(self.path) + query_params = parse_qs(parsed_url.query) + page = int(query_params.get("page", ["1"])[0]) + page_size = int(query_params.get("page_size", ["5"])[0]) + sort_by = query_params.get("sort_by", ["count"])[0] + sort_order = query_params.get("sort_order", ["desc"])[0] + + page = max(1, page) + page_size = min(max(1, page_size), 100) + + result = db.get_top_user_agents_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order) + self.wfile.write(json.dumps(result).encode()) + except BrokenPipeError: + pass + except Exception as e: + self.app_logger.error(f"Error fetching top user agents: {e}") + self.wfile.write(json.dumps({"error": str(e)}).encode()) + return + + # API endpoint for paginated attack types + if self.config.dashboard_secret_path and self.path.startswith( + f"{self.config.dashboard_secret_path}/api/attack-types" + ): + self.send_response(200) + self.send_header("Content-type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header( + "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0" + ) + self.send_header("Pragma", "no-cache") + self.send_header("Expires", "0") + self.end_headers() + try: + from database import get_database + import json + from urllib.parse import urlparse, parse_qs + + db = get_database() + parsed_url = urlparse(self.path) + query_params = parse_qs(parsed_url.query) + page = int(query_params.get("page", ["1"])[0]) + page_size = int(query_params.get("page_size", ["5"])[0]) + sort_by = query_params.get("sort_by", ["timestamp"])[0] + sort_order = query_params.get("sort_order", ["desc"])[0] + + page = max(1, page) + page_size = min(max(1, page_size), 100) + + result = db.get_attack_types_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order) + self.wfile.write(json.dumps(result).encode()) + except BrokenPipeError: + pass + except Exception as e: + self.app_logger.error(f"Error fetching attack types: {e}") + self.wfile.write(json.dumps({"error": str(e)}).encode()) + return + # API endpoint for downloading malicious IPs file if ( self.config.dashboard_secret_path diff --git a/src/templates/dashboard_template.py b/src/templates/dashboard_template.py index 5d31bb8..8babb4d 100644 --- a/src/templates/dashboard_template.py +++ b/src/templates/dashboard_template.py @@ -45,45 +45,6 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str: dashboard_path: The secret dashboard path for generating API URLs """ - # Generate IP rows with clickable functionality for dropdown stats - top_ips_rows = ( - "\n".join([f""" - {i+1} - {_escape(ip)} - {count} - - - -
-
Loading stats...
-
- - """ for i, (ip, count) in enumerate(stats["top_ips"])]) - or 'No data' - ) - - # Generate paths rows (CRITICAL: paths can contain XSS payloads) - top_paths_rows = ( - "\n".join( - [ - f'{i+1}{_escape(path)}{count}' - for i, (path, count) in enumerate(stats["top_paths"]) - ] - ) - or 'No data' - ) - - # Generate User-Agent rows (CRITICAL: user agents can contain XSS payloads) - top_ua_rows = ( - "\n".join( - [ - f'{i+1}{_escape(ua[:80])}{count}' - for i, (ua, count) in enumerate(stats["top_user_agents"]) - ] - ) - or 'No data' - ) - # Generate suspicious accesses rows with clickable IPs suspicious_rows = ( "\n".join([f""" @@ -102,66 +63,14 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str: or 'No suspicious activity detected' ) - # Generate honeypot triggered IPs rows with clickable IPs - honeypot_rows = ( - "\n".join([f""" - {_escape(ip)} - {_escape(", ".join(paths))} - {len(paths)} - - - -
-
Loading stats...
-
- - """ for ip, paths in stats.get("honeypot_triggered_ips", [])]) - or 'No honeypot triggers yet' - ) - - # Generate attack types rows with clickable IPs - attack_type_rows = ( - "\n".join([f""" - {_escape(log["ip"])} - {_escape(log["path"])} - {_escape(", ".join(log["attack_types"]))} - {_escape(log["user_agent"][:60])} - {format_timestamp(log["timestamp"],time_only=True)} - - - -
-
Loading stats...
-
- - """ for log in stats.get("attack_types", [])[-10:]]) - or 'No attacks detected' - ) - - # Generate credential attempts rows with clickable IPs - credential_rows = ( - "\n".join([f""" - {_escape(log["ip"])} - {_escape(log["username"])} - {_escape(log["password"])} - {_escape(log["path"])} - {format_timestamp(log["timestamp"], time_only=True)} - - - -
-
Loading stats...
-
- - """ for log in stats.get("credential_attempts", [])[-20:]]) - or 'No credentials captured yet' - ) - return f""" Krawl Dashboard + + + @@ -549,25 +618,19 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
{len(stats.get('credential_attempts', []))}
Credentials Captured
+
+
{stats.get('unique_attackers', 0)}
+
Unique Attackers
+
-
-

Honeypot Triggers by IP

- - - - - - - - - - {honeypot_rows} - -
IP AddressAccessed PathsCount
+ -
+
+

Recent Suspicious Activity

@@ -585,87 +648,204 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
-

Captured Credentials

-
+
+

Honeypot Triggers by IP

+
+
+ Page 1/1 + + 0 total +
+ + +
+
+
+ + + + + + + + + +
#IP AddressAccessed PathsCount
Loading...
+
+ +
+
+
+

Top IP Addresses

+
+
+ Page 1/1 + + 0 total +
+ + +
+
+ + + + + + + + + + + +
#IP AddressAccess Count
Loading...
+
+ +
+
+

Top User-Agents

+
+
+ Page 1/1 + + 0 total +
+ + +
+
+ + + + + + + + + + + +
#User-AgentCount
Loading...
+
+
+
+ +
+
+

Attacker Origins Map

+
+
Loading map...
+
+
+ +
+
+

Attackers by Total Requests

+
+
+ Page 1/1 + + 0 total +
+ + +
+
+ + + + + + + + + + + + + + + +
#IP AddressTotal RequestsFirst SeenLast SeenLocation
+
+ +
+
+

Captured Credentials

+
+
+ Page 1/1 + + 0 total +
+ + +
+
+ + + + - + - - {credential_rows} + +
# IP Address Username Password PathTimeTime
Loading...
-

Detected Attack Types

- +
+

Detected Attack Types

+
+
+ Page 1/1 + + 0 total +
+ + +
+
+
+ - + - - {attack_type_rows} + +
# IP Address Path Attack Types User-AgentTimeTime
Loading...
-
-

Top IP Addresses

- - - - - - - - - - {top_ips_rows} - -
#IP AddressAccess Count
+
+

Most Recurring Attack Types

+
+ +
+
-
-

Top Paths

- - - - - - - - - - {top_paths_rows} - -
#PathAccess Count
-
- -
-

Top User-Agents

- - - - - - - - - - {top_ua_rows} - -
#User-AgentCount
+
+
+ +
+ +
+
diff --git a/tests/test_insert_fake_ips.py b/tests/test_insert_fake_ips.py index 6279b43..bdde596 100644 --- a/tests/test_insert_fake_ips.py +++ b/tests/test_insert_fake_ips.py @@ -109,7 +109,7 @@ def generate_analyzed_metrics(): } -def generate_fake_data(num_ips: int = 5, logs_per_ip: int = 15, credentials_per_ip: int = 3): +def generate_fake_data(num_ips: int = 45, logs_per_ip: int = 15, credentials_per_ip: int = 3): """ Generate and insert fake test data into the database.