diff --git a/app/db.py b/app/db.py index 2f903a6..432db74 100644 --- a/app/db.py +++ b/app/db.py @@ -26,7 +26,16 @@ CREATE TABLE IF NOT EXISTS enriched_domains ( server TEXT, enriched_at TEXT, error TEXT, - score INTEGER DEFAULT 0 + score INTEGER DEFAULT 0, + kit_digital INTEGER DEFAULT 0, + kit_digital_signals TEXT, + contact_info TEXT, + ai_assessment TEXT, + ai_lead_quality TEXT, + ai_pitch TEXT, + ai_contact_channel TEXT, + ai_contact_value TEXT, + ai_assessed_at TEXT ); CREATE TABLE IF NOT EXISTS job_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -37,6 +46,13 @@ CREATE TABLE IF NOT EXISTS job_queue ( completed_at TEXT, error TEXT ); +CREATE TABLE IF NOT EXISTS ai_queue ( + domain TEXT PRIMARY KEY, + status TEXT DEFAULT 'pending', + created_at TEXT DEFAULT (datetime('now')), + completed_at TEXT, + error TEXT +); CREATE TABLE IF NOT EXISTS scores ( domain TEXT PRIMARY KEY, score INTEGER NOT NULL, @@ -44,6 +60,20 @@ CREATE TABLE IF NOT EXISTS scores ( ); """ +# Columns added after initial release — applied as migrations on existing DBs +_MIGRATIONS = [ + "ALTER TABLE enriched_domains ADD COLUMN kit_digital INTEGER DEFAULT 0", + "ALTER TABLE enriched_domains ADD COLUMN kit_digital_signals TEXT", + "ALTER TABLE enriched_domains ADD COLUMN contact_info TEXT", + "ALTER TABLE enriched_domains ADD COLUMN ai_assessment TEXT", + "ALTER TABLE enriched_domains ADD COLUMN ai_lead_quality TEXT", + "ALTER TABLE enriched_domains ADD COLUMN ai_pitch TEXT", + "ALTER TABLE enriched_domains ADD COLUMN ai_contact_channel TEXT", + "ALTER TABLE enriched_domains ADD COLUMN ai_contact_value TEXT", + "ALTER TABLE enriched_domains ADD COLUMN ai_assessed_at TEXT", + "CREATE TABLE IF NOT EXISTS ai_queue (domain TEXT PRIMARY KEY, status TEXT DEFAULT 'pending', created_at TEXT DEFAULT (datetime('now')), completed_at TEXT, error TEXT)", +] + # Index build state _index_ready = False _index_building = False @@ -57,6 +87,12 @@ _total_cache: int = 0 async def init_db(): async with aiosqlite.connect(SQLITE_PATH) as db: await db.executescript(SCHEMA) + # Run migrations (safe to re-run — silently skips existing columns) + for sql in _MIGRATIONS: + try: + await db.execute(sql) + except Exception: + pass await db.commit() @@ -243,6 +279,8 @@ async def get_stats(): threshold = int(os.getenv("SCORE_THRESHOLD", "60")) async with db.execute("SELECT COUNT(*) FROM enriched_domains WHERE score >= ?", (threshold,)) as cur: hot_leads = (await cur.fetchone())[0] + async with db.execute("SELECT COUNT(*) FROM enriched_domains WHERE kit_digital=1") as cur: + kit_digital_count = (await cur.fetchone())[0] async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur: q = {r[0]: r[1] async for r in cur} @@ -250,6 +288,7 @@ async def get_stats(): "total_domains": _total_cache, "enriched": enriched, "hot_leads": hot_leads, + "kit_digital_count": kit_digital_count, "tld_breakdown": _tld_cache, "index_status": index_status(), "queue": { @@ -263,7 +302,7 @@ async def get_stats(): # ── Enrichment helpers ─────────────────────────────────────────────────────── -async def get_enriched(min_score=0, cms=None, country=None, page=1, limit=100): +async def get_enriched(min_score=0, cms=None, country=None, kit_digital=None, page=1, limit=100): offset = (page - 1) * limit conditions = ["score >= ?"] params: list = [min_score] @@ -273,6 +312,9 @@ async def get_enriched(min_score=0, cms=None, country=None, page=1, limit=100): if country: conditions.append("ip_country = ?") params.append(country) + if kit_digital is not None: + conditions.append("kit_digital = ?") + params.append(1 if kit_digital else 0) where = "WHERE " + " AND ".join(conditions) async with aiosqlite.connect(SQLITE_PATH) as db: db.row_factory = aiosqlite.Row @@ -288,6 +330,52 @@ async def get_enriched(min_score=0, cms=None, country=None, page=1, limit=100): return total, rows +async def queue_ai(domains: list[str]): + async with aiosqlite.connect(SQLITE_PATH) as db: + await db.executemany( + "INSERT OR IGNORE INTO ai_queue (domain) VALUES (?)", + [(d,) for d in domains], + ) + await db.commit() + + +async def get_ai_queue_status(): + async with aiosqlite.connect(SQLITE_PATH) as db: + async with db.execute("SELECT status, COUNT(*) FROM ai_queue GROUP BY status") as cur: + rows = {r[0]: r[1] async for r in cur} + return { + "pending": rows.get("pending", 0), + "running": rows.get("running", 0), + "done": rows.get("done", 0), + "failed": rows.get("failed", 0), + "total": sum(rows.values()), + } + + +async def save_ai_assessment(domain: str, assessment: dict): + import json as _json + async with aiosqlite.connect(SQLITE_PATH) as db: + await db.execute( + """UPDATE enriched_domains SET + ai_assessment=?, ai_lead_quality=?, ai_pitch=?, + ai_contact_channel=?, ai_contact_value=?, ai_assessed_at=datetime('now') + WHERE domain=?""", + ( + _json.dumps(assessment), + assessment.get("lead_quality"), + assessment.get("pitch_angle"), + assessment.get("best_contact_channel"), + assessment.get("best_contact_value"), + domain, + ), + ) + await db.execute( + "UPDATE ai_queue SET status='done', completed_at=datetime('now') WHERE domain=?", + (domain,), + ) + await db.commit() + + async def queue_domains(domains: list[str]): async with aiosqlite.connect(SQLITE_PATH) as db: await db.executemany( diff --git a/app/enricher.py b/app/enricher.py index d2074a6..3ac91bb 100644 --- a/app/enricher.py +++ b/app/enricher.py @@ -1,5 +1,7 @@ import asyncio +import json import os +import re import ssl import socket import datetime @@ -11,49 +13,149 @@ import dns.resolver import aiosqlite from bs4 import BeautifulSoup -from app.db import SQLITE_PATH +from app.db import SQLITE_PATH, queue_ai, save_ai_assessment, get_ai_queue_status from app.scorer import score logger = logging.getLogger(__name__) CONCURRENCY_LIMIT = int(os.getenv("CONCURRENCY_LIMIT", "50")) -# ip-api.com free tier: 45 req/min → ~1.33/s. We use a separate slower semaphore. -IP_API_SEMAPHORE: Optional[asyncio.Semaphore] = None -IP_API_RATE = 45 # per minute +IP_API_RATE = 45 # req/min free tier _worker_task: Optional[asyncio.Task] = None +_ai_worker_task: Optional[asyncio.Task] = None _paused = False +_ip_sem: Optional[asyncio.Semaphore] = None +_ip_last_call = 0.0 -def get_ip_semaphore(): - global IP_API_SEMAPHORE - if IP_API_SEMAPHORE is None: - IP_API_SEMAPHORE = asyncio.Semaphore(1) - return IP_API_SEMAPHORE +def _get_ip_sem(): + global _ip_sem + if _ip_sem is None: + _ip_sem = asyncio.Semaphore(1) + return _ip_sem +# ── CMS detection ──────────────────────────────────────────────────────────── + CMS_SIGNATURES = { - "wordpress": ["/wp-content/", "/wp-includes/", 'name="generator" content="WordPress'], - "joomla": ["/components/com_", "Joomla!", 'name="generator" content="Joomla'], - "drupal": ["/sites/default/files/", "Drupal.settings", 'name="generator" content="Drupal'], - "wix": ["wix.com", "X-Wix-"], + "wordpress": ["/wp-content/", "/wp-includes/", 'content="WordPress'], + "joomla": ["/components/com_", "Joomla!", 'content="Joomla'], + "drupal": ["/sites/default/files/", "Drupal.settings", 'content="Drupal'], + "wix": ["static.wixstatic.com", "X-Wix-"], "squarespace": ["squarespace.com", "X-Squarespace-"], - "shopify": ["cdn.shopify.com", "Shopify.theme"], - "prestashop": ["PrestaShop", "/modules/"], - "magento": ["Mage.Cookies", "X-Magento-"], - "typo3": ["typo3", "TYPO3 CMS"], - "opencart": ["route=common/home", "OpenCart"], + "shopify": ["cdn.shopify.com", "Shopify.theme"], + "prestashop": ["PrestaShop", "/modules/prestashop"], + "magento": ["Mage.Cookies", "X-Magento-"], + "typo3": ["typo3temp", "TYPO3 CMS"], + "opencart": ["route=common/home", "OpenCart"], } def detect_cms(html: str, headers: dict) -> Optional[str]: - combined = html[:50000] + " ".join(f"{k}:{v}" for k, v in headers.items()) + combined = html[:60000] + " ".join(f"{k}:{v}" for k, v in headers.items()) + cl = combined.lower() for cms, sigs in CMS_SIGNATURES.items(): - if any(sig.lower() in combined.lower() for sig in sigs): + if any(s.lower() in cl for s in sigs): return cms return None +# ── Kit Digital detection ──────────────────────────────────────────────────── + +KIT_IMG_PATS = [ + "digitalizadores", "kit-digital", "kitdigital", "kit_digital", + "fondos-europeos", "fondos_europeos", "nextgeneration", "next-generation", + "prtr", "plan-recuperacion", "planderecuperacion", + "acelerapyme", "logo-ue", "recovery-eu", "cofinanciado", +] +KIT_TEXT_PATS = [ + "kit digital", "agente digitalizador", "agentes digitalizadores", + "fondos europeos", "next generation eu", "nextgenerationeu", + "plan de recuperación", "plan de recuperacion", + "plan de digitalización", "digitalización pymes", + "prtr", "financiado por la unión europea", + "red.es/kit-digital", "acelerapyme.es", +] +KIT_LINK_PATS = ["acelerapyme", "red.es", "kit-digital", "kitdigital"] + + +def detect_kit_digital(soup, html: str) -> tuple[bool, list]: + signals = [] + hl = html.lower() + + for img in soup.find_all("img"): + combined = ((img.get("src") or "") + (img.get("alt") or "") + (img.get("srcset") or "")).lower() + for p in KIT_IMG_PATS: + if p in combined: + signals.append(f"img:{p}") + break + + for p in KIT_TEXT_PATS: + if p in hl: + signals.append(f"text:{p}") + + for a in soup.find_all("a", href=True): + href = a["href"].lower() + if any(p in href for p in KIT_LINK_PATS): + signals.append(f"link:{href[:60]}") + + signals = list(dict.fromkeys(signals))[:15] + return len(signals) > 0, signals + + +# ── Contact extraction ──────────────────────────────────────────────────────── + +EMAIL_RE = re.compile(r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}") +PHONE_RE = re.compile(r"(?:\+34[\s\-]?)?(?:6|7|8|9)\d{2}[\s\-]?\d{3}[\s\-]?\d{3}") +SOCIAL_DOMAINS = ["facebook.com", "instagram.com", "linkedin.com", "twitter.com", "x.com", "tiktok.com"] + + +def extract_contacts(soup, html: str) -> dict: + contacts: dict = {"emails": [], "phones": [], "whatsapp": [], "social": []} + + # mailto links + for a in soup.find_all("a", href=True): + href = a["href"] + if href.startswith("mailto:"): + em = href[7:].split("?")[0].strip() + if em and em not in contacts["emails"]: + contacts["emails"].append(em) + elif href.startswith("tel:"): + ph = re.sub(r"[^\d+]", "", href[4:]) + if ph and ph not in contacts["phones"]: + contacts["phones"].append(ph) + elif "wa.me" in href or "api.whatsapp.com" in href: + if href not in contacts["whatsapp"]: + contacts["whatsapp"].append(href[:80]) + else: + for sd in SOCIAL_DOMAINS: + if sd in href.lower(): + clean = href.split("?")[0].rstrip("/") + if clean not in contacts["social"]: + contacts["social"].append(clean) + break + + # Email regex in raw HTML (catches obfuscated ones) + for em in EMAIL_RE.findall(html[:100000]): + em = em.lower() + if em not in contacts["emails"] and not em.endswith((".png", ".jpg", ".css", ".js")): + contacts["emails"].append(em) + + # Phone numbers in visible text + for ph in PHONE_RE.findall(soup.get_text()): + ph_clean = re.sub(r"[\s\-]", "", ph) + if ph_clean not in contacts["phones"]: + contacts["phones"].append(ph_clean) + + # Dedupe + cap + for k in contacts: + contacts[k] = list(dict.fromkeys(contacts[k]))[:5] + + return contacts + + +# ── SSL / MX / IP ───────────────────────────────────────────────────────────── + async def check_ssl(domain: str) -> tuple[bool, Optional[int]]: try: ctx = ssl.create_default_context() @@ -63,11 +165,8 @@ async def check_ssl(domain: str) -> tuple[bool, Optional[int]]: with socket.create_connection((domain, 443), timeout=5) as sock: with ctx.wrap_socket(sock, server_hostname=domain) as ssock: cert = ssock.getpeercert() - expiry_str = cert.get("notAfter", "") - expiry = datetime.datetime.strptime(expiry_str, "%b %d %H:%M:%S %Y %Z") - days = (expiry - datetime.datetime.utcnow()).days - return True, days - + expiry = datetime.datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z") + return True, (expiry - datetime.datetime.utcnow()).days return await loop.run_in_executor(None, _check) except Exception: return False, None @@ -76,64 +175,53 @@ async def check_ssl(domain: str) -> tuple[bool, Optional[int]]: async def check_mx(domain: str) -> bool: try: loop = asyncio.get_event_loop() - def _check(): try: - answers = dns.resolver.resolve(domain, "MX", lifetime=5) - return len(answers) > 0 + return len(dns.resolver.resolve(domain, "MX", lifetime=5)) > 0 except Exception: return False - return await loop.run_in_executor(None, _check) except Exception: return False -_ip_last_call = 0.0 -_ip_lock = asyncio.Lock() if False else None # initialized lazily - - async def get_ip_country(ip: str) -> Optional[str]: global _ip_last_call - # Enforce 45 req/min = 1 req per 1.33s - async with get_ip_semaphore(): + async with _get_ip_sem(): now = asyncio.get_event_loop().time() - wait = (1 / (IP_API_RATE / 60)) - (now - _ip_last_call) + wait = (60 / IP_API_RATE) - (now - _ip_last_call) if wait > 0: await asyncio.sleep(wait) _ip_last_call = asyncio.get_event_loop().time() - try: async with httpx.AsyncClient(timeout=5) as client: - resp = await client.get(f"http://ip-api.com/json/{ip}?fields=countryCode") - if resp.status_code == 200: - return resp.json().get("countryCode") + r = await client.get(f"http://ip-api.com/json/{ip}?fields=countryCode") + if r.status_code == 200: + return r.json().get("countryCode") except Exception: pass return None +# ── Main enrichment ─────────────────────────────────────────────────────────── + async def enrich_domain(domain: str) -> dict: result = { "domain": domain, - "is_live": False, - "status_code": None, - "ssl_valid": False, - "ssl_expiry_days": None, - "cms": None, - "has_mx": False, - "ip_country": None, - "page_title": None, + "is_live": False, "status_code": None, + "ssl_valid": False, "ssl_expiry_days": None, + "cms": None, "has_mx": False, + "ip_country": None, "page_title": None, "server": None, + "kit_digital": False, "kit_digital_signals": "[]", + "contact_info": "{}", "enriched_at": datetime.datetime.utcnow().isoformat(), "error": None, } try: async with httpx.AsyncClient( - timeout=10, - follow_redirects=True, - verify=False, + timeout=12, follow_redirects=True, verify=False, headers={"User-Agent": "Mozilla/5.0 (compatible; DomGod/1.0)"}, ) as client: resp = await client.get(f"http://{domain}") @@ -143,11 +231,16 @@ async def enrich_domain(domain: str) -> dict: html = resp.text soup = BeautifulSoup(html, "html.parser") - title_tag = soup.find("title") - result["page_title"] = title_tag.get_text(strip=True)[:500] if title_tag else None + title = soup.find("title") + result["page_title"] = title.get_text(strip=True)[:500] if title else None result["cms"] = detect_cms(html, dict(resp.headers)) - # Resolve IP for country lookup + kit, signals = detect_kit_digital(soup, html) + result["kit_digital"] = kit + result["kit_digital_signals"] = json.dumps(signals) + + result["contact_info"] = json.dumps(extract_contacts(soup, html)) + try: loop = asyncio.get_event_loop() ip = await loop.run_in_executor(None, socket.gethostbyname, domain) @@ -158,15 +251,10 @@ async def enrich_domain(domain: str) -> dict: except Exception as e: result["error"] = str(e)[:500] - # SSL check (independent of HTTP) ssl_valid, ssl_days = await check_ssl(domain) result["ssl_valid"] = ssl_valid result["ssl_expiry_days"] = ssl_days - - # MX check result["has_mx"] = await check_mx(domain) - - # Score result["score"] = score(result) return result @@ -177,19 +265,24 @@ async def save_enriched(data: dict): await db.execute( """INSERT INTO enriched_domains (domain, is_live, status_code, ssl_valid, ssl_expiry_days, cms, - has_mx, ip_country, page_title, server, enriched_at, error, score) - VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) + has_mx, ip_country, page_title, server, enriched_at, error, score, + kit_digital, kit_digital_signals, contact_info) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT(domain) DO UPDATE SET is_live=excluded.is_live, status_code=excluded.status_code, ssl_valid=excluded.ssl_valid, ssl_expiry_days=excluded.ssl_expiry_days, cms=excluded.cms, has_mx=excluded.has_mx, ip_country=excluded.ip_country, page_title=excluded.page_title, server=excluded.server, - enriched_at=excluded.enriched_at, error=excluded.error, score=excluded.score""", + enriched_at=excluded.enriched_at, error=excluded.error, score=excluded.score, + kit_digital=excluded.kit_digital, + kit_digital_signals=excluded.kit_digital_signals, + contact_info=excluded.contact_info""", ( data["domain"], data["is_live"], data["status_code"], data["ssl_valid"], data["ssl_expiry_days"], data["cms"], data["has_mx"], data["ip_country"], data["page_title"], data["server"], data["enriched_at"], data["error"], data["score"], + int(data["kit_digital"]), data["kit_digital_signals"], data["contact_info"], ), ) await db.execute( @@ -205,18 +298,17 @@ async def mark_job(domain: str, status: str, error: str = None): if status == "running": await db.execute( "UPDATE job_queue SET status=?, started_at=datetime('now') WHERE domain=?", - (status, domain), - ) + (status, domain)) elif status in ("done", "failed"): await db.execute( "UPDATE job_queue SET status=?, completed_at=datetime('now'), error=? WHERE domain=?", - (status, error, domain), - ) + (status, error, domain)) await db.commit() +# ── Enrichment worker ───────────────────────────────────────────────────────── + async def worker_loop(): - global _paused sem = asyncio.Semaphore(CONCURRENCY_LIMIT) async def process(domain: str): @@ -233,26 +325,70 @@ async def worker_loop(): if _paused: await asyncio.sleep(1) continue - async with aiosqlite.connect(SQLITE_PATH) as db: async with db.execute( "SELECT domain FROM job_queue WHERE status='pending' LIMIT 100" ) as cur: rows = await cur.fetchall() - if not rows: await asyncio.sleep(2) continue + await asyncio.gather(*[asyncio.create_task(process(r[0])) for r in rows], return_exceptions=True) - tasks = [asyncio.create_task(process(r[0])) for r in rows] - await asyncio.gather(*tasks, return_exceptions=True) + +# ── AI assessment worker ────────────────────────────────────────────────────── + +async def ai_worker_loop(): + from app.replicate_ai import assess_domain as gemini_assess + + while True: + async with aiosqlite.connect(SQLITE_PATH) as db: + async with db.execute( + "SELECT domain FROM ai_queue WHERE status='pending' LIMIT 20" + ) as cur: + rows = await cur.fetchall() + # Mark as running + if rows: + await db.executemany( + "UPDATE ai_queue SET status='running', created_at=created_at WHERE domain=?", + [(r[0],) for r in rows], + ) + await db.commit() + + if not rows: + await asyncio.sleep(3) + continue + + async def assess_one(domain: str): + try: + async with aiosqlite.connect(SQLITE_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + "SELECT * FROM enriched_domains WHERE domain=?", (domain,) + ) as cur: + row = await cur.fetchone() + if not row: + return + assessment = await gemini_assess(dict(row)) + await save_ai_assessment(domain, assessment) + except Exception as e: + async with aiosqlite.connect(SQLITE_PATH) as db: + await db.execute( + "UPDATE ai_queue SET status='failed', completed_at=datetime('now') WHERE domain=?", + (domain,), + ) + await db.commit() + logger.error("AI worker error %s: %s", domain, e) + + await asyncio.gather(*[asyncio.create_task(assess_one(r[0])) for r in rows], return_exceptions=True) def start_worker(): - global _worker_task + global _worker_task, _ai_worker_task if _worker_task is None or _worker_task.done(): _worker_task = asyncio.create_task(worker_loop()) - _paused = False + if _ai_worker_task is None or _ai_worker_task.done(): + _ai_worker_task = asyncio.create_task(ai_worker_loop()) def pause_worker(): diff --git a/app/main.py b/app/main.py index 57db25f..3f611b2 100644 --- a/app/main.py +++ b/app/main.py @@ -6,6 +6,7 @@ from contextlib import asynccontextmanager import httpx import aiosqlite +from typing import Optional from fastapi import FastAPI, Query from fastapi.responses import StreamingResponse, JSONResponse from fastapi.staticfiles import StaticFiles @@ -17,6 +18,7 @@ from app.db import ( DATA_DIR, PARQUET_PATH, SQLITE_PATH, init_db, get_stats, get_domains, get_enriched, queue_domains, get_queue_status, build_duckdb_index, index_status, + queue_ai, get_ai_queue_status, save_ai_assessment, ) from app.enricher import start_worker, pause_worker, resume_worker, is_running from app.scorer import run_scoring @@ -146,13 +148,53 @@ async def enriched( min_score: int = Query(0, ge=0, le=100), cms: str = Query(None), country: str = Query(None), + kit_digital: Optional[bool] = Query(None), page: int = Query(1, ge=1), limit: int = Query(100, ge=1, le=1000), ): - total, rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=page, limit=limit) + total, rows = await get_enriched( + min_score=min_score, cms=cms, country=country, + kit_digital=kit_digital, page=page, limit=limit, + ) return {"page": page, "limit": limit, "total": total, "results": rows} +# ── AI assessment endpoints ─────────────────────────────────────────────────── + +@app.post("/api/ai/assess/batch") +async def ai_assess_batch(body: dict): + domains_list = body.get("domains", []) + if not domains_list: + return JSONResponse({"error": "no domains provided"}, status_code=400) + await queue_ai(domains_list) + return {"queued": len(domains_list)} + + +@app.get("/api/ai/status") +async def ai_status(): + return await get_ai_queue_status() + + +@app.post("/api/ai/assess/single") +async def ai_assess_single(body: dict): + """Immediate (blocking) AI assessment of a single domain.""" + domain = body.get("domain") + if not domain: + return JSONResponse({"error": "no domain"}, status_code=400) + from app.replicate_ai import assess_domain as gemini_assess + async with aiosqlite.connect(SQLITE_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + "SELECT * FROM enriched_domains WHERE domain=?", (domain,) + ) as cur: + row = await cur.fetchone() + if not row: + return JSONResponse({"error": "domain not yet enriched"}, status_code=404) + assessment = await gemini_assess(dict(row)) + await save_ai_assessment(domain, assessment) + return assessment + + @app.get("/api/export") async def export_csv( min_score: int = Query(0), diff --git a/app/replicate_ai.py b/app/replicate_ai.py new file mode 100644 index 0000000..a680d0e --- /dev/null +++ b/app/replicate_ai.py @@ -0,0 +1,142 @@ +"""Replicate / Gemini integration for domain lead assessment.""" +import asyncio +import json +import logging +import os +import re +from typing import Optional + +import httpx + +logger = logging.getLogger(__name__) + +REPLICATE_TOKEN = os.getenv("REPLICATE_API_TOKEN", "r8_6kV2NWMQyPVB9JILHJprrXJJh4vWazA22Osyj") +REPLICATE_MODEL = "https://api.replicate.com/v1/models/google/gemini-3-pro/predictions" +AI_CONCURRENCY = int(os.getenv("AI_CONCURRENCY", "3")) + +_ai_sem: Optional[asyncio.Semaphore] = None + + +def _sem() -> asyncio.Semaphore: + global _ai_sem + if _ai_sem is None: + _ai_sem = asyncio.Semaphore(AI_CONCURRENCY) + return _ai_sem + + +def _build_prompt(row: dict) -> str: + kit_signals = row.get("kit_digital_signals") or "[]" + try: + sigs = json.loads(kit_signals) + kit_block = "\n".join(f" - {s}" for s in sigs) if sigs else " None detected" + except Exception: + kit_block = f" {kit_signals}" + + contact_raw = row.get("contact_info") or "{}" + try: + contacts = json.loads(contact_raw) + except Exception: + contacts = {} + + contact_block = [] + if contacts.get("emails"): + contact_block.append(f" Emails: {', '.join(contacts['emails'][:3])}") + if contacts.get("phones"): + contact_block.append(f" Phones: {', '.join(contacts['phones'][:3])}") + if contacts.get("whatsapp"): + contact_block.append(f" WhatsApp: {', '.join(contacts['whatsapp'][:2])}") + if contacts.get("social"): + contact_block.append(f" Social: {', '.join(contacts['social'][:4])}") + contact_str = "\n".join(contact_block) if contact_block else " None found" + + return f"""You are a sales intelligence analyst evaluating Spanish SME websites for IT services upsell. + +DOMAIN DATA: +- Domain: {row.get("domain")} +- Page title: {row.get("page_title") or "N/A"} +- CMS: {row.get("cms") or "unknown"} +- Server: {row.get("server") or "unknown"} +- Country: {row.get("ip_country") or "unknown"} +- SSL valid: {row.get("ssl_valid")}, expires in {row.get("ssl_expiry_days") or "?"} days +- Has email (MX): {bool(row.get("has_mx"))} +- Is live: {bool(row.get("is_live"))} +- Kit Digital signals found on page: +{kit_block} +- Contact channels found on page: +{contact_str} + +Kit Digital is a Spanish government program (up to €12k grants for SME digitalization). Sites that received it MUST display EU/digitalizadores logos. These businesses have proven they invest in IT services and may need follow-up: new website, SEO, hosting migration, security, maintenance contracts. + +Assess this lead and respond ONLY with valid JSON (no markdown, no explanation outside the JSON): +{{ + "is_local_sme": true/false, + "kit_digital_confirmed": true/false, + "kit_digital_reasoning": "1 sentence explaining why or why not", + "lead_quality": "HOT|WARM|COLD", + "lead_reasoning": "1-2 sentences on why this is a good/bad lead for IT services sales", + "best_contact_channel": "email|phone|whatsapp|social|web_form|unknown", + "best_contact_value": "the actual email/phone/URL to use, or empty string", + "pitch_angle": "One concrete opening sentence for a cold email or call in Spanish", + "services_likely_needed": ["service1", "service2"], + "outreach_notes": "Any useful context for the sales rep (language, business type, urgency)" +}}""" + + +def _parse_output(raw: str) -> dict: + """Extract JSON from Gemini text output.""" + text = re.sub(r"```(?:json)?", "", raw).strip().rstrip("`").strip() + m = re.search(r"\{[\s\S]+\}", text) + if m: + try: + return json.loads(m.group(0)) + except json.JSONDecodeError: + pass + return { + "raw": raw[:500], + "lead_quality": "COLD", + "best_contact_channel": "unknown", + "best_contact_value": "", + "parse_error": True, + } + + +async def assess_domain(row: dict) -> dict: + """Call Gemini via Replicate to assess a domain. Returns parsed assessment dict.""" + async with _sem(): + payload = { + "input": { + "prompt": _build_prompt(row), + "images": [], + "videos": [], + "top_p": 0.9, + "temperature": 0.2, + "thinking_level": "low", + "max_output_tokens": 1024, + } + } + try: + async with httpx.AsyncClient(timeout=90) as client: + resp = await client.post( + REPLICATE_MODEL, + headers={ + "Authorization": f"Bearer {REPLICATE_TOKEN}", + "Content-Type": "application/json", + "Prefer": "wait", + }, + json=payload, + ) + resp.raise_for_status() + data = resp.json() + + output = data.get("output", "") + if isinstance(output, list): + output = "".join(output) + + result = _parse_output(output) + logger.info("AI %s → %s / contact: %s", + row.get("domain"), result.get("lead_quality"), result.get("best_contact_channel")) + return result + + except Exception as e: + logger.error("Replicate error %s: %s", row.get("domain"), e) + return {"error": str(e)[:300], "lead_quality": "COLD", "best_contact_channel": "unknown", "best_contact_value": ""} diff --git a/app/scorer.py b/app/scorer.py index 126a4dd..92f591d 100644 --- a/app/scorer.py +++ b/app/scorer.py @@ -41,6 +41,9 @@ def score(domain_row: dict) -> int: s += 10 if local_biz_keywords(domain_row.get("page_title")): s += 5 + # Kit Digital: proven buyer of IT services + if domain_row.get("kit_digital"): + s += 20 return min(s, 100) diff --git a/app/static/index.html b/app/static/index.html index 92714d9..64b2e6c 100644 --- a/app/static/index.html +++ b/app/static/index.html @@ -7,169 +7,203 @@ -
+
+ + +

DomGod

- - - + + - +
-
Overview
-
-
Total Domains
in dataset
-
Enriched
-
Hot Leads
score ≥ 60
-
Queue Pending
-
Done / Failed
+
Overview
+
+
Total Domains
+
Enriched
+
Hot Leads
score ≥ 60
+
Kit Digital
detected
+
Queue
+
AI Queue
Browse & Filter
-
Enrichment Queue
-
Lead Pipeline
-
TLD Chart
+
Enrichment
+
Lead Pipeline
+
TLD Chart
-
-
- - -
-
- - -
-
- - -
-
- +
+
+
+
+
-
- - +
- - - + + + +
-
- - - + + - - - - matches - + + + +
-
+
- - - - - - - - - + + + -
DomainScoreCMSSSL daysCountryLiveServerStatusDomainScoreKDAIContactCMSSSL daysCountryLive