Files
DomGod/app/db.py
Malin 788252e14f feat: assessed filter, 5000 per-page limit, auto-advance on empty Not-checked page
Assessed/Not assessed filter:
- 'yes' → beauty_lead_quality IS NOT NULL (has been B2B assessed)
- 'no'  → beauty_lead_quality IS NULL (never assessed)
- wired through /api/enriched → get_enriched(beauty_assessed=)

Per-page limit:
- options: 100 / 500 / 1000 / 2000 / 5000
- backend cap raised from le=1000 to le=5000

Auto-advance on empty Not-checked page:
- after bulk validate/prescreen, loadDomains reloads the same DuckDB page
- if every domain on that page is now processed (client-side filter → 0 rows)
  but the page still returned results, automatically increment page and retry
- prevents "No domains found" after successfully processing a batch
- capped at page 500 to avoid infinite loop

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 09:19:51 +02:00

621 lines
24 KiB
Python

import os
import asyncio
import logging
import aiosqlite
import duckdb
from pathlib import Path
logger = logging.getLogger(__name__)
DATA_DIR = Path(os.getenv("DATA_DIR", "/data"))
PARQUET_PATH = DATA_DIR / "domains.parquet"
DUCKDB_PATH = DATA_DIR / "domains.duckdb"
SQLITE_PATH = DATA_DIR / "enrichment.db"
SCHEMA = """
CREATE TABLE IF NOT EXISTS enriched_domains (
domain TEXT PRIMARY KEY,
is_live INTEGER DEFAULT 0,
status_code INTEGER,
ssl_valid INTEGER DEFAULT 0,
ssl_expiry_days INTEGER,
cms TEXT,
has_mx INTEGER DEFAULT 0,
ip_country TEXT,
page_title TEXT,
server TEXT,
enriched_at TEXT,
error TEXT,
score INTEGER DEFAULT 0,
kit_digital INTEGER DEFAULT 0,
kit_digital_signals TEXT,
contact_info TEXT,
ai_assessment TEXT,
ai_lead_quality TEXT,
ai_pitch TEXT,
ai_contact_channel TEXT,
ai_contact_value TEXT,
ai_assessed_at TEXT,
site_analysis TEXT,
prescreen_status TEXT,
niche TEXT,
site_type TEXT,
prescreen_at TEXT,
ip TEXT,
load_time_ms INTEGER
);
CREATE TABLE IF NOT EXISTS job_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
domain TEXT UNIQUE NOT NULL,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT (datetime('now')),
started_at TEXT,
completed_at TEXT,
error TEXT
);
CREATE TABLE IF NOT EXISTS ai_queue (
domain TEXT PRIMARY KEY,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT (datetime('now')),
completed_at TEXT,
error TEXT,
language TEXT DEFAULT 'ES'
);
CREATE TABLE IF NOT EXISTS scores (
domain TEXT PRIMARY KEY,
score INTEGER NOT NULL,
scored_at TEXT DEFAULT (datetime('now'))
);
"""
# Columns added after initial release — applied as migrations on existing DBs
_MIGRATIONS = [
"ALTER TABLE enriched_domains ADD COLUMN kit_digital INTEGER DEFAULT 0",
"ALTER TABLE enriched_domains ADD COLUMN kit_digital_signals TEXT",
"ALTER TABLE enriched_domains ADD COLUMN contact_info TEXT",
"ALTER TABLE enriched_domains ADD COLUMN ai_assessment TEXT",
"ALTER TABLE enriched_domains ADD COLUMN ai_lead_quality TEXT",
"ALTER TABLE enriched_domains ADD COLUMN ai_pitch TEXT",
"ALTER TABLE enriched_domains ADD COLUMN ai_contact_channel TEXT",
"ALTER TABLE enriched_domains ADD COLUMN ai_contact_value TEXT",
"ALTER TABLE enriched_domains ADD COLUMN ai_assessed_at TEXT",
"ALTER TABLE enriched_domains ADD COLUMN site_analysis TEXT",
"CREATE TABLE IF NOT EXISTS ai_queue (domain TEXT PRIMARY KEY, status TEXT DEFAULT 'pending', created_at TEXT DEFAULT (datetime('now')), completed_at TEXT, error TEXT)",
"ALTER TABLE ai_queue ADD COLUMN language TEXT DEFAULT 'ES'",
"ALTER TABLE enriched_domains ADD COLUMN prescreen_status TEXT",
"ALTER TABLE enriched_domains ADD COLUMN niche TEXT",
"ALTER TABLE enriched_domains ADD COLUMN site_type TEXT",
"ALTER TABLE enriched_domains ADD COLUMN prescreen_at TEXT",
"ALTER TABLE enriched_domains ADD COLUMN ip TEXT",
"ALTER TABLE enriched_domains ADD COLUMN load_time_ms INTEGER",
"ALTER TABLE enriched_domains ADD COLUMN beauty_lead_quality TEXT",
"ALTER TABLE enriched_domains ADD COLUMN beauty_assessment TEXT",
"ALTER TABLE enriched_domains ADD COLUMN beauty_assessed_at TEXT",
"ALTER TABLE enriched_domains ADD COLUMN page_snippet TEXT",
"""CREATE TABLE IF NOT EXISTS beauty_queue (
domain TEXT PRIMARY KEY,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT (datetime('now')),
completed_at TEXT,
error TEXT
)""",
]
# Index build state
_index_ready = False
_index_building = False
_index_total = 0
# Cached stats (TLD breakdown is expensive — compute once)
_tld_cache: list = []
_total_cache: int = 0
async def init_db():
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
# WAL mode: concurrent reads don't block on writes; write lock held briefly
await db.execute("PRAGMA journal_mode=WAL")
await db.execute("PRAGMA busy_timeout=30000")
await db.executescript(SCHEMA)
# Run migrations (safe to re-run — silently skips existing columns)
for sql in _MIGRATIONS:
try:
await db.execute(sql)
except Exception:
pass
await db.commit()
# ── DuckDB persistent index ──────────────────────────────────────────────────
def _build_index_sync():
global _index_ready, _index_building, _index_total
_index_building = True
try:
conn = duckdb.connect(str(DUCKDB_PATH))
conn.execute("SET threads=4")
conn.execute("SET memory_limit='2GB'")
# Check if already built
try:
n = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0]
if n > 0:
_index_total = n
_index_ready = True
_index_building = False
logger.info("DuckDB index already ready (%d rows)", n)
conn.close()
return
except Exception:
pass
logger.info("Building DuckDB index from parquet (one-time ~2-3 min)...")
conn.execute("""
CREATE OR REPLACE TABLE domains AS
SELECT
domain,
lower(regexp_extract(domain, '\\.([^.]+)$', 1)) AS tld,
len(string_split(domain, '.')) AS parts
FROM read_parquet(?)
""", [str(PARQUET_PATH)])
conn.execute("CREATE INDEX IF NOT EXISTS idx_tld ON domains(tld)")
_index_total = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0]
conn.close()
_index_ready = True
logger.info("DuckDB index built: %d rows", _index_total)
except Exception as e:
logger.error("DuckDB index build failed: %s", e)
finally:
_index_building = False
async def build_duckdb_index():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _build_index_sync)
def index_status() -> dict:
return {
"ready": _index_ready,
"building": _index_building,
"total": _index_total,
}
# ── Domain queries ───────────────────────────────────────────────────────────
def _domains_sync(tld, page, limit, alpha_only, no_sld, keyword):
conditions = []
params_count = []
params_data = []
if _index_ready:
source = "domains"
def _add(clause, val=None):
conditions.append(clause)
if val is not None:
params_count.append(val)
params_data.append(val)
else:
source = f"read_parquet('{PARQUET_PATH}')"
def _add(clause, val=None):
conditions.append(clause)
if val is not None:
params_count.append(val)
params_data.append(val)
if tld:
if _index_ready:
_add("tld = ?", tld.lower().lstrip("."))
else:
_add("lower(regexp_extract(domain, '\\.([^.]+)$', 1)) = ?", tld.lower().lstrip("."))
if no_sld:
if _index_ready:
_add("parts = 2")
else:
_add("len(string_split(domain, '.')) = 2")
if alpha_only:
_add("NOT regexp_matches(domain, '[^a-zA-Z.]')")
if keyword:
_add("domain LIKE ?", f"%{keyword.lower()}%")
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
offset = (page - 1) * limit
if _index_ready:
conn = duckdb.connect(str(DUCKDB_PATH), read_only=True)
else:
conn = duckdb.connect(":memory:")
conn.execute("SET threads=4")
total = conn.execute(f"SELECT COUNT(*) FROM {source} {where}", params_count).fetchone()[0]
rows = conn.execute(
f"SELECT domain FROM {source} {where} LIMIT {limit} OFFSET {offset}", params_data
).fetchall()
conn.close()
return total, [r[0] for r in rows]
async def get_domains(tld=None, page=1, limit=100, alpha_only=False, no_sld=False, keyword=None, live_only=False):
loop = asyncio.get_event_loop()
total, domain_list = await loop.run_in_executor(
None, _domains_sync, tld, page, limit, alpha_only, no_sld, keyword
)
if not domain_list:
return total, []
placeholders = ",".join("?" * len(domain_list))
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
f"SELECT * FROM enriched_domains WHERE domain IN ({placeholders})",
domain_list,
) as cur:
enriched_map = {r["domain"]: dict(r) async for r in cur}
results = []
for d in domain_list:
row = enriched_map.get(d, {"domain": d})
if live_only and not row.get("is_live"):
continue
results.append(row)
return total, results
# ── Stats ────────────────────────────────────────────────────────────────────
def _tld_stats_sync() -> tuple[int, list]:
if _index_ready:
conn = duckdb.connect(str(DUCKDB_PATH), read_only=True)
total = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0]
rows = conn.execute("""
SELECT tld, COUNT(*) AS cnt FROM domains
WHERE tld != ''
GROUP BY tld ORDER BY cnt DESC LIMIT 20
""").fetchall()
conn.close()
else:
p = str(PARQUET_PATH)
conn = duckdb.connect(":memory:")
conn.execute("SET threads=4")
total = conn.execute(f"SELECT COUNT(*) FROM read_parquet('{p}')").fetchone()[0]
rows = conn.execute(f"""
SELECT lower(regexp_extract(domain, '\\.([^.]+)$', 1)) AS tld, COUNT(*) AS cnt
FROM read_parquet('{p}')
GROUP BY tld ORDER BY cnt DESC LIMIT 20
""").fetchall()
conn.close()
return total, [{"tld": r[0], "count": r[1]} for r in rows]
async def get_stats():
global _tld_cache, _total_cache
# Compute TLD breakdown once and cache it
if not _tld_cache:
loop = asyncio.get_event_loop()
_total_cache, _tld_cache = await loop.run_in_executor(None, _tld_stats_sync)
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
async with db.execute("SELECT COUNT(*) FROM enriched_domains") as cur:
enriched = (await cur.fetchone())[0]
threshold = int(os.getenv("SCORE_THRESHOLD", "60"))
async with db.execute("SELECT COUNT(*) FROM enriched_domains WHERE score >= ?", (threshold,)) as cur:
hot_leads = (await cur.fetchone())[0]
async with db.execute("SELECT COUNT(*) FROM enriched_domains WHERE kit_digital=1") as cur:
kit_digital_count = (await cur.fetchone())[0]
async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur:
q = {r[0]: r[1] async for r in cur}
return {
"total_domains": _total_cache,
"enriched": enriched,
"hot_leads": hot_leads,
"kit_digital_count": kit_digital_count,
"tld_breakdown": _tld_cache,
"index_status": index_status(),
"queue": {
"pending": q.get("pending", 0),
"running": q.get("running", 0),
"done": q.get("done", 0),
"failed": q.get("failed", 0),
},
}
# ── Enrichment helpers ───────────────────────────────────────────────────────
async def get_enriched(min_score=0, cms=None, country=None, kit_digital=None,
ai_only=False, lead_quality=None,
prescreen_status=None, niche=None, site_type=None,
keyword=None, tld=None,
alpha_only=False, no_sld=False,
beauty_assessed=None,
page=1, limit=100):
offset = (page - 1) * limit
conditions = ["score >= ?"]
params: list = [min_score]
if cms:
conditions.append("cms = ?")
params.append(cms)
if country:
conditions.append("ip_country = ?")
params.append(country.upper())
if kit_digital is not None:
conditions.append("kit_digital = ?")
params.append(1 if kit_digital else 0)
if ai_only:
conditions.append("ai_lead_quality IS NOT NULL")
if lead_quality:
conditions.append("ai_lead_quality = ?")
params.append(lead_quality.upper())
if prescreen_status == "none":
conditions.append("prescreen_status IS NULL")
elif prescreen_status:
conditions.append("prescreen_status = ?")
params.append(prescreen_status)
if niche:
conditions.append("niche = ?")
params.append(niche)
if site_type:
conditions.append("site_type = ?")
params.append(site_type)
if keyword:
kw = f"%{keyword.lower()}%"
conditions.append(
"(LOWER(domain) LIKE ?"
" OR LOWER(COALESCE(page_title,'')) LIKE ?"
" OR LOWER(COALESCE(page_snippet,'')) LIKE ?"
" OR LOWER(COALESCE(beauty_assessment,'')) LIKE ?)"
)
params.extend([kw, kw, kw, kw])
if tld:
tld_clean = tld.lower().lstrip(".")
conditions.append("LOWER(domain) LIKE ?")
params.append(f"%.{tld_clean}")
if beauty_assessed == "yes":
conditions.append("beauty_lead_quality IS NOT NULL")
elif beauty_assessed == "no":
conditions.append("beauty_lead_quality IS NULL")
if alpha_only:
# No hyphens, no digits anywhere in the domain name
conditions.append(
"domain NOT LIKE '%-%' AND domain NOT LIKE '%0%' AND domain NOT LIKE '%1%'"
" AND domain NOT LIKE '%2%' AND domain NOT LIKE '%3%' AND domain NOT LIKE '%4%'"
" AND domain NOT LIKE '%5%' AND domain NOT LIKE '%6%' AND domain NOT LIKE '%7%'"
" AND domain NOT LIKE '%8%' AND domain NOT LIKE '%9%'"
)
if no_sld:
# Exactly one dot → only name.tld, excludes shop.com.es style
conditions.append("(LENGTH(domain) - LENGTH(REPLACE(domain, '.', ''))) = 1")
where = "WHERE " + " AND ".join(conditions)
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
f"SELECT * FROM enriched_domains {where} ORDER BY score DESC LIMIT ? OFFSET ?",
params + [limit, offset],
) as cur:
rows = [dict(r) async for r in cur]
async with db.execute(
f"SELECT COUNT(*) FROM enriched_domains {where}", params
) as cur:
total = (await cur.fetchone())[0]
return total, rows
async def queue_ai(domains: list[str], language: str = "ES"):
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
await db.executemany(
"""INSERT INTO ai_queue (domain, language) VALUES (?, ?)
ON CONFLICT(domain) DO UPDATE SET language=excluded.language, status='pending'""",
[(d, language) for d in domains],
)
await db.commit()
async def get_ai_queue_status():
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
async with db.execute("SELECT status, COUNT(*) FROM ai_queue GROUP BY status") as cur:
rows = {r[0]: r[1] async for r in cur}
return {
"pending": rows.get("pending", 0),
"running": rows.get("running", 0),
"done": rows.get("done", 0),
"failed": rows.get("failed", 0),
"total": sum(rows.values()),
}
async def save_ai_assessment(domain: str, assessment: dict, site_analysis: dict = None):
import json as _json
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
# Upsert into enriched_domains (domain may not exist yet if assessed before full enrichment)
await db.execute(
"""INSERT INTO enriched_domains (domain) VALUES (?) ON CONFLICT(domain) DO NOTHING""",
(domain,),
)
await db.execute(
"""UPDATE enriched_domains SET
ai_assessment=?, ai_lead_quality=?, ai_pitch=?,
ai_contact_channel=?, ai_contact_value=?, ai_assessed_at=datetime('now'),
site_analysis=?
WHERE domain=?""",
(
_json.dumps(assessment),
assessment.get("lead_quality"),
assessment.get("pitch_angle"),
assessment.get("best_contact_channel"),
assessment.get("best_contact_value"),
_json.dumps(site_analysis) if site_analysis else None,
domain,
),
)
# Update contact_info + kit_digital from site_analysis if available.
# Gemini's kit_digital_confirmed is the authoritative verdict — it can
# override a false-positive from the heuristic scanner.
if site_analysis:
contacts = {
"emails": site_analysis.get("emails", []),
"phones": site_analysis.get("phones", []),
"whatsapp": site_analysis.get("whatsapp", []),
"social": site_analysis.get("social_links", []),
}
# Prefer Gemini's explicit verdict; fall back to heuristic if null
ai_kit = assessment.get("kit_digital_confirmed")
kit_val = int(ai_kit) if ai_kit is not None else int(site_analysis.get("kit_digital", False))
await db.execute(
"""UPDATE enriched_domains SET
kit_digital=?, kit_digital_signals=?, contact_info=?
WHERE domain=?""",
(
kit_val,
_json.dumps(site_analysis.get("kit_digital_signals", [])),
_json.dumps(contacts),
domain,
),
)
await db.execute(
"UPDATE ai_queue SET status='done', completed_at=datetime('now') WHERE domain=?",
(domain,),
)
await db.commit()
async def save_prescreen_results(results: list[dict]):
"""Upsert prescreen HTTP results and/or DeepSeek niche/type classifications."""
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
for r in results:
domain = r.get("domain")
if not domain:
continue
niche = r.get("niche")
site_type = r.get("type") # DeepSeek returns "type" key
if niche or site_type:
# Upsert niche/type — works even if the row was never enriched
await db.execute(
"""INSERT INTO enriched_domains (domain, niche, site_type)
VALUES (?, ?, ?)
ON CONFLICT(domain) DO UPDATE SET
niche=excluded.niche,
site_type=excluded.site_type""",
(domain, niche, site_type),
)
else:
# Prescreen status upsert — create row if it doesn't exist yet
await db.execute(
"""INSERT INTO enriched_domains (domain, prescreen_status, prescreen_at, page_title, page_snippet)
VALUES (?, ?, datetime('now'), ?, ?)
ON CONFLICT(domain) DO UPDATE SET
prescreen_status = excluded.prescreen_status,
prescreen_at = excluded.prescreen_at,
page_title = COALESCE(page_title, excluded.page_title),
page_snippet = COALESCE(page_snippet, excluded.page_snippet)""",
(domain, r.get("prescreen_status"), r.get("title"), r.get("snippet")),
)
await db.commit()
async def queue_domains(domains: list[str]):
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
await db.executemany(
"INSERT OR IGNORE INTO job_queue (domain) VALUES (?)",
[(d,) for d in domains],
)
await db.commit()
async def queue_beauty(domains: list[str]):
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
await db.executemany(
"INSERT OR IGNORE INTO beauty_queue (domain) VALUES (?)",
[(d,) for d in domains],
)
await db.commit()
async def get_beauty_queue_status():
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
async with db.execute("SELECT status, COUNT(*) FROM beauty_queue GROUP BY status") as cur:
rows = {r[0]: r[1] async for r in cur}
return {
"pending": rows.get("pending", 0),
"running": rows.get("running", 0),
"done": rows.get("done", 0),
"failed": rows.get("failed", 0),
"total": sum(rows.values()),
}
async def save_beauty_assessment(domain: str, assessment: dict):
import json as _json
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
await db.execute(
"INSERT INTO enriched_domains (domain) VALUES (?) ON CONFLICT(domain) DO NOTHING",
(domain,),
)
await db.execute(
"""UPDATE enriched_domains SET
beauty_lead_quality=?, beauty_assessment=?, beauty_assessed_at=datetime('now')
WHERE domain=?""",
(assessment.get("lead_quality"), _json.dumps(assessment), domain),
)
await db.execute(
"UPDATE beauty_queue SET status='done', completed_at=datetime('now') WHERE domain=?",
(domain,),
)
await db.commit()
async def get_beauty_leads(quality: str = None, country: str = None,
page: int = 1, limit: int = 100):
import json as _json
offset = (page - 1) * limit
conditions = ["beauty_lead_quality IS NOT NULL"]
params: list = []
if quality:
conditions.append("beauty_lead_quality = ?")
params.append(quality.upper())
if country:
conditions.append("ip_country = ?")
params.append(country.upper())
where = "WHERE " + " AND ".join(conditions)
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
f"SELECT * FROM enriched_domains {where} "
f"ORDER BY CASE beauty_lead_quality WHEN 'HOT' THEN 1 WHEN 'WARM' THEN 2 ELSE 3 END "
f"LIMIT ? OFFSET ?",
params + [limit, offset],
) as cur:
rows = [dict(r) async for r in cur]
async with db.execute(f"SELECT COUNT(*) FROM enriched_domains {where}", params) as cur:
total = (await cur.fetchone())[0]
# Parse beauty_assessment JSON inline
for r in rows:
try:
r["_beauty"] = _json.loads(r.get("beauty_assessment") or "{}")
except Exception:
r["_beauty"] = {}
return total, rows
async def get_queue_status():
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur:
rows = {r[0]: r[1] async for r in cur}
pending = rows.get("pending", 0)
running = rows.get("running", 0)
done = rows.get("done", 0)
failed = rows.get("failed", 0)
total = sum(rows.values())
rate = int(os.getenv("CONCURRENCY_LIMIT", "50"))
eta_seconds = (pending + running) / max(rate / 10, 1) if (pending + running) > 0 else None
return {"total": total, "pending": pending, "running": running, "done": done, "failed": failed, "eta_seconds": eta_seconds}