"""Replicate / Gemini integration for domain lead assessment.""" import asyncio import json import logging import os import re from typing import Optional import httpx logger = logging.getLogger(__name__) REPLICATE_TOKEN = os.getenv("REPLICATE_API_TOKEN", "r8_6kV2NWMQyPVB9JILHJprrXJJh4vWazA22Osyj") REPLICATE_MODEL = "https://api.replicate.com/v1/models/google/gemini-3-pro/predictions" AI_CONCURRENCY = int(os.getenv("AI_CONCURRENCY", "3")) _ai_sem: Optional[asyncio.Semaphore] = None def _sem() -> asyncio.Semaphore: global _ai_sem if _ai_sem is None: _ai_sem = asyncio.Semaphore(AI_CONCURRENCY) return _ai_sem def _build_prompt(row: dict) -> str: kit_signals = row.get("kit_digital_signals") or "[]" try: sigs = json.loads(kit_signals) kit_block = "\n".join(f" - {s}" for s in sigs) if sigs else " None detected" except Exception: kit_block = f" {kit_signals}" contact_raw = row.get("contact_info") or "{}" try: contacts = json.loads(contact_raw) except Exception: contacts = {} contact_block = [] if contacts.get("emails"): contact_block.append(f" Emails: {', '.join(contacts['emails'][:3])}") if contacts.get("phones"): contact_block.append(f" Phones: {', '.join(contacts['phones'][:3])}") if contacts.get("whatsapp"): contact_block.append(f" WhatsApp: {', '.join(contacts['whatsapp'][:2])}") if contacts.get("social"): contact_block.append(f" Social: {', '.join(contacts['social'][:4])}") contact_str = "\n".join(contact_block) if contact_block else " None found" return f"""You are a sales intelligence analyst evaluating Spanish SME websites for IT services upsell. DOMAIN DATA: - Domain: {row.get("domain")} - Page title: {row.get("page_title") or "N/A"} - CMS: {row.get("cms") or "unknown"} - Server: {row.get("server") or "unknown"} - Country: {row.get("ip_country") or "unknown"} - SSL valid: {row.get("ssl_valid")}, expires in {row.get("ssl_expiry_days") or "?"} days - Has email (MX): {bool(row.get("has_mx"))} - Is live: {bool(row.get("is_live"))} - Kit Digital signals found on page: {kit_block} - Contact channels found on page: {contact_str} Kit Digital is a Spanish government program (up to €12k grants for SME digitalization). Sites that received it MUST display EU/digitalizadores logos. These businesses have proven they invest in IT services and may need follow-up: new website, SEO, hosting migration, security, maintenance contracts. Assess this lead and respond ONLY with valid JSON (no markdown, no explanation outside the JSON): {{ "is_local_sme": true/false, "kit_digital_confirmed": true/false, "kit_digital_reasoning": "1 sentence explaining why or why not", "lead_quality": "HOT|WARM|COLD", "lead_reasoning": "1-2 sentences on why this is a good/bad lead for IT services sales", "best_contact_channel": "email|phone|whatsapp|social|web_form|unknown", "best_contact_value": "the actual email/phone/URL to use, or empty string", "pitch_angle": "One concrete opening sentence for a cold email or call in Spanish", "services_likely_needed": ["service1", "service2"], "outreach_notes": "Any useful context for the sales rep (language, business type, urgency)" }}""" def _parse_output(raw: str) -> dict: """Extract JSON from Gemini text output.""" text = re.sub(r"```(?:json)?", "", raw).strip().rstrip("`").strip() m = re.search(r"\{[\s\S]+\}", text) if m: try: return json.loads(m.group(0)) except json.JSONDecodeError: pass return { "raw": raw[:500], "lead_quality": "COLD", "best_contact_channel": "unknown", "best_contact_value": "", "parse_error": True, } async def assess_domain(row: dict) -> dict: """Call Gemini via Replicate to assess a domain. Returns parsed assessment dict.""" async with _sem(): payload = { "input": { "prompt": _build_prompt(row), "images": [], "videos": [], "top_p": 0.9, "temperature": 0.2, "thinking_level": "low", "max_output_tokens": 1024, } } try: async with httpx.AsyncClient(timeout=90) as client: resp = await client.post( REPLICATE_MODEL, headers={ "Authorization": f"Bearer {REPLICATE_TOKEN}", "Content-Type": "application/json", "Prefer": "wait", }, json=payload, ) resp.raise_for_status() data = resp.json() output = data.get("output", "") if isinstance(output, list): output = "".join(output) result = _parse_output(output) logger.info("AI %s → %s / contact: %s", row.get("domain"), result.get("lead_quality"), result.get("best_contact_channel")) return result except Exception as e: logger.error("Replicate error %s: %s", row.get("domain"), e) return {"error": str(e)[:300], "lead_quality": "COLD", "best_contact_channel": "unknown", "best_contact_value": ""}