"""Replicate / Gemini integration — deep site assessment.""" import asyncio import json import logging import os import re from typing import Optional import httpx from bs4 import BeautifulSoup logger = logging.getLogger(__name__) REPLICATE_TOKEN = os.getenv("REPLICATE_API_TOKEN", "r8_7I7Feai78f9PzMOs20y5GVFKiLkgUWP463vZO") # override via env REPLICATE_MODEL = "https://api.replicate.com/v1/models/google/gemini-3-pro/predictions" AI_CONCURRENCY = int(os.getenv("AI_CONCURRENCY", "3")) _ai_sem: Optional[asyncio.Semaphore] = None def _sem() -> asyncio.Semaphore: global _ai_sem if _ai_sem is None: _ai_sem = asyncio.Semaphore(AI_CONCURRENCY) return _ai_sem async def _ddg_search(query: str) -> str: """DuckDuckGo HTML search — returns top snippet text, empty string on failure.""" try: async with httpx.AsyncClient( timeout=10, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 (compatible; DomGod/1.0)"}, ) as client: r = await client.get( "https://html.duckduckgo.com/html/", params={"q": query, "kl": "es-es"}, ) if r.status_code != 200: return "" soup = BeautifulSoup(r.text, "html.parser") parts = [] for res in soup.select(".result")[:4]: title = res.select_one(".result__a") snip = res.select_one(".result__snippet") url = res.select_one(".result__url") if snip: t = title.get_text(strip=True) if title else "" u = url.get_text(strip=True) if url else "" parts.append(f"[{u}] {t} — {snip.get_text(strip=True)}") return "\n".join(parts) except Exception as e: logger.debug("DDG search failed: %s", e) return "" def _build_prompt(a: dict, search_results: str = "") -> str: contacts_block = [] if a.get("emails"): contacts_block.append(f" Emails: {', '.join(a['emails'][:3])}") if a.get("phones"): contacts_block.append(f" Phones: {', '.join(a['phones'][:3])}") if a.get("whatsapp"): contacts_block.append(f" WhatsApp: {', '.join(a['whatsapp'][:2])}") if a.get("social_links"):contacts_block.append(f" Social: {', '.join(a['social_links'][:4])}") contacts_str = "\n".join(contacts_block) or " None found" kd_str = "\n".join(f" - {s}" for s in (a.get("kit_digital_signals") or [])) or " None" analytics = ", ".join(a.get("analytics_present") or []) or "none" webmaster = ", ".join(a.get("webmaster_verified") or []) or "none" lorem_str = ", ".join(a.get("lorem_matches") or []) or "none" ph_str = ", ".join(a.get("placeholder_matches") or []) or "none" snippet = (a.get("visible_text_snippet") or "")[:800] social_str = ", ".join(a.get("social_links") or []) or "none detected" gmb_str = f"✅ Found — {a.get('gmb_url','')}" if a.get("has_gmb") else "❌ Not detected" copyright_yr = a.get("copyright_year") or "not found" last_mod = a.get("last_modified") or "not found" eu_hosted = a.get("eu_hosted") hosting_flag = "✅ EU" if eu_hosted else ("❌ Non-EU" if eu_hosted is False else "unknown") return f"""You are a senior web consultant and IT sales analyst evaluating a Spanish/European SME website for IT services upsell. === TECHNICAL === Domain: {a.get("domain")} Reachable: {a.get("reachable")} | Status: {a.get("status_code")} | Load time: {a.get("load_time_ms")} ms Page size: {a.get("page_size_kb")} KB | Server: {a.get("server")} | CMS: {a.get("cms") or "unknown"} SSL: valid={a.get("ssl_valid")} expires_in={a.get("ssl_expiry_days")} days Mobile: viewport={a.get("has_mobile_viewport")} Words: {a.get("word_count")} | Images: {a.get("image_count")} | Scripts: {a.get("script_count")} === HOSTING & INFRASTRUCTURE === IP: {a.get("ip") or "unknown"} ASN: {a.get("asn") or "unknown"} Organisation: {a.get("org") or "unknown"} ISP: {a.get("isp") or "unknown"} Host country: {a.get("ip_country") or "unknown"} / {a.get("ip_region") or ""} EU hosted: {hosting_flag} === SEO & INDEXING === Title: {a.get("page_title") or "MISSING"} H1: {a.get("h1_text") or "MISSING"} Meta desc: {a.get("meta_description") or "MISSING"} Canonical: {a.get("canonical_url") or "not set"} Sitemap: {a.get("has_sitemap")} | Robots: {a.get("has_robots")} | Blocks Google: {a.get("robots_disallows_google")} Analytics: {analytics} Webmaster: {webmaster} === GDPR & LEGAL COMPLIANCE === Cookie tool: {a.get("cookie_tool") or "none detected"} Cookie notice: {a.get("has_cookie_notice")} Privacy policy: {a.get("has_privacy_policy")} GDPR text: {a.get("has_gdpr_text")} === ACCESSIBILITY (quick scan) === HTML lang attr: {a.get("html_lang") or "MISSING"} Images missing alt: {a.get("images_missing_alt")} Skip navigation link: {a.get("has_skip_nav")} Empty links: {a.get("empty_links")} Inputs without labels: {a.get("inputs_without_labels")} === CONTENT QUALITY & FRESHNESS === Lorem ipsum: {a.get("has_lorem_ipsum")} → {lorem_str} Placeholder: {a.get("has_placeholder")} → {ph_str} Copyright year: {copyright_yr} Last-Modified: {last_mod} === KIT DIGITAL (Spanish gov €12k SME grants — ONLY confirm if you see explicit Kit Digital / agente digitalizador branding) === Heuristic detected: {a.get("kit_digital")} {kd_str} === GOOGLE MY BUSINESS === GMB/Business Profile: {gmb_str} === SOCIAL MEDIA === Profiles found on site: {social_str} === CONTACT CHANNELS === {contacts_str} === PAGE TEXT SAMPLE === {snippet} === WEB SEARCH RESULTS (use to find contacts, verify business identity) === {(search_results or "No results.")[:600]} === INSTRUCTIONS === The client sells: web redesign, SEO, hosting migration, SSL renewal, security audits, GDPR compliance, accessibility fixes, Google Ads, maintenance contracts, AI tools for SMEs, GMB setup, social media management. RULES — you MUST follow all of these: 1. A placeholder / minimal / blank site (few words, no images, no CMS) is one of the BEST leads — they need a complete website build + all digital services. Score it lead_quality=HOT or WARM and write an enthusiastic pitch. 2. pitch_angle is MANDATORY. Never leave it empty. Write 1 punchy Spanish sentence tailored to the business type. Even "Hola, su web necesita una renovación completa — podemos tenerla lista en 2 semanas." is better than nothing. 3. services_needed must list at LEAST 2 services. For a blank/placeholder site always include "diseño web" and "posicionamiento SEO". 4. Use the WEB SEARCH RESULTS to find the real phone/email — put the best one in best_contact_value. 5. Use copyright_year + Last-Modified to estimate site_last_updated. 6. Keep every string value SHORT (≤ 15 words). Arrays: max 4 items. This keeps the JSON small and avoids truncation. Respond ONLY with valid JSON, no markdown fences, no text outside the JSON: {{ "lead_quality": "HOT|WARM|COLD", "lead_reasoning": "1-2 sentences why", "pitch_angle": "1 punchy cold-outreach sentence in Spanish — NEVER empty", "services_needed": ["service1","service2"], "best_contact_channel": "email|phone|whatsapp|social|web_form|unknown", "best_contact_value": "real email/phone from page or search results", "all_contacts": {{"emails":[],"phones":[],"whatsapp":[],"social":[]}}, "summary": "2-3 sentence executive summary", "site_quality_score": <0-10>, "cms_detected": "wordpress|wix|custom|unknown", "site_last_updated": "year or estimate", "kit_digital_confirmed": true/false, "kit_digital_reasoning": "1 sentence", "has_gmb": true/false, "has_social_media": true/false, "is_local_sme": true/false, "urgency_signals": ["issue1","issue2"], "content_issues": ["issue1"], "accessibility_issues": ["issue1"], "performance_notes": "brief", "seo_status": "brief", "hosting_notes": "brief", "gdpr_compliance": "brief", "outreach_notes": "sales rep context" }}""" def _parse_output(raw: str) -> dict: text = re.sub(r"```(?:json)?", "", raw).strip().rstrip("`").strip() m = re.search(r"\{[\s\S]+\}", text) if m: candidate = m.group(0) try: return json.loads(candidate) except json.JSONDecodeError: # Truncated JSON: close any open arrays/objects and retry fixed = candidate # Count unclosed brackets depth_obj = fixed.count("{") - fixed.count("}") depth_arr = fixed.count("[") - fixed.count("]") # Strip trailing incomplete key-value (e.g. `,"foo": "bar` with no closing `"`) fixed = re.sub(r',\s*"[^"]*"?\s*:\s*[^,\}\]]*$', '', fixed) fixed += "]" * max(0, depth_arr) + "}" * max(0, depth_obj) try: return json.loads(fixed) except json.JSONDecodeError: pass logger.warning("Could not parse Gemini JSON output, raw: %s", raw[:300]) return { "summary": raw[:400] if raw.strip() else "AI assessment failed — no output.", "lead_quality": "COLD", "best_contact_channel": "unknown", "best_contact_value": "", "parse_error": True, } async def assess_domain(analysis: dict) -> dict: """Call Gemini with the full site analysis. Returns parsed assessment.""" async with _sem(): # Build search query from domain / page title for contact lookup domain = analysis.get("domain", "") title = analysis.get("page_title") or "" biz_name = title.split("|")[0].split("-")[0].strip() or domain search_query = f'"{biz_name}" {domain} contacto telefono email' search_results = await _ddg_search(search_query) logger.info("DDG search for %s → %d chars", domain, len(search_results)) payload = { "input": { "prompt": _build_prompt(analysis, search_results), "images": [], "videos": [], "top_p": 0.9, "temperature": 0.2, "thinking_level": "low", "max_output_tokens": 4096, } } try: async with httpx.AsyncClient(timeout=120) as client: resp = await client.post( REPLICATE_MODEL, headers={ "Authorization": f"Bearer {REPLICATE_TOKEN}", "Content-Type": "application/json", "Prefer": "wait", }, json=payload, ) resp.raise_for_status() data = resp.json() output = data.get("output", "") if isinstance(output, list): output = "".join(output) result = _parse_output(output) logger.info("AI %s → %s (quality %s)", analysis.get("domain"), result.get("lead_quality"), result.get("site_quality_score")) return result except Exception as e: logger.error("Replicate error %s: %s", analysis.get("domain"), e) return { "error": str(e)[:300], "lead_quality": "COLD", "best_contact_channel": "unknown", "best_contact_value": "", }