Files
DomGod/app/main.py
Malin 7fc510f903 feat: two-phase pre-screening with HTTP check + DeepSeek batch classification
Phase 1 (no AI credits): httpx checks every selected domain concurrently
(30 parallel) with real browser UA — detects live/dead/parked/redirect.
Parked: keyword scan in body/title + known parking host redirect check.
Results saved to DB immediately; dead/parked never reach DeepSeek.

Phase 2 (single DeepSeek call): all live-site titles + snippets bundled
into ONE Replicate/DeepSeek-R1 request → returns niche + type for every
domain in batch (up to 80 per call, parallelised if more).

- app/prescreener.py (new): _check_one(), prescreen_domains(),
  classify_with_deepseek(), parking signal lists, same-domain redirect logic
- app/db.py: prescreen_status/niche/site_type/prescreen_at columns +
  migrations; save_prescreen_results() upsert helper
- app/main.py: POST /api/prescreen/batch endpoint
- app/static/index.html:
  - 🔍 Pre-screen button (disabled while running, shows spinner)
  - Niche + Type columns in Browse and Leads tables (.pni/.pty pills)
  - Prescreen status colour dot (●) when niche not yet set
  - prescreening state flag; result toast shows per-status counts

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 21:22:45 +02:00

400 lines
14 KiB
Python

