feat: implement brute force protection and error handling for authentication
This commit is contained in:
@@ -10,6 +10,7 @@ import hashlib
|
||||
import hmac
|
||||
import os
|
||||
import secrets
|
||||
import time
|
||||
|
||||
from fastapi import APIRouter, Request, Response, Query, Cookie
|
||||
from fastapi.responses import JSONResponse, PlainTextResponse
|
||||
@@ -21,6 +22,12 @@ from logger import get_app_logger
|
||||
# Server-side session token store (valid tokens for authenticated sessions)
|
||||
_auth_tokens: set = set()
|
||||
|
||||
# Bruteforce protection: tracks failed attempts per IP
|
||||
# { ip: { "attempts": int, "locked_until": float } }
|
||||
_auth_attempts: dict = {}
|
||||
_AUTH_MAX_ATTEMPTS = 5
|
||||
_AUTH_BASE_LOCKOUT = 30 # seconds, doubles on each lockout
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@@ -45,9 +52,27 @@ def verify_auth(request: Request) -> bool:
|
||||
|
||||
@router.post("/api/auth")
|
||||
async def authenticate(request: Request, body: AuthRequest):
|
||||
ip = request.client.host
|
||||
|
||||
# Check if IP is currently locked out
|
||||
record = _auth_attempts.get(ip)
|
||||
if record and record["locked_until"] > time.time():
|
||||
remaining = int(record["locked_until"] - time.time())
|
||||
return JSONResponse(
|
||||
content={
|
||||
"authenticated": False,
|
||||
"error": f"Too many attempts. Try again in {remaining}s",
|
||||
"locked": True,
|
||||
"retry_after": remaining,
|
||||
},
|
||||
status_code=429,
|
||||
)
|
||||
|
||||
config = request.app.state.config
|
||||
expected = hashlib.sha256(config.dashboard_password.encode()).hexdigest()
|
||||
if hmac.compare_digest(body.fingerprint, expected):
|
||||
# Success — clear failed attempts
|
||||
_auth_attempts.pop(ip, None)
|
||||
token = secrets.token_hex(32)
|
||||
_auth_tokens.add(token)
|
||||
response = JSONResponse(content={"authenticated": True})
|
||||
@@ -58,8 +83,38 @@ async def authenticate(request: Request, body: AuthRequest):
|
||||
samesite="strict",
|
||||
)
|
||||
return response
|
||||
|
||||
# Failed attempt — track and possibly lock out
|
||||
if not record:
|
||||
record = {"attempts": 0, "locked_until": 0, "lockouts": 0}
|
||||
_auth_attempts[ip] = record
|
||||
record["attempts"] += 1
|
||||
|
||||
if record["attempts"] >= _AUTH_MAX_ATTEMPTS:
|
||||
lockout = _AUTH_BASE_LOCKOUT * (2 ** record["lockouts"])
|
||||
record["locked_until"] = time.time() + lockout
|
||||
record["lockouts"] += 1
|
||||
record["attempts"] = 0
|
||||
get_app_logger().warning(
|
||||
f"Auth bruteforce: IP {ip} locked out for {lockout}s "
|
||||
f"(lockout #{record['lockouts']})"
|
||||
)
|
||||
return JSONResponse(
|
||||
content={
|
||||
"authenticated": False,
|
||||
"error": f"Too many attempts. Locked for {lockout}s",
|
||||
"locked": True,
|
||||
"retry_after": lockout,
|
||||
},
|
||||
status_code=429,
|
||||
)
|
||||
|
||||
remaining_attempts = _AUTH_MAX_ATTEMPTS - record["attempts"]
|
||||
return JSONResponse(
|
||||
content={"authenticated": False, "error": "Invalid password"},
|
||||
content={
|
||||
"authenticated": False,
|
||||
"error": f"Invalid password. {remaining_attempts} attempt{'s' if remaining_attempts != 1 else ''} remaining",
|
||||
},
|
||||
status_code=401,
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user