linted code

This commit is contained in:
Lorenzo Venerandi
2026-02-17 13:13:06 +01:00
parent 2c7b612587
commit c023d808c6
8 changed files with 65 additions and 34 deletions

View File

@@ -58,7 +58,9 @@ async def lifespan(app: FastAPI):
) )
webpages = None webpages = None
except IOError: except IOError:
app_logger.warning("Can't read webpages file. Using randomly generated links.") app_logger.warning(
"Can't read webpages file. Using randomly generated links."
)
app.state.webpages = webpages app.state.webpages = webpages
# Initialize canary counter # Initialize canary counter
@@ -82,9 +84,7 @@ DASHBOARD AVAILABLE AT
f"Canary token will appear after {config.canary_token_tries} tries" f"Canary token will appear after {config.canary_token_tries} tries"
) )
else: else:
app_logger.info( app_logger.info("No canary token configured (set CANARY_TOKEN_URL to enable)")
"No canary token configured (set CANARY_TOKEN_URL to enable)"
)
yield yield

View File

@@ -24,9 +24,7 @@ def get_templates() -> Jinja2Templates:
"""Get shared Jinja2Templates instance with custom filters.""" """Get shared Jinja2Templates instance with custom filters."""
global _templates global _templates
if _templates is None: if _templates is None:
templates_dir = os.path.join( templates_dir = os.path.join(os.path.dirname(__file__), "templates", "jinja2")
os.path.dirname(__file__), "templates", "jinja2"
)
_templates = Jinja2Templates(directory=templates_dir) _templates = Jinja2Templates(directory=templates_dir)
_templates.env.filters["format_ts"] = _format_ts _templates.env.filters["format_ts"] = _format_ts
return _templates return _templates

View File

@@ -59,8 +59,20 @@ class DeceptionMiddleware(BaseHTTPMiddleware):
elif any( elif any(
pattern in full_input pattern in full_input
for pattern in [ for pattern in [
"cmd=", "exec=", "command=", "execute=", "system=", "cmd=",
";", "|", "&&", "whoami", "id", "uname", "cat", "ls", "pwd", "exec=",
"command=",
"execute=",
"system=",
";",
"|",
"&&",
"whoami",
"id",
"uname",
"cat",
"ls",
"pwd",
] ]
): ):
attack_type_log = "COMMAND_INJECTION" attack_type_log = "COMMAND_INJECTION"

View File

@@ -258,9 +258,7 @@ async def raw_request(log_id: int, request: Request):
return JSONResponse( return JSONResponse(
content={"error": "Raw request not found"}, status_code=404 content={"error": "Raw request not found"}, status_code=404
) )
return JSONResponse( return JSONResponse(content={"raw_request": raw}, headers=_no_cache_headers())
content={"raw_request": raw}, headers=_no_cache_headers()
)
except Exception as e: except Exception as e:
get_app_logger().error(f"Error fetching raw request: {e}") get_app_logger().error(f"Error fetching raw request: {e}")
return JSONResponse(content={"error": str(e)}, status_code=500) return JSONResponse(content={"error": str(e)}, status_code=500)

View File

