Files
DomGod/app/site_analyzer.py
Malin d62e4e986e feat: web search for contacts, copyright year, contact page scan, CMS/age from Gemini
- replicate_ai: DDG pre-search runs before every Gemini call; top 4 snippets
  injected into prompt so Gemini can find phone/email not on homepage
- replicate_ai: explicit instructions to use search results for contact lookup
- replicate_ai: new output fields cms_detected + site_last_updated
- site_analyzer: copyright year extracted from footer (© / copyright pattern)
- site_analyzer: Last-Modified from HTTP header + OG meta tag
- site_analyzer: scans /contacto /contact /contactanos /sobre-nosotros for
  additional emails/phones (parallel with sitemap/robots fetch)
- index.html: modal shows CMS (AI-detected), Last Updated (red if pre-2021)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 08:22:14 +02:00

475 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Deep site analysis: content quality, SEO, hosting, GDPR, accessibility."""
import asyncio
import re
import time
import logging
import socket
from typing import Optional
import httpx
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
# ── EU countries (hosting check) ─────────────────────────────────────────────
EU_COUNTRIES = {
'AT','BE','BG','HR','CY','CZ','DK','EE','FI','FR','DE','GR',
'HU','IE','IT','LV','LT','LU','MT','NL','PL','PT','RO','SK',
'SI','ES','SE',
'NO','IS','LI', # EEA
'CH','GB','AD', # adequacy / adjacent
}
# ── Content quality ───────────────────────────────────────────────────────────
LOREM_PHRASES = [
"lorem ipsum", "sed ut perspiciatis", "nunc sem sapien",
"nulla id nibh", "aenean dignissim", "aliquam tincidunt",
"vestibulum commodo", "fusce nunc lacus", "consectetuer",
"cras ornare tristique", "ntulla nec ante", "risus id metus",
"praesent placerat", "fusce pellentesque", "suscipit nibh",
"integer vitae libero", "felis quis tortor", "dolor sit amet",
]
PLACEHOLDER_PHRASES = [
"under construction", "coming soon", "sample page",
"this is a demo", "hello world", "test content",
"default post", "uncategorized", "demo content",
]
# ── Cookie / GDPR consent tools ───────────────────────────────────────────────
COOKIE_TOOLS = {
"cookiebot": ["cookiebot.com", "CookieConsent", "CybotCookiebot"],
"onetrust": ["onetrust", "otBannerSdk"],
"cookiepro": ["cookiepro.com"],
"osano": ["osano.com"],
"iubenda": ["iubenda.com"],
"borlabs": ["borlabs-cookie"],
"complianz": ["complianz"],
"cookieyes": ["cookieyes.com", "cookie-law-info"],
"usercentrics": ["usercentrics.com"],
"quantcast": ["quantcast.com/cmp"],
}
COOKIE_TEXT_SIGNALS = [
"accept cookies", "acepta las cookies", "we use cookies", "usamos cookies",
"cookie policy", "política de cookies", "cookie settings", "manage cookies",
"aceptar todas", "rechazar cookies",
]
PRIVACY_SIGNALS = [
"privacy policy", "política de privacidad", "aviso legal",
"privacy notice", "data protection",
]
GDPR_TEXT_SIGNALS = [
"rgpd", "gdpr", "reglamento general de protección",
"lopd", "protección de datos", "responsable del tratamiento",
]
# ── Analytics / webmaster ─────────────────────────────────────────────────────
ANALYTICS = {
"google_analytics": ["gtag('config'", "google-analytics.com/analytics.js", "G-"],
"google_tag_manager": ["googletagmanager.com/gtm.js", "GTM-"],
"facebook_pixel": ["fbq('init'", "connect.facebook.net/en_US/fbevents"],
"hotjar": ["static.hotjar.com"],
"clarity": ["clarity.ms/tag"],
}
WEBMASTER = {
"google_search_console": ["google-site-verification"],
"bing_webmaster": ["msvalidate.01"],
"yandex": ["yandex-verification"],
}
# ── Kit Digital — require SPECIFIC signals, not generic EU logos ───────────────
# These patterns are unambiguously Kit Digital programme markers
KIT_STRONG_IMG = ["kit-digital", "kitdigital", "kit_digital", "agente-digitalizador", "agente_digitalizador"]
KIT_STRONG_TEXT = ["kit digital", "agente digitalizador", "agentes digitalizadores"]
KIT_STRONG_LINK = ["acelerapyme.es", "red.es/kit-digital", "kit-digital.red.es"]
# ── Google My Business / Business Profile ────────────────────────────────────
GMB_URL_SIGNALS = [
"maps.googleapis.com/maps/api", # embedded Google Map widget
"google.com/maps/place", # link to GMB Place page
"maps.google.com",
"g.page/",
"maps.app.goo.gl",
"goo.gl/maps",
"business.google.com",
]
GMB_SCHEMA_SIGNALS = [
'"@type":"LocalBusiness"',
'"@type": "LocalBusiness"',
"schema.org/LocalBusiness",
]
EMAIL_RE = re.compile(r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}")
PHONE_RE = re.compile(r"(?:\+34[\s\-]?)?(?:6|7|8|9)\d{2}[\s\-]?\d{3}[\s\-]?\d{3}")
SOCIAL_DOM = ["facebook.com", "instagram.com", "linkedin.com",
"twitter.com", "x.com", "tiktok.com", "youtube.com"]
async def _get_hosting_info(domain: str) -> dict:
"""Resolve IP, then look up ASN / org / country via ip-api.com."""
info = {"ip": None, "asn": None, "org": None, "isp": None,
"ip_country": None, "ip_region": None, "eu_hosted": None}
try:
loop = asyncio.get_event_loop()
ip = await asyncio.wait_for(
loop.run_in_executor(None, socket.gethostbyname, domain), timeout=6
)
info["ip"] = ip
async with httpx.AsyncClient(timeout=6) as client:
r = await client.get(
f"http://ip-api.com/json/{ip}",
params={"fields": "status,country,countryCode,regionName,org,as,isp"},
)
if r.status_code == 200:
d = r.json()
if d.get("status") == "success":
info.update({
"asn": d.get("as"),
"org": d.get("org"),
"isp": d.get("isp"),
"ip_country": d.get("countryCode"),
"ip_region": d.get("regionName"),
"eu_hosted": d.get("countryCode") in EU_COUNTRIES,
})
except Exception as e:
logger.debug("Hosting lookup failed for %s: %s", domain, e)
return info
async def _analyze_site_inner(domain: str) -> dict:
result = {
"domain": domain,
"reachable": False, "load_time_ms": None, "status_code": None,
"final_url": None, "page_size_kb": None, "server": None, "cms": None,
# Hosting
"ip": None, "asn": None, "org": None, "isp": None,
"ip_country": None, "ip_region": None, "eu_hosted": None,
# SSL
"ssl_valid": False, "ssl_expiry_days": None,
# Content quality
"has_lorem_ipsum": False, "lorem_matches": [],
"has_placeholder": False, "placeholder_matches": [],
"word_count": 0, "image_count": 0, "script_count": 0,
"has_mobile_viewport": False,
"page_title": None, "meta_description": None, "h1_text": None,
"visible_text_snippet": "",
# SEO
"has_sitemap": False, "has_robots": False, "robots_disallows_google": False,
"analytics_present": [], "webmaster_verified": [],
"canonical_url": None, "og_title": None,
# GDPR / cookies
"cookie_tool": None, "has_cookie_notice": False,
"has_privacy_policy": False, "has_gdpr_text": False,
# Accessibility
"html_lang": None, "images_missing_alt": 0,
"has_skip_nav": False, "empty_links": 0,
"inputs_without_labels": 0,
# Kit Digital
"kit_digital": False, "kit_digital_signals": [],
# Google My Business
"has_gmb": False, "gmb_url": None,
# Contacts
"emails": [], "phones": [], "whatsapp": [], "social_links": [],
# Age / freshness
"copyright_year": None, "last_modified": None,
"error": None,
}
# ── Fetch + hosting (parallel) ────────────────────────────────────────────
async def _fetch():
t0 = time.monotonic()
try:
async with httpx.AsyncClient(
timeout=15, follow_redirects=True, verify=False,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"},
) as client:
resp = await client.get(f"https://{domain}")
if resp.status_code >= 400:
resp = await client.get(f"http://{domain}")
return resp, int((time.monotonic() - t0) * 1000)
except Exception as e:
return None, int((time.monotonic() - t0) * 1000)
(resp, load_ms), hosting = await asyncio.gather(_fetch(), _get_hosting_info(domain))
result.update(hosting)
result["load_time_ms"] = load_ms
if resp is None:
result["error"] = "Failed to fetch site"
else:
html = resp.text
result.update({
"reachable": resp.status_code < 400,
"status_code": resp.status_code,
"final_url": str(resp.url),
"page_size_kb": round(len(resp.content) / 1024, 1),
"server": resp.headers.get("server"),
})
soup = BeautifulSoup(html, "html.parser")
hl = html.lower()
# ── Basic metadata ────────────────────────────────────────────────────
result["html_lang"] = (soup.find("html") or {}).get("lang")
t = soup.find("title")
result["page_title"] = t.get_text(strip=True)[:200] if t else None
md = soup.find("meta", attrs={"name": "description"})
result["meta_description"] = (md.get("content") or "")[:300] if md else None
h1 = soup.find("h1")
result["h1_text"] = h1.get_text(strip=True)[:200] if h1 else None
result["has_mobile_viewport"] = bool(soup.find("meta", attrs={"name": "viewport"}))
c = soup.find("link", rel="canonical")
result["canonical_url"] = c.get("href") if c else None
og = soup.find("meta", property="og:title")
result["og_title"] = og.get("content") if og else None
# ── Visible text ──────────────────────────────────────────────────────
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
visible = soup.get_text(separator=" ", strip=True)
vl = visible.lower()
words = visible.split()
result["word_count"] = len(words)
result["visible_text_snippet"] = " ".join(words[:600])
# ── Content quality ───────────────────────────────────────────────────
lorem_hits = [p for p in LOREM_PHRASES if p in vl]
result["has_lorem_ipsum"] = len(lorem_hits) > 0
result["lorem_matches"] = lorem_hits[:6]
ph_hits = [p for p in PLACEHOLDER_PHRASES if p in vl]
result["has_placeholder"] = len(ph_hits) > 0
result["placeholder_matches"] = ph_hits[:3]
imgs = soup.find_all("img")
result["image_count"] = len(imgs)
result["script_count"] = len(soup.find_all("script", src=True))
# ── Analytics / webmaster ─────────────────────────────────────────────
for name, sigs in ANALYTICS.items():
if any(s.lower() in hl for s in sigs):
result["analytics_present"].append(name)
for name, sigs in WEBMASTER.items():
if any(s.lower() in hl for s in sigs):
result["webmaster_verified"].append(name)
# ── GDPR / cookies ────────────────────────────────────────────────────
for tool, sigs in COOKIE_TOOLS.items():
if any(s.lower() in hl for s in sigs):
result["cookie_tool"] = tool
result["has_cookie_notice"] = True
break
if not result["has_cookie_notice"]:
result["has_cookie_notice"] = any(s in vl for s in COOKIE_TEXT_SIGNALS)
result["has_privacy_policy"] = any(s in vl for s in PRIVACY_SIGNALS) or bool(
soup.find("a", href=lambda h: h and ("privacidad" in h.lower() or "privacy" in h.lower()))
)
result["has_gdpr_text"] = any(s in vl for s in GDPR_TEXT_SIGNALS)
# ── Accessibility ─────────────────────────────────────────────────────
result["images_missing_alt"] = sum(
1 for img in imgs if not img.get("alt") and img.get("alt") != ""
)
result["has_skip_nav"] = bool(
soup.find("a", href=lambda h: h and h.lower() in ("#main", "#content", "#maincontent", "#skip"))
)
result["empty_links"] = sum(
1 for a in soup.find_all("a") if not a.get_text(strip=True) and not a.find("img")
)
all_inputs = soup.find_all("input", type=lambda t: t not in ("hidden", "submit", "button", None) or t is None)
labeled_ids = {lbl.get("for") for lbl in soup.find_all("label") if lbl.get("for")}
result["inputs_without_labels"] = sum(
1 for inp in all_inputs
if inp.get("id") not in labeled_ids and not inp.get("aria-label") and not inp.get("aria-labelledby")
)
# ── Kit Digital (specific signals only — generic EU logos excluded) ──────
kd_signals = []
for img in soup.find_all("img"):
comb = ((img.get("src") or "") + (img.get("alt") or "") + (img.get("srcset") or "")).lower()
for p in KIT_STRONG_IMG:
if p in comb:
kd_signals.append(f"img:{p}")
break
for p in KIT_STRONG_TEXT:
if p in vl:
kd_signals.append(f"text:{p}")
for a in soup.find_all("a", href=True):
href = a["href"].lower()
for p in KIT_STRONG_LINK:
if p in href:
kd_signals.append(f"link:{href[:60]}")
break
kd_signals = list(dict.fromkeys(kd_signals))[:10]
result["kit_digital"] = len(kd_signals) > 0
result["kit_digital_signals"] = kd_signals
# ── Google My Business ────────────────────────────────────────────────
for a in soup.find_all("a", href=True):
href_g = a["href"]
for sig in GMB_URL_SIGNALS:
if sig in href_g:
result["has_gmb"] = True
result["gmb_url"] = href_g[:120]
break
if result["has_gmb"]:
break
if not result["has_gmb"]:
result["has_gmb"] = any(sig.lower() in hl for sig in GMB_SCHEMA_SIGNALS)
# ── Contacts ──────────────────────────────────────────────────────────
for a in soup.find_all("a", href=True):
href = a["href"]
if href.startswith("mailto:"):
em = href[7:].split("?")[0].strip().lower()
if em and em not in result["emails"]:
result["emails"].append(em)
elif href.startswith("tel:"):
ph = re.sub(r"[^\d+]", "", href[4:])
if ph and ph not in result["phones"]:
result["phones"].append(ph)
elif "wa.me" in href or "api.whatsapp.com" in href:
if href not in result["whatsapp"]:
result["whatsapp"].append(href[:80])
else:
for sd in SOCIAL_DOM:
if sd in href.lower():
clean = href.split("?")[0].rstrip("/")
if clean not in result["social_links"]:
result["social_links"].append(clean)
break
for em in EMAIL_RE.findall(html[:80000]):
em = em.lower()
if em not in result["emails"] and not any(em.endswith(x) for x in [".png",".jpg",".css",".js",".svg"]):
result["emails"].append(em)
for ph in PHONE_RE.findall(visible):
ph_c = re.sub(r"[\s\-]", "", ph)
if ph_c not in result["phones"]:
result["phones"].append(ph_c)
for k in ["emails", "phones", "whatsapp", "social_links"]:
result[k] = list(dict.fromkeys(result[k]))[:5]
# ── CMS ───────────────────────────────────────────────────────────────
CMS_SIGS = {
"wordpress": ["/wp-content/", "/wp-includes/", 'content="WordPress'],
"joomla": ["/components/com_", "Joomla!", 'content="Joomla'],
"drupal": ["/sites/default/files/", "Drupal.settings"],
"wix": ["static.wixstatic.com", "X-Wix-"],
"squarespace": ["squarespace.com", "X-Squarespace-"],
"shopify": ["cdn.shopify.com", "Shopify.theme"],
"prestashop": ["PrestaShop", "/modules/prestashop"],
"magento": ["Mage.Cookies", "X-Magento-"],
"typo3": ["typo3temp", "TYPO3 CMS"],
"opencart": ["route=common/home", "OpenCart"],
}
combined_check = html[:60000] + " ".join(f"{k}:{v}" for k, v in resp.headers.items())
for cms, sigs in CMS_SIGS.items():
if any(s.lower() in combined_check.lower() for s in sigs):
result["cms"] = cms
break
# ── Last-Modified / copyright year ────────────────────────────────────
lm = (resp.headers.get("last-modified") or
(soup.find("meta", attrs={"name": "last-modified"}) or {}).get("content") or
(soup.find("meta", property="article:modified_time") or {}).get("content"))
if lm:
result["last_modified"] = str(lm)[:30]
footer_el = (soup.find("footer") or
soup.find(id=re.compile(r"footer", re.I)) or
soup.find(class_=re.compile(r"footer", re.I)))
search_text = footer_el.get_text() if footer_el else visible[-600:]
cp = re.search(r"(?:©|&copy;|copyright)\s*[\d\-]*\s*(20\d{2})", search_text, re.I)
if not cp:
cp = re.search(r"(20\d{2})\s*[-]\s*20\d{2}|(?:©|copyright)\D{0,10}(20\d{2})", search_text, re.I)
if cp:
result["copyright_year"] = cp.group(1) or cp.group(2)
# ── Sitemap & robots (parallel) ───────────────────────────────────────────
async def _get(url):
try:
async with httpx.AsyncClient(timeout=6, follow_redirects=True, verify=False) as c:
r = await c.get(url)
return r.text if r.status_code == 200 else None
except Exception:
return None
async def _get_contact_page():
for path in ("/contacto", "/contact", "/contactanos", "/sobre-nosotros"):
txt = await _get(f"https://{domain}{path}")
if txt:
return txt
return None
sitemap_txt, robots_txt, contact_html = await asyncio.gather(
_get(f"https://{domain}/sitemap.xml"),
_get(f"https://{domain}/robots.txt"),
_get_contact_page(),
)
result["has_sitemap"] = sitemap_txt is not None
result["has_robots"] = robots_txt is not None
if robots_txt:
rl = robots_txt.lower()
result["robots_disallows_google"] = "disallow: /" in rl and "googlebot" in rl
# Merge contacts from /contacto page
if contact_html:
try:
csoup = BeautifulSoup(contact_html, "html.parser")
for a in csoup.find_all("a", href=True):
href = a["href"]
if href.startswith("mailto:"):
em = href[7:].split("?")[0].strip().lower()
if em and em not in result["emails"]:
result["emails"].append(em)
elif href.startswith("tel:"):
ph = re.sub(r"[^\d+]", "", href[4:])
if ph and ph not in result["phones"]:
result["phones"].append(ph)
elif "wa.me" in href or "api.whatsapp.com" in href:
if href not in result["whatsapp"]:
result["whatsapp"].append(href[:80])
ctext = csoup.get_text()
for em in EMAIL_RE.findall(contact_html[:60000]):
em = em.lower()
if em not in result["emails"] and not any(em.endswith(x) for x in [".png",".jpg",".css",".js"]):
result["emails"].append(em)
for ph in PHONE_RE.findall(ctext):
ph_c = re.sub(r"[\s\-]", "", ph)
if ph_c not in result["phones"]:
result["phones"].append(ph_c)
for k in ["emails", "phones", "whatsapp"]:
result[k] = list(dict.fromkeys(result[k]))[:5]
except Exception:
pass
# ── SSL ───────────────────────────────────────────────────────────────────
import ssl as _ssl
try:
def _ssl_check():
import datetime as _dt
ctx = _ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=5) as s:
s.settimeout(5) # SSL handshake timeout (wrap_socket has no timeout arg)
with ctx.wrap_socket(s, server_hostname=domain) as ss:
cert = ss.getpeercert()
exp = _dt.datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z")
return True, (_dt.datetime.utcnow() - exp).days * -1
loop = asyncio.get_event_loop()
result["ssl_valid"], result["ssl_expiry_days"] = await asyncio.wait_for(
loop.run_in_executor(None, _ssl_check), timeout=12
)
except Exception:
pass
return result
async def analyze_site(domain: str) -> dict:
"""Public entry point — hard 90s timeout so workers never hang permanently."""
try:
return await asyncio.wait_for(_analyze_site_inner(domain), timeout=90)
except asyncio.TimeoutError:
logger.warning("analyze_site timed out for %s", domain)
return {"domain": domain, "reachable": False, "error": "analyze_site timeout",
"emails": [], "phones": [], "whatsapp": [], "social_links": [],
"kit_digital": False, "kit_digital_signals": []}