Merge pull request #59 from BlessedRebuS/feat/blocklist-api

blocklist download api endpoint implementation
This commit is contained in:
Patrick Di Fazio
2026-02-02 23:01:55 +01:00
committed by GitHub
17 changed files with 456 additions and 139 deletions

3
.gitignore vendored
View File

@@ -81,3 +81,6 @@ personal-values.yaml
#exports dir (keeping .gitkeep so we have the dir)
/exports/*
/src/exports/*
# tmux config
.tmux.conf

6
.tmux.conf Normal file
View File

@@ -0,0 +1,6 @@
splitw -v -p 10
neww -n worker
select-window -t 1
select-pane -t 0
send-keys -t 0 "nvim" C-m
send-keys -t 1 "docker compose watch" C-m

View File

@@ -112,6 +112,8 @@ services:
- TZ="Europe/Rome"
volumes:
- ./config.yaml:/app/config.yaml:ro
# bind mount for firewall exporters
- ./exports:/app/exports
- krawl-data:/app/data
restart: unless-stopped
@@ -208,6 +210,7 @@ Krawl uses a **configuration hierarchy** in which **environment variables take p
| `KRAWL_DASHBOARD_SECRET_PATH` | Custom dashboard path | Auto-generated |
| `KRAWL_PROBABILITY_ERROR_CODES` | Error response probability (0-100%) | `0` |
| `KRAWL_DATABASE_PATH` | Database file location | `data/krawl.db` |
| `KRAWL_EXPORTS_PATH` | Path where firewalls rule sets are exported | `exports` |
| `KRAWL_DATABASE_RETENTION_DAYS` | Days to retain data in database | `30` |
| `KRAWL_HTTP_RISKY_METHODS_THRESHOLD` | Threshold for risky HTTP methods detection | `0.1` |
| `KRAWL_VIOLATED_ROBOTS_THRESHOLD` | Threshold for robots.txt violations | `0.1` |

View File

@@ -25,6 +25,9 @@ dashboard:
# secret_path: super-secret-dashboard-path
secret_path: test
exports:
path: "exports"
database:
path: "data/krawl.db"
retention_days: 30

View File

@@ -1,4 +1,5 @@
---
# THIS IS FOR DEVELOPMENT PURPOSES
services:
krawl:
build:
@@ -16,17 +17,13 @@ services:
- ./config.yaml:/app/config.yaml:ro
- ./logs:/app/logs
- ./exports:/app/exports
- data:/app/data
- ./data:/app/data
restart: unless-stopped
develop:
watch:
- path: ./Dockerfile
action: rebuild
- path: ./src/
action: sync+restart
target: /app/src
action: rebuild
- path: ./docker-compose.yaml
action: rebuild
volumes:
data:

View File

@@ -0,0 +1,50 @@
# Firewall exporters documentation
Firewall export feature is implemented trough a strategy pattern with an abstract class and a series of subclasses that implement the specific export logic for each firewall specific system:
```mermaid
classDiagram
class FWType{
+getBanlist()
}
FWType <|-- Raw
class Raw{ }
FWType <|-- Iptables
class Iptables{ }
note for Iptables "implements the getBanlist method for iptables rules"
```
Rule sets are generated trough the `top_attacking_ips__export-malicious-ips` that writes down the files in the `exports_path` configuration path. Files are named after the specific firewall that they implement as `[firewall]_banlist.txt` except for raw file that is called `malicious_ips.txt` to support legacy
## Adding firewalls exporters
To add a firewall exporter create a new python class in `src/firewall` that implements `FWType` class
> example with `Yourfirewall` class in the `yourfirewall.py` file
```python
from typing_extensions import override
from firewall.fwtype import FWType
class Yourfirewall(FWType):
@override
def getBanlist(self, ips) -> str:
"""
Generate raw list of bad IP addresses.
Args:
ips: List of IP addresses to ban
Returns:
String containing raw ips, one per line
"""
if not ips:
return ""
# Add here code implementation
```
Then add the following to the `src/server.py` and `src/tasks/top_attacking_ips.py`
```python
from firewall.yourfirewall import Yourfirewall
```

View File

@@ -3,7 +3,7 @@ name: krawl-chart
description: A Helm chart for Krawl honeypot server
type: application
version: 1.0.0
appVersion: 1.0.1
appVersion: 1.0.2
keywords:
- honeypot
- security

View File

@@ -37,9 +37,12 @@ class Config:
infinite_pages_for_malicious: bool = True # Infinite pages for malicious crawlers
ban_duration_seconds: int = 600 # Ban duration in seconds for IPs exceeding limits
# exporter settings
exports_path: str = "exports"
# Database settings
database_path: str = "data/krawl.db"
database_retention_days: int = 30
exports_path: str = "data/exports"
# Analyzer settings
http_risky_methods_threshold: float = None
@@ -150,6 +153,7 @@ class Config:
canary = data.get("canary", {})
dashboard = data.get("dashboard", {})
api = data.get("api", {})
exports = data.get("exports", {})
database = data.get("database", {})
behavior = data.get("behavior", {})
analyzer = data.get("analyzer") or {}
@@ -185,6 +189,7 @@ class Config:
canary_token_tries=canary.get("token_tries", 10),
dashboard_secret_path=dashboard_path,
probability_error_codes=behavior.get("probability_error_codes", 0),
exports_path=exports.get("path"),
database_path=database.get("path", "data/krawl.db"),
database_retention_days=database.get("retention_days", 30),
http_risky_methods_threshold=analyzer.get(

View File

@@ -147,7 +147,9 @@ class DatabaseManager:
migrations_run.append("region")
if "region_name" not in columns:
cursor.execute("ALTER TABLE ip_stats ADD COLUMN region_name VARCHAR(100)")
cursor.execute(
"ALTER TABLE ip_stats ADD COLUMN region_name VARCHAR(100)"
)
migrations_run.append("region_name")
if "timezone" not in columns:

42
src/firewall/fwtype.py Normal file
View File

@@ -0,0 +1,42 @@
from abc import ABC, abstractmethod
from typing import Dict, Type
class FWType(ABC):
"""Abstract base class for firewall types."""
# Registry to store child classes
_registry: Dict[str, Type["FWType"]] = {}
def __init_subclass__(cls, **kwargs):
"""Automatically register subclasses with their class name."""
super().__init_subclass__(**kwargs)
cls._registry[cls.__name__.lower()] = cls
@classmethod
def create(cls, fw_type: str, **kwargs) -> "FWType":
"""
Factory method to create instances of child classes.
Args:
fw_type: String name of the firewall type class to instantiate
**kwargs: Arguments to pass to the child class constructor
Returns:
Instance of the requested child class
Raises:
ValueError: If fw_type is not registered
"""
fw_type = fw_type.lower()
if fw_type not in cls._registry:
available = ", ".join(cls._registry.keys())
raise ValueError(
f"Unknown firewall type: '{fw_type}'. Available: {available}"
)
return cls._registry[fw_type](**kwargs)
@abstractmethod
def getBanlist(self, ips):
"""Return the ruleset for the specific server"""

40
src/firewall/iptables.py Normal file
View File

@@ -0,0 +1,40 @@
from typing_extensions import override
from firewall.fwtype import FWType
class Iptables(FWType):
@override
def getBanlist(self, ips) -> str:
"""
Generate iptables ban rules from an array of IP addresses.
Args:
ips: List of IP addresses to ban
Returns:
String containing iptables commands, one per line
"""
if not ips:
return ""
rules = []
chain = "INPUT"
target = "DROP"
rules.append("#!/bin/bash")
rules.append("# iptables ban rules")
rules.append("")
for ip in ips:
ip = ip.strip()
# Build the iptables command
rule_parts = ["iptables", "-A", chain, "-s", ip]
# Add target
rule_parts.extend(["-j", target])
rules.append(" ".join(rule_parts))
return "\n".join(rules)

21
src/firewall/raw.py Normal file
View File

@@ -0,0 +1,21 @@
from typing_extensions import override
from firewall.fwtype import FWType
class Raw(FWType):
@override
def getBanlist(self, ips) -> str:
"""
Generate raw list of bad IP addresses.
Args:
ips: List of IP addresses to ban
Returns:
String containing raw ips, one per line
"""
if not ips:
return ""
return "\n".join(ips)

View File

@@ -41,7 +41,9 @@ def fetch_ip_geolocation(ip_address: str) -> Optional[Dict[str, Any]]:
# Check if the API call was successful
if data.get("status") != "success":
app_logger.warning(f"IP lookup failed for {ip_address}: {data.get('message')}")
app_logger.warning(
f"IP lookup failed for {ip_address}: {data.get('message')}"
)
return None
# Cache the result

View File

@@ -7,8 +7,17 @@ from datetime import datetime
from http.server import BaseHTTPRequestHandler
from typing import Optional, List
from urllib.parse import urlparse, parse_qs
import json
import os
from database import get_database
from config import Config, get_config
# imports for the __init_subclass__ method, do not remove pls
from firewall.fwtype import FWType
from firewall.iptables import Iptables
from firewall.raw import Raw
from config import Config
from tracker import AccessTracker
from analyzer import Analyzer
from templates import html_templates
@@ -26,6 +35,9 @@ from wordlists import get_wordlists
from sql_errors import generate_sql_error_response, get_sql_response_with_data
from xss_detector import detect_xss_pattern, generate_xss_response
from server_errors import generate_server_error
from models import AccessLog
from ip_utils import is_valid_public_ip
from sqlalchemy import distinct
class Handler(BaseHTTPRequestHandler):
@@ -58,10 +70,6 @@ class Handler(BaseHTTPRequestHandler):
# Fallback to direct connection IP
return self.client_address[0]
def _get_user_agent(self) -> str:
"""Extract user agent from request"""
return self.headers.get("User-Agent", "")
def _get_category_by_ip(self, client_ip: str) -> str:
"""Get the category of an IP from the database"""
return self.tracker.get_category_by_ip(client_ip)
@@ -92,11 +100,6 @@ class Handler(BaseHTTPRequestHandler):
error_codes = [400, 401, 403, 404, 500, 502, 503]
return random.choice(error_codes)
def _parse_query_string(self) -> str:
"""Extract query string from the request path"""
parsed = urlparse(self.path)
return parsed.query
def _handle_sql_endpoint(self, path: str) -> bool:
"""
Handle SQL injection honeypot endpoints.
@@ -111,21 +114,20 @@ class Handler(BaseHTTPRequestHandler):
try:
# Get query parameters
query_string = self._parse_query_string()
# Log SQL injection attempt
client_ip = self._get_client_ip()
user_agent = self._get_user_agent()
user_agent = self.headers.get("User-Agent", "")
# Always check for SQL injection patterns
error_msg, content_type, status_code = generate_sql_error_response(
query_string or ""
request_query or ""
)
if error_msg:
# SQL injection detected - log and return error
self.access_logger.warning(
f"[SQL INJECTION DETECTED] {client_ip} - {base_path} - Query: {query_string[:100] if query_string else 'empty'}"
f"[SQL INJECTION DETECTED] {client_ip} - {base_path} - Query: {request_query[:100] if request_query else 'empty'}"
)
self.send_response(status_code)
self.send_header("Content-type", content_type)
@@ -134,13 +136,13 @@ class Handler(BaseHTTPRequestHandler):
else:
# No injection detected - return fake data
self.access_logger.info(
f"[SQL ENDPOINT] {client_ip} - {base_path} - Query: {query_string[:100] if query_string else 'empty'}"
f"[SQL ENDPOINT] {client_ip} - {base_path} - Query: {request_query[:100] if request_query else 'empty'}"
)
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
response_data = get_sql_response_with_data(
base_path, query_string or ""
base_path, request_query or ""
)
self.wfile.write(response_data.encode())
@@ -239,11 +241,9 @@ class Handler(BaseHTTPRequestHandler):
def do_POST(self):
"""Handle POST requests (mainly login attempts)"""
client_ip = self._get_client_ip()
user_agent = self._get_user_agent()
user_agent = self.headers.get("User-Agent", "")
post_data = ""
from urllib.parse import urlparse
base_path = urlparse(self.path).path
if base_path in ["/api/search", "/api/sql", "/api/database"]:
@@ -293,7 +293,6 @@ class Handler(BaseHTTPRequestHandler):
for pair in post_data.split("&"):
if "=" in pair:
key, value = pair.split("=", 1)
from urllib.parse import unquote_plus
parsed_data[unquote_plus(key)] = unquote_plus(value)
@@ -486,18 +485,30 @@ class Handler(BaseHTTPRequestHandler):
def do_GET(self):
"""Responds to webpage requests"""
client_ip = self._get_client_ip()
# respond with HTTP error code if client is banned
if self.tracker.is_banned_ip(client_ip):
self.send_response(500)
self.end_headers()
return
user_agent = self._get_user_agent()
# get request data
user_agent = self.headers.get("User-Agent", "")
request_path = urlparse(self.path).path
self.app_logger.info(f"request_query: {request_path}")
query_params = parse_qs(urlparse(self.path).query)
self.app_logger.info(f"query_params: {query_params}")
# get database reference
db = get_database()
session = db.session
# Handle static files for dashboard
if self.config.dashboard_secret_path and self.path.startswith(
f"{self.config.dashboard_secret_path}/static/"
):
import os
file_path = self.path.replace(
f"{self.config.dashboard_secret_path}/static/", ""
@@ -543,8 +554,11 @@ class Handler(BaseHTTPRequestHandler):
self.end_headers()
try:
stats = self.tracker.get_stats()
dashboard_path = self.config.dashboard_secret_path
self.wfile.write(generate_dashboard(stats, dashboard_path).encode())
self.wfile.write(
generate_dashboard(
stats, self.config.dashboard_secret_path
).encode()
)
except BrokenPipeError:
pass
except Exception as e:
@@ -566,10 +580,7 @@ class Handler(BaseHTTPRequestHandler):
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
db = get_database()
ip_stats_list = db.get_ip_stats(limit=500)
self.wfile.write(json.dumps({"ips": ip_stats_list}).encode())
except BrokenPipeError:
@@ -593,15 +604,7 @@ class Handler(BaseHTTPRequestHandler):
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
# Parse query parameters
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
page_size = int(query_params.get("page_size", ["25"])[0])
sort_by = query_params.get("sort_by", ["total_requests"])[0]
@@ -639,11 +642,6 @@ class Handler(BaseHTTPRequestHandler):
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
# Parse query parameters
parsed_url = urlparse(self.path)
@@ -689,10 +687,7 @@ class Handler(BaseHTTPRequestHandler):
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
db = get_database()
ip_stats = db.get_ip_stats_by_ip(ip_address)
if ip_stats:
self.wfile.write(json.dumps(ip_stats).encode())
@@ -719,11 +714,7 @@ class Handler(BaseHTTPRequestHandler):
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
@@ -762,11 +753,7 @@ class Handler(BaseHTTPRequestHandler):
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
@@ -805,11 +792,7 @@ class Handler(BaseHTTPRequestHandler):
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
@@ -848,11 +831,7 @@ class Handler(BaseHTTPRequestHandler):
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
@@ -891,11 +870,7 @@ class Handler(BaseHTTPRequestHandler):
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
@@ -934,11 +909,7 @@ class Handler(BaseHTTPRequestHandler):
self.send_header("Expires", "0")
self.end_headers()
try:
from database import get_database
import json
from urllib.parse import urlparse, parse_qs
db = get_database()
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
page = int(query_params.get("page", ["1"])[0])
@@ -963,13 +934,54 @@ class Handler(BaseHTTPRequestHandler):
self.wfile.write(json.dumps({"error": str(e)}).encode())
return
# API endpoint for downloading malicious IPs blocklist file
if (
self.config.dashboard_secret_path
and request_path == f"{self.config.dashboard_secret_path}/api/get_banlist"
):
# get fwtype from request params
fwtype = query_params.get("fwtype", ["iptables"])[0]
filename = f"{fwtype}_banlist.txt"
if fwtype == "raw":
filename = f"malicious_ips.txt"
file_path = os.path.join(self.config.exports_path, f"{filename}")
try:
if os.path.exists(file_path):
with open(file_path, "rb") as f:
content = f.read()
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.send_header(
"Content-Disposition",
f'attachment; filename="{filename}"',
)
self.send_header("Content-Length", str(len(content)))
self.end_headers()
self.wfile.write(content)
else:
self.send_response(404)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"File not found")
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error serving malicious IPs file: {e}")
self.send_response(500)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"Internal server error")
return
# API endpoint for downloading malicious IPs file
if (
self.config.dashboard_secret_path
and self.path
== f"{self.config.dashboard_secret_path}/api/download/malicious_ips.txt"
):
import os
file_path = os.path.join(
os.path.dirname(__file__), "exports", "malicious_ips.txt"

View File

@@ -69,7 +69,9 @@ def main():
sanitized_city = sanitize_for_storage(city, 100) if city else None
sanitized_timezone = sanitize_for_storage(timezone, 50)
sanitized_isp = sanitize_for_storage(isp, 100)
sanitized_reverse = sanitize_for_storage(reverse, 255) if reverse else None
sanitized_reverse = (
sanitize_for_storage(reverse, 255) if reverse else None
)
sanitized_list_on = sanitize_dict(list_on, 100000)
db_manager.update_ip_rep_infos(

View File

@@ -4,9 +4,14 @@ import os
from logger import get_app_logger
from database import get_database
from config import get_config
from models import IpStats
from models import IpStats, AccessLog
from ip_utils import is_valid_public_ip
from sqlalchemy import distinct
from firewall.fwtype import FWType
from firewall.iptables import Iptables
from firewall.raw import Raw
config = get_config()
app_logger = get_app_logger()
# ----------------------
@@ -20,7 +25,7 @@ TASK_CONFIG = {
}
EXPORTS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "exports")
OUTPUT_FILE = os.path.join(EXPORTS_DIR, "malicious_ips.txt")
EXPORTS_DIR = config.exports_path
# ----------------------
@@ -48,7 +53,6 @@ def main():
)
# Filter out local/private IPs and the server's own IP
config = get_config()
server_ip = config.get_server_ip()
public_ips = [
@@ -61,14 +65,24 @@ def main():
os.makedirs(EXPORTS_DIR, exist_ok=True)
# Write IPs to file (one per line)
with open(OUTPUT_FILE, "w") as f:
for ip in public_ips:
f.write(f"{ip}\n")
for fwname in FWType._registry:
app_logger.info(
f"[Background Task] {task_name} exported {len(public_ips)} attacker IPs "
f"(filtered {len(attackers) - len(public_ips)} local/private IPs) to {OUTPUT_FILE}"
)
# get banlist for specific ip
fw = FWType.create(fwname)
banlist = fw.getBanlist(public_ips)
output_file = os.path.join(EXPORTS_DIR, f"{fwname}_banlist.txt")
if fwname == "raw":
output_file = os.path.join(EXPORTS_DIR, f"malicious_ips.txt")
with open(output_file, "w") as f:
f.write(f"{banlist}\n")
app_logger.info(
f"[Background Task] {task_name} exported {len(public_ips)} in {fwname} public IPs"
f"(filtered {len(attackers) - len(public_ips)} local/private IPs) to {output_file}"
)
except Exception as e:
app_logger.error(f"[Background Task] {task_name} failed: {e}")

View File

@@ -9,6 +9,9 @@ import html
from datetime import datetime
from zoneinfo import ZoneInfo
# imports for the __init_subclass__ method, do not remove pls
from firewall import fwtype
def _escape(value) -> str:
"""Escape HTML special characters to prevent XSS attacks."""
@@ -47,7 +50,9 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
# Generate suspicious accesses rows with clickable IPs
suspicious_rows = (
"\n".join([f"""<tr class="ip-row" data-ip="{_escape(log["ip"])}">
"\n".join(
[
f"""<tr class="ip-row" data-ip="{_escape(log["ip"])}">
<td class="ip-clickable">{_escape(log["ip"])}</td>
<td>{_escape(log["path"])}</td>
<td style="word-break: break-all;">{_escape(log["user_agent"][:60])}</td>
@@ -59,7 +64,10 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
<div class="loading">Loading stats...</div>
</div>
</td>
</tr>""" for log in stats["recent_suspicious"][-10:]])
</tr>"""
for log in stats["recent_suspicious"][-10:]
]
)
or '<tr><td colspan="4" style="text-align:center;">No suspicious activity detected</td></tr>'
)
@@ -137,6 +145,68 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
.download-btn:active {{
background: #1f7a2f;
}}
.banlist-dropdown {{
position: relative;
display: inline-block;
width: 100%;
}}
.banlist-dropdown-btn {{
display: block;
width: 100%;
padding: 8px 14px;
background: #238636;
color: #ffffff;
text-decoration: none;
border-radius: 6px;
font-weight: 500;
font-size: 13px;
transition: background 0.2s;
border: 1px solid #2ea043;
cursor: pointer;
text-align: left;
box-sizing: border-box;
}}
.banlist-dropdown-btn:hover {{
background: #2ea043;
}}
.banlist-dropdown-menu {{
display: none;
position: absolute;
right: 0;
left: 0;
background-color: #161b22;
box-shadow: 0px 8px 16px 0px rgba(0,0,0,0.3);
z-index: 1;
border: 1px solid #30363d;
border-radius: 6px;
margin-top: 4px;
overflow: hidden;
}}
.banlist-dropdown-menu.show {{
display: block;
}}
.banlist-dropdown-menu a {{
color: #c9d1d9;
padding: 6px 12px;
text-decoration: none;
display: flex;
align-items: center;
gap: 6px;
transition: background 0.2s;
font-size: 12px;
}}
.banlist-dropdown-menu a:hover {{
background-color: #1c2128;
color: #58a6ff;
}}
.banlist-dropdown-menu a.disabled {{
color: #6e7681;
cursor: not-allowed;
pointer-events: none;
}}
.banlist-icon {{
font-size: 14px;
}}
.stats-grid {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
@@ -978,9 +1048,17 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
<span class="github-logo-text">BlessedRebuS/Krawl</span>
</a>
<div class="download-section">
<a href="{dashboard_path}/api/download/malicious_ips.txt" class="download-btn" download>
Export Malicious IPs
</a>
<div class="banlist-dropdown">
<button class="banlist-dropdown-btn" onclick="toggleBanlistDropdown()">Export IPs Banlist</button>
<div id="banlistDropdown" class="banlist-dropdown-menu">
<a href="javascript:void(0)" onclick="downloadBanlist('raw')">
<span>Raw IPs</span>
</a>
<a href="javascript:void(0)" onclick="downloadBanlist('iptables')">
<span>IPTables Rules</span>
</a>
</div>
</div>
</div>
<h1>Krawl Dashboard</h1>
@@ -1269,6 +1347,43 @@ def generate_dashboard(stats: dict, dashboard_path: str = "") -> str:
<script>
const DASHBOARD_PATH = '{dashboard_path}';
// Dropdown menu functions
function toggleBanlistDropdown() {{
const dropdown = document.getElementById('banlistDropdown');
dropdown.classList.toggle('show');
}}
// Close dropdown when clicking outside
document.addEventListener('click', function(event) {{
const dropdown = document.querySelector('.banlist-dropdown');
if (!dropdown.contains(event.target)) {{
const menu = document.getElementById('banlistDropdown');
menu.classList.remove('show');
}}
}});
// Download banlist function
function downloadBanlist(fwtype) {{
const url = DASHBOARD_PATH + '/api/get_banlist?fwtype=' + encodeURIComponent(fwtype);
// Create a temporary link and trigger download
const link = document.createElement('a');
link.href = url;
// Set filename based on type
const filename = fwtype === 'raw' ? 'banlist_raw.txt' : 'banlist_iptables.sh';
link.setAttribute('download', filename);
// Append to body, click, and remove
document.body.appendChild(link);
link.click();
document.body.removeChild(link);
// Close dropdown after download
const menu = document.getElementById('banlistDropdown');
menu.classList.remove('show');
}}
function formatTimestamp(isoTimestamp) {{
if (!isoTimestamp) return 'N/A';
try {{