linted code

This commit is contained in:
carnivuth
2026-01-27 17:53:11 +01:00
parent 7e74896dfd
commit 2e2e4d65d0
4 changed files with 26 additions and 23 deletions

View File

@@ -6,7 +6,7 @@ class FWType(ABC):
"""Abstract base class for firewall types.""" """Abstract base class for firewall types."""
# Registry to store child classes # Registry to store child classes
_registry: Dict[str, Type['FWType']] = {} _registry: Dict[str, Type["FWType"]] = {}
def __init_subclass__(cls, **kwargs): def __init_subclass__(cls, **kwargs):
"""Automatically register subclasses with their class name.""" """Automatically register subclasses with their class name."""
@@ -14,7 +14,7 @@ class FWType(ABC):
cls._registry[cls.__name__.lower()] = cls cls._registry[cls.__name__.lower()] = cls
@classmethod @classmethod
def create(cls, fw_type: str, **kwargs) -> 'FWType': def create(cls, fw_type: str, **kwargs) -> "FWType":
""" """
Factory method to create instances of child classes. Factory method to create instances of child classes.
@@ -30,11 +30,13 @@ class FWType(ABC):
""" """
fw_type = fw_type.lower() fw_type = fw_type.lower()
if fw_type not in cls._registry: if fw_type not in cls._registry:
available = ', '.join(cls._registry.keys()) available = ", ".join(cls._registry.keys())
raise ValueError(f"Unknown firewall type: '{fw_type}'. Available: {available}") raise ValueError(
f"Unknown firewall type: '{fw_type}'. Available: {available}"
)
return cls._registry[fw_type](**kwargs) return cls._registry[fw_type](**kwargs)
@abstractmethod @abstractmethod
def getBanlist(self,ips): def getBanlist(self, ips):
"""Return the ruleset for the specific server""" """Return the ruleset for the specific server"""

View File

@@ -1,10 +1,11 @@
from typing_extensions import override from typing_extensions import override
from firewall.fwtype import FWType from firewall.fwtype import FWType
class Iptables(FWType): class Iptables(FWType):
@override @override
def getBanlist(self,ips) -> str: def getBanlist(self, ips) -> str:
""" """
Generate iptables ban rules from an array of IP addresses. Generate iptables ban rules from an array of IP addresses.
@@ -29,11 +30,7 @@ class Iptables(FWType):
ip = ip.strip() ip = ip.strip()
# Build the iptables command # Build the iptables command
rule_parts = [ rule_parts = ["iptables", "-A", chain, "-s", ip]
"iptables",
"-A", chain,
"-s", ip
]
# Add target # Add target
rule_parts.extend(["-j", target]) rule_parts.extend(["-j", target])

View File

@@ -1,10 +1,11 @@
from typing_extensions import override from typing_extensions import override
from firewall.fwtype import FWType from firewall.fwtype import FWType
class Raw(FWType): class Raw(FWType):
@override @override
def getBanlist(self,ips) -> str: def getBanlist(self, ips) -> str:
""" """
Generate raw list of bad IP addresses. Generate raw list of bad IP addresses.

View File

@@ -11,7 +11,7 @@ import json
import os import os
from database import get_database from database import get_database
from config import Config,get_config from config import Config, get_config
from firewall.fwtype import FWType from firewall.fwtype import FWType
# imports for the __init_subclass__ method, do not remove pls # imports for the __init_subclass__ method, do not remove pls
@@ -100,7 +100,6 @@ class Handler(BaseHTTPRequestHandler):
error_codes = [400, 401, 403, 404, 500, 502, 503] error_codes = [400, 401, 403, 404, 500, 502, 503]
return random.choice(error_codes) return random.choice(error_codes)
def _handle_sql_endpoint(self, path: str) -> bool: def _handle_sql_endpoint(self, path: str) -> bool:
""" """
Handle SQL injection honeypot endpoints. Handle SQL injection honeypot endpoints.
@@ -245,7 +244,6 @@ class Handler(BaseHTTPRequestHandler):
user_agent = self.headers.get("User-Agent", "") user_agent = self.headers.get("User-Agent", "")
post_data = "" post_data = ""
base_path = urlparse(self.path).path base_path = urlparse(self.path).path
if base_path in ["/api/search", "/api/sql", "/api/database"]: if base_path in ["/api/search", "/api/sql", "/api/database"]:
@@ -557,7 +555,11 @@ class Handler(BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
try: try:
stats = self.tracker.get_stats() stats = self.tracker.get_stats()
self.wfile.write(generate_dashboard(stats, self.config.dashboard_secret_path).encode()) self.wfile.write(
generate_dashboard(
stats, self.config.dashboard_secret_path
).encode()
)
except BrokenPipeError: except BrokenPipeError:
pass pass
except Exception as e: except Exception as e:
@@ -604,7 +606,6 @@ class Handler(BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
try: try:
page = int(query_params.get("page", ["1"])[0]) page = int(query_params.get("page", ["1"])[0])
page_size = int(query_params.get("page_size", ["25"])[0]) page_size = int(query_params.get("page_size", ["25"])[0])
sort_by = query_params.get("sort_by", ["total_requests"])[0] sort_by = query_params.get("sort_by", ["total_requests"])[0]
@@ -643,7 +644,6 @@ class Handler(BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
try: try:
# Parse query parameters # Parse query parameters
parsed_url = urlparse(self.path) parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query) query_params = parse_qs(parsed_url.query)
@@ -807,7 +807,7 @@ class Handler(BaseHTTPRequestHandler):
result = db.get_top_ips_paginated( result = db.get_top_ips_paginated(
page=page, page=page,
page_size=page_size, page_size=page_size,
pathsort_by=sort_by, pathsort_by=sort_by,
sort_order=sort_order, sort_order=sort_order,
) )
self.wfile.write(json.dumps(result).encode()) self.wfile.write(json.dumps(result).encode())
@@ -937,12 +937,12 @@ pathsort_by=sort_by,
# API endpoint for downloading malicious IPs blocklist file # API endpoint for downloading malicious IPs blocklist file
if ( if (
self.config.dashboard_secret_path and self.config.dashboard_secret_path
request_path == f"{self.config.dashboard_secret_path}/api/get_banlist" and request_path == f"{self.config.dashboard_secret_path}/api/get_banlist"
): ):
# get fwtype from request params # get fwtype from request params
fwtype = query_params.get("fwtype",["iptables"])[0] fwtype = query_params.get("fwtype", ["iptables"])[0]
# Query distinct suspicious IPs # Query distinct suspicious IPs
results = ( results = (
@@ -962,7 +962,10 @@ pathsort_by=sort_by,
self.send_response(200) self.send_response(200)
self.send_header("Content-type", "text/plain") self.send_header("Content-type", "text/plain")
self.send_header("Content-Disposition", f'attachment; filename="{fwtype}.txt"',) self.send_header(
"Content-Disposition",
f'attachment; filename="{fwtype}.txt"',
)
self.send_header("Content-Length", str(len(banlist))) self.send_header("Content-Length", str(len(banlist)))
self.end_headers() self.end_headers()
self.wfile.write(banlist.encode()) self.wfile.write(banlist.encode())