From c730ba2bfda741cddee9135f738ae3af9b7dc98b Mon Sep 17 00:00:00 2001 From: carnivuth Date: Tue, 27 Jan 2026 17:53:11 +0100 Subject: [PATCH] linted code --- src/firewall/fwtype.py | 12 +++++++----- src/firewall/iptables.py | 9 +++------ src/firewall/raw.py | 3 ++- src/handler.py | 25 ++++++++++++++----------- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/src/firewall/fwtype.py b/src/firewall/fwtype.py index 2995489..0e0e421 100644 --- a/src/firewall/fwtype.py +++ b/src/firewall/fwtype.py @@ -6,7 +6,7 @@ class FWType(ABC): """Abstract base class for firewall types.""" # Registry to store child classes - _registry: Dict[str, Type['FWType']] = {} + _registry: Dict[str, Type["FWType"]] = {} def __init_subclass__(cls, **kwargs): """Automatically register subclasses with their class name.""" @@ -14,7 +14,7 @@ class FWType(ABC): cls._registry[cls.__name__.lower()] = cls @classmethod - def create(cls, fw_type: str, **kwargs) -> 'FWType': + def create(cls, fw_type: str, **kwargs) -> "FWType": """ Factory method to create instances of child classes. @@ -30,11 +30,13 @@ class FWType(ABC): """ fw_type = fw_type.lower() if fw_type not in cls._registry: - available = ', '.join(cls._registry.keys()) - raise ValueError(f"Unknown firewall type: '{fw_type}'. Available: {available}") + available = ", ".join(cls._registry.keys()) + raise ValueError( + f"Unknown firewall type: '{fw_type}'. Available: {available}" + ) return cls._registry[fw_type](**kwargs) @abstractmethod - def getBanlist(self,ips): + def getBanlist(self, ips): """Return the ruleset for the specific server""" diff --git a/src/firewall/iptables.py b/src/firewall/iptables.py index 73ac623..159171e 100644 --- a/src/firewall/iptables.py +++ b/src/firewall/iptables.py @@ -1,10 +1,11 @@ from typing_extensions import override from firewall.fwtype import FWType + class Iptables(FWType): @override - def getBanlist(self,ips) -> str: + def getBanlist(self, ips) -> str: """ Generate iptables ban rules from an array of IP addresses. @@ -29,11 +30,7 @@ class Iptables(FWType): ip = ip.strip() # Build the iptables command - rule_parts = [ - "iptables", - "-A", chain, - "-s", ip - ] + rule_parts = ["iptables", "-A", chain, "-s", ip] # Add target rule_parts.extend(["-j", target]) diff --git a/src/firewall/raw.py b/src/firewall/raw.py index 1e29e75..e0c82fe 100644 --- a/src/firewall/raw.py +++ b/src/firewall/raw.py @@ -1,10 +1,11 @@ from typing_extensions import override from firewall.fwtype import FWType + class Raw(FWType): @override - def getBanlist(self,ips) -> str: + def getBanlist(self, ips) -> str: """ Generate raw list of bad IP addresses. diff --git a/src/handler.py b/src/handler.py index 3c9f6f6..38a15d1 100644 --- a/src/handler.py +++ b/src/handler.py @@ -11,7 +11,7 @@ import json import os from database import get_database -from config import Config,get_config +from config import Config, get_config from firewall.fwtype import FWType # imports for the __init_subclass__ method, do not remove pls @@ -100,7 +100,6 @@ class Handler(BaseHTTPRequestHandler): error_codes = [400, 401, 403, 404, 500, 502, 503] return random.choice(error_codes) - def _handle_sql_endpoint(self, path: str) -> bool: """ Handle SQL injection honeypot endpoints. @@ -245,7 +244,6 @@ class Handler(BaseHTTPRequestHandler): user_agent = self.headers.get("User-Agent", "") post_data = "" - base_path = urlparse(self.path).path if base_path in ["/api/search", "/api/sql", "/api/database"]: @@ -516,7 +514,11 @@ class Handler(BaseHTTPRequestHandler): self.end_headers() try: stats = self.tracker.get_stats() - self.wfile.write(generate_dashboard(stats, self.config.dashboard_secret_path).encode()) + self.wfile.write( + generate_dashboard( + stats, self.config.dashboard_secret_path + ).encode() + ) except BrokenPipeError: pass except Exception as e: @@ -563,7 +565,6 @@ class Handler(BaseHTTPRequestHandler): self.end_headers() try: - page = int(query_params.get("page", ["1"])[0]) page_size = int(query_params.get("page_size", ["25"])[0]) sort_by = query_params.get("sort_by", ["total_requests"])[0] @@ -602,7 +603,6 @@ class Handler(BaseHTTPRequestHandler): self.end_headers() try: - # Parse query parameters parsed_url = urlparse(self.path) query_params = parse_qs(parsed_url.query) @@ -766,7 +766,7 @@ class Handler(BaseHTTPRequestHandler): result = db.get_top_ips_paginated( page=page, page_size=page_size, -pathsort_by=sort_by, + pathsort_by=sort_by, sort_order=sort_order, ) self.wfile.write(json.dumps(result).encode()) @@ -896,12 +896,12 @@ pathsort_by=sort_by, # API endpoint for downloading malicious IPs blocklist file if ( - self.config.dashboard_secret_path and - request_path == f"{self.config.dashboard_secret_path}/api/get_banlist" + self.config.dashboard_secret_path + and request_path == f"{self.config.dashboard_secret_path}/api/get_banlist" ): # get fwtype from request params - fwtype = query_params.get("fwtype",["iptables"])[0] + fwtype = query_params.get("fwtype", ["iptables"])[0] # Query distinct suspicious IPs results = ( @@ -921,7 +921,10 @@ pathsort_by=sort_by, self.send_response(200) self.send_header("Content-type", "text/plain") - self.send_header("Content-Disposition", f'attachment; filename="{fwtype}.txt"',) + self.send_header( + "Content-Disposition", + f'attachment; filename="{fwtype}.txt"', + ) self.send_header("Content-Length", str(len(banlist))) self.end_headers() self.wfile.write(banlist.encode())