"""Replicate / Gemini integration — deep site assessment.""" import asyncio import json import logging import os import re from typing import Optional import httpx from bs4 import BeautifulSoup logger = logging.getLogger(__name__) REPLICATE_TOKEN = os.getenv("REPLICATE_API_TOKEN", "r8_7I7Feai78f9PzMOs20y5GVFKiLkgUWP463vZO") # override via env REPLICATE_MODEL = "https://api.replicate.com/v1/models/google/gemini-3-pro/predictions" AI_CONCURRENCY = int(os.getenv("AI_CONCURRENCY", "3")) _ai_sem: Optional[asyncio.Semaphore] = None def _sem() -> asyncio.Semaphore: global _ai_sem if _ai_sem is None: _ai_sem = asyncio.Semaphore(AI_CONCURRENCY) return _ai_sem async def _ddg_search(query: str) -> str: """DuckDuckGo HTML search — returns top snippet text, empty string on failure.""" try: async with httpx.AsyncClient( timeout=10, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 (compatible; DomGod/1.0)"}, ) as client: r = await client.get( "https://html.duckduckgo.com/html/", params={"q": query, "kl": "es-es"}, ) if r.status_code != 200: return "" soup = BeautifulSoup(r.text, "html.parser") parts = [] for res in soup.select(".result")[:4]: title = res.select_one(".result__a") snip = res.select_one(".result__snippet") url = res.select_one(".result__url") if snip: t = title.get_text(strip=True) if title else "" u = url.get_text(strip=True) if url else "" parts.append(f"[{u}] {t} — {snip.get_text(strip=True)}") return "\n".join(parts) except Exception as e: logger.debug("DDG search failed: %s", e) return "" def _build_prompt(a: dict, search_results: str = "") -> str: contacts_block = [] if a.get("emails"): contacts_block.append(f" Emails: {', '.join(a['emails'][:3])}") if a.get("phones"): contacts_block.append(f" Phones: {', '.join(a['phones'][:3])}") if a.get("whatsapp"): contacts_block.append(f" WhatsApp: {', '.join(a['whatsapp'][:2])}") if a.get("social_links"):contacts_block.append(f" Social: {', '.join(a['social_links'][:4])}") contacts_str = "\n".join(contacts_block) or " None found" kd_str = "\n".join(f" - {s}" for s in (a.get("kit_digital_signals") or [])) or " None" analytics = ", ".join(a.get("analytics_present") or []) or "none" webmaster = ", ".join(a.get("webmaster_verified") or []) or "none" lorem_str = ", ".join(a.get("lorem_matches") or []) or "none" ph_str = ", ".join(a.get("placeholder_matches") or []) or "none" snippet = (a.get("visible_text_snippet") or "")[:2000] social_str = ", ".join(a.get("social_links") or []) or "none detected" gmb_str = f"✅ Found — {a.get('gmb_url','')}" if a.get("has_gmb") else "❌ Not detected" copyright_yr = a.get("copyright_year") or "not found" last_mod = a.get("last_modified") or "not found" eu_hosted = a.get("eu_hosted") hosting_flag = "✅ EU" if eu_hosted else ("❌ Non-EU" if eu_hosted is False else "unknown") return f"""You are a senior web consultant and IT sales analyst evaluating a Spanish/European SME website for IT services upsell. === TECHNICAL === Domain: {a.get("domain")} Reachable: {a.get("reachable")} | Status: {a.get("status_code")} | Load time: {a.get("load_time_ms")} ms Page size: {a.get("page_size_kb")} KB | Server: {a.get("server")} | CMS: {a.get("cms") or "unknown"} SSL: valid={a.get("ssl_valid")} expires_in={a.get("ssl_expiry_days")} days Mobile: viewport={a.get("has_mobile_viewport")} Words: {a.get("word_count")} | Images: {a.get("image_count")} | Scripts: {a.get("script_count")} === HOSTING & INFRASTRUCTURE === IP: {a.get("ip") or "unknown"} ASN: {a.get("asn") or "unknown"} Organisation: {a.get("org") or "unknown"} ISP: {a.get("isp") or "unknown"} Host country: {a.get("ip_country") or "unknown"} / {a.get("ip_region") or ""} EU hosted: {hosting_flag} === SEO & INDEXING === Title: {a.get("page_title") or "MISSING"} H1: {a.get("h1_text") or "MISSING"} Meta desc: {a.get("meta_description") or "MISSING"} Canonical: {a.get("canonical_url") or "not set"} Sitemap: {a.get("has_sitemap")} | Robots: {a.get("has_robots")} | Blocks Google: {a.get("robots_disallows_google")} Analytics: {analytics} Webmaster: {webmaster} === GDPR & LEGAL COMPLIANCE === Cookie tool: {a.get("cookie_tool") or "none detected"} Cookie notice: {a.get("has_cookie_notice")} Privacy policy: {a.get("has_privacy_policy")} GDPR text: {a.get("has_gdpr_text")} === ACCESSIBILITY (quick scan) === HTML lang attr: {a.get("html_lang") or "MISSING"} Images missing alt: {a.get("images_missing_alt")} Skip navigation link: {a.get("has_skip_nav")} Empty links: {a.get("empty_links")} Inputs without labels: {a.get("inputs_without_labels")} === CONTENT QUALITY & FRESHNESS === Lorem ipsum: {a.get("has_lorem_ipsum")} → {lorem_str} Placeholder: {a.get("has_placeholder")} → {ph_str} Copyright year: {copyright_yr} Last-Modified: {last_mod} === KIT DIGITAL (Spanish gov €12k SME grants — ONLY confirm if you see explicit Kit Digital / agente digitalizador branding) === Heuristic detected: {a.get("kit_digital")} {kd_str} === GOOGLE MY BUSINESS === GMB/Business Profile: {gmb_str} === SOCIAL MEDIA === Profiles found on site: {social_str} === CONTACT CHANNELS === {contacts_str} === PAGE TEXT SAMPLE === {snippet} === WEB SEARCH RESULTS (use these to find contact info, verify business details) === {search_results if search_results else "No search results available."} === INSTRUCTIONS === The client sells: web redesign, SEO, hosting migration, SSL renewal, security audits, GDPR compliance, accessibility fixes, Google Ads, maintenance contracts, AI tools for SMEs, Google My Business setup/optimisation, social media management (Instagram, Facebook, LinkedIn, TikTok). IMPORTANT — use the WEB SEARCH RESULTS above to: 1. Find any phone numbers, emails, or WhatsApp not visible on the homepage. 2. Identify the business owner name if available. 3. Populate best_contact_value with a real phone/email you found. 4. Use the copyright year and Last-Modified date to estimate when the site was last updated. 5. Determine the actual CMS from code signals and visible text (not just the heuristic). Respond ONLY with valid JSON, no markdown fences, no text outside the JSON: {{ "summary": "2-3 sentence executive summary of the site's state", "site_quality_score": <0-10>, "content_issues": ["specific issues found in page content"], "performance_notes": "load time, size, mobile assessment", "seo_status": "SEO health — what's missing or broken", "hosting_notes": "ASN/ISP name, EU vs non-EU, any concerns", "gdpr_compliance": "cookie banner status, privacy policy, GDPR gaps", "accessibility_issues": ["specific a11y problems found"], "cms_detected": "wordpress|wix|squarespace|custom|unknown", "site_last_updated": "year or estimate, e.g. '2019' or 'likely 2021'", "kit_digital_confirmed": true/false, "has_gmb": true/false, "has_social_media": true/false, "kit_digital_reasoning": "1 sentence", "is_local_sme": true/false, "lead_quality": "HOT|WARM|COLD", "lead_reasoning": "1-2 sentences", "best_contact_channel": "email|phone|whatsapp|social|web_form|unknown", "best_contact_value": "actual email/phone/URL or empty string", "all_contacts": {{"emails":[],"phones":[],"whatsapp":[],"social":[]}}, "pitch_angle": "1 cold-outreach sentence in Spanish", "services_needed": ["service1","service2"], "urgency_signals": ["specific urgent issues like 'SSL expires in 12 days', 'lorem ipsum on homepage', 'no cookie banner'"], "outreach_notes": "sales rep context" }}""" def _parse_output(raw: str) -> dict: text = re.sub(r"```(?:json)?", "", raw).strip().rstrip("`").strip() m = re.search(r"\{[\s\S]+\}", text) if m: try: return json.loads(m.group(0)) except json.JSONDecodeError: pass logger.warning("Could not parse Gemini JSON output, raw: %s", raw[:300]) return { "summary": raw[:400], "lead_quality": "COLD", "best_contact_channel": "unknown", "best_contact_value": "", "parse_error": True, } async def assess_domain(analysis: dict) -> dict: """Call Gemini with the full site analysis. Returns parsed assessment.""" async with _sem(): # Build search query from domain / page title for contact lookup domain = analysis.get("domain", "") title = analysis.get("page_title") or "" biz_name = title.split("|")[0].split("-")[0].strip() or domain search_query = f'"{biz_name}" {domain} contacto telefono email' search_results = await _ddg_search(search_query) logger.info("DDG search for %s → %d chars", domain, len(search_results)) payload = { "input": { "prompt": _build_prompt(analysis, search_results), "images": [], "videos": [], "top_p": 0.9, "temperature": 0.2, "thinking_level": "low", "max_output_tokens": 2048, } } try: async with httpx.AsyncClient(timeout=120) as client: resp = await client.post( REPLICATE_MODEL, headers={ "Authorization": f"Bearer {REPLICATE_TOKEN}", "Content-Type": "application/json", "Prefer": "wait", }, json=payload, ) resp.raise_for_status() data = resp.json() output = data.get("output", "") if isinstance(output, list): output = "".join(output) result = _parse_output(output) logger.info("AI %s → %s (quality %s)", analysis.get("domain"), result.get("lead_quality"), result.get("site_quality_score")) return result except Exception as e: logger.error("Replicate error %s: %s", analysis.get("domain"), e) return { "error": str(e)[:300], "lead_quality": "COLD", "best_contact_channel": "unknown", "best_contact_value": "", }