1. prescreener.py: classify_with_deepseek now retries on 429 with exponential back-off (5s → 10s → 20s → 40s, up to 4 attempts); same back-off also covers other transient errors. 2. main.py: prescreen batches run sequentially with a 3s gap instead of asyncio.gather (parallel). Parallel batches caused the second batch to always hit the 429 rate limit, leaving most domains unclassified (only the smaller last batch succeeded). 3. index.html: prescreenSelected() now clears this.domains before calling _fetch() so Alpine re-renders the full table with the updated niche/type values; also updates the notify hint to mention the expected 1-2 min wait. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
400 lines
14 KiB
Python
400 lines
14 KiB
Python
import os
|
|
import asyncio
|
|
import logging
|
|
from pathlib import Path
|
|
from contextlib import asynccontextmanager
|
|
|
|
import httpx
|
|
import aiosqlite
|
|
from typing import Optional
|
|
from fastapi import FastAPI, Query
|
|
from fastapi.responses import StreamingResponse, JSONResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
from app.db import (
|
|
DATA_DIR, PARQUET_PATH, SQLITE_PATH,
|
|
init_db, get_stats, get_domains, get_enriched,
|
|
queue_domains, get_queue_status, build_duckdb_index, index_status,
|
|
queue_ai, get_ai_queue_status, save_ai_assessment, save_prescreen_results,
|
|
)
|
|
from app.enricher import start_worker, pause_worker, resume_worker, is_running, ensure_workers_alive
|
|
from app.scorer import run_scoring
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PARQUET_URL = os.getenv("PARQUET_URL", "")
|
|
|
|
|
|
async def download_parquet():
|
|
if PARQUET_PATH.exists():
|
|
logger.info("Using cached parquet at %s", PARQUET_PATH)
|
|
return
|
|
|
|
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
tmp_path = PARQUET_PATH.with_suffix(".tmp")
|
|
|
|
downloaded = tmp_path.stat().st_size if tmp_path.exists() else 0
|
|
headers = {"Range": f"bytes={downloaded}-"} if downloaded > 0 else {}
|
|
|
|
logger.info("Downloading parquet from %s (offset=%d)...", PARQUET_URL, downloaded)
|
|
async with httpx.AsyncClient(follow_redirects=True, timeout=None) as client:
|
|
async with client.stream("GET", PARQUET_URL, headers=headers) as resp:
|
|
if resp.status_code == 416:
|
|
tmp_path.rename(PARQUET_PATH)
|
|
return
|
|
resp.raise_for_status()
|
|
total = int(resp.headers.get("content-length", 0)) + downloaded
|
|
mode = "ab" if downloaded > 0 else "wb"
|
|
with open(tmp_path, mode) as f:
|
|
received = downloaded
|
|
async for chunk in resp.aiter_bytes(chunk_size=1024 * 1024):
|
|
f.write(chunk)
|
|
received += len(chunk)
|
|
if total:
|
|
logger.info("Download: %.1f%% (%d/%d)", received / total * 100, received, total)
|
|
|
|
tmp_path.rename(PARQUET_PATH)
|
|
logger.info("Parquet download complete")
|
|
|
|
|
|
async def _watchdog():
|
|
"""Restart workers if they die every 10 seconds."""
|
|
while True:
|
|
await asyncio.sleep(10)
|
|
ensure_workers_alive()
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
await download_parquet()
|
|
await init_db()
|
|
asyncio.create_task(build_duckdb_index())
|
|
start_worker()
|
|
asyncio.create_task(_watchdog())
|
|
logger.info("DomGod ready on port 6677")
|
|
yield
|
|
|
|
|
|
app = FastAPI(title="DomGod", lifespan=lifespan)
|
|
|
|
|
|
# ── API ──────────────────────────────────────────────────────────────────────
|
|
|
|
@app.get("/api/stats")
|
|
async def stats():
|
|
return await get_stats()
|
|
|
|
|
|
@app.get("/api/index/status")
|
|
async def get_index_status():
|
|
return index_status()
|
|
|
|
|
|
@app.get("/api/domains")
|
|
async def domains(
|
|
tld: str = Query(None),
|
|
page: int = Query(1, ge=1),
|
|
limit: int = Query(100, ge=1, le=500),
|
|
live_only: bool = Query(False),
|
|
alpha_only: bool = Query(False),
|
|
no_sld: bool = Query(False),
|
|
keyword: str = Query(None),
|
|
):
|
|
total, rows = await get_domains(
|
|
tld=tld, page=page, limit=limit,
|
|
alpha_only=alpha_only, no_sld=no_sld,
|
|
keyword=keyword, live_only=live_only,
|
|
)
|
|
return {"page": page, "limit": limit, "total": total, "results": rows}
|
|
|
|
|
|
@app.post("/api/enrich/batch")
|
|
async def enrich_batch(body: dict):
|
|
domains_list = body.get("domains", [])
|
|
if not domains_list:
|
|
return JSONResponse({"error": "no domains provided"}, status_code=400)
|
|
await queue_domains(domains_list)
|
|
resume_worker()
|
|
return {"queued": len(domains_list)}
|
|
|
|
|
|
@app.get("/api/enrich/status")
|
|
async def enrich_status():
|
|
status = await get_queue_status()
|
|
status["worker_running"] = is_running()
|
|
return status
|
|
|
|
|
|
@app.post("/api/enrich/retry")
|
|
async def enrich_retry():
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
await db.execute("UPDATE job_queue SET status='pending', error=NULL WHERE status='failed'")
|
|
await db.commit()
|
|
resume_worker()
|
|
return {"status": "retrying"}
|
|
|
|
|
|
@app.post("/api/enrich/pause")
|
|
async def enrich_pause():
|
|
pause_worker()
|
|
return {"status": "paused"}
|
|
|
|
|
|
@app.post("/api/enrich/resume")
|
|
async def enrich_resume():
|
|
resume_worker()
|
|
return {"status": "resumed"}
|
|
|
|
|
|
@app.get("/api/enriched")
|
|
async def enriched(
|
|
min_score: int = Query(0, ge=0, le=100),
|
|
cms: str = Query(None),
|
|
country: str = Query(None),
|
|
kit_digital: Optional[bool] = Query(None),
|
|
ai_only: bool = Query(False),
|
|
lead_quality: str = Query(None),
|
|
page: int = Query(1, ge=1),
|
|
limit: int = Query(100, ge=1, le=1000),
|
|
):
|
|
total, rows = await get_enriched(
|
|
min_score=min_score, cms=cms, country=country,
|
|
kit_digital=kit_digital, ai_only=ai_only, lead_quality=lead_quality,
|
|
page=page, limit=limit,
|
|
)
|
|
return {"page": page, "limit": limit, "total": total, "results": rows}
|
|
|
|
|
|
# ── AI assessment endpoints ───────────────────────────────────────────────────
|
|
|
|
@app.post("/api/prescreen/batch")
|
|
async def prescreen_batch(body: dict):
|
|
"""
|
|
Phase 1 — HTTP check every domain (no AI). Marks live/dead/parked/redirect.
|
|
Phase 2 — Single DeepSeek call for all live domains → niche + type.
|
|
Max 200 domains per call.
|
|
"""
|
|
domains = body.get("domains", [])
|
|
if not domains:
|
|
return JSONResponse({"error": "no domains provided"}, status_code=400)
|
|
if len(domains) > 200:
|
|
return JSONResponse({"error": "max 200 domains per batch"}, status_code=400)
|
|
|
|
from app.prescreener import prescreen_domains, classify_with_deepseek, DEEPSEEK_BATCH_SIZE
|
|
|
|
# Phase 1: HTTP checks (concurrent, no AI)
|
|
results = await prescreen_domains(domains)
|
|
await save_prescreen_results(results)
|
|
|
|
counts: dict = {}
|
|
for r in results:
|
|
s = r.get("prescreen_status", "dead")
|
|
counts[s] = counts.get(s, 0) + 1
|
|
|
|
# Phase 2: DeepSeek classification for live sites only
|
|
# Run batches sequentially — parallel requests cause 429 rate-limit errors.
|
|
live = [r for r in results if r.get("prescreen_status") == "live"]
|
|
classified = 0
|
|
if live:
|
|
batches = [live[i:i + DEEPSEEK_BATCH_SIZE] for i in range(0, len(live), DEEPSEEK_BATCH_SIZE)]
|
|
all_cls: list = []
|
|
for i, batch in enumerate(batches):
|
|
if i > 0:
|
|
await asyncio.sleep(3) # brief gap between batches
|
|
cls = await classify_with_deepseek(batch)
|
|
all_cls.extend(cls)
|
|
if all_cls:
|
|
await save_prescreen_results(all_cls)
|
|
classified = len(all_cls)
|
|
|
|
return {
|
|
"total": len(domains),
|
|
"live": counts.get("live", 0),
|
|
"parked": counts.get("parked", 0),
|
|
"redirect": counts.get("redirect", 0),
|
|
"dead": counts.get("dead", 0),
|
|
"classified": classified,
|
|
}
|
|
|
|
|
|
@app.post("/api/ai/assess/batch")
|
|
async def ai_assess_batch(body: dict):
|
|
domains_list = body.get("domains", [])
|
|
if not domains_list:
|
|
return JSONResponse({"error": "no domains provided"}, status_code=400)
|
|
language = body.get("language", "ES").upper()
|
|
if language not in ("EN", "ES", "RO"):
|
|
language = "ES"
|
|
await queue_ai(domains_list, language=language)
|
|
ensure_workers_alive() # ensure AI worker is alive when jobs are queued
|
|
return {"queued": len(domains_list)}
|
|
|
|
|
|
@app.post("/api/ai/worker/restart")
|
|
async def ai_worker_restart():
|
|
ensure_workers_alive()
|
|
return {"status": "restarted"}
|
|
|
|
|
|
@app.post("/api/ai/reset")
|
|
async def ai_reset():
|
|
"""Reset all 'running' AI queue jobs back to 'pending' (unstick hung jobs)."""
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
r = await db.execute("UPDATE ai_queue SET status='pending' WHERE status='running'")
|
|
count = r.rowcount
|
|
await db.commit()
|
|
ensure_workers_alive()
|
|
return {"reset": count}
|
|
|
|
|
|
@app.get("/api/ai/debug")
|
|
async def ai_debug():
|
|
"""Returns worker state + last 10 queue entries for troubleshooting."""
|
|
from app.enricher import _ai_worker_task
|
|
task_alive = _ai_worker_task is not None and not _ai_worker_task.done()
|
|
task_exc = None
|
|
if _ai_worker_task and _ai_worker_task.done() and not _ai_worker_task.cancelled():
|
|
try:
|
|
task_exc = str(_ai_worker_task.exception())
|
|
except Exception:
|
|
pass
|
|
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(
|
|
"SELECT domain, status, created_at, completed_at, error FROM ai_queue ORDER BY created_at DESC LIMIT 10"
|
|
) as cur:
|
|
recent = [dict(r) async for r in cur]
|
|
|
|
return {
|
|
"ai_worker_alive": task_alive,
|
|
"ai_worker_exception": task_exc,
|
|
"recent_queue": recent,
|
|
"queue_status": await get_ai_queue_status(),
|
|
}
|
|
|
|
|
|
@app.get("/api/ai/status")
|
|
async def ai_status():
|
|
return await get_ai_queue_status()
|
|
|
|
|
|
@app.post("/api/ai/assess/single")
|
|
async def ai_assess_single(body: dict):
|
|
"""Immediate (blocking) AI assessment — does fresh scrape, no pre-enrichment needed."""
|
|
domain = body.get("domain")
|
|
if not domain:
|
|
return JSONResponse({"error": "no domain"}, status_code=400)
|
|
language = body.get("language", "ES").upper()
|
|
if language not in ("EN", "ES", "RO"):
|
|
language = "ES"
|
|
from app.site_analyzer import analyze_site
|
|
from app.replicate_ai import assess_domain as gemini_assess
|
|
analysis = await analyze_site(domain)
|
|
assessment = await gemini_assess(analysis, language=language)
|
|
await save_ai_assessment(domain, assessment, site_analysis=analysis)
|
|
return {**assessment, "site_analysis": analysis}
|
|
|
|
|
|
@app.get("/api/export")
|
|
async def export_csv(
|
|
min_score: int = Query(0),
|
|
cms: str = Query(None),
|
|
country: str = Query(None),
|
|
tier: str = Query(None),
|
|
):
|
|
if tier == "hot":
|
|
min_score = 80
|
|
elif tier == "warm":
|
|
min_score = 50
|
|
|
|
max_score = 79 if tier == "warm" else 100
|
|
|
|
async def generate():
|
|
yield "domain,score,cms,ssl_expiry_days,ip_country,is_live,status_code,has_mx,server,page_title,enriched_at\n"
|
|
p = 1
|
|
while True:
|
|
_, rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=p, limit=500)
|
|
if not rows:
|
|
break
|
|
for r in rows:
|
|
if r.get("score", 0) > max_score:
|
|
continue
|
|
line = ",".join(
|
|
f'"{str(r.get(col) or "").replace(chr(34), chr(39))}"'
|
|
for col in ["domain", "score", "cms", "ssl_expiry_days", "ip_country",
|
|
"is_live", "status_code", "has_mx", "server", "page_title", "enriched_at"]
|
|
)
|
|
yield line + "\n"
|
|
p += 1
|
|
|
|
fname = f"domgod_{tier or 'export'}_score{min_score}.csv"
|
|
return StreamingResponse(
|
|
generate(), media_type="text/csv",
|
|
headers={"Content-Disposition": f'attachment; filename="{fname}"'},
|
|
)
|
|
|
|
|
|
@app.get("/api/export/leads")
|
|
async def export_leads_csv(lead_quality: str = Query(None), country: str = Query(None)):
|
|
import json as _json
|
|
|
|
async def generate():
|
|
yield "domain,lead_quality,score,best_contact_channel,best_contact_value,emails,phones,whatsapp,social,cms,ip_country,page_title,ai_pitch\n"
|
|
p = 1
|
|
while True:
|
|
_, rows = await get_enriched(ai_only=True, lead_quality=lead_quality,
|
|
country=country, page=p, limit=500)
|
|
if not rows:
|
|
break
|
|
for r in rows:
|
|
contacts = {}
|
|
try:
|
|
contacts = _json.loads(r.get("contact_info") or "{}")
|
|
except Exception:
|
|
pass
|
|
def esc(v):
|
|
return f'"{str(v or "").replace(chr(34), chr(39))}"'
|
|
emails = esc(";".join(contacts.get("emails", [])[:3]))
|
|
phones = esc(";".join(contacts.get("phones", [])[:3]))
|
|
whatsapp = esc(";".join(contacts.get("whatsapp",[])[:2]))
|
|
social = esc(";".join(contacts.get("social", [])[:4]))
|
|
line = ",".join([
|
|
esc(r.get("domain")),
|
|
esc(r.get("ai_lead_quality")),
|
|
esc(r.get("score")),
|
|
esc(r.get("ai_contact_channel")),
|
|
esc(r.get("ai_contact_value")),
|
|
emails, phones, whatsapp, social,
|
|
esc(r.get("cms")),
|
|
esc(r.get("ip_country")),
|
|
esc(r.get("page_title")),
|
|
esc(r.get("ai_pitch")),
|
|
])
|
|
yield line + "\n"
|
|
p += 1
|
|
|
|
qual = f"_{lead_quality.lower()}" if lead_quality else ""
|
|
return StreamingResponse(
|
|
generate(), media_type="text/csv",
|
|
headers={"Content-Disposition": f'attachment; filename="domgod_leads{qual}.csv"'},
|
|
)
|
|
|
|
|
|
@app.post("/api/score/run")
|
|
async def score_run():
|
|
return await run_scoring()
|
|
|
|
|
|
# ── Static UI ────────────────────────────────────────────────────────────────
|
|
static_dir = Path(__file__).parent / "static"
|
|
app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static")
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run("app.main:app", host="0.0.0.0", port=6677, log_level="info")
|