Files
DomGod/app/db.py
Malin 22eae3f9b7 feat: add EN/ES/RO language selector for AI pitch generation
- db.py: add `language` column to ai_queue; migration; queue_ai() accepts
  language param and re-queues with ON CONFLICT UPDATE so changing language works
- main.py: batch and single assess endpoints accept `language` from request body
- enricher.py: ai_worker_loop reads language column, passes to _assess_one()
- replicate_ai.py: assess_domain() and _build_prompt() accept language param;
  OUTPUT LANGUAGE section injected into prompt so Gemini writes pitch/email in
  the requested language (EN/ES/RO)
- index.html: flag dropdown (🇪🇸/🇬🇧/🇷🇴) next to AI Assess button; aiLang
  state default ES; language sent in all batch assessment requests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 08:39:27 +02:00

436 lines
16 KiB
Python

import os
import asyncio
import logging
import aiosqlite
import duckdb
from pathlib import Path
logger = logging.getLogger(__name__)
DATA_DIR = Path(os.getenv("DATA_DIR", "/data"))
PARQUET_PATH = DATA_DIR / "domains.parquet"
DUCKDB_PATH = DATA_DIR / "domains.duckdb"
SQLITE_PATH = DATA_DIR / "enrichment.db"
SCHEMA = """
CREATE TABLE IF NOT EXISTS enriched_domains (
domain TEXT PRIMARY KEY,
is_live INTEGER DEFAULT 0,
status_code INTEGER,
ssl_valid INTEGER DEFAULT 0,
ssl_expiry_days INTEGER,
cms TEXT,
has_mx INTEGER DEFAULT 0,
ip_country TEXT,
page_title TEXT,
server TEXT,
enriched_at TEXT,
error TEXT,
score INTEGER DEFAULT 0,
kit_digital INTEGER DEFAULT 0,
kit_digital_signals TEXT,
contact_info TEXT,
ai_assessment TEXT,
ai_lead_quality TEXT,
ai_pitch TEXT,
ai_contact_channel TEXT,
ai_contact_value TEXT,
ai_assessed_at TEXT,
site_analysis TEXT
);
CREATE TABLE IF NOT EXISTS job_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
domain TEXT UNIQUE NOT NULL,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT (datetime('now')),
started_at TEXT,
completed_at TEXT,
error TEXT
);
CREATE TABLE IF NOT EXISTS ai_queue (
domain TEXT PRIMARY KEY,
status TEXT DEFAULT 'pending',
created_at TEXT DEFAULT (datetime('now')),
completed_at TEXT,
error TEXT,
language TEXT DEFAULT 'ES'
);
CREATE TABLE IF NOT EXISTS scores (
domain TEXT PRIMARY KEY,
score INTEGER NOT NULL,
scored_at TEXT DEFAULT (datetime('now'))
);
"""
# Columns added after initial release — applied as migrations on existing DBs
_MIGRATIONS = [
"ALTER TABLE enriched_domains ADD COLUMN kit_digital INTEGER DEFAULT 0",
"ALTER TABLE enriched_domains ADD COLUMN kit_digital_signals TEXT",
"ALTER TABLE enriched_domains ADD COLUMN contact_info TEXT",
"ALTER TABLE enriched_domains ADD COLUMN ai_assessment TEXT",
"ALTER TABLE enriched_domains ADD COLUMN ai_lead_quality TEXT",
"ALTER TABLE enriched_domains ADD COLUMN ai_pitch TEXT",
"ALTER TABLE enriched_domains ADD COLUMN ai_contact_channel TEXT",
"ALTER TABLE enriched_domains ADD COLUMN ai_contact_value TEXT",
"ALTER TABLE enriched_domains ADD COLUMN ai_assessed_at TEXT",
"ALTER TABLE enriched_domains ADD COLUMN site_analysis TEXT",
"CREATE TABLE IF NOT EXISTS ai_queue (domain TEXT PRIMARY KEY, status TEXT DEFAULT 'pending', created_at TEXT DEFAULT (datetime('now')), completed_at TEXT, error TEXT)",
"ALTER TABLE ai_queue ADD COLUMN language TEXT DEFAULT 'ES'",
]
# Index build state
_index_ready = False
_index_building = False
_index_total = 0
# Cached stats (TLD breakdown is expensive — compute once)
_tld_cache: list = []
_total_cache: int = 0
async def init_db():
async with aiosqlite.connect(SQLITE_PATH) as db:
await db.executescript(SCHEMA)
# Run migrations (safe to re-run — silently skips existing columns)
for sql in _MIGRATIONS:
try:
await db.execute(sql)
except Exception:
pass
await db.commit()
# ── DuckDB persistent index ──────────────────────────────────────────────────
def _build_index_sync():
global _index_ready, _index_building, _index_total
_index_building = True
try:
conn = duckdb.connect(str(DUCKDB_PATH))
conn.execute("SET threads=4")
conn.execute("SET memory_limit='2GB'")
# Check if already built
try:
n = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0]
if n > 0:
_index_total = n
_index_ready = True
_index_building = False
logger.info("DuckDB index already ready (%d rows)", n)
conn.close()
return
except Exception:
pass
logger.info("Building DuckDB index from parquet (one-time ~2-3 min)...")
conn.execute("""
CREATE OR REPLACE TABLE domains AS
SELECT
domain,
lower(regexp_extract(domain, '\\.([^.]+)$', 1)) AS tld,
len(string_split(domain, '.')) AS parts
FROM read_parquet(?)
""", [str(PARQUET_PATH)])
conn.execute("CREATE INDEX IF NOT EXISTS idx_tld ON domains(tld)")
_index_total = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0]
conn.close()
_index_ready = True
logger.info("DuckDB index built: %d rows", _index_total)
except Exception as e:
logger.error("DuckDB index build failed: %s", e)
finally:
_index_building = False
async def build_duckdb_index():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _build_index_sync)
def index_status() -> dict:
return {
"ready": _index_ready,
"building": _index_building,
"total": _index_total,
}
# ── Domain queries ───────────────────────────────────────────────────────────
def _domains_sync(tld, page, limit, alpha_only, no_sld, keyword):
conditions = []
params_count = []
params_data = []
if _index_ready:
source = "domains"
def _add(clause, val=None):
conditions.append(clause)
if val is not None:
params_count.append(val)
params_data.append(val)
else:
source = f"read_parquet('{PARQUET_PATH}')"
def _add(clause, val=None):
conditions.append(clause)
if val is not None:
params_count.append(val)
params_data.append(val)
if tld:
if _index_ready:
_add("tld = ?", tld.lower().lstrip("."))
else:
_add("lower(regexp_extract(domain, '\\.([^.]+)$', 1)) = ?", tld.lower().lstrip("."))
if no_sld:
if _index_ready:
_add("parts = 2")
else:
_add("len(string_split(domain, '.')) = 2")
if alpha_only:
_add("NOT regexp_matches(domain, '[^a-zA-Z.]')")
if keyword:
_add("domain LIKE ?", f"%{keyword.lower()}%")
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
offset = (page - 1) * limit
if _index_ready:
conn = duckdb.connect(str(DUCKDB_PATH), read_only=True)
else:
conn = duckdb.connect(":memory:")
conn.execute("SET threads=4")
total = conn.execute(f"SELECT COUNT(*) FROM {source} {where}", params_count).fetchone()[0]
rows = conn.execute(
f"SELECT domain FROM {source} {where} LIMIT {limit} OFFSET {offset}", params_data
).fetchall()
conn.close()
return total, [r[0] for r in rows]
async def get_domains(tld=None, page=1, limit=100, alpha_only=False, no_sld=False, keyword=None, live_only=False):
loop = asyncio.get_event_loop()
total, domain_list = await loop.run_in_executor(
None, _domains_sync, tld, page, limit, alpha_only, no_sld, keyword
)
if not domain_list:
return total, []
placeholders = ",".join("?" * len(domain_list))
async with aiosqlite.connect(SQLITE_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
f"SELECT * FROM enriched_domains WHERE domain IN ({placeholders})",
domain_list,
) as cur:
enriched_map = {r["domain"]: dict(r) async for r in cur}
results = []
for d in domain_list:
row = enriched_map.get(d, {"domain": d})
if live_only and not row.get("is_live"):
continue
results.append(row)
return total, results
# ── Stats ────────────────────────────────────────────────────────────────────
def _tld_stats_sync() -> tuple[int, list]:
if _index_ready:
conn = duckdb.connect(str(DUCKDB_PATH), read_only=True)
total = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0]
rows = conn.execute("""
SELECT tld, COUNT(*) AS cnt FROM domains
WHERE tld != ''
GROUP BY tld ORDER BY cnt DESC LIMIT 20
""").fetchall()
conn.close()
else:
p = str(PARQUET_PATH)
conn = duckdb.connect(":memory:")
conn.execute("SET threads=4")
total = conn.execute(f"SELECT COUNT(*) FROM read_parquet('{p}')").fetchone()[0]
rows = conn.execute(f"""
SELECT lower(regexp_extract(domain, '\\.([^.]+)$', 1)) AS tld, COUNT(*) AS cnt
FROM read_parquet('{p}')
GROUP BY tld ORDER BY cnt DESC LIMIT 20
""").fetchall()
conn.close()
return total, [{"tld": r[0], "count": r[1]} for r in rows]
async def get_stats():
global _tld_cache, _total_cache
# Compute TLD breakdown once and cache it
if not _tld_cache:
loop = asyncio.get_event_loop()
_total_cache, _tld_cache = await loop.run_in_executor(None, _tld_stats_sync)
async with aiosqlite.connect(SQLITE_PATH) as db:
async with db.execute("SELECT COUNT(*) FROM enriched_domains") as cur:
enriched = (await cur.fetchone())[0]
threshold = int(os.getenv("SCORE_THRESHOLD", "60"))
async with db.execute("SELECT COUNT(*) FROM enriched_domains WHERE score >= ?", (threshold,)) as cur:
hot_leads = (await cur.fetchone())[0]
async with db.execute("SELECT COUNT(*) FROM enriched_domains WHERE kit_digital=1") as cur:
kit_digital_count = (await cur.fetchone())[0]
async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur:
q = {r[0]: r[1] async for r in cur}
return {
"total_domains": _total_cache,
"enriched": enriched,
"hot_leads": hot_leads,
"kit_digital_count": kit_digital_count,
"tld_breakdown": _tld_cache,
"index_status": index_status(),
"queue": {
"pending": q.get("pending", 0),
"running": q.get("running", 0),
"done": q.get("done", 0),
"failed": q.get("failed", 0),
},
}
# ── Enrichment helpers ───────────────────────────────────────────────────────
async def get_enriched(min_score=0, cms=None, country=None, kit_digital=None, page=1, limit=100):
offset = (page - 1) * limit
conditions = ["score >= ?"]
params: list = [min_score]
if cms:
conditions.append("cms = ?")
params.append(cms)
if country:
conditions.append("ip_country = ?")
params.append(country)
if kit_digital is not None:
conditions.append("kit_digital = ?")
params.append(1 if kit_digital else 0)
where = "WHERE " + " AND ".join(conditions)
async with aiosqlite.connect(SQLITE_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
f"SELECT * FROM enriched_domains {where} ORDER BY score DESC LIMIT ? OFFSET ?",
params + [limit, offset],
) as cur:
rows = [dict(r) async for r in cur]
async with db.execute(
f"SELECT COUNT(*) FROM enriched_domains {where}", params
) as cur:
total = (await cur.fetchone())[0]
return total, rows
async def queue_ai(domains: list[str], language: str = "ES"):
async with aiosqlite.connect(SQLITE_PATH) as db:
await db.executemany(
"""INSERT INTO ai_queue (domain, language) VALUES (?, ?)
ON CONFLICT(domain) DO UPDATE SET language=excluded.language, status='pending'""",
[(d, language) for d in domains],
)
await db.commit()
async def get_ai_queue_status():
async with aiosqlite.connect(SQLITE_PATH) as db:
async with db.execute("SELECT status, COUNT(*) FROM ai_queue GROUP BY status") as cur:
rows = {r[0]: r[1] async for r in cur}
return {
"pending": rows.get("pending", 0),
"running": rows.get("running", 0),
"done": rows.get("done", 0),
"failed": rows.get("failed", 0),
"total": sum(rows.values()),
}
async def save_ai_assessment(domain: str, assessment: dict, site_analysis: dict = None):
import json as _json
async with aiosqlite.connect(SQLITE_PATH) as db:
# Upsert into enriched_domains (domain may not exist yet if assessed before full enrichment)
await db.execute(
"""INSERT INTO enriched_domains (domain) VALUES (?) ON CONFLICT(domain) DO NOTHING""",
(domain,),
)
await db.execute(
"""UPDATE enriched_domains SET
ai_assessment=?, ai_lead_quality=?, ai_pitch=?,
ai_contact_channel=?, ai_contact_value=?, ai_assessed_at=datetime('now'),
site_analysis=?
WHERE domain=?""",
(
_json.dumps(assessment),
assessment.get("lead_quality"),
assessment.get("pitch_angle"),
assessment.get("best_contact_channel"),
assessment.get("best_contact_value"),
_json.dumps(site_analysis) if site_analysis else None,
domain,
),
)
# Update contact_info + kit_digital from site_analysis if available.
# Gemini's kit_digital_confirmed is the authoritative verdict — it can
# override a false-positive from the heuristic scanner.
if site_analysis:
contacts = {
"emails": site_analysis.get("emails", []),
"phones": site_analysis.get("phones", []),
"whatsapp": site_analysis.get("whatsapp", []),
"social": site_analysis.get("social_links", []),
}
# Prefer Gemini's explicit verdict; fall back to heuristic if null
ai_kit = assessment.get("kit_digital_confirmed")
kit_val = int(ai_kit) if ai_kit is not None else int(site_analysis.get("kit_digital", False))
await db.execute(
"""UPDATE enriched_domains SET
kit_digital=?, kit_digital_signals=?, contact_info=?
WHERE domain=?""",
(
kit_val,
_json.dumps(site_analysis.get("kit_digital_signals", [])),
_json.dumps(contacts),
domain,
),
)
await db.execute(
"UPDATE ai_queue SET status='done', completed_at=datetime('now') WHERE domain=?",
(domain,),
)
await db.commit()
async def queue_domains(domains: list[str]):
async with aiosqlite.connect(SQLITE_PATH) as db:
await db.executemany(
"INSERT OR IGNORE INTO job_queue (domain) VALUES (?)",
[(d,) for d in domains],
)
await db.commit()
async def get_queue_status():
async with aiosqlite.connect(SQLITE_PATH) as db:
async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur:
rows = {r[0]: r[1] async for r in cur}
pending = rows.get("pending", 0)
running = rows.get("running", 0)
done = rows.get("done", 0)
failed = rows.get("failed", 0)
total = sum(rows.values())
rate = int(os.getenv("CONCURRENCY_LIMIT", "50"))
eta_seconds = (pending + running) / max(rate / 10, 1) if (pending + running) > 0 else None
return {"total": total, "pending": pending, "running": running, "done": done, "failed": failed, "eta_seconds": eta_seconds}