@@ -46,6 +46,7 @@ router = APIRouter()
# --- Helper functions --- # --- Helper functions ---
def _should_return_error(config: Config) -> bool: def _should_return_error(config: Config) -> bool:
if config.probability_error_codes <= 0: if config.probability_error_codes <= 0:
return False return False
@@ -62,6 +63,7 @@ def _get_random_error_code() -> int:
# --- HEAD --- # --- HEAD ---
@router.head("/{path:path}") @router.head("/{path:path}")
async def handle_head(path: str): async def handle_head(path: str):
return Response(status_code=200, headers={"Content-Type": "text/html"}) return Response(status_code=200, headers={"Content-Type": "text/html"})
@@ -69,6 +71,7 @@ async def handle_head(path: str):
# --- POST routes --- # --- POST routes ---
@router.post("/api/search") @router.post("/api/search")
@router.post("/api/sql") @router.post("/api/sql")
@router.post("/api/database") @router.post("/api/database")
@@ -90,10 +93,14 @@ async def sql_endpoint_post(request: Request):
access_logger.warning( access_logger.warning(
f"[SQL INJECTION DETECTED POST] {client_ip} - {base_path}" f"[SQL INJECTION DETECTED POST] {client_ip} - {base_path}"
) )
return Response(content=error_msg, status_code=status_code, media_type=content_type) return Response(
content=error_msg, status_code=status_code, media_type=content_type
)
else: else:
response_data = get_sql_response_with_data(base_path, post_data) response_data = get_sql_response_with_data(base_path, post_data)
return Response(content=response_data, status_code=200, media_type="application/json") return Response(
content=response_data, status_code=200, media_type="application/json"
)
@router.post("/api/contact") @router.post("/api/contact")
@@ -119,9 +126,7 @@ async def contact_post(request: Request):
f"[XSS ATTEMPT DETECTED] {client_ip} - {request.url.path} - Data: {post_data[:200]}" f"[XSS ATTEMPT DETECTED] {client_ip} - {request.url.path} - Data: {post_data[:200]}"
) )
else: else:
access_logger.info( access_logger.info(f"[XSS ENDPOINT POST] {client_ip} - {request.url.path}")
f"[XSS ENDPOINT POST] {client_ip} - {request.url.path}"
)
tracker.record_access( tracker.record_access(
ip=client_ip, ip=client_ip,
@@ -186,6 +191,7 @@ async def credential_capture_post(request: Request, path: str):
# --- GET special paths --- # --- GET special paths ---
@router.get("/robots.txt") @router.get("/robots.txt")
async def robots_txt(): async def robots_txt():
return PlainTextResponse(html_templates.robots_txt()) return PlainTextResponse(html_templates.robots_txt())
@@ -209,18 +215,26 @@ async def fake_users_json():
@router.get("/api_keys.json") @router.get("/api_keys.json")
async def fake_api_keys(): async def fake_api_keys():
return Response(content=api_keys_json(), status_code=200, media_type="application/json") return Response(
content=api_keys_json(), status_code=200, media_type="application/json"
)
@router.get("/config.json") @router.get("/config.json")
async def fake_config_json(): async def fake_config_json():
return Response(content=api_response("/api/config"), status_code=200, media_type="application/json") return Response(
content=api_response("/api/config"),
status_code=200,
media_type="application/json",
)
# Override the generic /users.json to return actual content # Override the generic /users.json to return actual content
@router.get("/users.json", include_in_schema=False) @router.get("/users.json", include_in_schema=False)
async def fake_users_json_content(): async def fake_users_json_content():
return Response(content=users_json(), status_code=200, media_type="application/json") return Response(
content=users_json(), status_code=200, media_type="application/json"
)
@router.get("/admin") @router.get("/admin")
@@ -281,7 +295,9 @@ async def fake_phpmyadmin(path: str = ""):
@router.get("/.env") @router.get("/.env")
async def fake_env(): async def fake_env():
return Response(content=api_response("/.env"), status_code=200, media_type="application/json") return Response(
content=api_response("/.env"), status_code=200, media_type="application/json"
)
@router.get("/backup/") @router.get("/backup/")
@@ -295,6 +311,7 @@ async def fake_directory_listing(request: Request):
# --- SQL injection honeypot GET endpoints --- # --- SQL injection honeypot GET endpoints ---
@router.get("/api/search") @router.get("/api/search")
@router.get("/api/sql") @router.get("/api/sql")
@router.get("/api/database") @router.get("/api/database")
@@ -312,26 +329,34 @@ async def sql_endpoint_get(request: Request):
access_logger.warning( access_logger.warning(
f"[SQL INJECTION DETECTED] {client_ip} - {base_path} - Query: {request_query[:100] if request_query else 'empty'}" f"[SQL INJECTION DETECTED] {client_ip} - {base_path} - Query: {request_query[:100] if request_query else 'empty'}"
) )
return Response(content=error_msg, status_code=status_code, media_type=content_type) return Response(
content=error_msg, status_code=status_code, media_type=content_type
)
else: else:
access_logger.info( access_logger.info(
f"[SQL ENDPOINT] {client_ip} - {base_path} - Query: {request_query[:100] if request_query else 'empty'}" f"[SQL ENDPOINT] {client_ip} - {base_path} - Query: {request_query[:100] if request_query else 'empty'}"
) )
response_data = get_sql_response_with_data(base_path, request_query) response_data = get_sql_response_with_data(base_path, request_query)
return Response(content=response_data, status_code=200, media_type="application/json") return Response(
content=response_data, status_code=200, media_type="application/json"
)
# --- Generic /api/* fake endpoints --- # --- Generic /api/* fake endpoints ---
@router.get("/api/{path:path}") @router.get("/api/{path:path}")
async def fake_api_catchall(request: Request, path: str): async def fake_api_catchall(request: Request, path: str):
full_path = f"/api/{path}" full_path = f"/api/{path}"
return Response(content=api_response(full_path), status_code=200, media_type="application/json") return Response(
content=api_response(full_path), status_code=200, media_type="application/json"
)
# --- Catch-all GET (trap pages with random links) --- # --- Catch-all GET (trap pages with random links) ---
# This MUST be registered last in the router # This MUST be registered last in the router
@router.get("/{path:path}") @router.get("/{path:path}")
async def trap_page(request: Request, path: str): async def trap_page(request: Request, path: str):
"""Generate trap page with random links. This is the catch-all route.""" """Generate trap page with random links. This is the catch-all route."""
@@ -365,9 +390,7 @@ async def trap_page(request: Request, path: str):
# Random error response # Random error response
if _should_return_error(config): if _should_return_error(config):
error_code = _get_random_error_code() error_code = _get_random_error_code()
access_logger.info( access_logger.info(f"Returning error {error_code} to {client_ip} - {full_path}")
f"Returning error {error_code} to {client_ip} - {full_path}"
)
return Response(status_code=error_code) return Response(status_code=error_code)
# Response delay # Response delay