Files
DomGod/app/site_analyzer.py
Malin 3a7ef19746 feat: Cloudflare JS challenge bypass via playwright fallback
When httpx gets a CF challenge page (detected by title + small page size),
site_analyzer retries the fetch with headless Chromium via playwright,
waits 3s for the challenge to resolve, then proceeds with normal extraction.
Tested on productospeluqueriabellezaaura.com.es — now extracts real title,
email, phone, and Instagram/Facebook/TikTok links that were previously blocked.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 10:49:45 +02:00

564 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Deep site analysis: content quality, SEO, hosting, GDPR, accessibility."""
import asyncio
import re
import time
import logging
import socket
from typing import Optional
import httpx
from bs4 import BeautifulSoup
# ── Cloudflare challenge detection ───────────────────────────────────────────
_CF_TITLES = {"un momento", "checking your browser", "just a moment",
"please wait", "verifying you are human", "espere mientras"}
def _is_cf_challenge(html: str) -> bool:
"""Return True if the page looks like a Cloudflare JS challenge."""
hl = html.lower()
if len(html) < 20_000 and any(t in hl for t in _CF_TITLES):
return True
return "cf-browser-verification" in hl or "cf_chl_opt" in html
async def _playwright_fetch(domain: str) -> Optional[str]:
"""Fetch via headless Chromium, bypassing Cloudflare JS challenges."""
try:
from playwright.async_api import async_playwright # type: ignore
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-setuid-sandbox"],
)
ctx = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
locale="es-ES",
)
page = await ctx.new_page()
await page.goto(f"https://{domain}", timeout=25_000)
await asyncio.sleep(3) # let the CF challenge JS execute & redirect
html = await page.content()
await browser.close()
return html
except Exception:
return None
logger = logging.getLogger(__name__)
# ── EU countries (hosting check) ─────────────────────────────────────────────
EU_COUNTRIES = {
'AT','BE','BG','HR','CY','CZ','DK','EE','FI','FR','DE','GR',
'HU','IE','IT','LV','LT','LU','MT','NL','PL','PT','RO','SK',
'SI','ES','SE',
'NO','IS','LI', # EEA
'CH','GB','AD', # adequacy / adjacent
}
# ── Content quality ───────────────────────────────────────────────────────────
LOREM_PHRASES = [
"lorem ipsum", "sed ut perspiciatis", "nunc sem sapien",
"nulla id nibh", "aenean dignissim", "aliquam tincidunt",
"vestibulum commodo", "fusce nunc lacus", "consectetuer",
"cras ornare tristique", "ntulla nec ante", "risus id metus",
"praesent placerat", "fusce pellentesque", "suscipit nibh",
"integer vitae libero", "felis quis tortor", "dolor sit amet",
]
PLACEHOLDER_PHRASES = [
"under construction", "coming soon", "sample page",
"this is a demo", "hello world", "test content",
"default post", "uncategorized", "demo content",
]
# ── Cookie / GDPR consent tools ───────────────────────────────────────────────
COOKIE_TOOLS = {
"cookiebot": ["cookiebot.com", "CookieConsent", "CybotCookiebot"],
"onetrust": ["onetrust", "otBannerSdk"],
"cookiepro": ["cookiepro.com"],
"osano": ["osano.com"],
"iubenda": ["iubenda.com"],
"borlabs": ["borlabs-cookie"],
"complianz": ["complianz"],
"cookieyes": ["cookieyes.com", "cookie-law-info"],
"usercentrics": ["usercentrics.com"],
"quantcast": ["quantcast.com/cmp"],
}
COOKIE_TEXT_SIGNALS = [
"accept cookies", "acepta las cookies", "we use cookies", "usamos cookies",
"cookie policy", "política de cookies", "cookie settings", "manage cookies",
"aceptar todas", "rechazar cookies",
]
PRIVACY_SIGNALS = [
"privacy policy", "política de privacidad", "aviso legal",
"privacy notice", "data protection",
]
GDPR_TEXT_SIGNALS = [
"rgpd", "gdpr", "reglamento general de protección",
"lopd", "protección de datos", "responsable del tratamiento",
]
# ── Analytics / webmaster ─────────────────────────────────────────────────────
ANALYTICS = {
"google_analytics": ["gtag('config'", "google-analytics.com/analytics.js", "G-"],
"google_tag_manager": ["googletagmanager.com/gtm.js", "GTM-"],
"facebook_pixel": ["fbq('init'", "connect.facebook.net/en_US/fbevents"],
"hotjar": ["static.hotjar.com"],
"clarity": ["clarity.ms/tag"],
}
WEBMASTER = {
"google_search_console": ["google-site-verification"],
"bing_webmaster": ["msvalidate.01"],
"yandex": ["yandex-verification"],
}
# ── Kit Digital — require SPECIFIC signals, not generic EU logos ───────────────
# These patterns are unambiguously Kit Digital programme markers
KIT_STRONG_IMG = ["kit-digital", "kitdigital", "kit_digital", "agente-digitalizador", "agente_digitalizador"]
KIT_STRONG_TEXT = ["kit digital", "agente digitalizador", "agentes digitalizadores"]
KIT_STRONG_LINK = ["acelerapyme.es", "red.es/kit-digital", "kit-digital.red.es"]
# ── Google My Business / Business Profile ────────────────────────────────────
GMB_URL_SIGNALS = [
"maps.googleapis.com/maps/api", # embedded Google Map widget
"google.com/maps/place", # link to GMB Place page
"maps.google.com",
"g.page/",
"maps.app.goo.gl",
"goo.gl/maps",
"business.google.com",
]
GMB_SCHEMA_SIGNALS = [
'"@type":"LocalBusiness"',
'"@type": "LocalBusiness"',
"schema.org/LocalBusiness",
]
EMAIL_RE = re.compile(r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}")
PHONE_RE = re.compile(r"(?:\+34[\s\-]?)?(?:6|7|8|9)\d{2}[\s\-]?\d{3}[\s\-]?\d{3}")
SOCIAL_DOM = ["facebook.com", "instagram.com", "linkedin.com",
"twitter.com", "x.com", "tiktok.com", "youtube.com"]
async def _get_hosting_info(domain: str) -> dict:
"""Resolve IP, then look up ASN / org / country via ip-api.com."""
info = {"ip": None, "asn": None, "org": None, "isp": None,
"ip_country": None, "ip_region": None, "eu_hosted": None}
try:
loop = asyncio.get_event_loop()
ip = await asyncio.wait_for(
loop.run_in_executor(None, socket.gethostbyname, domain), timeout=6
)
info["ip"] = ip
async with httpx.AsyncClient(timeout=6) as client:
r = await client.get(
f"http://ip-api.com/json/{ip}",
params={"fields": "status,country,countryCode,regionName,org,as,isp"},
)
if r.status_code == 200:
d = r.json()
if d.get("status") == "success":
info.update({
"asn": d.get("as"),
"org": d.get("org"),
"isp": d.get("isp"),
"ip_country": d.get("countryCode"),
"ip_region": d.get("regionName"),
"eu_hosted": d.get("countryCode") in EU_COUNTRIES,
})
except Exception as e:
logger.debug("Hosting lookup failed for %s: %s", domain, e)
return info
async def _analyze_site_inner(domain: str) -> dict:
result = {
"domain": domain,
"reachable": False, "load_time_ms": None, "status_code": None,
"final_url": None, "page_size_kb": None, "server": None, "cms": None,
# Hosting
"ip": None, "asn": None, "org": None, "isp": None,
"ip_country": None, "ip_region": None, "eu_hosted": None,
# SSL
"ssl_valid": False, "ssl_expiry_days": None,
# Content quality
"has_lorem_ipsum": False, "lorem_matches": [],
"has_placeholder": False, "placeholder_matches": [],
"word_count": 0, "image_count": 0, "script_count": 0,
"has_mobile_viewport": False,
"page_title": None, "meta_description": None, "h1_text": None,
"visible_text_snippet": "",
# SEO
"has_sitemap": False, "has_robots": False, "robots_disallows_google": False,
"analytics_present": [], "webmaster_verified": [],
"canonical_url": None, "og_title": None,
# GDPR / cookies
"cookie_tool": None, "has_cookie_notice": False,
"has_privacy_policy": False, "has_gdpr_text": False,
# Accessibility
"html_lang": None, "images_missing_alt": 0,
"has_skip_nav": False, "empty_links": 0,
"inputs_without_labels": 0,
# Kit Digital
"kit_digital": False, "kit_digital_signals": [],
# Google My Business
"has_gmb": False, "gmb_url": None,
# Contacts
"emails": [], "phones": [], "whatsapp": [], "social_links": [],
# Age / freshness
"copyright_year": None, "last_modified": None,
"error": None,
}
# ── Fetch + hosting (parallel) ────────────────────────────────────────────
async def _fetch():
t0 = time.monotonic()
try:
async with httpx.AsyncClient(
timeout=15, follow_redirects=True, verify=False,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"},
) as client:
resp = await client.get(f"https://{domain}")
if resp.status_code >= 400:
resp = await client.get(f"http://{domain}")
# Cloudflare JS challenge — retry with headless browser
if resp.status_code == 200 and _is_cf_challenge(resp.text):
html_pw = await _playwright_fetch(domain)
if html_pw and not _is_cf_challenge(html_pw):
return ("playwright", html_pw), int((time.monotonic() - t0) * 1000)
return resp, int((time.monotonic() - t0) * 1000)
except Exception as e:
return None, int((time.monotonic() - t0) * 1000)
(resp, load_ms), hosting = await asyncio.gather(_fetch(), _get_hosting_info(domain))
result.update(hosting)
result["load_time_ms"] = load_ms
if resp is None:
result["error"] = "Failed to fetch site"
else:
# Handle playwright fallback tuple ("playwright", html_string)
if isinstance(resp, tuple) and resp[0] == "playwright":
html = resp[1]
result.update({
"reachable": True,
"status_code": 200,
"final_url": f"https://{domain}/",
"page_size_kb": round(len(html.encode()) / 1024, 1),
"server": "cloudflare",
})
else:
html = resp.text
result.update({
"reachable": resp.status_code < 400,
"status_code": resp.status_code,
"final_url": str(resp.url),
"page_size_kb": round(len(resp.content) / 1024, 1),
"server": resp.headers.get("server"),
})
soup = BeautifulSoup(html, "html.parser")
hl = html.lower()
# ── Basic metadata ────────────────────────────────────────────────────
result["html_lang"] = (soup.find("html") or {}).get("lang")
t = soup.find("title")
result["page_title"] = t.get_text(strip=True)[:200] if t else None
md = soup.find("meta", attrs={"name": "description"})
result["meta_description"] = (md.get("content") or "")[:300] if md else None
h1 = soup.find("h1")
result["h1_text"] = h1.get_text(strip=True)[:200] if h1 else None
result["has_mobile_viewport"] = bool(soup.find("meta", attrs={"name": "viewport"}))
c = soup.find("link", rel="canonical")
result["canonical_url"] = c.get("href") if c else None
og = soup.find("meta", property="og:title")
result["og_title"] = og.get("content") if og else None
# ── Visible text ──────────────────────────────────────────────────────
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
visible = soup.get_text(separator=" ", strip=True)
vl = visible.lower()
words = visible.split()
result["word_count"] = len(words)
result["visible_text_snippet"] = " ".join(words[:600])
# ── Content quality ───────────────────────────────────────────────────
lorem_hits = [p for p in LOREM_PHRASES if p in vl]
result["has_lorem_ipsum"] = len(lorem_hits) > 0
result["lorem_matches"] = lorem_hits[:6]
ph_hits = [p for p in PLACEHOLDER_PHRASES if p in vl]
result["has_placeholder"] = len(ph_hits) > 0
result["placeholder_matches"] = ph_hits[:3]
imgs = soup.find_all("img")
result["image_count"] = len(imgs)
result["script_count"] = len(soup.find_all("script", src=True))
# ── Analytics / webmaster ─────────────────────────────────────────────
for name, sigs in ANALYTICS.items():
if any(s.lower() in hl for s in sigs):
result["analytics_present"].append(name)
for name, sigs in WEBMASTER.items():
if any(s.lower() in hl for s in sigs):
result["webmaster_verified"].append(name)
# ── GDPR / cookies ────────────────────────────────────────────────────
for tool, sigs in COOKIE_TOOLS.items():
if any(s.lower() in hl for s in sigs):
result["cookie_tool"] = tool
result["has_cookie_notice"] = True
break
if not result["has_cookie_notice"]:
result["has_cookie_notice"] = any(s in vl for s in COOKIE_TEXT_SIGNALS)
result["has_privacy_policy"] = any(s in vl for s in PRIVACY_SIGNALS) or bool(
soup.find("a", href=lambda h: h and ("privacidad" in h.lower() or "privacy" in h.lower()))
)
result["has_gdpr_text"] = any(s in vl for s in GDPR_TEXT_SIGNALS)
# ── Accessibility ─────────────────────────────────────────────────────
result["images_missing_alt"] = sum(
1 for img in imgs if not img.get("alt") and img.get("alt") != ""
)
result["has_skip_nav"] = bool(
soup.find("a", href=lambda h: h and h.lower() in ("#main", "#content", "#maincontent", "#skip"))
)
result["empty_links"] = sum(
1 for a in soup.find_all("a") if not a.get_text(strip=True) and not a.find("img")
)
all_inputs = soup.find_all("input", type=lambda t: t not in ("hidden", "submit", "button", None) or t is None)
labeled_ids = {lbl.get("for") for lbl in soup.find_all("label") if lbl.get("for")}
result["inputs_without_labels"] = sum(
1 for inp in all_inputs
if inp.get("id") not in labeled_ids and not inp.get("aria-label") and not inp.get("aria-labelledby")
)
# ── Kit Digital (specific signals only — generic EU logos excluded) ──────
kd_signals = []
for img in soup.find_all("img"):
comb = ((img.get("src") or "") + (img.get("alt") or "") + (img.get("srcset") or "")).lower()
for p in KIT_STRONG_IMG:
if p in comb:
kd_signals.append(f"img:{p}")
break
for p in KIT_STRONG_TEXT:
if p in vl:
kd_signals.append(f"text:{p}")
for a in soup.find_all("a", href=True):
href = a["href"].lower()
for p in KIT_STRONG_LINK:
if p in href:
kd_signals.append(f"link:{href[:60]}")
break
kd_signals = list(dict.fromkeys(kd_signals))[:10]
result["kit_digital"] = len(kd_signals) > 0
result["kit_digital_signals"] = kd_signals
# ── Google My Business ────────────────────────────────────────────────
for a in soup.find_all("a", href=True):
href_g = a["href"]
for sig in GMB_URL_SIGNALS:
if sig in href_g:
result["has_gmb"] = True
result["gmb_url"] = href_g[:120]
break
if result["has_gmb"]:
break
if not result["has_gmb"]:
result["has_gmb"] = any(sig.lower() in hl for sig in GMB_SCHEMA_SIGNALS)
# ── Contacts ──────────────────────────────────────────────────────────
# Pattern for WhatsApp links that appear inside onclick/data-* attrs
_WA_ATTR_RE = re.compile(
r'(https?://(?:wa\.me|api\.whatsapp\.com/send|web\.whatsapp\.com/send'
r'|wa\.link)[^\s\'"\\>]{0,80})',
re.I,
)
def _add_whatsapp(raw: str):
m = _WA_ATTR_RE.search(raw)
url = m.group(1) if m else raw[:80]
url = url.rstrip("'\"\\)")
if url and url not in result["whatsapp"]:
result["whatsapp"].append(url)
for tag in soup.find_all("a", href=True):
href = tag["href"]
if href.startswith("mailto:"):
em = href[7:].split("?")[0].strip().lower()
if em and em not in result["emails"]:
result["emails"].append(em)
elif href.startswith("tel:"):
ph = re.sub(r"[^\d+]", "", href[4:])
if ph and ph not in result["phones"]:
result["phones"].append(ph)
elif any(x in href for x in ("wa.me", "api.whatsapp", "wa.link", "web.whatsapp")):
_add_whatsapp(href)
else:
for sd in SOCIAL_DOM:
if sd in href.lower():
clean = href.split("?")[0].rstrip("/")
if clean not in result["social_links"]:
result["social_links"].append(clean)
break
# Broader scan: WhatsApp / tel links hidden in onclick, data-href, data-url, etc.
for tag in soup.find_all(True):
for attr in ("onclick", "data-href", "data-url", "data-link", "data-action"):
val = tag.get(attr) or ""
if not val:
continue
# WhatsApp in attribute value
if any(x in val for x in ("wa.me", "api.whatsapp", "wa.link", "web.whatsapp")):
_add_whatsapp(val)
# tel: in attribute value
m_tel = re.search(r"tel:([\d\s\+\-\(\)]{6,20})", val)
if m_tel:
ph = re.sub(r"[^\d+]", "", m_tel.group(1))
if ph and ph not in result["phones"]:
result["phones"].append(ph)
# Social media links in attribute value
for sd in SOCIAL_DOM:
if sd in val.lower():
url_m = re.search(r"https?://[^\s'\"\\)]{10,120}", val)
if url_m:
clean = url_m.group(0).split("?")[0].rstrip("/")
if clean not in result["social_links"]:
result["social_links"].append(clean)
break
for em in EMAIL_RE.findall(html[:80000]):
em = em.lower()
if em not in result["emails"] and not any(em.endswith(x) for x in [".png",".jpg",".css",".js",".svg"]):
result["emails"].append(em)
for ph in PHONE_RE.findall(visible):
ph_c = re.sub(r"[\s\-]", "", ph)
if ph_c not in result["phones"]:
result["phones"].append(ph_c)
for k in ["emails", "phones", "whatsapp", "social_links"]:
result[k] = list(dict.fromkeys(result[k]))[:8]
# ── CMS ───────────────────────────────────────────────────────────────
CMS_SIGS = {
"wordpress": ["/wp-content/", "/wp-includes/", 'content="WordPress'],
"joomla": ["/components/com_", "Joomla!", 'content="Joomla'],
"drupal": ["/sites/default/files/", "Drupal.settings"],
"wix": ["static.wixstatic.com", "X-Wix-"],
"squarespace": ["squarespace.com", "X-Squarespace-"],
"shopify": ["cdn.shopify.com", "Shopify.theme"],
"prestashop": ["PrestaShop", "/modules/prestashop"],
"magento": ["Mage.Cookies", "X-Magento-"],
"typo3": ["typo3temp", "TYPO3 CMS"],
"opencart": ["route=common/home", "OpenCart"],
}
combined_check = html[:60000] + " ".join(f"{k}:{v}" for k, v in resp.headers.items())
for cms, sigs in CMS_SIGS.items():
if any(s.lower() in combined_check.lower() for s in sigs):
result["cms"] = cms
break
# ── Last-Modified / copyright year ────────────────────────────────────
lm = (resp.headers.get("last-modified") or
(soup.find("meta", attrs={"name": "last-modified"}) or {}).get("content") or
(soup.find("meta", property="article:modified_time") or {}).get("content"))
if lm:
result["last_modified"] = str(lm)[:30]
footer_el = (soup.find("footer") or
soup.find(id=re.compile(r"footer", re.I)) or
soup.find(class_=re.compile(r"footer", re.I)))
search_text = footer_el.get_text() if footer_el else visible[-600:]
cp = re.search(r"(?:©|&copy;|copyright)\s*[\d\-]*\s*(20\d{2})", search_text, re.I)
if not cp:
cp = re.search(r"(20\d{2})\s*[-]\s*20\d{2}|(?:©|copyright)\D{0,10}(20\d{2})", search_text, re.I)
if cp:
result["copyright_year"] = cp.group(1) or cp.group(2)
# ── Sitemap & robots (parallel) ───────────────────────────────────────────
async def _get(url):
try:
async with httpx.AsyncClient(timeout=6, follow_redirects=True, verify=False) as c:
r = await c.get(url)
return r.text if r.status_code == 200 else None
except Exception:
return None
async def _get_contact_page():
for path in ("/contacto", "/contact", "/contactanos", "/sobre-nosotros"):
txt = await _get(f"https://{domain}{path}")
if txt:
return txt
return None
sitemap_txt, robots_txt, contact_html = await asyncio.gather(
_get(f"https://{domain}/sitemap.xml"),
_get(f"https://{domain}/robots.txt"),
_get_contact_page(),
)
result["has_sitemap"] = sitemap_txt is not None
result["has_robots"] = robots_txt is not None
if robots_txt:
rl = robots_txt.lower()
result["robots_disallows_google"] = "disallow: /" in rl and "googlebot" in rl
# Merge contacts from /contacto page
if contact_html:
try:
csoup = BeautifulSoup(contact_html, "html.parser")
for a in csoup.find_all("a", href=True):
href = a["href"]
if href.startswith("mailto:"):
em = href[7:].split("?")[0].strip().lower()
if em and em not in result["emails"]:
result["emails"].append(em)
elif href.startswith("tel:"):
ph = re.sub(r"[^\d+]", "", href[4:])
if ph and ph not in result["phones"]:
result["phones"].append(ph)
elif "wa.me" in href or "api.whatsapp.com" in href:
if href not in result["whatsapp"]:
result["whatsapp"].append(href[:80])
ctext = csoup.get_text()
for em in EMAIL_RE.findall(contact_html[:60000]):
em = em.lower()
if em not in result["emails"] and not any(em.endswith(x) for x in [".png",".jpg",".css",".js"]):
result["emails"].append(em)
for ph in PHONE_RE.findall(ctext):
ph_c = re.sub(r"[\s\-]", "", ph)
if ph_c not in result["phones"]:
result["phones"].append(ph_c)
for k in ["emails", "phones", "whatsapp"]:
result[k] = list(dict.fromkeys(result[k]))[:5]
except Exception:
pass
# ── SSL ───────────────────────────────────────────────────────────────────
import ssl as _ssl
try:
def _ssl_check():
import datetime as _dt
ctx = _ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=5) as s:
s.settimeout(5) # SSL handshake timeout (wrap_socket has no timeout arg)
with ctx.wrap_socket(s, server_hostname=domain) as ss:
cert = ss.getpeercert()
exp = _dt.datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z")
return True, (_dt.datetime.utcnow() - exp).days * -1
loop = asyncio.get_event_loop()
result["ssl_valid"], result["ssl_expiry_days"] = await asyncio.wait_for(
loop.run_in_executor(None, _ssl_check), timeout=12
)
except Exception:
pass
return result
async def analyze_site(domain: str) -> dict:
"""Public entry point — hard 90s timeout so workers never hang permanently."""
try:
return await asyncio.wait_for(_analyze_site_inner(domain), timeout=90)
except asyncio.TimeoutError:
logger.warning("analyze_site timed out for %s", domain)
return {"domain": domain, "reachable": False, "error": "analyze_site timeout",
"emails": [], "phones": [], "whatsapp": [], "social_links": [],
"kit_digital": False, "kit_digital_signals": []}