New service (app/beauty_main.py) sharing the same /data volume: - Separate FastAPI app running on port 7788 - beauty_ai.py: brand universe scan (~650 brands), portfolio match detection against OUR_BRANDS, Gemini B2B assessment prompt in Spanish returning quality/categories/dist_matches/outreach_email - beauty_queue table + beauty_lead_quality/beauty_assessment columns in enriched_domains (with migrations) - Endpoints: /api/beauty/assess/batch, /api/beauty/leads, /api/beauty/status, /api/beauty/export, /api/beauty/reset - Static frontend: Browse (beauty/ecommerce pre-filtered, no CMS/SSL/KD columns), Validator, B2B Pipeline (brand chips, expandable outreach), Pre-screen, Export CSV - docker-compose: second 'beauty' service with shared data volume - Dockerfile: expose 7788 alongside 6677 Also: add 'error' prescreen_status handling + UI (orange stat box, filter option) for 4xx/5xx HTTP responses Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
338 lines
12 KiB
Python
338 lines
12 KiB
Python
"""BeautyLeads — Cosmetics B2B intelligence dashboard (port 7788).
|
|
|
|
Shares the same /data volume as the main DomGod service.
|
|
Does NOT re-download parquet or rebuild DuckDB index (those run in main service).
|
|
Runs its own beauty AI assessment worker against the shared enriched_domains table.
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from contextlib import asynccontextmanager
|
|
|
|
import aiosqlite
|
|
from typing import Optional
|
|
from fastapi import FastAPI, Query
|
|
from fastapi.responses import StreamingResponse, JSONResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
from app.db import (
|
|
SQLITE_PATH, init_db, get_stats, get_domains, get_enriched,
|
|
build_duckdb_index, index_status,
|
|
queue_beauty, get_beauty_queue_status, save_beauty_assessment, get_beauty_leads,
|
|
save_prescreen_results,
|
|
)
|
|
from app.validator import start_validator, stop_validator, get_validator_status
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ── Beauty AI worker ──────────────────────────────────────────────────────────
|
|
|
|
_beauty_worker_task: Optional[asyncio.Task] = None
|
|
|
|
|
|
async def _assess_one_beauty(domain: str) -> None:
|
|
from app.beauty_ai import assess_beauty_domain
|
|
from app.site_analyzer import analyze_site
|
|
logger.info("Beauty AI: starting %s", domain)
|
|
try:
|
|
async with asyncio.timeout(180):
|
|
analysis = await analyze_site(domain)
|
|
assessment = await assess_beauty_domain(analysis)
|
|
await save_beauty_assessment(domain, assessment)
|
|
logger.info("Beauty AI: saved %s → %s", domain, assessment.get("lead_quality"))
|
|
except Exception as e:
|
|
logger.error("Beauty AI: failed %s — %s", domain, e)
|
|
try:
|
|
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
|
|
await db.execute(
|
|
"UPDATE beauty_queue SET status='failed', completed_at=datetime('now'), error=? WHERE domain=?",
|
|
(str(e)[:400], domain),
|
|
)
|
|
await db.commit()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
async def _beauty_worker_loop():
|
|
logger.info("Beauty AI worker starting")
|
|
# Reset stale running jobs
|
|
try:
|
|
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
|
|
await db.execute("UPDATE beauty_queue SET status='pending' WHERE status='running'")
|
|
await db.commit()
|
|
except Exception as e:
|
|
logger.error("Beauty worker: stale reset failed: %s", e)
|
|
|
|
while True:
|
|
rows = []
|
|
try:
|
|
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
|
|
async with db.execute(
|
|
"SELECT domain FROM beauty_queue WHERE status='pending' LIMIT 5"
|
|
) as cur:
|
|
rows = await cur.fetchall()
|
|
if rows:
|
|
await db.executemany(
|
|
"UPDATE beauty_queue SET status='running' WHERE domain=?",
|
|
[(r[0],) for r in rows],
|
|
)
|
|
await db.commit()
|
|
except Exception as e:
|
|
logger.error("Beauty worker DB error: %s", e)
|
|
await asyncio.sleep(5)
|
|
continue
|
|
|
|
if not rows:
|
|
await asyncio.sleep(3)
|
|
continue
|
|
|
|
await asyncio.gather(*[_assess_one_beauty(r[0]) for r in rows], return_exceptions=True)
|
|
|
|
|
|
def _start_beauty_worker():
|
|
global _beauty_worker_task
|
|
if _beauty_worker_task is None or _beauty_worker_task.done():
|
|
_beauty_worker_task = asyncio.create_task(_beauty_worker_loop())
|
|
logger.info("Beauty AI worker started")
|
|
|
|
|
|
# ── App lifespan ──────────────────────────────────────────────────────────────
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
await init_db()
|
|
# Detect existing DuckDB index (built by main service); don't rebuild
|
|
asyncio.create_task(build_duckdb_index())
|
|
_start_beauty_worker()
|
|
logger.info("BeautyLeads ready on port 7788")
|
|
yield
|
|
|
|
|
|
app = FastAPI(title="BeautyLeads", lifespan=lifespan)
|
|
|
|
|
|
# ── Shared read endpoints (same DB) ──────────────────────────────────────────
|
|
|
|
@app.get("/api/stats")
|
|
async def stats():
|
|
return await get_stats()
|
|
|
|
|
|
@app.get("/api/index/status")
|
|
async def get_index_status():
|
|
return index_status()
|
|
|
|
|
|
@app.get("/api/domains")
|
|
async def domains(
|
|
tld: str = Query(None),
|
|
page: int = Query(1, ge=1),
|
|
limit: int = Query(100, ge=1, le=500),
|
|
live_only: bool = Query(False),
|
|
alpha_only: bool = Query(False),
|
|
no_sld: bool = Query(False),
|
|
keyword: str = Query(None),
|
|
):
|
|
total, rows = await get_domains(
|
|
tld=tld, page=page, limit=limit,
|
|
alpha_only=alpha_only, no_sld=no_sld,
|
|
keyword=keyword, live_only=live_only,
|
|
)
|
|
return {"page": page, "limit": limit, "total": total, "results": rows}
|
|
|
|
|
|
@app.get("/api/enriched")
|
|
async def enriched(
|
|
min_score: int = Query(0),
|
|
country: str = Query(None),
|
|
prescreen_status: str = Query(None),
|
|
niche: str = Query(None),
|
|
site_type: str = Query(None),
|
|
page: int = Query(1, ge=1),
|
|
limit: int = Query(100, ge=1, le=1000),
|
|
):
|
|
total, rows = await get_enriched(
|
|
min_score=min_score, country=country,
|
|
prescreen_status=prescreen_status, niche=niche, site_type=site_type,
|
|
page=page, limit=limit,
|
|
)
|
|
return {"page": page, "limit": limit, "total": total, "results": rows}
|
|
|
|
|
|
# ── Validator (shared) ────────────────────────────────────────────────────────
|
|
|
|
@app.post("/api/validator/start")
|
|
async def validator_start(tld: str = Query(None), rescan_dead: bool = Query(False)):
|
|
start_validator(tld_filter=tld or None, rescan_dead=rescan_dead)
|
|
return get_validator_status()
|
|
|
|
|
|
@app.post("/api/validator/stop")
|
|
async def validator_stop():
|
|
stop_validator()
|
|
return {"status": "stopped"}
|
|
|
|
|
|
@app.get("/api/validator/status")
|
|
async def validator_status():
|
|
return get_validator_status()
|
|
|
|
|
|
# ── Pre-screen (shared) ───────────────────────────────────────────────────────
|
|
|
|
@app.post("/api/prescreen/batch")
|
|
async def prescreen_batch(body: dict):
|
|
domains_list = body.get("domains", [])
|
|
if not domains_list:
|
|
return JSONResponse({"error": "no domains provided"}, status_code=400)
|
|
if len(domains_list) > 200:
|
|
return JSONResponse({"error": "max 200 domains per batch"}, status_code=400)
|
|
|
|
from app.prescreener import prescreen_domains, classify_with_deepseek, DEEPSEEK_BATCH_SIZE
|
|
|
|
results = await prescreen_domains(domains_list)
|
|
await save_prescreen_results(results)
|
|
|
|
counts: dict = {}
|
|
for r in results:
|
|
s = r.get("prescreen_status", "dead")
|
|
counts[s] = counts.get(s, 0) + 1
|
|
|
|
live = [r for r in results if r.get("prescreen_status") == "live"]
|
|
classified = 0
|
|
if live:
|
|
batches = [live[i:i + DEEPSEEK_BATCH_SIZE] for i in range(0, len(live), DEEPSEEK_BATCH_SIZE)]
|
|
all_cls: list = []
|
|
for i, batch in enumerate(batches):
|
|
if i > 0:
|
|
await asyncio.sleep(3)
|
|
cls = await classify_with_deepseek(batch)
|
|
all_cls.extend(cls)
|
|
if all_cls:
|
|
await save_prescreen_results(all_cls)
|
|
classified = len(all_cls)
|
|
|
|
return {
|
|
"total": len(domains_list),
|
|
"live": counts.get("live", 0),
|
|
"parked": counts.get("parked", 0),
|
|
"redirect": counts.get("redirect", 0),
|
|
"dead": counts.get("dead", 0),
|
|
"classified": classified,
|
|
}
|
|
|
|
|
|
# ── Beauty AI endpoints ───────────────────────────────────────────────────────
|
|
|
|
@app.post("/api/beauty/assess/batch")
|
|
async def beauty_assess_batch(body: dict):
|
|
domains_list = body.get("domains", [])
|
|
if not domains_list:
|
|
return JSONResponse({"error": "no domains provided"}, status_code=400)
|
|
await queue_beauty(domains_list)
|
|
_start_beauty_worker()
|
|
return {"queued": len(domains_list)}
|
|
|
|
|
|
@app.post("/api/beauty/worker/restart")
|
|
async def beauty_worker_restart():
|
|
_start_beauty_worker()
|
|
return {"status": "restarted"}
|
|
|
|
|
|
@app.post("/api/beauty/reset")
|
|
async def beauty_reset():
|
|
"""Reset stale running jobs back to pending."""
|
|
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
|
|
r = await db.execute("UPDATE beauty_queue SET status='pending' WHERE status='running'")
|
|
count = r.rowcount
|
|
await db.commit()
|
|
_start_beauty_worker()
|
|
return {"reset": count}
|
|
|
|
|
|
@app.get("/api/beauty/status")
|
|
async def beauty_status():
|
|
return await get_beauty_queue_status()
|
|
|
|
|
|
@app.get("/api/beauty/leads")
|
|
async def beauty_leads(
|
|
quality: str = Query(None),
|
|
country: str = Query(None),
|
|
page: int = Query(1, ge=1),
|
|
limit: int = Query(100, ge=1, le=500),
|
|
):
|
|
total, rows = await get_beauty_leads(quality=quality, country=country, page=page, limit=limit)
|
|
return {"page": page, "limit": limit, "total": total, "results": rows}
|
|
|
|
|
|
@app.post("/api/beauty/assess/single")
|
|
async def beauty_assess_single(body: dict):
|
|
domain = body.get("domain")
|
|
if not domain:
|
|
return JSONResponse({"error": "no domain"}, status_code=400)
|
|
from app.beauty_ai import assess_beauty_domain
|
|
from app.site_analyzer import analyze_site
|
|
analysis = await analyze_site(domain)
|
|
assessment = await assess_beauty_domain(analysis)
|
|
await save_beauty_assessment(domain, assessment)
|
|
return {**assessment, "site_analysis": analysis}
|
|
|
|
|
|
# ── Export ────────────────────────────────────────────────────────────────────
|
|
|
|
@app.get("/api/beauty/export")
|
|
async def export_beauty_csv(quality: str = Query(None), country: str = Query(None)):
|
|
import json as _json
|
|
|
|
async def generate():
|
|
yield "domain,quality,business_name,country_fiscal,countries_active,categories,detected_brands,portfolio_matches,contact_email,contact_phone,proposal,outreach_subject,outreach_email\n"
|
|
p = 1
|
|
while True:
|
|
_, rows = await get_beauty_leads(quality=quality, country=country, page=p, limit=500)
|
|
if not rows:
|
|
break
|
|
for r in rows:
|
|
b = r.get("_beauty") or {}
|
|
def esc(v):
|
|
return f'"{str(v or "").replace(chr(34), chr(39))}"'
|
|
line = ",".join([
|
|
esc(r.get("domain")),
|
|
esc(r.get("beauty_lead_quality")),
|
|
esc(b.get("business_name")),
|
|
esc(b.get("country_fiscal")),
|
|
esc(", ".join(b.get("countries_active") or [])),
|
|
esc(", ".join(b.get("categories") or [])),
|
|
esc(", ".join(b.get("detected_brands") or [])),
|
|
esc(", ".join(b.get("dist_matches") or [])),
|
|
esc(b.get("contact_email")),
|
|
esc(b.get("contact_phone")),
|
|
esc(b.get("b2b_proposal")),
|
|
esc(b.get("outreach_subject")),
|
|
esc(b.get("outreach_email")),
|
|
])
|
|
yield line + "\n"
|
|
p += 1
|
|
|
|
qual = f"_{quality.lower()}" if quality else ""
|
|
return StreamingResponse(
|
|
generate(), media_type="text/csv",
|
|
headers={"Content-Disposition": f'attachment; filename="beautyleads{qual}.csv"'},
|
|
)
|
|
|
|
|
|
# ── Static UI ─────────────────────────────────────────────────────────────────
|
|
|
|
static_dir = Path(__file__).parent / "static" / "beauty"
|
|
app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static")
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run("app.beauty_main:app", host="0.0.0.0", port=7788, log_level="info")
|