diff --git a/exports/.gitkeep b/exports/.gitkeep
deleted file mode 100644
index e69de29..0000000
diff --git a/helm/Chart.yaml b/helm/Chart.yaml
index 928d0f8..938bfa3 100644
--- a/helm/Chart.yaml
+++ b/helm/Chart.yaml
@@ -3,7 +3,7 @@ name: krawl-chart
description: A Helm chart for Krawl honeypot server
type: application
version: 0.1.5
-appVersion: 0.1.7
+appVersion: 0.1.8
keywords:
- honeypot
- security
diff --git a/src/config.py b/src/config.py
index 1f1d122..1e96e09 100644
--- a/src/config.py
+++ b/src/config.py
@@ -100,8 +100,6 @@ class Config:
self._server_ip = ip
self._server_ip_cache_time = current_time
-
- get_app_logger().info(f"Server external IP detected: {ip}")
return ip
except Exception as e:
diff --git a/src/database.py b/src/database.py
index 88d72d7..80eb194 100644
--- a/src/database.py
+++ b/src/database.py
@@ -293,7 +293,7 @@ class DatabaseManager:
session.commit()
except Exception as e:
session.rollback()
- print(f"Error updating IP stats analysis: {e}")
+ applogger.error(f"Error updating IP stats analysis: {e}")
def manual_update_category(self, ip: str, category: str) -> None:
"""
@@ -322,7 +322,7 @@ class DatabaseManager:
session.commit()
except Exception as e:
session.rollback()
- print(f"Error updating manual category: {e}")
+ applogger.error(f"Error updating manual category: {e}")
def _record_category_change(
self,
@@ -514,56 +514,6 @@ class DatabaseManager:
finally:
self.close_session()
- # def persist_ip(
- # self,
- # ip: str
- # ) -> Optional[int]:
- # """
- # Persist an ip entry to the database.
-
- # Args:
- # ip: Client IP address
-
- # Returns:
- # The ID of the created IpLog record, or None on error
- # """
- # session = self.session
- # try:
- # # Create access log with sanitized fields
- # ip_log = AccessLog(
- # ip=sanitize_ip(ip),
- # manual_category = False
- # )
- # session.add(access_log)
- # session.flush() # Get the ID before committing
-
- # # Add attack detections if any
- # if attack_types:
- # matched_patterns = matched_patterns or {}
- # for attack_type in attack_types:
- # detection = AttackDetection(
- # access_log_id=access_log.id,
- # attack_type=attack_type[:50],
- # matched_pattern=sanitize_attack_pattern(
- # matched_patterns.get(attack_type, "")
- # )
- # )
- # session.add(detection)
-
- # # Update IP stats
- # self._update_ip_stats(session, ip)
-
- # session.commit()
- # return access_log.id
-
- # except Exception as e:
- # session.rollback()
- # # Log error but don't crash - database persistence is secondary to honeypot function
- # print(f"Database error persisting access: {e}")
- # return None
- # finally:
- # self.close_session()
-
def get_credential_attempts(
self, limit: int = 100, offset: int = 0, ip_filter: Optional[str] = None
) -> List[Dict[str, Any]]:
@@ -626,8 +576,8 @@ class DatabaseManager:
{
"ip": s.ip,
"total_requests": s.total_requests,
- "first_seen": s.first_seen.isoformat(),
- "last_seen": s.last_seen.isoformat(),
+ "first_seen": s.first_seen.isoformat() if s.first_seen else None,
+ "last_seen": s.last_seen.isoformat() if s.last_seen else None,
"country_code": s.country_code,
"city": s.city,
"asn": s.asn,
@@ -637,7 +587,7 @@ class DatabaseManager:
"analyzed_metrics": s.analyzed_metrics,
"category": s.category,
"manual_category": s.manual_category,
- "last_analysis": s.last_analysis,
+ "last_analysis": s.last_analysis.isoformat() if s.last_analysis else None,
}
for s in stats
]
@@ -688,6 +638,84 @@ class DatabaseManager:
finally:
self.close_session()
+ def get_attackers_paginated(self, page: int = 1, page_size: int = 25, sort_by: str = "total_requests", sort_order: str = "desc") -> Dict[str, Any]:
+ """
+ Retrieve paginated list of attacker IPs ordered by specified field.
+
+ Args:
+ page: Page number (1-indexed)
+ page_size: Number of results per page
+ sort_by: Field to sort by (total_requests, first_seen, last_seen)
+ sort_order: Sort order (asc or desc)
+
+ Returns:
+ Dictionary with attackers list and pagination info
+ """
+ session = self.session
+ try:
+ offset = (page - 1) * page_size
+
+ # Validate sort parameters
+ valid_sort_fields = {"total_requests", "first_seen", "last_seen"}
+ sort_by = sort_by if sort_by in valid_sort_fields else "total_requests"
+ sort_order = sort_order.lower() if sort_order.lower() in {"asc", "desc"} else "desc"
+
+ # Get total count of attackers
+ total_attackers = (
+ session.query(IpStats)
+ .filter(IpStats.category == "attacker")
+ .count()
+ )
+
+ # Build query with sorting
+ query = session.query(IpStats).filter(IpStats.category == "attacker")
+
+ if sort_by == "total_requests":
+ query = query.order_by(
+ IpStats.total_requests.desc() if sort_order == "desc" else IpStats.total_requests.asc()
+ )
+ elif sort_by == "first_seen":
+ query = query.order_by(
+ IpStats.first_seen.desc() if sort_order == "desc" else IpStats.first_seen.asc()
+ )
+ elif sort_by == "last_seen":
+ query = query.order_by(
+ IpStats.last_seen.desc() if sort_order == "desc" else IpStats.last_seen.asc()
+ )
+
+ # Get paginated attackers
+ attackers = query.offset(offset).limit(page_size).all()
+
+ total_pages = (total_attackers + page_size - 1) // page_size
+
+ return {
+ "attackers": [
+ {
+ "ip": a.ip,
+ "total_requests": a.total_requests,
+ "first_seen": a.first_seen.isoformat() if a.first_seen else None,
+ "last_seen": a.last_seen.isoformat() if a.last_seen else None,
+ "country_code": a.country_code,
+ "city": a.city,
+ "asn": a.asn,
+ "asn_org": a.asn_org,
+ "reputation_score": a.reputation_score,
+ "reputation_source": a.reputation_source,
+ "category": a.category,
+ "category_scores": a.category_scores or {},
+ }
+ for a in attackers
+ ],
+ "pagination": {
+ "page": page,
+ "page_size": page_size,
+ "total_attackers": total_attackers,
+ "total_pages": total_pages,
+ },
+ }
+ finally:
+ self.close_session()
+
def get_dashboard_counts(self) -> Dict[str, int]:
"""
Get aggregate statistics for the dashboard (excludes local/private IPs and server IP).
@@ -719,6 +747,9 @@ class DatabaseManager:
suspicious_accesses = sum(1 for log in public_accesses if log.is_suspicious)
honeypot_triggered = sum(1 for log in public_accesses if log.is_honeypot_trigger)
honeypot_ips = len(set(log.ip for log in public_accesses if log.is_honeypot_trigger))
+
+ # Count unique attackers from IpStats (matching the "Attackers by Total Requests" table)
+ unique_attackers = session.query(IpStats).filter(IpStats.category == "attacker").count()
return {
"total_accesses": total_accesses,
@@ -727,6 +758,7 @@ class DatabaseManager:
"suspicious_accesses": suspicious_accesses,
"honeypot_triggered": honeypot_triggered,
"honeypot_ips": honeypot_ips,
+ "unique_attackers": unique_attackers,
}
finally:
self.close_session()
@@ -772,7 +804,7 @@ class DatabaseManager:
Args:
limit: Maximum number of results
- Returns:data
+ Returns:
List of (path, count) tuples ordered by count descending
"""
session = self.session
@@ -929,46 +961,370 @@ class DatabaseManager:
finally:
self.close_session()
- # def get_ip_logs(
- # self,
- # limit: int = 100,
- # offset: int = 0,
- # ip_filter: Optional[str] = None
- # ) -> List[Dict[str, Any]]:
- # """
- # Retrieve ip logs with optional filtering.
+ def get_honeypot_paginated(self, page: int = 1, page_size: int = 5, sort_by: str = "count", sort_order: str = "desc") -> Dict[str, Any]:
+ """
+ Retrieve paginated list of honeypot-triggered IPs with their paths.
- # Args:
- # limit: Maximum number of records to return
- # offset: Number of records to skip
- # ip_filter: Filter by IP address
+ Args:
+ page: Page number (1-indexed)
+ page_size: Number of results per page
+ sort_by: Field to sort by (count or ip)
+ sort_order: Sort order (asc or desc)
- # Returns:
- # List of ip log dictionaries
- # """
- # session = self.session
- # try:
- # query = session.query(IpLog).order_by(IpLog.last_access.desc())
+ Returns:
+ Dictionary with honeypots list and pagination info
+ """
+ session = self.session
+ try:
+ from config import get_config
+ config = get_config()
+ server_ip = config.get_server_ip()
- # if ip_filter:
- # query = query.filter(IpLog.ip == sanitize_ip(ip_filter))
+ offset = (page - 1) * page_size
- # logs = query.offset(offset).limit(limit).all()
+ # Get honeypot triggers grouped by IP
+ results = (
+ session.query(AccessLog.ip, AccessLog.path)
+ .filter(AccessLog.is_honeypot_trigger == True)
+ .all()
+ )
- # return [
- # {
- # 'id': log.id,
- # 'ip': log.ip,
- # 'stats': log.stats,
- # 'category': log.category,
- # 'manual_category': log.manual_category,
- # 'last_evaluation': log.last_evaluation,
- # 'last_access': log.last_access
- # }
- # for log in logs
- # ]
- # finally:
- # self.close_session()
+ # Group paths by IP, filtering out invalid IPs
+ ip_paths: Dict[str, List[str]] = {}
+ for row in results:
+ if not is_valid_public_ip(row.ip, server_ip):
+ continue
+ if row.ip not in ip_paths:
+ ip_paths[row.ip] = []
+ if row.path not in ip_paths[row.ip]:
+ ip_paths[row.ip].append(row.path)
+
+ # Create list and sort
+ honeypot_list = [
+ {"ip": ip, "paths": paths, "count": len(paths)}
+ for ip, paths in ip_paths.items()
+ ]
+
+ if sort_by == "count":
+ honeypot_list.sort(
+ key=lambda x: x["count"],
+ reverse=(sort_order == "desc")
+ )
+ else: # sort by ip
+ honeypot_list.sort(
+ key=lambda x: x["ip"],
+ reverse=(sort_order == "desc")
+ )
+
+ total_honeypots = len(honeypot_list)
+ paginated = honeypot_list[offset:offset + page_size]
+ total_pages = (total_honeypots + page_size - 1) // page_size
+
+ return {
+ "honeypots": paginated,
+ "pagination": {
+ "page": page,
+ "page_size": page_size,
+ "total": total_honeypots,
+ "total_pages": total_pages,
+ },
+ }
+ finally:
+ self.close_session()
+
+ def get_credentials_paginated(self, page: int = 1, page_size: int = 5, sort_by: str = "timestamp", sort_order: str = "desc") -> Dict[str, Any]:
+ """
+ Retrieve paginated list of credential attempts.
+
+ Args:
+ page: Page number (1-indexed)
+ page_size: Number of results per page
+ sort_by: Field to sort by (timestamp, ip, username)
+ sort_order: Sort order (asc or desc)
+
+ Returns:
+ Dictionary with credentials list and pagination info
+ """
+ session = self.session
+ try:
+ offset = (page - 1) * page_size
+
+ # Validate sort parameters
+ valid_sort_fields = {"timestamp", "ip", "username"}
+ sort_by = sort_by if sort_by in valid_sort_fields else "timestamp"
+ sort_order = sort_order.lower() if sort_order.lower() in {"asc", "desc"} else "desc"
+
+ total_credentials = session.query(CredentialAttempt).count()
+
+ # Build query with sorting
+ query = session.query(CredentialAttempt)
+
+ if sort_by == "timestamp":
+ query = query.order_by(
+ CredentialAttempt.timestamp.desc() if sort_order == "desc" else CredentialAttempt.timestamp.asc()
+ )
+ elif sort_by == "ip":
+ query = query.order_by(
+ CredentialAttempt.ip.desc() if sort_order == "desc" else CredentialAttempt.ip.asc()
+ )
+ elif sort_by == "username":
+ query = query.order_by(
+ CredentialAttempt.username.desc() if sort_order == "desc" else CredentialAttempt.username.asc()
+ )
+
+ credentials = query.offset(offset).limit(page_size).all()
+ total_pages = (total_credentials + page_size - 1) // page_size
+
+ return {
+ "credentials": [
+ {
+ "ip": c.ip,
+ "username": c.username,
+ "password": c.password,
+ "path": c.path,
+ "timestamp": c.timestamp.isoformat() if c.timestamp else None,
+ }
+ for c in credentials
+ ],
+ "pagination": {
+ "page": page,
+ "page_size": page_size,
+ "total": total_credentials,
+ "total_pages": total_pages,
+ },
+ }
+ finally:
+ self.close_session()
+
+ def get_top_ips_paginated(self, page: int = 1, page_size: int = 5, sort_by: str = "count", sort_order: str = "desc") -> Dict[str, Any]:
+ """
+ Retrieve paginated list of top IP addresses by access count.
+
+ Args:
+ page: Page number (1-indexed)
+ page_size: Number of results per page
+ sort_by: Field to sort by (count or ip)
+ sort_order: Sort order (asc or desc)
+
+ Returns:
+ Dictionary with IPs list and pagination info
+ """
+ session = self.session
+ try:
+ from config import get_config
+ config = get_config()
+ server_ip = config.get_server_ip()
+
+ offset = (page - 1) * page_size
+
+ results = (
+ session.query(AccessLog.ip, func.count(AccessLog.id).label("count"))
+ .group_by(AccessLog.ip)
+ .all()
+ )
+
+ # Filter out local/private IPs and server IP, then sort
+ filtered = [
+ {"ip": row.ip, "count": row.count}
+ for row in results
+ if is_valid_public_ip(row.ip, server_ip)
+ ]
+
+ if sort_by == "count":
+ filtered.sort(key=lambda x: x["count"], reverse=(sort_order == "desc"))
+ else: # sort by ip
+ filtered.sort(key=lambda x: x["ip"], reverse=(sort_order == "desc"))
+
+ total_ips = len(filtered)
+ paginated = filtered[offset:offset + page_size]
+ total_pages = (total_ips + page_size - 1) // page_size
+
+ return {
+ "ips": paginated,
+ "pagination": {
+ "page": page,
+ "page_size": page_size,
+ "total": total_ips,
+ "total_pages": total_pages,
+ },
+ }
+ finally:
+ self.close_session()
+
+ def get_top_paths_paginated(self, page: int = 1, page_size: int = 5, sort_by: str = "count", sort_order: str = "desc") -> Dict[str, Any]:
+ """
+ Retrieve paginated list of top paths by access count.
+
+ Args:
+ page: Page number (1-indexed)
+ page_size: Number of results per page
+ sort_by: Field to sort by (count or path)
+ sort_order: Sort order (asc or desc)
+
+ Returns:
+ Dictionary with paths list and pagination info
+ """
+ session = self.session
+ try:
+ offset = (page - 1) * page_size
+
+ results = (
+ session.query(AccessLog.path, func.count(AccessLog.id).label("count"))
+ .group_by(AccessLog.path)
+ .all()
+ )
+
+ # Create list and sort
+ paths_list = [
+ {"path": row.path, "count": row.count}
+ for row in results
+ ]
+
+ if sort_by == "count":
+ paths_list.sort(key=lambda x: x["count"], reverse=(sort_order == "desc"))
+ else: # sort by path
+ paths_list.sort(key=lambda x: x["path"], reverse=(sort_order == "desc"))
+
+ total_paths = len(paths_list)
+ paginated = paths_list[offset:offset + page_size]
+ total_pages = (total_paths + page_size - 1) // page_size
+
+ return {
+ "paths": paginated,
+ "pagination": {
+ "page": page,
+ "page_size": page_size,
+ "total": total_paths,
+ "total_pages": total_pages,
+ },
+ }
+ finally:
+ self.close_session()
+
+ def get_top_user_agents_paginated(self, page: int = 1, page_size: int = 5, sort_by: str = "count", sort_order: str = "desc") -> Dict[str, Any]:
+ """
+ Retrieve paginated list of top user agents by access count.
+
+ Args:
+ page: Page number (1-indexed)
+ page_size: Number of results per page
+ sort_by: Field to sort by (count or user_agent)
+ sort_order: Sort order (asc or desc)
+
+ Returns:
+ Dictionary with user agents list and pagination info
+ """
+ session = self.session
+ try:
+ offset = (page - 1) * page_size
+
+ results = (
+ session.query(AccessLog.user_agent, func.count(AccessLog.id).label("count"))
+ .filter(AccessLog.user_agent.isnot(None), AccessLog.user_agent != "")
+ .group_by(AccessLog.user_agent)
+ .all()
+ )
+
+ # Create list and sort
+ ua_list = [
+ {"user_agent": row.user_agent, "count": row.count}
+ for row in results
+ ]
+
+ if sort_by == "count":
+ ua_list.sort(key=lambda x: x["count"], reverse=(sort_order == "desc"))
+ else: # sort by user_agent
+ ua_list.sort(key=lambda x: x["user_agent"], reverse=(sort_order == "desc"))
+
+ total_uas = len(ua_list)
+ paginated = ua_list[offset:offset + page_size]
+ total_pages = (total_uas + page_size - 1) // page_size
+
+ return {
+ "user_agents": paginated,
+ "pagination": {
+ "page": page,
+ "page_size": page_size,
+ "total": total_uas,
+ "total_pages": total_pages,
+ },
+ }
+ finally:
+ self.close_session()
+
+ def get_attack_types_paginated(self, page: int = 1, page_size: int = 5, sort_by: str = "timestamp", sort_order: str = "desc") -> Dict[str, Any]:
+ """
+ Retrieve paginated list of detected attack types with access logs.
+
+ Args:
+ page: Page number (1-indexed)
+ page_size: Number of results per page
+ sort_by: Field to sort by (timestamp, ip, attack_type)
+ sort_order: Sort order (asc or desc)
+
+ Returns:
+ Dictionary with attacks list and pagination info
+ """
+ session = self.session
+ try:
+ offset = (page - 1) * page_size
+
+ # Validate sort parameters
+ valid_sort_fields = {"timestamp", "ip", "attack_type"}
+ sort_by = sort_by if sort_by in valid_sort_fields else "timestamp"
+ sort_order = sort_order.lower() if sort_order.lower() in {"asc", "desc"} else "desc"
+
+ # Get all access logs with attack detections
+ query = (
+ session.query(AccessLog)
+ .join(AttackDetection)
+ )
+
+ if sort_by == "timestamp":
+ query = query.order_by(
+ AccessLog.timestamp.desc() if sort_order == "desc" else AccessLog.timestamp.asc()
+ )
+ elif sort_by == "ip":
+ query = query.order_by(
+ AccessLog.ip.desc() if sort_order == "desc" else AccessLog.ip.asc()
+ )
+
+ logs = query.all()
+
+ # Convert to attack list
+ attack_list = [
+ {
+ "ip": log.ip,
+ "path": log.path,
+ "user_agent": log.user_agent,
+ "timestamp": log.timestamp.isoformat() if log.timestamp else None,
+ "attack_types": [d.attack_type for d in log.attack_detections],
+ }
+ for log in logs
+ ]
+
+ # Sort by attack_type if needed (this must be done post-fetch since it's in a related table)
+ if sort_by == "attack_type":
+ attack_list.sort(
+ key=lambda x: x["attack_types"][0] if x["attack_types"] else "",
+ reverse=(sort_order == "desc")
+ )
+
+ total_attacks = len(attack_list)
+ paginated = attack_list[offset:offset + page_size]
+ total_pages = (total_attacks + page_size - 1) // page_size
+
+ return {
+ "attacks": paginated,
+ "pagination": {
+ "page": page,
+ "page_size": page_size,
+ "total": total_attacks,
+ "total_pages": total_pages,
+ },
+ }
+ finally:
+ self.close_session()
# Module-level singleton instance
diff --git a/src/exports/malicious_ips.txt b/src/exports/malicious_ips.txt
new file mode 100644
index 0000000..2541a21
--- /dev/null
+++ b/src/exports/malicious_ips.txt
@@ -0,0 +1,2 @@
+175.23.45.67
+210.45.67.89
diff --git a/src/handler.py b/src/handler.py
index 1be7c2c..df04465 100644
--- a/src/handler.py
+++ b/src/handler.py
@@ -510,6 +510,72 @@ class Handler(BaseHTTPRequestHandler):
self.app_logger.error(f"Error generating dashboard: {e}")
return
+ # API endpoint for fetching all IP statistics
+ if self.config.dashboard_secret_path and self.path == f"{self.config.dashboard_secret_path}/api/all-ip-stats":
+ self.send_response(200)
+ self.send_header("Content-type", "application/json")
+ self.send_header("Access-Control-Allow-Origin", "*")
+ self.send_header(
+ "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
+ )
+ self.send_header("Pragma", "no-cache")
+ self.send_header("Expires", "0")
+ self.end_headers()
+ try:
+ from database import get_database
+ import json
+
+ db = get_database()
+ ip_stats_list = db.get_ip_stats(limit=500)
+ self.wfile.write(json.dumps({"ips": ip_stats_list}).encode())
+ except BrokenPipeError:
+ pass
+ except Exception as e:
+ self.app_logger.error(f"Error fetching all IP stats: {e}")
+ self.wfile.write(json.dumps({"error": str(e)}).encode())
+ return
+
+ # API endpoint for fetching paginated attackers
+ if self.config.dashboard_secret_path and self.path.startswith(
+ f"{self.config.dashboard_secret_path}/api/attackers"
+ ):
+ self.send_response(200)
+ self.send_header("Content-type", "application/json")
+ self.send_header("Access-Control-Allow-Origin", "*")
+ self.send_header(
+ "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
+ )
+ self.send_header("Pragma", "no-cache")
+ self.send_header("Expires", "0")
+ self.end_headers()
+ try:
+ from database import get_database
+ import json
+ from urllib.parse import urlparse, parse_qs
+
+ db = get_database()
+
+ # Parse query parameters
+ parsed_url = urlparse(self.path)
+ query_params = parse_qs(parsed_url.query)
+ page = int(query_params.get("page", ["1"])[0])
+ page_size = int(query_params.get("page_size", ["25"])[0])
+ sort_by = query_params.get("sort_by", ["total_requests"])[0]
+ sort_order = query_params.get("sort_order", ["desc"])[0]
+
+ # Ensure valid parameters
+ page = max(1, page)
+ page_size = min(max(1, page_size), 100) # Max 100 per page
+
+ result = db.get_attackers_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
+ self.wfile.write(json.dumps(result).encode())
+ except BrokenPipeError:
+ pass
+ except Exception as e:
+ self.app_logger.error(f"Error fetching attackers: {e}")
+ self.wfile.write(json.dumps({"error": str(e)}).encode())
+ return
+
# API endpoint for fetching IP stats
if self.config.dashboard_secret_path and self.path.startswith(
f"{self.config.dashboard_secret_path}/api/ip-stats/"
@@ -544,6 +610,234 @@ class Handler(BaseHTTPRequestHandler):
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
+ # API endpoint for paginated honeypot triggers
+ if self.config.dashboard_secret_path and self.path.startswith(
+ f"{self.config.dashboard_secret_path}/api/honeypot"
+ ):
+ self.send_response(200)
+ self.send_header("Content-type", "application/json")
+ self.send_header("Access-Control-Allow-Origin", "*")
+ self.send_header(
+ "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
+ )
+ self.send_header("Pragma", "no-cache")
+ self.send_header("Expires", "0")
+ self.end_headers()
+ try:
+ from database import get_database
+ import json
+ from urllib.parse import urlparse, parse_qs
+
+ db = get_database()
+ parsed_url = urlparse(self.path)
+ query_params = parse_qs(parsed_url.query)
+ page = int(query_params.get("page", ["1"])[0])
+ page_size = int(query_params.get("page_size", ["5"])[0])
+ sort_by = query_params.get("sort_by", ["count"])[0]
+ sort_order = query_params.get("sort_order", ["desc"])[0]
+
+ page = max(1, page)
+ page_size = min(max(1, page_size), 100)
+
+ result = db.get_honeypot_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
+ self.wfile.write(json.dumps(result).encode())
+ except BrokenPipeError:
+ pass
+ except Exception as e:
+ self.app_logger.error(f"Error fetching honeypot data: {e}")
+ self.wfile.write(json.dumps({"error": str(e)}).encode())
+ return
+
+ # API endpoint for paginated credentials
+ if self.config.dashboard_secret_path and self.path.startswith(
+ f"{self.config.dashboard_secret_path}/api/credentials"
+ ):
+ self.send_response(200)
+ self.send_header("Content-type", "application/json")
+ self.send_header("Access-Control-Allow-Origin", "*")
+ self.send_header(
+ "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
+ )
+ self.send_header("Pragma", "no-cache")
+ self.send_header("Expires", "0")
+ self.end_headers()
+ try:
+ from database import get_database
+ import json
+ from urllib.parse import urlparse, parse_qs
+
+ db = get_database()
+ parsed_url = urlparse(self.path)
+ query_params = parse_qs(parsed_url.query)
+ page = int(query_params.get("page", ["1"])[0])
+ page_size = int(query_params.get("page_size", ["5"])[0])
+ sort_by = query_params.get("sort_by", ["timestamp"])[0]
+ sort_order = query_params.get("sort_order", ["desc"])[0]
+
+ page = max(1, page)
+ page_size = min(max(1, page_size), 100)
+
+ result = db.get_credentials_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
+ self.wfile.write(json.dumps(result).encode())
+ except BrokenPipeError:
+ pass
+ except Exception as e:
+ self.app_logger.error(f"Error fetching credentials: {e}")
+ self.wfile.write(json.dumps({"error": str(e)}).encode())
+ return
+
+ # API endpoint for paginated top IPs
+ if self.config.dashboard_secret_path and self.path.startswith(
+ f"{self.config.dashboard_secret_path}/api/top-ips"
+ ):
+ self.send_response(200)
+ self.send_header("Content-type", "application/json")
+ self.send_header("Access-Control-Allow-Origin", "*")
+ self.send_header(
+ "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
+ )
+ self.send_header("Pragma", "no-cache")
+ self.send_header("Expires", "0")
+ self.end_headers()
+ try:
+ from database import get_database
+ import json
+ from urllib.parse import urlparse, parse_qs
+
+ db = get_database()
+ parsed_url = urlparse(self.path)
+ query_params = parse_qs(parsed_url.query)
+ page = int(query_params.get("page", ["1"])[0])
+ page_size = int(query_params.get("page_size", ["5"])[0])
+ sort_by = query_params.get("sort_by", ["count"])[0]
+ sort_order = query_params.get("sort_order", ["desc"])[0]
+
+ page = max(1, page)
+ page_size = min(max(1, page_size), 100)
+
+ result = db.get_top_ips_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
+ self.wfile.write(json.dumps(result).encode())
+ except BrokenPipeError:
+ pass
+ except Exception as e:
+ self.app_logger.error(f"Error fetching top IPs: {e}")
+ self.wfile.write(json.dumps({"error": str(e)}).encode())
+ return
+
+ # API endpoint for paginated top paths
+ if self.config.dashboard_secret_path and self.path.startswith(
+ f"{self.config.dashboard_secret_path}/api/top-paths"
+ ):
+ self.send_response(200)
+ self.send_header("Content-type", "application/json")
+ self.send_header("Access-Control-Allow-Origin", "*")
+ self.send_header(
+ "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
+ )
+ self.send_header("Pragma", "no-cache")
+ self.send_header("Expires", "0")
+ self.end_headers()
+ try:
+ from database import get_database
+ import json
+ from urllib.parse import urlparse, parse_qs
+
+ db = get_database()
+ parsed_url = urlparse(self.path)
+ query_params = parse_qs(parsed_url.query)
+ page = int(query_params.get("page", ["1"])[0])
+ page_size = int(query_params.get("page_size", ["5"])[0])
+ sort_by = query_params.get("sort_by", ["count"])[0]
+ sort_order = query_params.get("sort_order", ["desc"])[0]
+
+ page = max(1, page)
+ page_size = min(max(1, page_size), 100)
+
+ result = db.get_top_paths_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
+ self.wfile.write(json.dumps(result).encode())
+ except BrokenPipeError:
+ pass
+ except Exception as e:
+ self.app_logger.error(f"Error fetching top paths: {e}")
+ self.wfile.write(json.dumps({"error": str(e)}).encode())
+ return
+
+ # API endpoint for paginated top user agents
+ if self.config.dashboard_secret_path and self.path.startswith(
+ f"{self.config.dashboard_secret_path}/api/top-user-agents"
+ ):
+ self.send_response(200)
+ self.send_header("Content-type", "application/json")
+ self.send_header("Access-Control-Allow-Origin", "*")
+ self.send_header(
+ "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
+ )
+ self.send_header("Pragma", "no-cache")
+ self.send_header("Expires", "0")
+ self.end_headers()
+ try:
+ from database import get_database
+ import json
+ from urllib.parse import urlparse, parse_qs
+
+ db = get_database()
+ parsed_url = urlparse(self.path)
+ query_params = parse_qs(parsed_url.query)
+ page = int(query_params.get("page", ["1"])[0])
+ page_size = int(query_params.get("page_size", ["5"])[0])
+ sort_by = query_params.get("sort_by", ["count"])[0]
+ sort_order = query_params.get("sort_order", ["desc"])[0]
+
+ page = max(1, page)
+ page_size = min(max(1, page_size), 100)
+
+ result = db.get_top_user_agents_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
+ self.wfile.write(json.dumps(result).encode())
+ except BrokenPipeError:
+ pass
+ except Exception as e:
+ self.app_logger.error(f"Error fetching top user agents: {e}")
+ self.wfile.write(json.dumps({"error": str(e)}).encode())
+ return
+
+ # API endpoint for paginated attack types
+ if self.config.dashboard_secret_path and self.path.startswith(
+ f"{self.config.dashboard_secret_path}/api/attack-types"
+ ):
+ self.send_response(200)
+ self.send_header("Content-type", "application/json")
+ self.send_header("Access-Control-Allow-Origin", "*")
+ self.send_header(
+ "Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
+ )
+ self.send_header("Pragma", "no-cache")
+ self.send_header("Expires", "0")
+ self.end_headers()
+ try:
+ from database import get_database
+ import json
+ from urllib.parse import urlparse, parse_qs
+
+ db = get_database()
+ parsed_url = urlparse(self.path)
+ query_params = parse_qs(parsed_url.query)
+ page = int(query_params.get("page", ["1"])[0])
+ page_size = int(query_params.get("page_size", ["5"])[0])
+ sort_by = query_params.get("sort_by", ["timestamp"])[0]
+ sort_order = query_params.get("sort_order", ["desc"])[0]
+
+ page = max(1, page)
+ page_size = min(max(1, page_size), 100)
+
+ result = db.get_attack_types_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
+ self.wfile.write(json.dumps(result).encode())
+ except BrokenPipeError:
+ pass
+ except Exception as e:
+ self.app_logger.error(f"Error fetching attack types: {e}")
+ self.wfile.write(json.dumps({"error": str(e)}).encode())
+ return
+
# API endpoint for downloading malicious IPs file
if (
self.config.dashboard_secret_path
diff --git a/src/templates/dashboard_template.py b/src/templates/dashboard_template.py
index 5d31bb8..8babb4d 100644
--- a/src/templates/dashboard_template.py
+++ b/src/templates/dashboard_template.py
@@ -45,45 +45,6 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
dashboard_path: The secret dashboard path for generating API URLs
"""
- # Generate IP rows with clickable functionality for dropdown stats
- top_ips_rows = (
- "\n".join([f"""
- | {i+1} |
- {_escape(ip)} |
- {count} |
-
-
- |
-
- |
-
""" for i, (ip, count) in enumerate(stats["top_ips"])])
- or '| No data |
'
- )
-
- # Generate paths rows (CRITICAL: paths can contain XSS payloads)
- top_paths_rows = (
- "\n".join(
- [
- f'| {i+1} | {_escape(path)} | {count} |
'
- for i, (path, count) in enumerate(stats["top_paths"])
- ]
- )
- or '| No data |
'
- )
-
- # Generate User-Agent rows (CRITICAL: user agents can contain XSS payloads)
- top_ua_rows = (
- "\n".join(
- [
- f'| {i+1} | {_escape(ua[:80])} | {count} |
'
- for i, (ua, count) in enumerate(stats["top_user_agents"])
- ]
- )
- or '| No data |
'
- )
-
# Generate suspicious accesses rows with clickable IPs
suspicious_rows = (
"\n".join([f"""
@@ -102,66 +63,14 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
or '
| No suspicious activity detected |
'
)
- # Generate honeypot triggered IPs rows with clickable IPs
- honeypot_rows = (
- "\n".join([f"""
- | {_escape(ip)} |
- {_escape(", ".join(paths))} |
- {len(paths)} |
-
-
- |
-
- |
-
""" for ip, paths in stats.get("honeypot_triggered_ips", [])])
- or '| No honeypot triggers yet |
'
- )
-
- # Generate attack types rows with clickable IPs
- attack_type_rows = (
- "\n".join([f"""
- | {_escape(log["ip"])} |
- {_escape(log["path"])} |
- {_escape(", ".join(log["attack_types"]))} |
- {_escape(log["user_agent"][:60])} |
- {format_timestamp(log["timestamp"],time_only=True)} |
-
-
- |
-
- |
-
""" for log in stats.get("attack_types", [])[-10:]])
- or '| No attacks detected |
'
- )
-
- # Generate credential attempts rows with clickable IPs
- credential_rows = (
- "\n".join([f"""
- | {_escape(log["ip"])} |
- {_escape(log["username"])} |
- {_escape(log["password"])} |
- {_escape(log["path"])} |
- {format_timestamp(log["timestamp"], time_only=True)} |
-
-
- |
-
- |
-
""" for log in stats.get("credential_attempts", [])[-20:]])
- or '| No credentials captured yet |
'
- )
-
return f"""
Krawl Dashboard
+
+
+
@@ -549,25 +618,19 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
{len(stats.get('credential_attempts', []))}
Credentials Captured
+
+
{stats.get('unique_attackers', 0)}
+
Unique Attackers
+
-
-
Honeypot Triggers by IP
-
-
-
- | IP Address |
- Accessed Paths |
- Count |
-
-
-
- {honeypot_rows}
-
-
+
-
+
+
Recent Suspicious Activity
@@ -585,87 +648,204 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
-
Captured Credentials
-
+
+
Honeypot Triggers by IP
+
+
+
+ | # |
+ IP Address |
+ Accessed Paths |
+ Count |
+
+
+
+ | Loading... |
+
+
+
+
+
+
+
+
Top IP Addresses
+
+
+
+
+
+ | # |
+ IP Address |
+ Access Count |
+
+
+
+ | Loading... |
+
+
+
+
+
+
+
Top User-Agents
+
+
+
+
+
+ | # |
+ User-Agent |
+ Count |
+
+
+
+ | Loading... |
+
+
+
+
+
+
+
+
+
Attacker Origins Map
+
+
+
+
+
+
Attackers by Total Requests
+
+
+
+
+
+
+ | # |
+ IP Address |
+ Total Requests |
+ First Seen |
+ Last Seen |
+ Location |
+
+
+
+
+
+
+
+
+
+
+
Captured Credentials
+
+
+
+
+
+ | # |
IP Address |
Username |
Password |
Path |
- Time |
+ Time |
-
- {credential_rows}
+
+ | Loading... |
-
Detected Attack Types
-
+
+
Detected Attack Types
+
+
+
+ | # |
IP Address |
Path |
Attack Types |
User-Agent |
- Time |
+ Time |
-
- {attack_type_rows}
+
+ | Loading... |
-
-
Top IP Addresses
-
-
-
- | # |
- IP Address |
- Access Count |
-
-
-
- {top_ips_rows}
-
-
+
+
Most Recurring Attack Types
+
+
+
+
-
-
Top Paths
-
-
-
- | # |
- Path |
- Access Count |
-
-
-
- {top_paths_rows}
-
-
-
-
-
-
Top User-Agents
-
-
-
- | # |
- User-Agent |
- Count |
-
-
-
- {top_ua_rows}
-
-
+