"""Replicate / Gemini integration — deep site assessment.""" import asyncio import json import logging import os import re from typing import Optional import httpx logger = logging.getLogger(__name__) REPLICATE_TOKEN = os.getenv("REPLICATE_API_TOKEN", "r8_7I7Feai78f9PzMOs20y5GVFKiLkgUWP463vZO") # override via env REPLICATE_MODEL = "https://api.replicate.com/v1/models/google/gemini-3-pro/predictions" AI_CONCURRENCY = int(os.getenv("AI_CONCURRENCY", "3")) _ai_sem: Optional[asyncio.Semaphore] = None def _sem() -> asyncio.Semaphore: global _ai_sem if _ai_sem is None: _ai_sem = asyncio.Semaphore(AI_CONCURRENCY) return _ai_sem def _build_prompt(a: dict) -> str: contacts_block = [] if a.get("emails"): contacts_block.append(f" Emails: {', '.join(a['emails'][:3])}") if a.get("phones"): contacts_block.append(f" Phones: {', '.join(a['phones'][:3])}") if a.get("whatsapp"): contacts_block.append(f" WhatsApp: {', '.join(a['whatsapp'][:2])}") if a.get("social_links"):contacts_block.append(f" Social: {', '.join(a['social_links'][:4])}") contacts_str = "\n".join(contacts_block) or " None found" kd_str = "\n".join(f" - {s}" for s in (a.get("kit_digital_signals") or [])) or " None" analytics = ", ".join(a.get("analytics_present") or []) or "none" webmaster = ", ".join(a.get("webmaster_verified") or []) or "none" lorem_str = ", ".join(a.get("lorem_matches") or []) or "none" ph_str = ", ".join(a.get("placeholder_matches") or []) or "none" snippet = (a.get("visible_text_snippet") or "")[:2000] eu_hosted = a.get("eu_hosted") hosting_flag = "✅ EU" if eu_hosted else ("❌ Non-EU" if eu_hosted is False else "unknown") return f"""You are a senior web consultant and IT sales analyst evaluating a Spanish/European SME website for IT services upsell. === TECHNICAL === Domain: {a.get("domain")} Reachable: {a.get("reachable")} | Status: {a.get("status_code")} | Load time: {a.get("load_time_ms")} ms Page size: {a.get("page_size_kb")} KB | Server: {a.get("server")} | CMS: {a.get("cms") or "unknown"} SSL: valid={a.get("ssl_valid")} expires_in={a.get("ssl_expiry_days")} days Mobile: viewport={a.get("has_mobile_viewport")} Words: {a.get("word_count")} | Images: {a.get("image_count")} | Scripts: {a.get("script_count")} === HOSTING & INFRASTRUCTURE === IP: {a.get("ip") or "unknown"} ASN: {a.get("asn") or "unknown"} Organisation: {a.get("org") or "unknown"} ISP: {a.get("isp") or "unknown"} Host country: {a.get("ip_country") or "unknown"} / {a.get("ip_region") or ""} EU hosted: {hosting_flag} === SEO & INDEXING === Title: {a.get("page_title") or "MISSING"} H1: {a.get("h1_text") or "MISSING"} Meta desc: {a.get("meta_description") or "MISSING"} Canonical: {a.get("canonical_url") or "not set"} Sitemap: {a.get("has_sitemap")} | Robots: {a.get("has_robots")} | Blocks Google: {a.get("robots_disallows_google")} Analytics: {analytics} Webmaster: {webmaster} === GDPR & LEGAL COMPLIANCE === Cookie tool: {a.get("cookie_tool") or "none detected"} Cookie notice: {a.get("has_cookie_notice")} Privacy policy: {a.get("has_privacy_policy")} GDPR text: {a.get("has_gdpr_text")} === ACCESSIBILITY (quick scan) === HTML lang attr: {a.get("html_lang") or "MISSING"} Images missing alt: {a.get("images_missing_alt")} Skip navigation link: {a.get("has_skip_nav")} Empty links: {a.get("empty_links")} Inputs without labels: {a.get("inputs_without_labels")} === CONTENT QUALITY === Lorem ipsum: {a.get("has_lorem_ipsum")} → {lorem_str} Placeholder: {a.get("has_placeholder")} → {ph_str} === KIT DIGITAL (Spanish gov €12k SME grants — sites must show EU logos) === Detected: {a.get("kit_digital")} {kd_str} === CONTACT CHANNELS === {contacts_str} === PAGE TEXT SAMPLE === {snippet} === INSTRUCTIONS === The client sells: web redesign, SEO, hosting migration, SSL renewal, security audits, GDPR compliance, accessibility fixes, Google Ads, maintenance contracts, AI tools for SMEs. Respond ONLY with valid JSON, no markdown fences, no text outside the JSON: {{ "summary": "2-3 sentence executive summary of the site's state", "site_quality_score": <0-10>, "content_issues": ["specific issues found in page content"], "performance_notes": "load time, size, mobile assessment", "seo_status": "SEO health — what's missing or broken", "hosting_notes": "ASN/ISP name, EU vs non-EU, any concerns", "gdpr_compliance": "cookie banner status, privacy policy, GDPR gaps", "accessibility_issues": ["specific a11y problems found"], "kit_digital_confirmed": true/false, "kit_digital_reasoning": "1 sentence", "is_local_sme": true/false, "lead_quality": "HOT|WARM|COLD", "lead_reasoning": "1-2 sentences", "best_contact_channel": "email|phone|whatsapp|social|web_form|unknown", "best_contact_value": "actual email/phone/URL or empty string", "all_contacts": {{"emails":[],"phones":[],"whatsapp":[],"social":[]}}, "pitch_angle": "1 cold-outreach sentence in Spanish", "services_needed": ["service1","service2"], "urgency_signals": ["specific urgent issues like 'SSL expires in 12 days', 'lorem ipsum on homepage', 'no cookie banner'"], "outreach_notes": "sales rep context" }}""" def _parse_output(raw: str) -> dict: text = re.sub(r"```(?:json)?", "", raw).strip().rstrip("`").strip() m = re.search(r"\{[\s\S]+\}", text) if m: try: return json.loads(m.group(0)) except json.JSONDecodeError: pass logger.warning("Could not parse Gemini JSON output, raw: %s", raw[:300]) return { "summary": raw[:400], "lead_quality": "COLD", "best_contact_channel": "unknown", "best_contact_value": "", "parse_error": True, } async def assess_domain(analysis: dict) -> dict: """Call Gemini with the full site analysis. Returns parsed assessment.""" async with _sem(): payload = { "input": { "prompt": _build_prompt(analysis), "images": [], "videos": [], "top_p": 0.9, "temperature": 0.2, "thinking_level": "low", "max_output_tokens": 2048, } } try: async with httpx.AsyncClient(timeout=120) as client: resp = await client.post( REPLICATE_MODEL, headers={ "Authorization": f"Bearer {REPLICATE_TOKEN}", "Content-Type": "application/json", "Prefer": "wait", }, json=payload, ) resp.raise_for_status() data = resp.json() output = data.get("output", "") if isinstance(output, list): output = "".join(output) result = _parse_output(output) logger.info("AI %s → %s (quality %s)", analysis.get("domain"), result.get("lead_quality"), result.get("site_quality_score")) return result except Exception as e: logger.error("Replicate error %s: %s", analysis.get("domain"), e) return { "error": str(e)[:300], "lead_quality": "COLD", "best_contact_channel": "unknown", "best_contact_value": "", }