Files
DomGod/app/site_analyzer.py
Malin 60c9b495ae fix: AI worker crash-proof + GDPR/hosting/accessibility analysis
AI worker fixes (root cause of "nothing reaches Replicate"):
- Worker task died silently — no exception handler around while loop
- Added try/except around entire loop body with exc_info logging
- Added watchdog task that restarts dead workers every 10 seconds
- ensure_workers_alive() called on every /api/ai/assess/batch POST
- _assess_one() is now a top-level function (not closure) — avoids
  subtle scoping bugs with async inner functions in while loops
- /api/ai/debug endpoint: shows worker alive status, task exception,
  last 10 queue entries — browse to /api/ai/debug to diagnose
- /api/ai/worker/restart endpoint + UI button
- "Restart AI worker" button + "Debug AI queue" link in enrichment tab

site_analyzer.py — new signals:
- IP resolution + ip-api.com for ASN, org, ISP, host country
- EU hosting detection (27 EU + EEA + adequacy countries)
- GDPR: detects Cookiebot, OneTrust, CookiePro, Osano, Iubenda,
  Borlabs, CookieYes, Complianz, Usercentrics + text signals
- Privacy policy and GDPR text presence
- Accessibility: html lang missing, images without alt count,
  skip nav link, empty links, inputs without labels

Gemini prompt additions:
- Hosting section: IP, ASN, org/ISP, EU vs non-EU flag
- GDPR section: cookie tool, notice, privacy policy
- Accessibility section: all quick-scan results
- New output fields: hosting_notes, gdpr_compliance,
  accessibility_issues[]

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 18:01:34 +02:00

374 lines
18 KiB
Python