import os
import asyncio
import logging
from pathlib import Path
from contextlib import asynccontextmanager
import httpx
import aiosqlite
from typing import Optional
from fastapi import FastAPI, Query
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from dotenv import load_dotenv
load_dotenv()
from app.db import (
DATA_DIR, PARQUET_PATH, SQLITE_PATH,
init_db, get_stats, get_domains, get_enriched,
queue_domains, get_queue_status, build_duckdb_index, index_status,
queue_ai, get_ai_queue_status, save_ai_assessment, save_prescreen_results,
)
from app.enricher import start_worker, pause_worker, resume_worker, is_running, ensure_workers_alive
from app.scorer import run_scoring
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
PARQUET_URL = os.getenv("PARQUET_URL", "")
async def download_parquet():
if PARQUET_PATH.exists():
logger.info("Using cached parquet at %s", PARQUET_PATH)
return
DATA_DIR.mkdir(parents=True, exist_ok=True)
tmp_path = PARQUET_PATH.with_suffix(".tmp")
downloaded = tmp_path.stat().st_size if tmp_path.exists() else 0
headers = {"Range": f"bytes={downloaded}-"} if downloaded > 0 else {}
logger.info("Downloading parquet from %s (offset=%d)...", PARQUET_URL, downloaded)
async with httpx.AsyncClient(follow_redirects=True, timeout=None) as client:
async with client.stream("GET", PARQUET_URL, headers=headers) as resp:
if resp.status_code == 416:
tmp_path.rename(PARQUET_PATH)
return
resp.raise_for_status()
total = int(resp.headers.get("content-length", 0)) + downloaded
mode = "ab" if downloaded > 0 else "wb"
with open(tmp_path, mode) as f:
received = downloaded
async for chunk in resp.aiter_bytes(chunk_size=1024 * 1024):
f.write(chunk)
received += len(chunk)
if total:
logger.info("Download: %.1f%% (%d/%d)", received / total * 100, received, total)
tmp_path.rename(PARQUET_PATH)
logger.info("Parquet download complete")
async def _watchdog():
"""Restart workers if they die every 10 seconds."""
while True:
await asyncio.sleep(10)
ensure_workers_alive()
@asynccontextmanager
async def lifespan(app: FastAPI):
await download_parquet()
await init_db()
asyncio.create_task(build_duckdb_index())
start_worker()
asyncio.create_task(_watchdog())
logger.info("DomGod ready on port 6677")
yield
app = FastAPI(title="DomGod", lifespan=lifespan)
# ── API ──────────────────────────────────────────────────────────────────────
@app.get("/api/stats")
async def stats():
return await get_stats()
@app.get("/api/index/status")
async def get_index_status():
return index_status()
@app.get("/api/domains")
async def domains(
tld: str = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(100, ge=1, le=500),
live_only: bool = Query(False),
alpha_only: bool = Query(False),
no_sld: bool = Query(False),
keyword: str = Query(None),
):
total, rows = await get_domains(
tld=tld, page=page, limit=limit,
alpha_only=alpha_only, no_sld=no_sld,
keyword=keyword, live_only=live_only,
)
return {"page": page, "limit": limit, "total": total, "results": rows}
@app.post("/api/enrich/batch")
async def enrich_batch(body: dict):
domains_list = body.get("domains", [])
if not domains_list:
return JSONResponse({"error": "no domains provided"}, status_code=400)
await queue_domains(domains_list)
resume_worker()
return {"queued": len(domains_list)}
@app.get("/api/enrich/status")
async def enrich_status():
status = await get_queue_status()
status["worker_running"] = is_running()
return status
@app.post("/api/enrich/retry")
async def enrich_retry():
async with aiosqlite.connect(SQLITE_PATH) as db:
await db.execute("UPDATE job_queue SET status='pending', error=NULL WHERE status='failed'")
await db.commit()
resume_worker()
return {"status": "retrying"}
@app.post("/api/enrich/pause")
async def enrich_pause():
pause_worker()
return {"status": "paused"}
@app.post("/api/enrich/resume")
async def enrich_resume():
resume_worker()
return {"status": "resumed"}
@app.get("/api/enriched")
async def enriched(
min_score: int = Query(0, ge=0, le=100),
cms: str = Query(None),
country: str = Query(None),
kit_digital: Optional[bool] = Query(None),
ai_only: bool = Query(False),
lead_quality: str = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(100, ge=1, le=1000),
):
total, rows = await get_enriched(
min_score=min_score, cms=cms, country=country,
kit_digital=kit_digital, ai_only=ai_only, lead_quality=lead_quality,
page=page, limit=limit,
)
return {"page": page, "limit": limit, "total": total, "results": rows}
# ── AI assessment endpoints ───────────────────────────────────────────────────
@app.post("/api/prescreen/batch")
async def prescreen_batch(body: dict):
"""
Phase 1 — HTTP check every domain (no AI). Marks live/dead/parked/redirect.
Phase 2 — Single DeepSeek call for all live domains → niche + type.
Max 200 domains per call.
"""
domains = body.get("domains", [])
if not domains:
return JSONResponse({"error": "no domains provided"}, status_code=400)
if len(domains) > 200:
return JSONResponse({"error": "max 200 domains per batch"}, status_code=400)
from app.prescreener import prescreen_domains, classify_with_deepseek, DEEPSEEK_BATCH_SIZE
# Phase 1: HTTP checks (concurrent, no AI)
results = await prescreen_domains(domains)
await save_prescreen_results(results)
counts: dict = {}
for r in results:
s = r.get("prescreen_status", "dead")
counts[s] = counts.get(s, 0) + 1
# Phase 2: DeepSeek classification for live sites only
live = [r for r in results if r.get("prescreen_status") == "live"]
classified = 0
if live:
batches = [live[i:i + DEEPSEEK_BATCH_SIZE] for i in range(0, len(live), DEEPSEEK_BATCH_SIZE)]
batch_cls = await asyncio.gather(
*[classify_with_deepseek(b) for b in batches], return_exceptions=True
)
all_cls: list = []
for bc in batch_cls:
if isinstance(bc, list):
all_cls.extend(bc)
if all_cls:
await save_prescreen_results(all_cls)
classified = len(all_cls)
return {
"total": len(domains),
"live": counts.get("live", 0),
"parked": counts.get("parked", 0),
"redirect": counts.get("redirect", 0),
"dead": counts.get("dead", 0),
"classified": classified,
}
@app.post("/api/ai/assess/batch")
async def ai_assess_batch(body: dict):
domains_list = body.get("domains", [])
if not domains_list:
return JSONResponse({"error": "no domains provided"}, status_code=400)
language = body.get("language", "ES").upper()
if language not in ("EN", "ES", "RO"):
language = "ES"
await queue_ai(domains_list, language=language)
ensure_workers_alive() # ensure AI worker is alive when jobs are queued
return {"queued": len(domains_list)}
@app.post("/api/ai/worker/restart")
async def ai_worker_restart():
ensure_workers_alive()
return {"status": "restarted"}
@app.post("/api/ai/reset")
async def ai_reset():
"""Reset all 'running' AI queue jobs back to 'pending' (unstick hung jobs)."""
async with aiosqlite.connect(SQLITE_PATH) as db:
r = await db.execute("UPDATE ai_queue SET status='pending' WHERE status='running'")
count = r.rowcount
await db.commit()
ensure_workers_alive()
return {"reset": count}
@app.get("/api/ai/debug")
async def ai_debug():
"""Returns worker state + last 10 queue entries for troubleshooting."""
from app.enricher import _ai_worker_task
task_alive = _ai_worker_task is not None and not _ai_worker_task.done()
task_exc = None
if _ai_worker_task and _ai_worker_task.done() and not _ai_worker_task.cancelled():
try:
task_exc = str(_ai_worker_task.exception())
except Exception:
pass
async with aiosqlite.connect(SQLITE_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT domain, status, created_at, completed_at, error FROM ai_queue ORDER BY created_at DESC LIMIT 10"
) as cur:
recent = [dict(r) async for r in cur]
return {
"ai_worker_alive": task_alive,
"ai_worker_exception": task_exc,
"recent_queue": recent,
"queue_status": await get_ai_queue_status(),
}
@app.get("/api/ai/status")
async def ai_status():
return await get_ai_queue_status()
@app.post("/api/ai/assess/single")
async def ai_assess_single(body: dict):
"""Immediate (blocking) AI assessment — does fresh scrape, no pre-enrichment needed."""
domain = body.get("domain")
if not domain:
return JSONResponse({"error": "no domain"}, status_code=400)
language = body.get("language", "ES").upper()
if language not in ("EN", "ES", "RO"):
language = "ES"
from app.site_analyzer import analyze_site
from app.replicate_ai import assess_domain as gemini_assess
analysis = await analyze_site(domain)
assessment = await gemini_assess(analysis, language=language)
await save_ai_assessment(domain, assessment, site_analysis=analysis)
return {**assessment, "site_analysis": analysis}
@app.get("/api/export")
async def export_csv(
min_score: int = Query(0),
cms: str = Query(None),
country: str = Query(None),
tier: str = Query(None),
):
if tier == "hot":
min_score = 80
elif tier == "warm":
min_score = 50
max_score = 79 if tier == "warm" else 100
async def generate():
yield "domain,score,cms,ssl_expiry_days,ip_country,is_live,status_code,has_mx,server,page_title,enriched_at\n"
p = 1
while True:
_, rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=p, limit=500)
if not rows:
break
for r in rows:
if r.get("score", 0) > max_score:
continue
line = ",".join(
f'"{str(r.get(col) or "").replace(chr(34), chr(39))}"'
for col in ["domain", "score", "cms", "ssl_expiry_days", "ip_country",
"is_live", "status_code", "has_mx", "server", "page_title", "enriched_at"]
)
yield line + "\n"
p += 1
fname = f"domgod_{tier or 'export'}_score{min_score}.csv"
return StreamingResponse(
generate(), media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="{fname}"'},
)
@app.get("/api/export/leads")
async def export_leads_csv(lead_quality: str = Query(None), country: str = Query(None)):
import json as _json
async def generate():
yield "domain,lead_quality,score,best_contact_channel,best_contact_value,emails,phones,whatsapp,social,cms,ip_country,page_title,ai_pitch\n"
p = 1
while True:
_, rows = await get_enriched(ai_only=True, lead_quality=lead_quality,
country=country, page=p, limit=500)
if not rows:
break
for r in rows:
contacts = {}
try:
contacts = _json.loads(r.get("contact_info") or "{}")
except Exception:
pass
def esc(v):
return f'"{str(v or "").replace(chr(34), chr(39))}"'
emails = esc(";".join(contacts.get("emails", [])[:3]))
phones = esc(";".join(contacts.get("phones", [])[:3]))
whatsapp = esc(";".join(contacts.get("whatsapp",[])[:2]))
social = esc(";".join(contacts.get("social", [])[:4]))
line = ",".join([
esc(r.get("domain")),
esc(r.get("ai_lead_quality")),
esc(r.get("score")),
esc(r.get("ai_contact_channel")),
esc(r.get("ai_contact_value")),
emails, phones, whatsapp, social,
esc(r.get("cms")),
esc(r.get("ip_country")),
esc(r.get("page_title")),
esc(r.get("ai_pitch")),
])
yield line + "\n"
p += 1
qual = f"_{lead_quality.lower()}" if lead_quality else ""
return StreamingResponse(
generate(), media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="domgod_leads{qual}.csv"'},
)
@app.post("/api/score/run")
async def score_run():
return await run_scoring()
# ── Static UI ────────────────────────────────────────────────────────────────
static_dir = Path(__file__).parent / "static"
app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", host="0.0.0.0", port=6677, log_level="info")