import os import asyncio import logging import aiosqlite import duckdb from pathlib import Path logger = logging.getLogger(__name__) DATA_DIR = Path(os.getenv("DATA_DIR", "/data")) PARQUET_PATH = DATA_DIR / "domains.parquet" DUCKDB_PATH = DATA_DIR / "domains.duckdb" SQLITE_PATH = DATA_DIR / "enrichment.db" SCHEMA = """ CREATE TABLE IF NOT EXISTS enriched_domains ( domain TEXT PRIMARY KEY, is_live INTEGER DEFAULT 0, status_code INTEGER, ssl_valid INTEGER DEFAULT 0, ssl_expiry_days INTEGER, cms TEXT, has_mx INTEGER DEFAULT 0, ip_country TEXT, page_title TEXT, server TEXT, enriched_at TEXT, error TEXT, score INTEGER DEFAULT 0, kit_digital INTEGER DEFAULT 0, kit_digital_signals TEXT, contact_info TEXT, ai_assessment TEXT, ai_lead_quality TEXT, ai_pitch TEXT, ai_contact_channel TEXT, ai_contact_value TEXT, ai_assessed_at TEXT, site_analysis TEXT, prescreen_status TEXT, niche TEXT, site_type TEXT, prescreen_at TEXT, ip TEXT, load_time_ms INTEGER ); CREATE TABLE IF NOT EXISTS job_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, domain TEXT UNIQUE NOT NULL, status TEXT DEFAULT 'pending', created_at TEXT DEFAULT (datetime('now')), started_at TEXT, completed_at TEXT, error TEXT ); CREATE TABLE IF NOT EXISTS ai_queue ( domain TEXT PRIMARY KEY, status TEXT DEFAULT 'pending', created_at TEXT DEFAULT (datetime('now')), completed_at TEXT, error TEXT, language TEXT DEFAULT 'ES' ); CREATE TABLE IF NOT EXISTS scores ( domain TEXT PRIMARY KEY, score INTEGER NOT NULL, scored_at TEXT DEFAULT (datetime('now')) ); """ # Columns added after initial release — applied as migrations on existing DBs _MIGRATIONS = [ "ALTER TABLE enriched_domains ADD COLUMN kit_digital INTEGER DEFAULT 0", "ALTER TABLE enriched_domains ADD COLUMN kit_digital_signals TEXT", "ALTER TABLE enriched_domains ADD COLUMN contact_info TEXT", "ALTER TABLE enriched_domains ADD COLUMN ai_assessment TEXT", "ALTER TABLE enriched_domains ADD COLUMN ai_lead_quality TEXT", "ALTER TABLE enriched_domains ADD COLUMN ai_pitch TEXT", "ALTER TABLE enriched_domains ADD COLUMN ai_contact_channel TEXT", "ALTER TABLE enriched_domains ADD COLUMN ai_contact_value TEXT", "ALTER TABLE enriched_domains ADD COLUMN ai_assessed_at TEXT", "ALTER TABLE enriched_domains ADD COLUMN site_analysis TEXT", "CREATE TABLE IF NOT EXISTS ai_queue (domain TEXT PRIMARY KEY, status TEXT DEFAULT 'pending', created_at TEXT DEFAULT (datetime('now')), completed_at TEXT, error TEXT)", "ALTER TABLE ai_queue ADD COLUMN language TEXT DEFAULT 'ES'", "ALTER TABLE enriched_domains ADD COLUMN prescreen_status TEXT", "ALTER TABLE enriched_domains ADD COLUMN niche TEXT", "ALTER TABLE enriched_domains ADD COLUMN site_type TEXT", "ALTER TABLE enriched_domains ADD COLUMN prescreen_at TEXT", "ALTER TABLE enriched_domains ADD COLUMN ip TEXT", "ALTER TABLE enriched_domains ADD COLUMN load_time_ms INTEGER", ] # Index build state _index_ready = False _index_building = False _index_total = 0 # Cached stats (TLD breakdown is expensive — compute once) _tld_cache: list = [] _total_cache: int = 0 async def init_db(): async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: # WAL mode: concurrent reads don't block on writes; write lock held briefly await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout=30000") await db.executescript(SCHEMA) # Run migrations (safe to re-run — silently skips existing columns) for sql in _MIGRATIONS: try: await db.execute(sql) except Exception: pass await db.commit() # ── DuckDB persistent index ────────────────────────────────────────────────── def _build_index_sync(): global _index_ready, _index_building, _index_total _index_building = True try: conn = duckdb.connect(str(DUCKDB_PATH)) conn.execute("SET threads=4") conn.execute("SET memory_limit='2GB'") # Check if already built try: n = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0] if n > 0: _index_total = n _index_ready = True _index_building = False logger.info("DuckDB index already ready (%d rows)", n) conn.close() return except Exception: pass logger.info("Building DuckDB index from parquet (one-time ~2-3 min)...") conn.execute(""" CREATE OR REPLACE TABLE domains AS SELECT domain, lower(regexp_extract(domain, '\\.([^.]+)$', 1)) AS tld, len(string_split(domain, '.')) AS parts FROM read_parquet(?) """, [str(PARQUET_PATH)]) conn.execute("CREATE INDEX IF NOT EXISTS idx_tld ON domains(tld)") _index_total = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0] conn.close() _index_ready = True logger.info("DuckDB index built: %d rows", _index_total) except Exception as e: logger.error("DuckDB index build failed: %s", e) finally: _index_building = False async def build_duckdb_index(): loop = asyncio.get_event_loop() await loop.run_in_executor(None, _build_index_sync) def index_status() -> dict: return { "ready": _index_ready, "building": _index_building, "total": _index_total, } # ── Domain queries ─────────────────────────────────────────────────────────── def _domains_sync(tld, page, limit, alpha_only, no_sld, keyword): conditions = [] params_count = [] params_data = [] if _index_ready: source = "domains" def _add(clause, val=None): conditions.append(clause) if val is not None: params_count.append(val) params_data.append(val) else: source = f"read_parquet('{PARQUET_PATH}')" def _add(clause, val=None): conditions.append(clause) if val is not None: params_count.append(val) params_data.append(val) if tld: if _index_ready: _add("tld = ?", tld.lower().lstrip(".")) else: _add("lower(regexp_extract(domain, '\\.([^.]+)$', 1)) = ?", tld.lower().lstrip(".")) if no_sld: if _index_ready: _add("parts = 2") else: _add("len(string_split(domain, '.')) = 2") if alpha_only: _add("NOT regexp_matches(domain, '[^a-zA-Z.]')") if keyword: _add("domain LIKE ?", f"%{keyword.lower()}%") where = ("WHERE " + " AND ".join(conditions)) if conditions else "" offset = (page - 1) * limit if _index_ready: conn = duckdb.connect(str(DUCKDB_PATH), read_only=True) else: conn = duckdb.connect(":memory:") conn.execute("SET threads=4") total = conn.execute(f"SELECT COUNT(*) FROM {source} {where}", params_count).fetchone()[0] rows = conn.execute( f"SELECT domain FROM {source} {where} LIMIT {limit} OFFSET {offset}", params_data ).fetchall() conn.close() return total, [r[0] for r in rows] async def get_domains(tld=None, page=1, limit=100, alpha_only=False, no_sld=False, keyword=None, live_only=False): loop = asyncio.get_event_loop() total, domain_list = await loop.run_in_executor( None, _domains_sync, tld, page, limit, alpha_only, no_sld, keyword ) if not domain_list: return total, [] placeholders = ",".join("?" * len(domain_list)) async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: db.row_factory = aiosqlite.Row async with db.execute( f"SELECT * FROM enriched_domains WHERE domain IN ({placeholders})", domain_list, ) as cur: enriched_map = {r["domain"]: dict(r) async for r in cur} results = [] for d in domain_list: row = enriched_map.get(d, {"domain": d}) if live_only and not row.get("is_live"): continue results.append(row) return total, results # ── Stats ──────────────────────────────────────────────────────────────────── def _tld_stats_sync() -> tuple[int, list]: if _index_ready: conn = duckdb.connect(str(DUCKDB_PATH), read_only=True) total = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0] rows = conn.execute(""" SELECT tld, COUNT(*) AS cnt FROM domains WHERE tld != '' GROUP BY tld ORDER BY cnt DESC LIMIT 20 """).fetchall() conn.close() else: p = str(PARQUET_PATH) conn = duckdb.connect(":memory:") conn.execute("SET threads=4") total = conn.execute(f"SELECT COUNT(*) FROM read_parquet('{p}')").fetchone()[0] rows = conn.execute(f""" SELECT lower(regexp_extract(domain, '\\.([^.]+)$', 1)) AS tld, COUNT(*) AS cnt FROM read_parquet('{p}') GROUP BY tld ORDER BY cnt DESC LIMIT 20 """).fetchall() conn.close() return total, [{"tld": r[0], "count": r[1]} for r in rows] async def get_stats(): global _tld_cache, _total_cache # Compute TLD breakdown once and cache it if not _tld_cache: loop = asyncio.get_event_loop() _total_cache, _tld_cache = await loop.run_in_executor(None, _tld_stats_sync) async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: async with db.execute("SELECT COUNT(*) FROM enriched_domains") as cur: enriched = (await cur.fetchone())[0] threshold = int(os.getenv("SCORE_THRESHOLD", "60")) async with db.execute("SELECT COUNT(*) FROM enriched_domains WHERE score >= ?", (threshold,)) as cur: hot_leads = (await cur.fetchone())[0] async with db.execute("SELECT COUNT(*) FROM enriched_domains WHERE kit_digital=1") as cur: kit_digital_count = (await cur.fetchone())[0] async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur: q = {r[0]: r[1] async for r in cur} return { "total_domains": _total_cache, "enriched": enriched, "hot_leads": hot_leads, "kit_digital_count": kit_digital_count, "tld_breakdown": _tld_cache, "index_status": index_status(), "queue": { "pending": q.get("pending", 0), "running": q.get("running", 0), "done": q.get("done", 0), "failed": q.get("failed", 0), }, } # ── Enrichment helpers ─────────────────────────────────────────────────────── async def get_enriched(min_score=0, cms=None, country=None, kit_digital=None, ai_only=False, lead_quality=None, prescreen_status=None, niche=None, site_type=None, page=1, limit=100): offset = (page - 1) * limit conditions = ["score >= ?"] params: list = [min_score] if cms: conditions.append("cms = ?") params.append(cms) if country: conditions.append("ip_country = ?") params.append(country) if kit_digital is not None: conditions.append("kit_digital = ?") params.append(1 if kit_digital else 0) if ai_only: conditions.append("ai_lead_quality IS NOT NULL") if lead_quality: conditions.append("ai_lead_quality = ?") params.append(lead_quality.upper()) if prescreen_status == "none": conditions.append("prescreen_status IS NULL") elif prescreen_status: conditions.append("prescreen_status = ?") params.append(prescreen_status) if niche: conditions.append("niche = ?") params.append(niche) if site_type: conditions.append("site_type = ?") params.append(site_type) where = "WHERE " + " AND ".join(conditions) async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: db.row_factory = aiosqlite.Row async with db.execute( f"SELECT * FROM enriched_domains {where} ORDER BY score DESC LIMIT ? OFFSET ?", params + [limit, offset], ) as cur: rows = [dict(r) async for r in cur] async with db.execute( f"SELECT COUNT(*) FROM enriched_domains {where}", params ) as cur: total = (await cur.fetchone())[0] return total, rows async def queue_ai(domains: list[str], language: str = "ES"): async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: await db.executemany( """INSERT INTO ai_queue (domain, language) VALUES (?, ?) ON CONFLICT(domain) DO UPDATE SET language=excluded.language, status='pending'""", [(d, language) for d in domains], ) await db.commit() async def get_ai_queue_status(): async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: async with db.execute("SELECT status, COUNT(*) FROM ai_queue GROUP BY status") as cur: rows = {r[0]: r[1] async for r in cur} return { "pending": rows.get("pending", 0), "running": rows.get("running", 0), "done": rows.get("done", 0), "failed": rows.get("failed", 0), "total": sum(rows.values()), } async def save_ai_assessment(domain: str, assessment: dict, site_analysis: dict = None): import json as _json async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: # Upsert into enriched_domains (domain may not exist yet if assessed before full enrichment) await db.execute( """INSERT INTO enriched_domains (domain) VALUES (?) ON CONFLICT(domain) DO NOTHING""", (domain,), ) await db.execute( """UPDATE enriched_domains SET ai_assessment=?, ai_lead_quality=?, ai_pitch=?, ai_contact_channel=?, ai_contact_value=?, ai_assessed_at=datetime('now'), site_analysis=? WHERE domain=?""", ( _json.dumps(assessment), assessment.get("lead_quality"), assessment.get("pitch_angle"), assessment.get("best_contact_channel"), assessment.get("best_contact_value"), _json.dumps(site_analysis) if site_analysis else None, domain, ), ) # Update contact_info + kit_digital from site_analysis if available. # Gemini's kit_digital_confirmed is the authoritative verdict — it can # override a false-positive from the heuristic scanner. if site_analysis: contacts = { "emails": site_analysis.get("emails", []), "phones": site_analysis.get("phones", []), "whatsapp": site_analysis.get("whatsapp", []), "social": site_analysis.get("social_links", []), } # Prefer Gemini's explicit verdict; fall back to heuristic if null ai_kit = assessment.get("kit_digital_confirmed") kit_val = int(ai_kit) if ai_kit is not None else int(site_analysis.get("kit_digital", False)) await db.execute( """UPDATE enriched_domains SET kit_digital=?, kit_digital_signals=?, contact_info=? WHERE domain=?""", ( kit_val, _json.dumps(site_analysis.get("kit_digital_signals", [])), _json.dumps(contacts), domain, ), ) await db.execute( "UPDATE ai_queue SET status='done', completed_at=datetime('now') WHERE domain=?", (domain,), ) await db.commit() async def save_prescreen_results(results: list[dict]): """Upsert prescreen HTTP results and/or DeepSeek niche/type classifications.""" async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: for r in results: domain = r.get("domain") if not domain: continue niche = r.get("niche") site_type = r.get("type") # DeepSeek returns "type" key if niche or site_type: # Upsert niche/type — works even if the row was never enriched await db.execute( """INSERT INTO enriched_domains (domain, niche, site_type) VALUES (?, ?, ?) ON CONFLICT(domain) DO UPDATE SET niche=excluded.niche, site_type=excluded.site_type""", (domain, niche, site_type), ) else: # Prescreen status upsert — create row if it doesn't exist yet await db.execute( """INSERT INTO enriched_domains (domain, prescreen_status, prescreen_at, page_title) VALUES (?, ?, datetime('now'), ?) ON CONFLICT(domain) DO UPDATE SET prescreen_status = excluded.prescreen_status, prescreen_at = excluded.prescreen_at, page_title = COALESCE(page_title, excluded.page_title)""", (domain, r.get("prescreen_status"), r.get("title")), ) await db.commit() async def queue_domains(domains: list[str]): async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: await db.executemany( "INSERT OR IGNORE INTO job_queue (domain) VALUES (?)", [(d,) for d in domains], ) await db.commit() async def get_queue_status(): async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur: rows = {r[0]: r[1] async for r in cur} pending = rows.get("pending", 0) running = rows.get("running", 0) done = rows.get("done", 0) failed = rows.get("failed", 0) total = sum(rows.values()) rate = int(os.getenv("CONCURRENCY_LIMIT", "50")) eta_seconds = (pending + running) / max(rate / 10, 1) if (pending + running) > 0 else None return {"total": total, "pending": pending, "running": running, "done": done, "failed": failed, "eta_seconds": eta_seconds}