feat: implement FastAPI authentication with password protection for admin panel

This commit is contained in:
Lorenzo Venerandi
2026-03-06 22:19:59 +01:00
parent 755de7f231
commit 18536f0706
8 changed files with 188 additions and 4 deletions

View File

@@ -7,13 +7,19 @@ All endpoints are prefixed with the secret dashboard path.
"""
import os
import secrets
import hmac
from fastapi import APIRouter, Request, Response, Query
from fastapi import APIRouter, Request, Response, Query, Cookie
from fastapi.responses import JSONResponse, PlainTextResponse
from pydantic import BaseModel
from dependencies import get_db
from logger import get_app_logger
# Server-side session token store (valid tokens for authenticated sessions)
_auth_tokens: set = set()
router = APIRouter()
@@ -26,6 +32,54 @@ def _no_cache_headers() -> dict:
}
class AuthRequest(BaseModel):
password: str
def verify_auth(request: Request) -> bool:
"""Check if the request has a valid auth session cookie."""
token = request.cookies.get("krawl_auth")
return token is not None and token in _auth_tokens
@router.post("/api/auth")
async def authenticate(request: Request, body: AuthRequest):
config = request.app.state.config
if hmac.compare_digest(body.password, config.dashboard_password):
token = secrets.token_hex(32)
_auth_tokens.add(token)
response = JSONResponse(content={"authenticated": True})
response.set_cookie(
key="krawl_auth",
value=token,
httponly=True,
samesite="strict",
)
return response
return JSONResponse(
content={"authenticated": False, "error": "Invalid password"},
status_code=401,
)
@router.post("/api/auth/logout")
async def logout(request: Request):
token = request.cookies.get("krawl_auth")
if token and token in _auth_tokens:
_auth_tokens.discard(token)
response = JSONResponse(content={"authenticated": False})
response.delete_cookie(key="krawl_auth")
return response
@router.get("/api/auth/check")
async def auth_check(request: Request):
"""Check if the current session is authenticated."""
if verify_auth(request):
return JSONResponse(content={"authenticated": True})
return JSONResponse(content={"authenticated": False}, status_code=401)
@router.get("/api/all-ip-stats")
async def all_ip_stats(request: Request):
db = get_db()