"""Deep site analysis: content quality, SEO, hosting, GDPR, accessibility."""
import asyncio
import re
import time
import logging
import socket
from typing import Optional
import httpx
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
# ── EU countries (hosting check) ─────────────────────────────────────────────
EU_COUNTRIES = {
'AT','BE','BG','HR','CY','CZ','DK','EE','FI','FR','DE','GR',
'HU','IE','IT','LV','LT','LU','MT','NL','PL','PT','RO','SK',
'SI','ES','SE',
'NO','IS','LI', # EEA
'CH','GB','AD', # adequacy / adjacent
}
# ── Content quality ───────────────────────────────────────────────────────────
LOREM_PHRASES = [
"lorem ipsum", "sed ut perspiciatis", "nunc sem sapien",
"nulla id nibh", "aenean dignissim", "aliquam tincidunt",
"vestibulum commodo", "fusce nunc lacus", "consectetuer",
"cras ornare tristique", "ntulla nec ante", "risus id metus",
"praesent placerat", "fusce pellentesque", "suscipit nibh",
"integer vitae libero", "felis quis tortor", "dolor sit amet",
]
PLACEHOLDER_PHRASES = [
"under construction", "coming soon", "sample page",
"this is a demo", "hello world", "test content",
"default post", "uncategorized", "demo content",
]
# ── Cookie / GDPR consent tools ───────────────────────────────────────────────
COOKIE_TOOLS = {
"cookiebot": ["cookiebot.com", "CookieConsent", "CybotCookiebot"],
"onetrust": ["onetrust", "otBannerSdk"],
"cookiepro": ["cookiepro.com"],
"osano": ["osano.com"],
"iubenda": ["iubenda.com"],
"borlabs": ["borlabs-cookie"],
"complianz": ["complianz"],
"cookieyes": ["cookieyes.com", "cookie-law-info"],
"usercentrics": ["usercentrics.com"],
"quantcast": ["quantcast.com/cmp"],
}
COOKIE_TEXT_SIGNALS = [
"accept cookies", "acepta las cookies", "we use cookies", "usamos cookies",
"cookie policy", "política de cookies", "cookie settings", "manage cookies",
"aceptar todas", "rechazar cookies",
]
PRIVACY_SIGNALS = [
"privacy policy", "política de privacidad", "aviso legal",
"privacy notice", "data protection",
]
GDPR_TEXT_SIGNALS = [
"rgpd", "gdpr", "reglamento general de protección",
"lopd", "protección de datos", "responsable del tratamiento",
]
# ── Analytics / webmaster ─────────────────────────────────────────────────────
ANALYTICS = {
"google_analytics": ["gtag('config'", "google-analytics.com/analytics.js", "G-"],
"google_tag_manager": ["googletagmanager.com/gtm.js", "GTM-"],
"facebook_pixel": ["fbq('init'", "connect.facebook.net/en_US/fbevents"],
"hotjar": ["static.hotjar.com"],
"clarity": ["clarity.ms/tag"],
}
WEBMASTER = {
"google_search_console": ["google-site-verification"],
"bing_webmaster": ["msvalidate.01"],
"yandex": ["yandex-verification"],
}
# ── Kit Digital ───────────────────────────────────────────────────────────────
KIT_IMG_PATS = [
"digitalizadores", "kit-digital", "kitdigital", "kit_digital",
"fondos-europeos", "fondos_europeos", "nextgeneration", "next-generation",
"prtr", "plan-recuperacion", "acelerapyme", "cofinanciado",
]
KIT_TEXT_PATS = [
"kit digital", "agente digitalizador", "fondos europeos",
"next generation eu", "nextgenerationeu", "plan de recuperación",
"prtr", "financiado por la unión europea", "red.es/kit-digital", "acelerapyme",
]
EMAIL_RE = re.compile(r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}")
PHONE_RE = re.compile(r"(?:\+34[\s\-]?)?(?:6|7|8|9)\d{2}[\s\-]?\d{3}[\s\-]?\d{3}")
SOCIAL_DOM = ["facebook.com", "instagram.com", "linkedin.com",
"twitter.com", "x.com", "tiktok.com", "youtube.com"]
async def _get_hosting_info(domain: str) -> dict:
"""Resolve IP, then look up ASN / org / country via ip-api.com."""
info = {"ip": None, "asn": None, "org": None, "isp": None,
"ip_country": None, "ip_region": None, "eu_hosted": None}
try:
loop = asyncio.get_event_loop()
ip = await loop.run_in_executor(None, socket.gethostbyname, domain)
info["ip"] = ip
async with httpx.AsyncClient(timeout=6) as client:
r = await client.get(
f"http://ip-api.com/json/{ip}",
params={"fields": "status,country,countryCode,regionName,org,as,isp"},
)
if r.status_code == 200:
d = r.json()
if d.get("status") == "success":
info.update({
"asn": d.get("as"),
"org": d.get("org"),
"isp": d.get("isp"),
"ip_country": d.get("countryCode"),
"ip_region": d.get("regionName"),
"eu_hosted": d.get("countryCode") in EU_COUNTRIES,
})
except Exception as e:
logger.debug("Hosting lookup failed for %s: %s", domain, e)
return info
async def analyze_site(domain: str) -> dict:
result = {
"domain": domain,
"reachable": False, "load_time_ms": None, "status_code": None,
"final_url": None, "page_size_kb": None, "server": None, "cms": None,
# Hosting
"ip": None, "asn": None, "org": None, "isp": None,
"ip_country": None, "ip_region": None, "eu_hosted": None,
# SSL
"ssl_valid": False, "ssl_expiry_days": None,
# Content quality
"has_lorem_ipsum": False, "lorem_matches": [],
"has_placeholder": False, "placeholder_matches": [],
"word_count": 0, "image_count": 0, "script_count": 0,
"has_mobile_viewport": False,
"page_title": None, "meta_description": None, "h1_text": None,
"visible_text_snippet": "",
# SEO
"has_sitemap": False, "has_robots": False, "robots_disallows_google": False,
"analytics_present": [], "webmaster_verified": [],
"canonical_url": None, "og_title": None,
# GDPR / cookies
"cookie_tool": None, "has_cookie_notice": False,
"has_privacy_policy": False, "has_gdpr_text": False,
# Accessibility
"html_lang": None, "images_missing_alt": 0,
"has_skip_nav": False, "empty_links": 0,
"inputs_without_labels": 0,
# Kit Digital
"kit_digital": False, "kit_digital_signals": [],
# Contacts
"emails": [], "phones": [], "whatsapp": [], "social_links": [],
"error": None,
}
# ── Fetch + hosting (parallel) ────────────────────────────────────────────
async def _fetch():
t0 = time.monotonic()
try:
async with httpx.AsyncClient(
timeout=15, follow_redirects=True, verify=False,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"},
) as client:
resp = await client.get(f"https://{domain}")
if resp.status_code >= 400:
resp = await client.get(f"http://{domain}")
return resp, int((time.monotonic() - t0) * 1000)
except Exception as e:
return None, int((time.monotonic() - t0) * 1000)
(resp, load_ms), hosting = await asyncio.gather(_fetch(), _get_hosting_info(domain))
result.update(hosting)
result["load_time_ms"] = load_ms
if resp is None:
result["error"] = "Failed to fetch site"
else:
html = resp.text
result.update({
"reachable": resp.status_code < 400,
"status_code": resp.status_code,
"final_url": str(resp.url),
"page_size_kb": round(len(resp.content) / 1024, 1),
"server": resp.headers.get("server"),
})
soup = BeautifulSoup(html, "html.parser")
hl = html.lower()
# ── Basic metadata ────────────────────────────────────────────────────
result["html_lang"] = (soup.find("html") or {}).get("lang")
t = soup.find("title")
result["page_title"] = t.get_text(strip=True)[:200] if t else None
md = soup.find("meta", attrs={"name": "description"})
result["meta_description"] = (md.get("content") or "")[:300] if md else None
h1 = soup.find("h1")
result["h1_text"] = h1.get_text(strip=True)[:200] if h1 else None
result["has_mobile_viewport"] = bool(soup.find("meta", attrs={"name": "viewport"}))
c = soup.find("link", rel="canonical")
result["canonical_url"] = c.get("href") if c else None
og = soup.find("meta", property="og:title")
result["og_title"] = og.get("content") if og else None
# ── Visible text ──────────────────────────────────────────────────────
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
visible = soup.get_text(separator=" ", strip=True)
vl = visible.lower()
words = visible.split()
result["word_count"] = len(words)
result["visible_text_snippet"] = " ".join(words[:600])
# ── Content quality ───────────────────────────────────────────────────
lorem_hits = [p for p in LOREM_PHRASES if p in vl]
result["has_lorem_ipsum"] = len(lorem_hits) > 0
result["lorem_matches"] = lorem_hits[:6]
ph_hits = [p for p in PLACEHOLDER_PHRASES if p in vl]
result["has_placeholder"] = len(ph_hits) > 0
result["placeholder_matches"] = ph_hits[:3]
imgs = soup.find_all("img")
result["image_count"] = len(imgs)
result["script_count"] = len(soup.find_all("script", src=True))
# ── Analytics / webmaster ─────────────────────────────────────────────
for name, sigs in ANALYTICS.items():
if any(s.lower() in hl for s in sigs):
result["analytics_present"].append(name)
for name, sigs in WEBMASTER.items():
if any(s.lower() in hl for s in sigs):
result["webmaster_verified"].append(name)
# ── GDPR / cookies ────────────────────────────────────────────────────
for tool, sigs in COOKIE_TOOLS.items():
if any(s.lower() in hl for s in sigs):
result["cookie_tool"] = tool
result["has_cookie_notice"] = True
break
if not result["has_cookie_notice"]:
result["has_cookie_notice"] = any(s in vl for s in COOKIE_TEXT_SIGNALS)
result["has_privacy_policy"] = any(s in vl for s in PRIVACY_SIGNALS) or bool(
soup.find("a", href=lambda h: h and ("privacidad" in h.lower() or "privacy" in h.lower()))
)
result["has_gdpr_text"] = any(s in vl for s in GDPR_TEXT_SIGNALS)
# ── Accessibility ─────────────────────────────────────────────────────
result["images_missing_alt"] = sum(
1 for img in imgs if not img.get("alt") and img.get("alt") != ""
)
result["has_skip_nav"] = bool(
soup.find("a", href=lambda h: h and h.lower() in ("#main", "#content", "#maincontent", "#skip"))
)
result["empty_links"] = sum(
1 for a in soup.find_all("a") if not a.get_text(strip=True) and not a.find("img")
)
all_inputs = soup.find_all("input", type=lambda t: t not in ("hidden", "submit", "button", None) or t is None)
labeled_ids = {lbl.get("for") for lbl in soup.find_all("label") if lbl.get("for")}
result["inputs_without_labels"] = sum(
1 for inp in all_inputs
if inp.get("id") not in labeled_ids and not inp.get("aria-label") and not inp.get("aria-labelledby")
)
# ── Kit Digital ───────────────────────────────────────────────────────
kd_signals = []
for img in soup.find_all("img"):
comb = ((img.get("src") or "") + (img.get("alt") or "") + (img.get("srcset") or "")).lower()
for p in KIT_IMG_PATS:
if p in comb:
kd_signals.append(f"img:{p}")
break
for p in KIT_TEXT_PATS:
if p in hl:
kd_signals.append(f"text:{p}")
for a in soup.find_all("a", href=True):
href = a["href"].lower()
if "acelerapyme" in href or "red.es" in href or "kit-digital" in href:
kd_signals.append(f"link:{href[:50]}")
kd_signals = list(dict.fromkeys(kd_signals))[:10]
result["kit_digital"] = len(kd_signals) > 0
result["kit_digital_signals"] = kd_signals
# ── Contacts ──────────────────────────────────────────────────────────
for a in soup.find_all("a", href=True):
href = a["href"]
if href.startswith("mailto:"):
em = href[7:].split("?")[0].strip().lower()
if em and em not in result["emails"]:
result["emails"].append(em)
elif href.startswith("tel:"):
ph = re.sub(r"[^\d+]", "", href[4:])
if ph and ph not in result["phones"]:
result["phones"].append(ph)
elif "wa.me" in href or "api.whatsapp.com" in href:
if href not in result["whatsapp"]:
result["whatsapp"].append(href[:80])
else:
for sd in SOCIAL_DOM:
if sd in href.lower():
clean = href.split("?")[0].rstrip("/")
if clean not in result["social_links"]:
result["social_links"].append(clean)
break
for em in EMAIL_RE.findall(html[:80000]):
em = em.lower()
if em not in result["emails"] and not any(em.endswith(x) for x in [".png",".jpg",".css",".js",".svg"]):
result["emails"].append(em)
for ph in PHONE_RE.findall(visible):
ph_c = re.sub(r"[\s\-]", "", ph)
if ph_c not in result["phones"]:
result["phones"].append(ph_c)
for k in ["emails", "phones", "whatsapp", "social_links"]:
result[k] = list(dict.fromkeys(result[k]))[:5]
# ── CMS ───────────────────────────────────────────────────────────────
CMS_SIGS = {
"wordpress": ["/wp-content/", "/wp-includes/", 'content="WordPress'],
"joomla": ["/components/com_", "Joomla!", 'content="Joomla'],
"drupal": ["/sites/default/files/", "Drupal.settings"],
"wix": ["static.wixstatic.com", "X-Wix-"],
"squarespace": ["squarespace.com", "X-Squarespace-"],
"shopify": ["cdn.shopify.com", "Shopify.theme"],
"prestashop": ["PrestaShop", "/modules/prestashop"],
"magento": ["Mage.Cookies", "X-Magento-"],
"typo3": ["typo3temp", "TYPO3 CMS"],
"opencart": ["route=common/home", "OpenCart"],
}
combined_check = html[:60000] + " ".join(f"{k}:{v}" for k, v in resp.headers.items())
for cms, sigs in CMS_SIGS.items():
if any(s.lower() in combined_check.lower() for s in sigs):
result["cms"] = cms
break
# ── Sitemap & robots (parallel) ───────────────────────────────────────────
async def _get(url):
try:
async with httpx.AsyncClient(timeout=6, follow_redirects=True, verify=False) as c:
r = await c.get(url)
return r.text if r.status_code == 200 else None
except Exception:
return None
sitemap_txt, robots_txt = await asyncio.gather(
_get(f"https://{domain}/sitemap.xml"),
_get(f"https://{domain}/robots.txt"),
)
result["has_sitemap"] = sitemap_txt is not None
result["has_robots"] = robots_txt is not None
if robots_txt:
rl = robots_txt.lower()
result["robots_disallows_google"] = "disallow: /" in rl and "googlebot" in rl
# ── SSL ───────────────────────────────────────────────────────────────────
import ssl as _ssl
try:
def _ssl_check():
import datetime as _dt
ctx = _ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=5) as s:
with ctx.wrap_socket(s, server_hostname=domain) as ss:
cert = ss.getpeercert()
exp = _dt.datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z")
return True, (_dt.datetime.utcnow() - exp).days * -1
loop = asyncio.get_event_loop()
result["ssl_valid"], result["ssl_expiry_days"] = await loop.run_in_executor(None, _ssl_check)
except Exception:
pass
return result