Files
DomGod/app/main.py
Malin 8f387cada2 feat: bulk validator tab + status/niche/type browse filters
- New app/validator.py: background HTTP checker for entire dataset
  - 50 concurrent checks, skips already-validated domains
  - Extracts prescreen_status, server, IP, load_time_ms
  - start/stop/status API at /api/validator/start|stop|status

- New dedicated "Validator 🔬" tab with stats grid, TLD filter,
  Start/Stop controls, live progress indicator

- Browse tab: "Live" column replaced with "Status" dot (color-coded
  ● from prescreen_status, falls back to is_live)
- Browse tab: new Status / Niche / Type filter dropdowns

- db.py: added ip TEXT + load_time_ms INTEGER columns + migrations;
  get_enriched() supports prescreen_status/niche/site_type filters

- main.py: /api/enriched extended with prescreen_status/niche/site_type

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-18 08:27:24 +02:00

424 lines
15 KiB
Python

import os
import asyncio
import logging
from pathlib import Path
from contextlib import asynccontextmanager
import httpx
import aiosqlite
from typing import Optional
from fastapi import FastAPI, Query
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from dotenv import load_dotenv
load_dotenv()
from app.db import (
DATA_DIR, PARQUET_PATH, SQLITE_PATH,
init_db, get_stats, get_domains, get_enriched,
queue_domains, get_queue_status, build_duckdb_index, index_status,
queue_ai, get_ai_queue_status, save_ai_assessment, save_prescreen_results,
)
from app.enricher import start_worker, pause_worker, resume_worker, is_running, ensure_workers_alive
from app.scorer import run_scoring
from app.validator import start_validator, stop_validator, get_validator_status
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
PARQUET_URL = os.getenv("PARQUET_URL", "")
async def download_parquet():
if PARQUET_PATH.exists():
logger.info("Using cached parquet at %s", PARQUET_PATH)
return
DATA_DIR.mkdir(parents=True, exist_ok=True)
tmp_path = PARQUET_PATH.with_suffix(".tmp")
downloaded = tmp_path.stat().st_size if tmp_path.exists() else 0
headers = {"Range": f"bytes={downloaded}-"} if downloaded > 0 else {}
logger.info("Downloading parquet from %s (offset=%d)...", PARQUET_URL, downloaded)
async with httpx.AsyncClient(follow_redirects=True, timeout=None) as client:
async with client.stream("GET", PARQUET_URL, headers=headers) as resp:
if resp.status_code == 416:
tmp_path.rename(PARQUET_PATH)
return
resp.raise_for_status()
total = int(resp.headers.get("content-length", 0)) + downloaded
mode = "ab" if downloaded > 0 else "wb"
with open(tmp_path, mode) as f:
received = downloaded
async for chunk in resp.aiter_bytes(chunk_size=1024 * 1024):
f.write(chunk)
received += len(chunk)
if total:
logger.info("Download: %.1f%% (%d/%d)", received / total * 100, received, total)
tmp_path.rename(PARQUET_PATH)
logger.info("Parquet download complete")
async def _watchdog():
"""Restart workers if they die every 10 seconds."""
while True:
await asyncio.sleep(10)
ensure_workers_alive()
@asynccontextmanager
async def lifespan(app: FastAPI):
await download_parquet()
await init_db()
asyncio.create_task(build_duckdb_index())
start_worker()
asyncio.create_task(_watchdog())
logger.info("DomGod ready on port 6677")
yield
app = FastAPI(title="DomGod", lifespan=lifespan)
# ── API ──────────────────────────────────────────────────────────────────────
@app.get("/api/stats")
async def stats():
return await get_stats()
@app.get("/api/index/status")
async def get_index_status():
return index_status()
@app.get("/api/domains")
async def domains(
tld: str = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(100, ge=1, le=500),
live_only: bool = Query(False),
alpha_only: bool = Query(False),
no_sld: bool = Query(False),
keyword: str = Query(None),
):
total, rows = await get_domains(
tld=tld, page=page, limit=limit,
alpha_only=alpha_only, no_sld=no_sld,
keyword=keyword, live_only=live_only,
)
return {"page": page, "limit": limit, "total": total, "results": rows}
@app.post("/api/enrich/batch")
async def enrich_batch(body: dict):
domains_list = body.get("domains", [])
if not domains_list:
return JSONResponse({"error": "no domains provided"}, status_code=400)
await queue_domains(domains_list)
resume_worker()
return {"queued": len(domains_list)}
@app.get("/api/enrich/status")
async def enrich_status():
status = await get_queue_status()
status["worker_running"] = is_running()
return status
@app.post("/api/enrich/retry")
async def enrich_retry():
async with aiosqlite.connect(SQLITE_PATH) as db:
await db.execute("UPDATE job_queue SET status='pending', error=NULL WHERE status='failed'")
await db.commit()
resume_worker()
return {"status": "retrying"}
@app.post("/api/enrich/pause")
async def enrich_pause():
pause_worker()
return {"status": "paused"}
@app.post("/api/enrich/resume")
async def enrich_resume():
resume_worker()
return {"status": "resumed"}
@app.get("/api/enriched")
async def enriched(
min_score: int = Query(0, ge=0, le=100),
cms: str = Query(None),
country: str = Query(None),
kit_digital: Optional[bool] = Query(None),
ai_only: bool = Query(False),
lead_quality: str = Query(None),
prescreen_status: str = Query(None),
niche: str = Query(None),
site_type: str = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(100, ge=1, le=1000),
):
total, rows = await get_enriched(
min_score=min_score, cms=cms, country=country,
kit_digital=kit_digital, ai_only=ai_only, lead_quality=lead_quality,
prescreen_status=prescreen_status, niche=niche, site_type=site_type,
page=page, limit=limit,
)
return {"page": page, "limit": limit, "total": total, "results": rows}
# ── Bulk Validator endpoints ──────────────────────────────────────────────────
@app.post("/api/validator/start")
async def validator_start(tld: str = Query(None)):
start_validator(tld_filter=tld or None)
return get_validator_status()
@app.post("/api/validator/stop")
async def validator_stop():
stop_validator()
return {"status": "stopped"}
@app.get("/api/validator/status")
async def validator_status():
return get_validator_status()
# ── AI assessment endpoints ───────────────────────────────────────────────────
@app.post("/api/prescreen/batch")
async def prescreen_batch(body: dict):
"""
Phase 1 — HTTP check every domain (no AI). Marks live/dead/parked/redirect.
Phase 2 — Single DeepSeek call for all live domains → niche + type.
Max 200 domains per call.
"""
domains = body.get("domains", [])
if not domains:
return JSONResponse({"error": "no domains provided"}, status_code=400)
if len(domains) > 200:
return JSONResponse({"error": "max 200 domains per batch"}, status_code=400)
from app.prescreener import prescreen_domains, classify_with_deepseek, DEEPSEEK_BATCH_SIZE
# Phase 1: HTTP checks (concurrent, no AI)
results = await prescreen_domains(domains)
await save_prescreen_results(results)
counts: dict = {}
for r in results:
s = r.get("prescreen_status", "dead")
counts[s] = counts.get(s, 0) + 1
# Phase 2: DeepSeek classification for live sites only
# Run batches sequentially — parallel requests cause 429 rate-limit errors.
live = [r for r in results if r.get("prescreen_status") == "live"]
classified = 0
if live:
batches = [live[i:i + DEEPSEEK_BATCH_SIZE] for i in range(0, len(live), DEEPSEEK_BATCH_SIZE)]
all_cls: list = []
for i, batch in enumerate(batches):
if i > 0:
await asyncio.sleep(3) # brief gap between batches
cls = await classify_with_deepseek(batch)
all_cls.extend(cls)
if all_cls:
await save_prescreen_results(all_cls)
classified = len(all_cls)
return {
"total": len(domains),
"live": counts.get("live", 0),
"parked": counts.get("parked", 0),
"redirect": counts.get("redirect", 0),
"dead": counts.get("dead", 0),
"classified": classified,
}
@app.post("/api/ai/assess/batch")
async def ai_assess_batch(body: dict):
domains_list = body.get("domains", [])
if not domains_list:
return JSONResponse({"error": "no domains provided"}, status_code=400)
language = body.get("language", "ES").upper()
if language not in ("EN", "ES", "RO"):
language = "ES"
await queue_ai(domains_list, language=language)
ensure_workers_alive() # ensure AI worker is alive when jobs are queued
return {"queued": len(domains_list)}
@app.post("/api/ai/worker/restart")
async def ai_worker_restart():
ensure_workers_alive()
return {"status": "restarted"}
@app.post("/api/ai/reset")
async def ai_reset():
"""Reset all 'running' AI queue jobs back to 'pending' (unstick hung jobs)."""
async with aiosqlite.connect(SQLITE_PATH) as db:
r = await db.execute("UPDATE ai_queue SET status='pending' WHERE status='running'")
count = r.rowcount
await db.commit()
ensure_workers_alive()
return {"reset": count}
@app.get("/api/ai/debug")
async def ai_debug():
"""Returns worker state + last 10 queue entries for troubleshooting."""
from app.enricher import _ai_worker_task
task_alive = _ai_worker_task is not None and not _ai_worker_task.done()
task_exc = None
if _ai_worker_task and _ai_worker_task.done() and not _ai_worker_task.cancelled():
try:
task_exc = str(_ai_worker_task.exception())
except Exception:
pass
async with aiosqlite.connect(SQLITE_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT domain, status, created_at, completed_at, error FROM ai_queue ORDER BY created_at DESC LIMIT 10"
) as cur:
recent = [dict(r) async for r in cur]
return {
"ai_worker_alive": task_alive,
"ai_worker_exception": task_exc,
"recent_queue": recent,
"queue_status": await get_ai_queue_status(),
}
@app.get("/api/ai/status")
async def ai_status():
return await get_ai_queue_status()
@app.post("/api/ai/assess/single")
async def ai_assess_single(body: dict):
"""Immediate (blocking) AI assessment — does fresh scrape, no pre-enrichment needed."""
domain = body.get("domain")
if not domain:
return JSONResponse({"error": "no domain"}, status_code=400)
language = body.get("language", "ES").upper()
if language not in ("EN", "ES", "RO"):
language = "ES"
from app.site_analyzer import analyze_site
from app.replicate_ai import assess_domain as gemini_assess
analysis = await analyze_site(domain)
assessment = await gemini_assess(analysis, language=language)
await save_ai_assessment(domain, assessment, site_analysis=analysis)
return {**assessment, "site_analysis": analysis}
@app.get("/api/export")
async def export_csv(
min_score: int = Query(0),
cms: str = Query(None),
country: str = Query(None),
tier: str = Query(None),
):
if tier == "hot":
min_score = 80
elif tier == "warm":
min_score = 50
max_score = 79 if tier == "warm" else 100
async def generate():
yield "domain,score,cms,ssl_expiry_days,ip_country,is_live,status_code,has_mx,server,page_title,enriched_at\n"
p = 1
while True:
_, rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=p, limit=500)
if not rows:
break
for r in rows:
if r.get("score", 0) > max_score:
continue
line = ",".join(
f'"{str(r.get(col) or "").replace(chr(34), chr(39))}"'
for col in ["domain", "score", "cms", "ssl_expiry_days", "ip_country",
"is_live", "status_code", "has_mx", "server", "page_title", "enriched_at"]
)
yield line + "\n"
p += 1
fname = f"domgod_{tier or 'export'}_score{min_score}.csv"
return StreamingResponse(
generate(), media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="{fname}"'},
)
@app.get("/api/export/leads")
async def export_leads_csv(lead_quality: str = Query(None), country: str = Query(None)):
import json as _json
async def generate():
yield "domain,lead_quality,score,best_contact_channel,best_contact_value,emails,phones,whatsapp,social,cms,ip_country,page_title,ai_pitch\n"
p = 1
while True:
_, rows = await get_enriched(ai_only=True, lead_quality=lead_quality,
country=country, page=p, limit=500)
if not rows:
break
for r in rows:
contacts = {}
try:
contacts = _json.loads(r.get("contact_info") or "{}")
except Exception:
pass
def esc(v):
return f'"{str(v or "").replace(chr(34), chr(39))}"'
emails = esc(";".join(contacts.get("emails", [])[:3]))
phones = esc(";".join(contacts.get("phones", [])[:3]))
whatsapp = esc(";".join(contacts.get("whatsapp",[])[:2]))
social = esc(";".join(contacts.get("social", [])[:4]))
line = ",".join([
esc(r.get("domain")),
esc(r.get("ai_lead_quality")),
esc(r.get("score")),
esc(r.get("ai_contact_channel")),
esc(r.get("ai_contact_value")),
emails, phones, whatsapp, social,
esc(r.get("cms")),
esc(r.get("ip_country")),
esc(r.get("page_title")),
esc(r.get("ai_pitch")),
])
yield line + "\n"
p += 1
qual = f"_{lead_quality.lower()}" if lead_quality else ""
return StreamingResponse(
generate(), media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="domgod_leads{qual}.csv"'},
)
@app.post("/api/score/run")
async def score_run():
return await run_scoring()
# ── Static UI ────────────────────────────────────────────────────────────────
static_dir = Path(__file__).parent / "static"
app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", host="0.0.0.0", port=6677, log_level="info")