- db.py: get_enriched() accepts ai_only + lead_quality params - main.py: /api/enriched exposes ai_only + lead_quality query params; new /api/export/leads endpoint produces CSV with contacts + pitch - index.html: - New "Leads 🤖" tab shows all AI-assessed domains with contacts (quality/country/limit filters, per-row 📋 copy email, 🔍 modal, CSV export, pagination, auto-refreshes every 3s) - Browse: "Hide assessed" checkbox filters out already-processed domains so you can focus on fresh targets - Poll cycle refreshes Leads tab when active Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
442 lines
16 KiB
Python
442 lines
16 KiB
Python
import os
|
|
import asyncio
|
|
import logging
|
|
import aiosqlite
|
|
import duckdb
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DATA_DIR = Path(os.getenv("DATA_DIR", "/data"))
|
|
PARQUET_PATH = DATA_DIR / "domains.parquet"
|
|
DUCKDB_PATH = DATA_DIR / "domains.duckdb"
|
|
SQLITE_PATH = DATA_DIR / "enrichment.db"
|
|
|
|
SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS enriched_domains (
|
|
domain TEXT PRIMARY KEY,
|
|
is_live INTEGER DEFAULT 0,
|
|
status_code INTEGER,
|
|
ssl_valid INTEGER DEFAULT 0,
|
|
ssl_expiry_days INTEGER,
|
|
cms TEXT,
|
|
has_mx INTEGER DEFAULT 0,
|
|
ip_country TEXT,
|
|
page_title TEXT,
|
|
server TEXT,
|
|
enriched_at TEXT,
|
|
error TEXT,
|
|
score INTEGER DEFAULT 0,
|
|
kit_digital INTEGER DEFAULT 0,
|
|
kit_digital_signals TEXT,
|
|
contact_info TEXT,
|
|
ai_assessment TEXT,
|
|
ai_lead_quality TEXT,
|
|
ai_pitch TEXT,
|
|
ai_contact_channel TEXT,
|
|
ai_contact_value TEXT,
|
|
ai_assessed_at TEXT,
|
|
site_analysis TEXT
|
|
);
|
|
CREATE TABLE IF NOT EXISTS job_queue (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
domain TEXT UNIQUE NOT NULL,
|
|
status TEXT DEFAULT 'pending',
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
started_at TEXT,
|
|
completed_at TEXT,
|
|
error TEXT
|
|
);
|
|
CREATE TABLE IF NOT EXISTS ai_queue (
|
|
domain TEXT PRIMARY KEY,
|
|
status TEXT DEFAULT 'pending',
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
completed_at TEXT,
|
|
error TEXT,
|
|
language TEXT DEFAULT 'ES'
|
|
);
|
|
CREATE TABLE IF NOT EXISTS scores (
|
|
domain TEXT PRIMARY KEY,
|
|
score INTEGER NOT NULL,
|
|
scored_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
"""
|
|
|
|
# Columns added after initial release — applied as migrations on existing DBs
|
|
_MIGRATIONS = [
|
|
"ALTER TABLE enriched_domains ADD COLUMN kit_digital INTEGER DEFAULT 0",
|
|
"ALTER TABLE enriched_domains ADD COLUMN kit_digital_signals TEXT",
|
|
"ALTER TABLE enriched_domains ADD COLUMN contact_info TEXT",
|
|
"ALTER TABLE enriched_domains ADD COLUMN ai_assessment TEXT",
|
|
"ALTER TABLE enriched_domains ADD COLUMN ai_lead_quality TEXT",
|
|
"ALTER TABLE enriched_domains ADD COLUMN ai_pitch TEXT",
|
|
"ALTER TABLE enriched_domains ADD COLUMN ai_contact_channel TEXT",
|
|
"ALTER TABLE enriched_domains ADD COLUMN ai_contact_value TEXT",
|
|
"ALTER TABLE enriched_domains ADD COLUMN ai_assessed_at TEXT",
|
|
"ALTER TABLE enriched_domains ADD COLUMN site_analysis TEXT",
|
|
"CREATE TABLE IF NOT EXISTS ai_queue (domain TEXT PRIMARY KEY, status TEXT DEFAULT 'pending', created_at TEXT DEFAULT (datetime('now')), completed_at TEXT, error TEXT)",
|
|
"ALTER TABLE ai_queue ADD COLUMN language TEXT DEFAULT 'ES'",
|
|
]
|
|
|
|
# Index build state
|
|
_index_ready = False
|
|
_index_building = False
|
|
_index_total = 0
|
|
|
|
# Cached stats (TLD breakdown is expensive — compute once)
|
|
_tld_cache: list = []
|
|
_total_cache: int = 0
|
|
|
|
|
|
async def init_db():
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
await db.executescript(SCHEMA)
|
|
# Run migrations (safe to re-run — silently skips existing columns)
|
|
for sql in _MIGRATIONS:
|
|
try:
|
|
await db.execute(sql)
|
|
except Exception:
|
|
pass
|
|
await db.commit()
|
|
|
|
|
|
# ── DuckDB persistent index ──────────────────────────────────────────────────
|
|
|
|
def _build_index_sync():
|
|
global _index_ready, _index_building, _index_total
|
|
_index_building = True
|
|
try:
|
|
conn = duckdb.connect(str(DUCKDB_PATH))
|
|
conn.execute("SET threads=4")
|
|
conn.execute("SET memory_limit='2GB'")
|
|
|
|
# Check if already built
|
|
try:
|
|
n = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0]
|
|
if n > 0:
|
|
_index_total = n
|
|
_index_ready = True
|
|
_index_building = False
|
|
logger.info("DuckDB index already ready (%d rows)", n)
|
|
conn.close()
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
logger.info("Building DuckDB index from parquet (one-time ~2-3 min)...")
|
|
conn.execute("""
|
|
CREATE OR REPLACE TABLE domains AS
|
|
SELECT
|
|
domain,
|
|
lower(regexp_extract(domain, '\\.([^.]+)$', 1)) AS tld,
|
|
len(string_split(domain, '.')) AS parts
|
|
FROM read_parquet(?)
|
|
""", [str(PARQUET_PATH)])
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_tld ON domains(tld)")
|
|
_index_total = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0]
|
|
conn.close()
|
|
_index_ready = True
|
|
logger.info("DuckDB index built: %d rows", _index_total)
|
|
except Exception as e:
|
|
logger.error("DuckDB index build failed: %s", e)
|
|
finally:
|
|
_index_building = False
|
|
|
|
|
|
async def build_duckdb_index():
|
|
loop = asyncio.get_event_loop()
|
|
await loop.run_in_executor(None, _build_index_sync)
|
|
|
|
|
|
def index_status() -> dict:
|
|
return {
|
|
"ready": _index_ready,
|
|
"building": _index_building,
|
|
"total": _index_total,
|
|
}
|
|
|
|
|
|
# ── Domain queries ───────────────────────────────────────────────────────────
|
|
|
|
def _domains_sync(tld, page, limit, alpha_only, no_sld, keyword):
|
|
conditions = []
|
|
params_count = []
|
|
params_data = []
|
|
|
|
if _index_ready:
|
|
source = "domains"
|
|
|
|
def _add(clause, val=None):
|
|
conditions.append(clause)
|
|
if val is not None:
|
|
params_count.append(val)
|
|
params_data.append(val)
|
|
else:
|
|
source = f"read_parquet('{PARQUET_PATH}')"
|
|
|
|
def _add(clause, val=None):
|
|
conditions.append(clause)
|
|
if val is not None:
|
|
params_count.append(val)
|
|
params_data.append(val)
|
|
|
|
if tld:
|
|
if _index_ready:
|
|
_add("tld = ?", tld.lower().lstrip("."))
|
|
else:
|
|
_add("lower(regexp_extract(domain, '\\.([^.]+)$', 1)) = ?", tld.lower().lstrip("."))
|
|
|
|
if no_sld:
|
|
if _index_ready:
|
|
_add("parts = 2")
|
|
else:
|
|
_add("len(string_split(domain, '.')) = 2")
|
|
|
|
if alpha_only:
|
|
_add("NOT regexp_matches(domain, '[^a-zA-Z.]')")
|
|
|
|
if keyword:
|
|
_add("domain LIKE ?", f"%{keyword.lower()}%")
|
|
|
|
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
|
|
offset = (page - 1) * limit
|
|
|
|
if _index_ready:
|
|
conn = duckdb.connect(str(DUCKDB_PATH), read_only=True)
|
|
else:
|
|
conn = duckdb.connect(":memory:")
|
|
conn.execute("SET threads=4")
|
|
|
|
total = conn.execute(f"SELECT COUNT(*) FROM {source} {where}", params_count).fetchone()[0]
|
|
rows = conn.execute(
|
|
f"SELECT domain FROM {source} {where} LIMIT {limit} OFFSET {offset}", params_data
|
|
).fetchall()
|
|
conn.close()
|
|
return total, [r[0] for r in rows]
|
|
|
|
|
|
async def get_domains(tld=None, page=1, limit=100, alpha_only=False, no_sld=False, keyword=None, live_only=False):
|
|
loop = asyncio.get_event_loop()
|
|
total, domain_list = await loop.run_in_executor(
|
|
None, _domains_sync, tld, page, limit, alpha_only, no_sld, keyword
|
|
)
|
|
|
|
if not domain_list:
|
|
return total, []
|
|
|
|
placeholders = ",".join("?" * len(domain_list))
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(
|
|
f"SELECT * FROM enriched_domains WHERE domain IN ({placeholders})",
|
|
domain_list,
|
|
) as cur:
|
|
enriched_map = {r["domain"]: dict(r) async for r in cur}
|
|
|
|
results = []
|
|
for d in domain_list:
|
|
row = enriched_map.get(d, {"domain": d})
|
|
if live_only and not row.get("is_live"):
|
|
continue
|
|
results.append(row)
|
|
|
|
return total, results
|
|
|
|
|
|
# ── Stats ────────────────────────────────────────────────────────────────────
|
|
|
|
def _tld_stats_sync() -> tuple[int, list]:
|
|
if _index_ready:
|
|
conn = duckdb.connect(str(DUCKDB_PATH), read_only=True)
|
|
total = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0]
|
|
rows = conn.execute("""
|
|
SELECT tld, COUNT(*) AS cnt FROM domains
|
|
WHERE tld != ''
|
|
GROUP BY tld ORDER BY cnt DESC LIMIT 20
|
|
""").fetchall()
|
|
conn.close()
|
|
else:
|
|
p = str(PARQUET_PATH)
|
|
conn = duckdb.connect(":memory:")
|
|
conn.execute("SET threads=4")
|
|
total = conn.execute(f"SELECT COUNT(*) FROM read_parquet('{p}')").fetchone()[0]
|
|
rows = conn.execute(f"""
|
|
SELECT lower(regexp_extract(domain, '\\.([^.]+)$', 1)) AS tld, COUNT(*) AS cnt
|
|
FROM read_parquet('{p}')
|
|
GROUP BY tld ORDER BY cnt DESC LIMIT 20
|
|
""").fetchall()
|
|
conn.close()
|
|
return total, [{"tld": r[0], "count": r[1]} for r in rows]
|
|
|
|
|
|
async def get_stats():
|
|
global _tld_cache, _total_cache
|
|
|
|
# Compute TLD breakdown once and cache it
|
|
if not _tld_cache:
|
|
loop = asyncio.get_event_loop()
|
|
_total_cache, _tld_cache = await loop.run_in_executor(None, _tld_stats_sync)
|
|
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
async with db.execute("SELECT COUNT(*) FROM enriched_domains") as cur:
|
|
enriched = (await cur.fetchone())[0]
|
|
threshold = int(os.getenv("SCORE_THRESHOLD", "60"))
|
|
async with db.execute("SELECT COUNT(*) FROM enriched_domains WHERE score >= ?", (threshold,)) as cur:
|
|
hot_leads = (await cur.fetchone())[0]
|
|
async with db.execute("SELECT COUNT(*) FROM enriched_domains WHERE kit_digital=1") as cur:
|
|
kit_digital_count = (await cur.fetchone())[0]
|
|
async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur:
|
|
q = {r[0]: r[1] async for r in cur}
|
|
|
|
return {
|
|
"total_domains": _total_cache,
|
|
"enriched": enriched,
|
|
"hot_leads": hot_leads,
|
|
"kit_digital_count": kit_digital_count,
|
|
"tld_breakdown": _tld_cache,
|
|
"index_status": index_status(),
|
|
"queue": {
|
|
"pending": q.get("pending", 0),
|
|
"running": q.get("running", 0),
|
|
"done": q.get("done", 0),
|
|
"failed": q.get("failed", 0),
|
|
},
|
|
}
|
|
|
|
|
|
# ── Enrichment helpers ───────────────────────────────────────────────────────
|
|
|
|
async def get_enriched(min_score=0, cms=None, country=None, kit_digital=None,
|
|
ai_only=False, lead_quality=None, page=1, limit=100):
|
|
offset = (page - 1) * limit
|
|
conditions = ["score >= ?"]
|
|
params: list = [min_score]
|
|
if cms:
|
|
conditions.append("cms = ?")
|
|
params.append(cms)
|
|
if country:
|
|
conditions.append("ip_country = ?")
|
|
params.append(country)
|
|
if kit_digital is not None:
|
|
conditions.append("kit_digital = ?")
|
|
params.append(1 if kit_digital else 0)
|
|
if ai_only:
|
|
conditions.append("ai_lead_quality IS NOT NULL")
|
|
if lead_quality:
|
|
conditions.append("ai_lead_quality = ?")
|
|
params.append(lead_quality.upper())
|
|
where = "WHERE " + " AND ".join(conditions)
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(
|
|
f"SELECT * FROM enriched_domains {where} ORDER BY score DESC LIMIT ? OFFSET ?",
|
|
params + [limit, offset],
|
|
) as cur:
|
|
rows = [dict(r) async for r in cur]
|
|
async with db.execute(
|
|
f"SELECT COUNT(*) FROM enriched_domains {where}", params
|
|
) as cur:
|
|
total = (await cur.fetchone())[0]
|
|
return total, rows
|
|
|
|
|
|
async def queue_ai(domains: list[str], language: str = "ES"):
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
await db.executemany(
|
|
"""INSERT INTO ai_queue (domain, language) VALUES (?, ?)
|
|
ON CONFLICT(domain) DO UPDATE SET language=excluded.language, status='pending'""",
|
|
[(d, language) for d in domains],
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def get_ai_queue_status():
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
async with db.execute("SELECT status, COUNT(*) FROM ai_queue GROUP BY status") as cur:
|
|
rows = {r[0]: r[1] async for r in cur}
|
|
return {
|
|
"pending": rows.get("pending", 0),
|
|
"running": rows.get("running", 0),
|
|
"done": rows.get("done", 0),
|
|
"failed": rows.get("failed", 0),
|
|
"total": sum(rows.values()),
|
|
}
|
|
|
|
|
|
async def save_ai_assessment(domain: str, assessment: dict, site_analysis: dict = None):
|
|
import json as _json
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
# Upsert into enriched_domains (domain may not exist yet if assessed before full enrichment)
|
|
await db.execute(
|
|
"""INSERT INTO enriched_domains (domain) VALUES (?) ON CONFLICT(domain) DO NOTHING""",
|
|
(domain,),
|
|
)
|
|
await db.execute(
|
|
"""UPDATE enriched_domains SET
|
|
ai_assessment=?, ai_lead_quality=?, ai_pitch=?,
|
|
ai_contact_channel=?, ai_contact_value=?, ai_assessed_at=datetime('now'),
|
|
site_analysis=?
|
|
WHERE domain=?""",
|
|
(
|
|
_json.dumps(assessment),
|
|
assessment.get("lead_quality"),
|
|
assessment.get("pitch_angle"),
|
|
assessment.get("best_contact_channel"),
|
|
assessment.get("best_contact_value"),
|
|
_json.dumps(site_analysis) if site_analysis else None,
|
|
domain,
|
|
),
|
|
)
|
|
# Update contact_info + kit_digital from site_analysis if available.
|
|
# Gemini's kit_digital_confirmed is the authoritative verdict — it can
|
|
# override a false-positive from the heuristic scanner.
|
|
if site_analysis:
|
|
contacts = {
|
|
"emails": site_analysis.get("emails", []),
|
|
"phones": site_analysis.get("phones", []),
|
|
"whatsapp": site_analysis.get("whatsapp", []),
|
|
"social": site_analysis.get("social_links", []),
|
|
}
|
|
# Prefer Gemini's explicit verdict; fall back to heuristic if null
|
|
ai_kit = assessment.get("kit_digital_confirmed")
|
|
kit_val = int(ai_kit) if ai_kit is not None else int(site_analysis.get("kit_digital", False))
|
|
await db.execute(
|
|
"""UPDATE enriched_domains SET
|
|
kit_digital=?, kit_digital_signals=?, contact_info=?
|
|
WHERE domain=?""",
|
|
(
|
|
kit_val,
|
|
_json.dumps(site_analysis.get("kit_digital_signals", [])),
|
|
_json.dumps(contacts),
|
|
domain,
|
|
),
|
|
)
|
|
await db.execute(
|
|
"UPDATE ai_queue SET status='done', completed_at=datetime('now') WHERE domain=?",
|
|
(domain,),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def queue_domains(domains: list[str]):
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
await db.executemany(
|
|
"INSERT OR IGNORE INTO job_queue (domain) VALUES (?)",
|
|
[(d,) for d in domains],
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def get_queue_status():
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur:
|
|
rows = {r[0]: r[1] async for r in cur}
|
|
pending = rows.get("pending", 0)
|
|
running = rows.get("running", 0)
|
|
done = rows.get("done", 0)
|
|
failed = rows.get("failed", 0)
|
|
total = sum(rows.values())
|
|
rate = int(os.getenv("CONCURRENCY_LIMIT", "50"))
|
|
eta_seconds = (pending + running) / max(rate / 10, 1) if (pending + running) > 0 else None
|
|
return {"total": total, "pending": pending, "running": running, "done": done, "failed": failed, "eta_seconds": eta_seconds}
|