feat: deep site analysis engine + fix AI assess for any domain

site_analyzer.py (new):
- Fresh scrape with timing, page size, server, CMS detection
- Lorem ipsum detection (16 phrases incl. user's example)
- Placeholder content detection (hello world, sample page, etc.)
- Analytics: GA4, GTM, Facebook Pixel, Hotjar, Clarity
- Webmaster: Google Search Console, Bing, Yandex verification tags
- sitemap.xml and robots.txt check + Googlebot block detection
- Mobile viewport check, word count, image/script count
- Full contact extraction: emails, phones, WhatsApp, social links
- Kit Digital signal detection

AI worker fix:
- No longer requires pre-enrichment — works on ANY selected domain
- Does fresh site_analyzer scrape then calls Gemini with full context
- Stores site_analysis JSON alongside AI assessment
- Upserts into enriched_domains even if domain was never enriched

Gemini prompt now includes:
- Complete technical snapshot (load time, size, server, SSL)
- Full SEO signals (sitemap, robots, analytics, webmaster verified)
- Content quality (lorem ipsum matches, placeholder matches)
- Kit Digital signals
- All extracted contacts
- 500-word page text sample
- Outputs: summary, site_quality_score/10, content_issues[],
  urgency_signals[], performance_notes, seo_status,
  best_contact_channel+value, all_contacts, ES pitch,
  services_needed, outreach_notes

UI: rich AI modal with summary banner, quality grid, content issues,
    urgency signals, full contact list, technical snapshot

Fixes: correct Replicate token, ai_queue status='running' bug

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-13 17:46:01 +02:00
parent faca4b6e1a
commit 5ad8259c75
7 changed files with 530 additions and 111 deletions

277
app/site_analyzer.py Normal file
View File

@@ -0,0 +1,277 @@
"""Deep site analysis: content quality, SEO signals, performance, indexing hints."""
import asyncio
import re
import time
import logging
from typing import Optional
import httpx
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
# ── Content quality ───────────────────────────────────────────────────────────
LOREM_PHRASES = [
"lorem ipsum", "sed ut perspiciatis", "nunc sem sapien",
"nulla id nibh", "aenean dignissim", "aliquam tincidunt",
"vestibulum commodo", "fusce nunc lacus", "consectetuer",
"cras ornare tristique", "ntulla nec ante", "risus id metus",
"praesent placerat", "fusce pellentesque", "suscipit nibh",
"integer vitae libero", "felis quis tortor",
]
PLACEHOLDER_PHRASES = [
"under construction", "coming soon", "sample page",
"this is a demo", "default post", "hello world",
"test post", "uncategorized",
]
# ── Analytics & webmaster tags ────────────────────────────────────────────────
ANALYTICS = {
"google_analytics": ["gtag('config'", "google-analytics.com/analytics.js", "G-"],
"google_tag_manager": ["googletagmanager.com/gtm.js", "GTM-"],
"facebook_pixel": ["fbq('init'", "connect.facebook.net/en_US/fbevents"],
"hotjar": ["static.hotjar.com"],
"clarity": ["clarity.ms/tag"],
}
WEBMASTER = {
"google_search_console": ['google-site-verification'],
"bing_webmaster": ['msvalidate.01'],
"yandex": ['yandex-verification'],
}
KIT_IMG_PATS = [
"digitalizadores", "kit-digital", "kitdigital", "kit_digital",
"fondos-europeos", "fondos_europeos", "nextgeneration", "next-generation",
"prtr", "plan-recuperacion", "acelerapyme", "cofinanciado",
]
KIT_TEXT_PATS = [
"kit digital", "agente digitalizador", "fondos europeos",
"next generation eu", "nextgenerationeu", "plan de recuperación",
"prtr", "financiado por la unión europea", "red.es/kit-digital", "acelerapyme",
]
EMAIL_RE = re.compile(r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}")
PHONE_RE = re.compile(r"(?:\+34[\s\-]?)?(?:6|7|8|9)\d{2}[\s\-]?\d{3}[\s\-]?\d{3}")
SOCIAL_DOM = ["facebook.com", "instagram.com", "linkedin.com", "twitter.com", "x.com", "tiktok.com"]
async def analyze_site(domain: str) -> dict:
"""Fetch and deeply analyse a site. Returns a rich dict for the AI prompt."""
result = {
"domain": domain,
"reachable": False,
"load_time_ms": None,
"status_code": None,
"final_url": None,
"page_size_kb": None,
"server": None,
"cms": None,
"ssl_valid": False,
"ssl_expiry_days": None,
# Content quality
"has_lorem_ipsum": False,
"lorem_matches": [],
"has_placeholder": False,
"placeholder_matches": [],
"word_count": 0,
"image_count": 0,
"broken_images": 0,
"script_count": 0,
"has_mobile_viewport": False,
"page_title": None,
"meta_description": None,
"h1_text": None,
"visible_text_snippet": "",
# SEO / webmaster
"has_sitemap": False,
"has_robots": False,
"robots_disallows_google": False,
"analytics_present": [],
"webmaster_verified": [],
"canonical_url": None,
"og_title": None,
# Kit Digital
"kit_digital": False,
"kit_digital_signals": [],
# Contacts
"emails": [],
"phones": [],
"whatsapp": [],
"social_links": [],
# Errors
"error": None,
}
# ── Fetch main page ───────────────────────────────────────────────────────
try:
t0 = time.monotonic()
async with httpx.AsyncClient(
timeout=15, follow_redirects=True, verify=False,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"},
) as client:
resp = await client.get(f"https://{domain}")
if resp.status_code >= 400:
resp = await client.get(f"http://{domain}")
load_ms = int((time.monotonic() - t0) * 1000)
html = resp.text
result.update({
"reachable": resp.status_code < 400,
"load_time_ms": load_ms,
"status_code": resp.status_code,
"final_url": str(resp.url),
"page_size_kb": round(len(resp.content) / 1024, 1),
"server": resp.headers.get("server"),
})
soup = BeautifulSoup(html, "html.parser")
hl = html.lower()
# Title, meta
title_tag = soup.find("title")
result["page_title"] = title_tag.get_text(strip=True)[:200] if title_tag else None
meta_desc = soup.find("meta", attrs={"name": "description"})
result["meta_description"] = (meta_desc.get("content") or "")[:300] if meta_desc else None
h1 = soup.find("h1")
result["h1_text"] = h1.get_text(strip=True)[:200] if h1 else None
# Mobile viewport
result["has_mobile_viewport"] = bool(soup.find("meta", attrs={"name": "viewport"}))
# Canonical + OG
canon = soup.find("link", rel="canonical")
result["canonical_url"] = canon.get("href") if canon else None
og = soup.find("meta", property="og:title")
result["og_title"] = og.get("content") if og else None
# Visible text
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
visible_text = soup.get_text(separator=" ", strip=True)
words = visible_text.split()
result["word_count"] = len(words)
result["visible_text_snippet"] = " ".join(words[:500])
# Lorem ipsum / placeholder detection
vl = visible_text.lower()
lorem_hits = [p for p in LOREM_PHRASES if p in vl]
result["has_lorem_ipsum"] = len(lorem_hits) > 0
result["lorem_matches"] = lorem_hits[:5]
ph_hits = [p for p in PLACEHOLDER_PHRASES if p in vl]
result["has_placeholder"] = len(ph_hits) > 0
result["placeholder_matches"] = ph_hits[:3]
# Images & scripts
imgs = soup.find_all("img")
result["image_count"] = len(imgs)
result["script_count"] = len(soup.find_all("script", src=True))
# Analytics / webmaster tags
for name, sigs in ANALYTICS.items():
if any(s.lower() in hl for s in sigs):
result["analytics_present"].append(name)
for name, sigs in WEBMASTER.items():
if any(s.lower() in hl for s in sigs):
result["webmaster_verified"].append(name)
# Kit Digital
kd_signals = []
for img in imgs:
combined = ((img.get("src") or "") + (img.get("alt") or "") + (img.get("srcset") or "")).lower()
for p in KIT_IMG_PATS:
if p in combined:
kd_signals.append(f"img:{p}")
break
for p in KIT_TEXT_PATS:
if p in hl:
kd_signals.append(f"text:{p}")
for a in soup.find_all("a", href=True):
href = a["href"].lower()
if "acelerapyme" in href or "red.es" in href or "kit-digital" in href:
kd_signals.append(f"link:{href[:50]}")
kd_signals = list(dict.fromkeys(kd_signals))[:10]
result["kit_digital"] = len(kd_signals) > 0
result["kit_digital_signals"] = kd_signals
# Contacts
for a in soup.find_all("a", href=True):
href = a["href"]
if href.startswith("mailto:"):
em = href[7:].split("?")[0].strip().lower()
if em and em not in result["emails"]:
result["emails"].append(em)
elif href.startswith("tel:"):
ph = re.sub(r"[^\d+]", "", href[4:])
if ph and ph not in result["phones"]:
result["phones"].append(ph)
elif "wa.me" in href or "api.whatsapp.com" in href:
if href not in result["whatsapp"]:
result["whatsapp"].append(href[:80])
else:
for sd in SOCIAL_DOM:
if sd in href.lower():
clean = href.split("?")[0].rstrip("/")
if clean not in result["social_links"]:
result["social_links"].append(clean)
break
for em in EMAIL_RE.findall(html[:80000]):
em = em.lower()
if em not in result["emails"] and not any(em.endswith(x) for x in [".png", ".jpg", ".css", ".js", ".svg"]):
result["emails"].append(em)
for ph in PHONE_RE.findall(visible_text):
ph_c = re.sub(r"[\s\-]", "", ph)
if ph_c not in result["phones"]:
result["phones"].append(ph_c)
# Cap
for k in ["emails", "phones", "whatsapp", "social_links"]:
result[k] = list(dict.fromkeys(result[k]))[:5]
# CMS
from app.enricher import detect_cms
result["cms"] = detect_cms(html, dict(resp.headers))
except Exception as e:
result["error"] = str(e)[:300]
# ── Sitemap & robots (parallel) ───────────────────────────────────────────
async def _check_url(url: str) -> Optional[str]:
try:
async with httpx.AsyncClient(timeout=6, follow_redirects=True, verify=False) as c:
r = await c.get(url)
return r.text if r.status_code == 200 else None
except Exception:
return None
sitemap_txt, robots_txt = await asyncio.gather(
_check_url(f"https://{domain}/sitemap.xml"),
_check_url(f"https://{domain}/robots.txt"),
)
result["has_sitemap"] = sitemap_txt is not None
result["has_robots"] = robots_txt is not None
if robots_txt:
robots_lower = robots_txt.lower()
result["robots_disallows_google"] = (
"disallow: /" in robots_lower and "googlebot" in robots_lower
)
# ── SSL ───────────────────────────────────────────────────────────────────
import ssl as _ssl, socket as _socket
try:
def _ssl_check():
import datetime as _dt
ctx = _ssl.create_default_context()
with _socket.create_connection((domain, 443), timeout=5) as s:
with ctx.wrap_socket(s, server_hostname=domain) as ss:
cert = ss.getpeercert()
exp = _dt.datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z")
return True, (_dt.datetime.utcnow() - exp).days * -1
loop = asyncio.get_event_loop()
result["ssl_valid"], result["ssl_expiry_days"] = await loop.run_in_executor(None, _ssl_check)
except Exception:
pass
return result