"""Replicate / Gemini integration — deep site assessment.""" import asyncio import json import logging import os import re from typing import Optional import httpx from bs4 import BeautifulSoup logger = logging.getLogger(__name__) REPLICATE_TOKEN = os.getenv("REPLICATE_API_TOKEN", "r8_7I7Feai78f9PzMOs20y5GVFKiLkgUWP463vZO") # override via env REPLICATE_MODEL = "https://api.replicate.com/v1/models/google/gemini-3-pro/predictions" AI_CONCURRENCY = int(os.getenv("AI_CONCURRENCY", "3")) _ai_sem: Optional[asyncio.Semaphore] = None def _sem() -> asyncio.Semaphore: global _ai_sem if _ai_sem is None: _ai_sem = asyncio.Semaphore(AI_CONCURRENCY) return _ai_sem async def _ddg_search(query: str) -> str: """DuckDuckGo HTML search — returns top snippet text, empty string on failure.""" try: async with httpx.AsyncClient( timeout=10, follow_redirects=True, headers={"User-Agent": "Mozilla/5.0 (compatible; DomGod/1.0)"}, ) as client: r = await client.get( "https://html.duckduckgo.com/html/", params={"q": query, "kl": "es-es"}, ) if r.status_code != 200: return "" soup = BeautifulSoup(r.text, "html.parser") parts = [] for res in soup.select(".result")[:4]: title = res.select_one(".result__a") snip = res.select_one(".result__snippet") url = res.select_one(".result__url") if snip: t = title.get_text(strip=True) if title else "" u = url.get_text(strip=True) if url else "" parts.append(f"[{u}] {t} — {snip.get_text(strip=True)}") return "\n".join(parts) except Exception as e: logger.debug("DDG search failed: %s", e) return "" def _build_prompt(a: dict, search_results: str = "", language: str = "ES") -> str: contacts_block = [] if a.get("emails"): contacts_block.append(f" Emails: {', '.join(a['emails'][:3])}") if a.get("phones"): contacts_block.append(f" Phones: {', '.join(a['phones'][:3])}") if a.get("whatsapp"): contacts_block.append(f" WhatsApp: {', '.join(a['whatsapp'][:2])}") if a.get("social_links"):contacts_block.append(f" Social: {', '.join(a['social_links'][:4])}") contacts_str = "\n".join(contacts_block) or " None found" kd_str = "\n".join(f" - {s}" for s in (a.get("kit_digital_signals") or [])) or " None" analytics = ", ".join(a.get("analytics_present") or []) or "none" webmaster = ", ".join(a.get("webmaster_verified") or []) or "none" lorem_str = ", ".join(a.get("lorem_matches") or []) or "none" ph_str = ", ".join(a.get("placeholder_matches") or []) or "none" snippet = (a.get("visible_text_snippet") or "")[:800] social_str = ", ".join(a.get("social_links") or []) or "none detected" gmb_str = f"✅ Found — {a.get('gmb_url','')}" if a.get("has_gmb") else "❌ Not detected" copyright_yr = a.get("copyright_year") or "not found" last_mod = a.get("last_modified") or "not found" eu_hosted = a.get("eu_hosted") hosting_flag = "✅ EU" if eu_hosted else ("❌ Non-EU" if eu_hosted is False else "unknown") return f"""You are a senior web consultant and IT sales analyst evaluating a Spanish/European SME website for IT services upsell. === TECHNICAL === Domain: {a.get("domain")} Reachable: {a.get("reachable")} | Status: {a.get("status_code")} | Load time: {a.get("load_time_ms")} ms Page size: {a.get("page_size_kb")} KB | Server: {a.get("server")} | CMS: {a.get("cms") or "unknown"} SSL: valid={a.get("ssl_valid")} expires_in={a.get("ssl_expiry_days")} days Mobile: viewport={a.get("has_mobile_viewport")} Words: {a.get("word_count")} | Images: {a.get("image_count")} | Scripts: {a.get("script_count")} === HOSTING & INFRASTRUCTURE === IP: {a.get("ip") or "unknown"} ASN: {a.get("asn") or "unknown"} Organisation: {a.get("org") or "unknown"} ISP: {a.get("isp") or "unknown"} Host country: {a.get("ip_country") or "unknown"} / {a.get("ip_region") or ""} EU hosted: {hosting_flag} === SEO & INDEXING === Title: {a.get("page_title") or "MISSING"} H1: {a.get("h1_text") or "MISSING"} Meta desc: {a.get("meta_description") or "MISSING"} Canonical: {a.get("canonical_url") or "not set"} Sitemap: {a.get("has_sitemap")} | Robots: {a.get("has_robots")} | Blocks Google: {a.get("robots_disallows_google")} Analytics: {analytics} Webmaster: {webmaster} === GDPR & LEGAL COMPLIANCE === Cookie tool: {a.get("cookie_tool") or "none detected"} Cookie notice: {a.get("has_cookie_notice")} Privacy policy: {a.get("has_privacy_policy")} GDPR text: {a.get("has_gdpr_text")} === ACCESSIBILITY (quick scan) === HTML lang attr: {a.get("html_lang") or "MISSING"} Images missing alt: {a.get("images_missing_alt")} Skip navigation link: {a.get("has_skip_nav")} Empty links: {a.get("empty_links")} Inputs without labels: {a.get("inputs_without_labels")} === CONTENT QUALITY & FRESHNESS === Lorem ipsum: {a.get("has_lorem_ipsum")} → {lorem_str} Placeholder: {a.get("has_placeholder")} → {ph_str} Copyright year: {copyright_yr} Last-Modified: {last_mod} === KIT DIGITAL (Spanish gov €12k SME grants — ONLY confirm if you see explicit Kit Digital / agente digitalizador branding) === Heuristic detected: {a.get("kit_digital")} {kd_str} === GOOGLE MY BUSINESS === GMB/Business Profile: {gmb_str} === SOCIAL MEDIA === Profiles found on site: {social_str} === CONTACT CHANNELS === {contacts_str} === PAGE TEXT SAMPLE === {snippet} === WEB SEARCH RESULTS (use to find contacts, verify business identity) === {(search_results or "No results.")[:600]} === OUTPUT LANGUAGE === Write pitch_angle, outreach_email, email_subject, and all human-readable text fields in: {language} (EN = English | ES = Spanish | RO = Romanian) === WHO WE ARE === We are a full-service digital agency. We handle EVERYTHING web-related for SMEs: new website builds, redesigns, landing pages, e-commerce, CMS migrations, speed optimisation, mobile responsiveness, SSL/security, SEO (on-page + technical + local), Google Ads, Google My Business setup & optimisation, social media management (Instagram, Facebook, LinkedIn, TikTok), GDPR compliance, cookie banners, accessibility fixes, hosting migrations, email setup, maintenance contracts, and AI-powered tools. No job is too small or too large. === ASSESSMENT RULES === Look at EVERY aspect of the site — quality, age, CMS, performance, SEO, GDPR, social presence, GMB, contacts, hosting — and identify ALL the problems AND opportunities. Then build the pitch around the most compelling angle for THIS specific business. Lead scoring guide: • HOT — blank/placeholder site, or ≥3 serious issues (expired SSL, no SEO, no mobile, lorem ipsum, non-EU hosting, no GDPR, no social, site >3 yrs old) • WARM — functional but clearly outdated or missing 1-2 key services • COLD — modern, well-maintained site with few obvious gaps MANDATORY for EVERY assessment — no exceptions: 1. pitch_angle: A single, compelling cold-outreach sentence in Spanish, personalised to this specific business name/type and its biggest weakness. Reference the actual problem. Examples of good pitches: - "Hola Salom Manacor, su web lleva sin actualizarse desde 2019 — en 3 semanas le entregamos una nueva web con ficha en Google, redes sociales y posicionamiento incluidos." - "Detectamos que la web de [Negocio] no aparece en Google Maps ni tiene perfil en Instagram — podemos solucionarlo esta semana." - "Su certificado SSL vence en 12 días y su web no tiene aviso de cookies legal — evite multas y pérdida de visitas con nuestro plan de mantenimiento." 2. outreach_email: A 3-4 sentence ready-to-send email in Spanish. First sentence names the business and the most urgent problem. Second sentence explains the impact (losing clients, Google ranking, legal risk). Third sentence introduces us as the solution. Close with a call to action (llamada de 15 min, presupuesto gratuito). Sign off: "Un saludo, [Agencia Digital]". 3. email_subject: A short, specific Spanish email subject line referencing the business name and main issue (e.g. "Web de Salom Manacor — propuesta de mejora"). 4. services_needed: At least 2 specific services from our catalogue. 5. Use WEB SEARCH RESULTS to find real phone/email for best_contact_value. 6. Use copyright_year + Last-Modified to estimate site_last_updated. 7. Keep all string values concise (≤ 20 words each). Arrays: max 4 items. Respond ONLY with valid JSON, no markdown fences, no text outside the JSON: {{ "lead_quality": "HOT|WARM|COLD", "lead_reasoning": "1-2 sentences", "pitch_angle": "1 punchy sentence in Spanish referencing the specific business problem", "outreach_email": "ready-to-send 3-4 sentence email in Spanish", "email_subject": "specific Spanish subject line", "services_needed": ["service1","service2"], "best_contact_channel": "email|phone|whatsapp|social|web_form|unknown", "best_contact_value": "real email/phone from page or search results", "all_contacts": {{"emails":[],"phones":[],"whatsapp":[],"social":[]}}, "summary": "2-3 sentence executive summary", "site_quality_score": <0-10>, "cms_detected": "wordpress|wix|custom|unknown", "site_last_updated": "year or estimate", "kit_digital_confirmed": true/false, "kit_digital_reasoning": "1 sentence", "has_gmb": true/false, "has_social_media": true/false, "is_local_sme": true/false, "urgency_signals": ["issue1","issue2"], "content_issues": ["issue1"], "accessibility_issues": ["issue1"], "performance_notes": "brief", "seo_status": "brief", "hosting_notes": "brief", "gdpr_compliance": "brief", "outreach_notes": "sales rep context" }}""" def _parse_output(raw: str) -> dict: text = re.sub(r"```(?:json)?", "", raw).strip().rstrip("`").strip() m = re.search(r"\{[\s\S]+\}", text) if m: candidate = m.group(0) try: return json.loads(candidate) except json.JSONDecodeError: # Truncated JSON: close any open arrays/objects and retry fixed = candidate # Count unclosed brackets depth_obj = fixed.count("{") - fixed.count("}") depth_arr = fixed.count("[") - fixed.count("]") # Strip trailing incomplete key-value (e.g. `,"foo": "bar` with no closing `"`) fixed = re.sub(r',\s*"[^"]*"?\s*:\s*[^,\}\]]*$', '', fixed) fixed += "]" * max(0, depth_arr) + "}" * max(0, depth_obj) try: return json.loads(fixed) except json.JSONDecodeError: pass logger.warning("Could not parse Gemini JSON output, raw: %s", raw[:300]) return { "summary": raw[:400] if raw.strip() else "AI assessment failed — no output.", "lead_quality": "COLD", "best_contact_channel": "unknown", "best_contact_value": "", "parse_error": True, } async def assess_domain(analysis: dict, language: str = "ES") -> dict: """Call Gemini with the full site analysis. Returns parsed assessment.""" async with _sem(): # Build search query from domain / page title for contact lookup domain = analysis.get("domain", "") title = analysis.get("page_title") or "" biz_name = title.split("|")[0].split("-")[0].strip() or domain search_query = f'"{biz_name}" {domain} contacto telefono email' search_results = await _ddg_search(search_query) logger.info("DDG search for %s → %d chars", domain, len(search_results)) payload = { "input": { "prompt": _build_prompt(analysis, search_results, language), "images": [], "videos": [], "top_p": 0.9, "temperature": 0.2, "thinking_level": "low", "max_output_tokens": 6000, } } try: async with httpx.AsyncClient(timeout=120) as client: resp = await client.post( REPLICATE_MODEL, headers={ "Authorization": f"Bearer {REPLICATE_TOKEN}", "Content-Type": "application/json", "Prefer": "wait", }, json=payload, ) resp.raise_for_status() data = resp.json() output = data.get("output", "") if isinstance(output, list): output = "".join(output) result = _parse_output(output) logger.info("AI %s → %s (quality %s)", analysis.get("domain"), result.get("lead_quality"), result.get("site_quality_score")) return result except Exception as e: logger.error("Replicate error %s: %s", analysis.get("domain"), e) return { "error": str(e)[:300], "lead_quality": "COLD", "best_contact_channel": "unknown", "best_contact_value": "", }