Files
DomGod/app/beauty_main.py

509 lines
20 KiB
Python

"""BeautyLeads — Cosmetics B2B intelligence dashboard (port 7788).
Shares the same /data volume as the main DomGod service.
Does NOT re-download parquet or rebuild DuckDB index (those run in main service).
Runs its own beauty AI assessment worker against the shared enriched_domains table.
"""
import asyncio
import logging
import os
from pathlib import Path
from contextlib import asynccontextmanager
import aiosqlite
from typing import Optional
from fastapi import FastAPI, Query, Request, Response
from fastapi.responses import StreamingResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from starlette.middleware.base import BaseHTTPMiddleware
from dotenv import load_dotenv
load_dotenv()
from app.db import (
SQLITE_PATH, init_db, get_stats, get_domains, get_enriched,
build_duckdb_index, index_status,
queue_beauty, requeue_beauty, get_beauty_queue_status, save_beauty_assessment, get_beauty_leads,
save_prescreen_results,
init_beauty_auth, verify_beauty_user, create_beauty_session, validate_beauty_session,
delete_beauty_session, change_beauty_password, list_beauty_users, add_beauty_user,
delete_beauty_user,
)
from app.validator import start_validator, stop_validator, get_validator_status
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
# ── Beauty AI worker ──────────────────────────────────────────────────────────
_beauty_worker_task: Optional[asyncio.Task] = None
async def _assess_one_beauty(domain: str) -> None:
from app.beauty_ai import assess_beauty_domain
from app.site_analyzer import analyze_site
logger.info("Beauty AI: starting %s", domain)
try:
async with asyncio.timeout(180):
analysis = await analyze_site(domain)
assessment = await assess_beauty_domain(analysis)
await save_beauty_assessment(domain, assessment)
logger.info("Beauty AI: saved %s%s", domain, assessment.get("lead_quality"))
except Exception as e:
logger.error("Beauty AI: failed %s%s", domain, e)
try:
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
await db.execute(
"UPDATE beauty_queue SET status='failed', completed_at=datetime('now'), error=? WHERE domain=?",
(str(e)[:400], domain),
)
await db.commit()
except Exception:
pass
async def _beauty_worker_loop():
logger.info("Beauty AI worker starting")
# Reset stale running jobs
try:
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
await db.execute("UPDATE beauty_queue SET status='pending' WHERE status='running'")
await db.commit()
except Exception as e:
logger.error("Beauty worker: stale reset failed: %s", e)
while True:
rows = []
try:
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
async with db.execute(
"SELECT domain FROM beauty_queue WHERE status='pending' LIMIT 5"
) as cur:
rows = await cur.fetchall()
if rows:
await db.executemany(
"UPDATE beauty_queue SET status='running' WHERE domain=?",
[(r[0],) for r in rows],
)
await db.commit()
except Exception as e:
logger.error("Beauty worker DB error: %s", e)
await asyncio.sleep(5)
continue
if not rows:
await asyncio.sleep(3)
continue
await asyncio.gather(*[_assess_one_beauty(r[0]) for r in rows], return_exceptions=True)
def _start_beauty_worker():
global _beauty_worker_task
if _beauty_worker_task is None or _beauty_worker_task.done():
_beauty_worker_task = asyncio.create_task(_beauty_worker_loop())
logger.info("Beauty AI worker started")
# ── App lifespan ──────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
await init_beauty_auth()
asyncio.create_task(build_duckdb_index())
_start_beauty_worker()
logger.info("BeautyLeads ready on port 7788")
yield
app = FastAPI(title="BeautyLeads", lifespan=lifespan)
# ── Auth middleware ───────────────────────────────────────────────────────────
class AuthMiddleware(BaseHTTPMiddleware):
# Only these exact paths skip the session check:
# - login (no session yet)
# - logout (gracefully accepts expired/missing session)
# - login page and favicon
_EXEMPT_PREFIXES = ()
_EXEMPT_EXACT = {"/api/auth/login", "/api/auth/logout", "/login.html", "/favicon.ico"}
async def dispatch(self, request: Request, call_next):
path = request.url.path
if path in self._EXEMPT_EXACT or any(
path.startswith(p) for p in self._EXEMPT_PREFIXES
):
return await call_next(request)
token = request.cookies.get("beauty_session")
if token:
user = await validate_beauty_session(token)
if user:
request.state.user = user
return await call_next(request)
# Unauthenticated — API calls get 401, browser requests get redirect
if path.startswith("/api/"):
return JSONResponse({"detail": "Not authenticated"}, status_code=401)
return RedirectResponse(url="/login.html", status_code=302)
app.add_middleware(AuthMiddleware)
# ── Auth routes ───────────────────────────────────────────────────────────────
@app.post("/api/auth/login")
async def auth_login(body: dict, response: Response):
username = (body.get("username") or "").strip()
password = body.get("password") or ""
user = await verify_beauty_user(username, password)
if not user:
return JSONResponse({"detail": "Invalid credentials"}, status_code=401)
token = await create_beauty_session(user["username"])
response.set_cookie(
"beauty_session", token,
max_age=30 * 24 * 3600,
httponly=True,
samesite="lax",
path="/",
)
return {"username": user["username"], "is_admin": user["is_admin"]}
@app.post("/api/auth/logout")
async def auth_logout(request: Request, response: Response):
token = request.cookies.get("beauty_session")
if token:
await delete_beauty_session(token)
response.delete_cookie("beauty_session", path="/")
return {"ok": True}
@app.get("/api/auth/me")
async def auth_me(request: Request):
return request.state.user # middleware already validated; state always set here
@app.post("/api/auth/change-password")
async def auth_change_password(request: Request, body: dict, response: Response):
user = request.state.user
current = body.get("current_password") or ""
new_pw = body.get("new_password") or ""
if not new_pw or len(new_pw) < 8:
return JSONResponse({"detail": "Password must be at least 8 characters"}, status_code=400)
if not await verify_beauty_user(user["username"], current):
return JSONResponse({"detail": "Current password is incorrect"}, status_code=400)
await change_beauty_password(user["username"], new_pw)
# Clear cookie so the user is re-directed to login after password change
response.delete_cookie("beauty_session", path="/")
return {"ok": True}
@app.get("/api/auth/users")
async def auth_list_users(request: Request):
if not request.state.user.get("is_admin"):
return JSONResponse({"detail": "Admin only"}, status_code=403)
return await list_beauty_users()
@app.post("/api/auth/users")
async def auth_add_user(request: Request, body: dict):
if not request.state.user.get("is_admin"):
return JSONResponse({"detail": "Admin only"}, status_code=403)
username = (body.get("username") or "").strip()
password = body.get("password") or ""
is_admin = bool(body.get("is_admin", False))
if not username or not password:
return JSONResponse({"detail": "username and password required"}, status_code=400)
if len(password) < 8:
return JSONResponse({"detail": "Password must be at least 8 characters"}, status_code=400)
try:
await add_beauty_user(username, password, is_admin)
except Exception as e:
return JSONResponse({"detail": f"Could not create user: {e}"}, status_code=400)
return {"ok": True}
@app.delete("/api/auth/users/{username}")
async def auth_delete_user(username: str, request: Request):
if not request.state.user.get("is_admin"):
return JSONResponse({"detail": "Admin only"}, status_code=403)
if username == request.state.user["username"]:
return JSONResponse({"detail": "Cannot delete yourself"}, status_code=400)
await delete_beauty_user(username)
return {"ok": True}
# ── Shared read endpoints (same DB) ──────────────────────────────────────────
@app.get("/api/stats")
async def stats():
return await get_stats()
@app.get("/api/index/status")
async def get_index_status():
return index_status()
@app.get("/api/domains")
async def domains(
tld: str = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(100, ge=1, le=5000),
live_only: bool = Query(False),
alpha_only: bool = Query(False),
no_sld: bool = Query(False),
keyword: str = Query(None),
):
total, rows = await get_domains(
tld=tld, page=page, limit=limit,
alpha_only=alpha_only, no_sld=no_sld,
keyword=keyword, live_only=live_only,
)
return {"page": page, "limit": limit, "total": total, "results": rows}
@app.get("/api/enriched")
async def enriched(
min_score: int = Query(0),
country: str = Query(None),
prescreen_status: str = Query(None),
niche: str = Query(None),
site_type: str = Query(None),
keyword: str = Query(None),
tld: str = Query(None),
alpha_only: bool = Query(False),
no_sld: bool = Query(False),
assessed: str = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(100, ge=1, le=5000),
):
total, rows = await get_enriched(
min_score=min_score, country=country,
prescreen_status=prescreen_status, niche=niche, site_type=site_type,
keyword=keyword, tld=tld,
alpha_only=alpha_only, no_sld=no_sld,
beauty_assessed=assessed,
page=page, limit=limit,
)
return {"page": page, "limit": limit, "total": total, "results": rows}
# ── Validator (shared) ────────────────────────────────────────────────────────
@app.post("/api/validator/start")
async def validator_start(tld: str = Query(None), rescan_dead: bool = Query(False)):
start_validator(tld_filter=tld or None, rescan_dead=rescan_dead)
return get_validator_status()
@app.post("/api/validator/stop")
async def validator_stop():
stop_validator()
return {"status": "stopped"}
@app.get("/api/validator/status")
async def validator_status():
return get_validator_status()
# ── Pre-screen (shared) ───────────────────────────────────────────────────────
@app.post("/api/validate/batch")
async def validate_batch(body: dict):
"""HTTP-check only — no DeepSeek classification. Fast live/dead check for bulk selection."""
domains_list = body.get("domains", [])
if not domains_list:
return JSONResponse({"error": "no domains provided"}, status_code=400)
if len(domains_list) > 500:
return JSONResponse({"error": "max 500 per batch"}, status_code=400)
from app.prescreener import prescreen_domains
results = await prescreen_domains(domains_list)
await save_prescreen_results(results)
counts: dict = {}
for r in results:
s = r.get("prescreen_status", "dead")
counts[s] = counts.get(s, 0) + 1
return {"total": len(domains_list), "live": counts.get("live", 0),
"dead": counts.get("dead", 0), "parked": counts.get("parked", 0),
"redirect": counts.get("redirect", 0), "error": counts.get("error", 0)}
@app.post("/api/prescreen/batch")
async def prescreen_batch(body: dict):
domains_list = body.get("domains", [])
if not domains_list:
return JSONResponse({"error": "no domains provided"}, status_code=400)
if len(domains_list) > 200:
return JSONResponse({"error": "max 200 domains per batch"}, status_code=400)
from app.prescreener import prescreen_domains, classify_with_deepseek, DEEPSEEK_BATCH_SIZE
# Phase 1: HTTP check — runs synchronously, finishes in ~30-90s, saves immediately.
results = await prescreen_domains(domains_list)
await save_prescreen_results(results)
counts: dict = {}
for r in results:
s = r.get("prescreen_status", "dead")
counts[s] = counts.get(s, 0) + 1
live = [r for r in results if r.get("prescreen_status") == "live"]
# Phase 2: DeepSeek classification — fires in the background so the HTTP
# response is returned immediately. Results are saved async; the Browse
# table will show niche/type once the background task completes.
if live:
async def _classify_bg(items: list) -> None:
try:
batches = [items[i:i + DEEPSEEK_BATCH_SIZE]
for i in range(0, len(items), DEEPSEEK_BATCH_SIZE)]
for i, batch in enumerate(batches):
if i > 0:
await asyncio.sleep(3)
cls = await classify_with_deepseek(batch)
if cls:
await save_prescreen_results(cls)
logger.info("Prescreen BG: classified %d domains", len(cls))
except Exception as e:
logger.error("Prescreen BG classification failed: %s", e)
asyncio.create_task(_classify_bg(live))
return {
"total": len(domains_list),
"live": counts.get("live", 0),
"parked": counts.get("parked", 0),
"redirect": counts.get("redirect", 0),
"dead": counts.get("dead", 0),
"error": counts.get("error", 0),
"classifying": len(live), # niche/type arrives shortly via background task
}
# ── Beauty AI endpoints ───────────────────────────────────────────────────────
@app.post("/api/beauty/assess/batch")
async def beauty_assess_batch(body: dict):
domains_list = body.get("domains", [])
if not domains_list:
return JSONResponse({"error": "no domains provided"}, status_code=400)
await queue_beauty(domains_list)
_start_beauty_worker()
return {"queued": len(domains_list)}
@app.post("/api/beauty/reassess/batch")
async def beauty_reassess_batch(body: dict):
"""Re-queue domains for fresh assessment, resetting any existing result."""
domains_list = body.get("domains", [])
if not domains_list:
return JSONResponse({"error": "no domains provided"}, status_code=400)
await requeue_beauty(domains_list)
_start_beauty_worker()
return {"requeued": len(domains_list)}
@app.post("/api/beauty/worker/restart")
async def beauty_worker_restart():
_start_beauty_worker()
return {"status": "restarted"}
@app.post("/api/beauty/reset")
async def beauty_reset():
"""Reset stale running jobs back to pending."""
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
r = await db.execute("UPDATE beauty_queue SET status='pending' WHERE status='running'")
count = r.rowcount
await db.commit()
_start_beauty_worker()
return {"reset": count}
@app.get("/api/beauty/status")
async def beauty_status():
return await get_beauty_queue_status()
@app.get("/api/beauty/leads")
async def beauty_leads(
quality: str = Query(None),
country: str = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(100, ge=1, le=500),
):
total, rows = await get_beauty_leads(quality=quality, country=country, page=page, limit=limit)
return {"page": page, "limit": limit, "total": total, "results": rows}
@app.post("/api/beauty/assess/single")
async def beauty_assess_single(body: dict):
domain = body.get("domain")
if not domain:
return JSONResponse({"error": "no domain"}, status_code=400)
from app.beauty_ai import assess_beauty_domain
from app.site_analyzer import analyze_site
analysis = await analyze_site(domain)
assessment = await assess_beauty_domain(analysis)
await save_beauty_assessment(domain, assessment)
return {**assessment, "site_analysis": analysis}
# ── Export ────────────────────────────────────────────────────────────────────
@app.get("/api/beauty/export")
async def export_beauty_csv(quality: str = Query(None), country: str = Query(None)):
import json as _json
async def generate():
yield "domain,quality,business_name,country_fiscal,countries_active,categories,detected_brands,portfolio_matches,contact_email,contact_phone,proposal,outreach_subject,outreach_email\n"
p = 1
while True:
_, rows = await get_beauty_leads(quality=quality, country=country, page=p, limit=500)
if not rows:
break
for r in rows:
b = r.get("_beauty") or {}
def esc(v):
return f'"{str(v or "").replace(chr(34), chr(39))}"'
line = ",".join([
esc(r.get("domain")),
esc(r.get("beauty_lead_quality")),
esc(b.get("business_name")),
esc(b.get("country_fiscal")),
esc(", ".join(b.get("countries_active") or [])),
esc(", ".join(b.get("categories") or [])),
esc(", ".join(b.get("detected_brands") or [])),
esc(", ".join(b.get("dist_matches") or [])),
esc(b.get("contact_email")),
esc(b.get("contact_phone")),
esc(b.get("b2b_proposal")),
esc(b.get("outreach_subject")),
esc(b.get("outreach_email")),
])
yield line + "\n"
p += 1
qual = f"_{quality.lower()}" if quality else ""
return StreamingResponse(
generate(), media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="beautyleads{qual}.csv"'},
)
# ── Static UI ─────────────────────────────────────────────────────────────────
static_dir = Path(__file__).parent / "static" / "beauty"
app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.beauty_main:app", host="0.0.0.0", port=7788, log_level="info")