Feat/dashboard improvements (#55)

* fixed external ip resoultion

* added dashboard logic division, filtering capabilities, geoip map, attacker stats

* refactor: replace print statements with applogger for error logging in DatabaseManager

* feat: add click listeners for IP cells in dashboard tables to fetch and display stats

---------

Co-authored-by: BlessedRebuS <patrick.difa@gmail.com>
This commit is contained in:
Lorenzo Venerandi
2026-01-25 22:50:27 +01:00
committed by GitHub
parent c7fe588bc4
commit 130e81ad64
8 changed files with 2101 additions and 259 deletions

View File

@@ -510,6 +510,72 @@ class Handler(BaseHTTPRequestHandler):
self.app_logger.error(f"Error generating dashboard: {e}")
return
# API endpoint for fetching all IP statistics
if self.config.dashboard_secret_path and self.path == f"{self.config.dashboard_secret_path}/api/all-ip-stats":
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header(
"Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
)
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
db = get_database()
ip_stats_list = db.get_ip_stats(limit=500)
self.wfile.write(json.dumps({"ips": ip_stats_list}).encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error fetching all IP stats: {e}")
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# API endpoint for fetching paginated attackers
if self.config.dashboard_secret_path and self.path.startswith(
f"{self.config.dashboard_secret_path}/api/attackers"
):
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header(
"Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
)
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
# Parse query parameters
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
page_size = int(query_params.get("page_size", ["25"])[0])
sort_by = query_params.get("sort_by", ["total_requests"])[0]
sort_order = query_params.get("sort_order", ["desc"])[0]
# Ensure valid parameters
page = max(1, page)
page_size = min(max(1, page_size), 100) # Max 100 per page
result = db.get_attackers_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
self.wfile.write(json.dumps(result).encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error fetching attackers: {e}")
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# API endpoint for fetching IP stats
if self.config.dashboard_secret_path and self.path.startswith(
f"{self.config.dashboard_secret_path}/api/ip-stats/"
@@ -544,6 +610,234 @@ class Handler(BaseHTTPRequestHandler):
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# API endpoint for paginated honeypot triggers
if self.config.dashboard_secret_path and self.path.startswith(
f"{self.config.dashboard_secret_path}/api/honeypot"
):
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header(
"Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
)
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
page_size = int(query_params.get("page_size", ["5"])[0])
sort_by = query_params.get("sort_by", ["count"])[0]
sort_order = query_params.get("sort_order", ["desc"])[0]
page = max(1, page)
page_size = min(max(1, page_size), 100)
result = db.get_honeypot_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
self.wfile.write(json.dumps(result).encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error fetching honeypot data: {e}")
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# API endpoint for paginated credentials
if self.config.dashboard_secret_path and self.path.startswith(
f"{self.config.dashboard_secret_path}/api/credentials"
):
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header(
"Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
)
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
page_size = int(query_params.get("page_size", ["5"])[0])
sort_by = query_params.get("sort_by", ["timestamp"])[0]
sort_order = query_params.get("sort_order", ["desc"])[0]
page = max(1, page)
page_size = min(max(1, page_size), 100)
result = db.get_credentials_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
self.wfile.write(json.dumps(result).encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error fetching credentials: {e}")
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# API endpoint for paginated top IPs
if self.config.dashboard_secret_path and self.path.startswith(
f"{self.config.dashboard_secret_path}/api/top-ips"
):
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header(
"Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
)
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
page_size = int(query_params.get("page_size", ["5"])[0])
sort_by = query_params.get("sort_by", ["count"])[0]
sort_order = query_params.get("sort_order", ["desc"])[0]
page = max(1, page)
page_size = min(max(1, page_size), 100)
result = db.get_top_ips_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
self.wfile.write(json.dumps(result).encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error fetching top IPs: {e}")
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# API endpoint for paginated top paths
if self.config.dashboard_secret_path and self.path.startswith(
f"{self.config.dashboard_secret_path}/api/top-paths"
):
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header(
"Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
)
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
page_size = int(query_params.get("page_size", ["5"])[0])
sort_by = query_params.get("sort_by", ["count"])[0]
sort_order = query_params.get("sort_order", ["desc"])[0]
page = max(1, page)
page_size = min(max(1, page_size), 100)
result = db.get_top_paths_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
self.wfile.write(json.dumps(result).encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error fetching top paths: {e}")
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# API endpoint for paginated top user agents
if self.config.dashboard_secret_path and self.path.startswith(
f"{self.config.dashboard_secret_path}/api/top-user-agents"
):
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header(
"Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
)
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
page_size = int(query_params.get("page_size", ["5"])[0])
sort_by = query_params.get("sort_by", ["count"])[0]
sort_order = query_params.get("sort_order", ["desc"])[0]
page = max(1, page)
page_size = min(max(1, page_size), 100)
result = db.get_top_user_agents_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
self.wfile.write(json.dumps(result).encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error fetching top user agents: {e}")
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# API endpoint for paginated attack types
if self.config.dashboard_secret_path and self.path.startswith(
f"{self.config.dashboard_secret_path}/api/attack-types"
):
self.send_response(200)
self.send_header("Content-type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header(
"Cache-Control", "no-store, no-cache, must-revalidate, max-age=0"
)
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
page_size = int(query_params.get("page_size", ["5"])[0])
sort_by = query_params.get("sort_by", ["timestamp"])[0]
sort_order = query_params.get("sort_order", ["desc"])[0]
page = max(1, page)
page_size = min(max(1, page_size), 100)
result = db.get_attack_types_paginated(page=page, page_size=page_size, sort_by=sort_by, sort_order=sort_order)
self.wfile.write(json.dumps(result).encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error fetching attack types: {e}")
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# API endpoint for downloading malicious IPs file
if (
self.config.dashboard_secret_path