"""Deep site analysis: content quality, SEO, hosting, GDPR, accessibility.""" import asyncio import re import time import logging import socket from typing import Optional import httpx from bs4 import BeautifulSoup # ── Cloudflare challenge detection ─────────────────────────────────────────── _CF_TITLES = {"un momento", "checking your browser", "just a moment", "please wait", "verifying you are human", "espere mientras"} def _is_cf_challenge(html: str) -> bool: """Return True if the page looks like a Cloudflare JS challenge.""" hl = html.lower() if len(html) < 20_000 and any(t in hl for t in _CF_TITLES): return True return "cf-browser-verification" in hl or "cf_chl_opt" in html async def _playwright_fetch(domain: str) -> Optional[str]: """Fetch via headless Chromium, bypassing Cloudflare JS challenges.""" try: from playwright.async_api import async_playwright # type: ignore async with async_playwright() as p: browser = await p.chromium.launch( headless=True, args=["--no-sandbox", "--disable-setuid-sandbox"], ) ctx = await browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", locale="es-ES", ) page = await ctx.new_page() await page.goto(f"https://{domain}", timeout=25_000) await asyncio.sleep(3) # let the CF challenge JS execute & redirect html = await page.content() await browser.close() return html except Exception: return None logger = logging.getLogger(__name__) # ── EU countries (hosting check) ───────────────────────────────────────────── EU_COUNTRIES = { 'AT','BE','BG','HR','CY','CZ','DK','EE','FI','FR','DE','GR', 'HU','IE','IT','LV','LT','LU','MT','NL','PL','PT','RO','SK', 'SI','ES','SE', 'NO','IS','LI', # EEA 'CH','GB','AD', # adequacy / adjacent } # ── Content quality ─────────────────────────────────────────────────────────── LOREM_PHRASES = [ "lorem ipsum", "sed ut perspiciatis", "nunc sem sapien", "nulla id nibh", "aenean dignissim", "aliquam tincidunt", "vestibulum commodo", "fusce nunc lacus", "consectetuer", "cras ornare tristique", "ntulla nec ante", "risus id metus", "praesent placerat", "fusce pellentesque", "suscipit nibh", "integer vitae libero", "felis quis tortor", "dolor sit amet", ] PLACEHOLDER_PHRASES = [ "under construction", "coming soon", "sample page", "this is a demo", "hello world", "test content", "default post", "uncategorized", "demo content", ] # ── Cookie / GDPR consent tools ─────────────────────────────────────────────── COOKIE_TOOLS = { "cookiebot": ["cookiebot.com", "CookieConsent", "CybotCookiebot"], "onetrust": ["onetrust", "otBannerSdk"], "cookiepro": ["cookiepro.com"], "osano": ["osano.com"], "iubenda": ["iubenda.com"], "borlabs": ["borlabs-cookie"], "complianz": ["complianz"], "cookieyes": ["cookieyes.com", "cookie-law-info"], "usercentrics": ["usercentrics.com"], "quantcast": ["quantcast.com/cmp"], } COOKIE_TEXT_SIGNALS = [ "accept cookies", "acepta las cookies", "we use cookies", "usamos cookies", "cookie policy", "política de cookies", "cookie settings", "manage cookies", "aceptar todas", "rechazar cookies", ] PRIVACY_SIGNALS = [ "privacy policy", "política de privacidad", "aviso legal", "privacy notice", "data protection", ] GDPR_TEXT_SIGNALS = [ "rgpd", "gdpr", "reglamento general de protección", "lopd", "protección de datos", "responsable del tratamiento", ] # ── Analytics / webmaster ───────────────────────────────────────────────────── ANALYTICS = { "google_analytics": ["gtag('config'", "google-analytics.com/analytics.js", "G-"], "google_tag_manager": ["googletagmanager.com/gtm.js", "GTM-"], "facebook_pixel": ["fbq('init'", "connect.facebook.net/en_US/fbevents"], "hotjar": ["static.hotjar.com"], "clarity": ["clarity.ms/tag"], } WEBMASTER = { "google_search_console": ["google-site-verification"], "bing_webmaster": ["msvalidate.01"], "yandex": ["yandex-verification"], } # ── Kit Digital — require SPECIFIC signals, not generic EU logos ─────────────── # These patterns are unambiguously Kit Digital programme markers KIT_STRONG_IMG = ["kit-digital", "kitdigital", "kit_digital", "agente-digitalizador", "agente_digitalizador"] KIT_STRONG_TEXT = ["kit digital", "agente digitalizador", "agentes digitalizadores"] KIT_STRONG_LINK = ["acelerapyme.es", "red.es/kit-digital", "kit-digital.red.es"] # ── Google My Business / Business Profile ──────────────────────────────────── GMB_URL_SIGNALS = [ "maps.googleapis.com/maps/api", # embedded Google Map widget "google.com/maps/place", # link to GMB Place page "maps.google.com", "g.page/", "maps.app.goo.gl", "goo.gl/maps", "business.google.com", ] GMB_SCHEMA_SIGNALS = [ '"@type":"LocalBusiness"', '"@type": "LocalBusiness"', "schema.org/LocalBusiness", ] EMAIL_RE = re.compile(r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}") PHONE_RE = re.compile(r"(?:\+34[\s\-]?)?(?:6|7|8|9)\d{2}[\s\-]?\d{3}[\s\-]?\d{3}") SOCIAL_DOM = ["facebook.com", "instagram.com", "linkedin.com", "twitter.com", "x.com", "tiktok.com", "youtube.com"] async def _get_hosting_info(domain: str) -> dict: """Resolve IP, then look up ASN / org / country via ip-api.com.""" info = {"ip": None, "asn": None, "org": None, "isp": None, "ip_country": None, "ip_region": None, "eu_hosted": None} try: loop = asyncio.get_event_loop() ip = await asyncio.wait_for( loop.run_in_executor(None, socket.gethostbyname, domain), timeout=6 ) info["ip"] = ip async with httpx.AsyncClient(timeout=6) as client: r = await client.get( f"http://ip-api.com/json/{ip}", params={"fields": "status,country,countryCode,regionName,org,as,isp"}, ) if r.status_code == 200: d = r.json() if d.get("status") == "success": info.update({ "asn": d.get("as"), "org": d.get("org"), "isp": d.get("isp"), "ip_country": d.get("countryCode"), "ip_region": d.get("regionName"), "eu_hosted": d.get("countryCode") in EU_COUNTRIES, }) except Exception as e: logger.debug("Hosting lookup failed for %s: %s", domain, e) return info async def _analyze_site_inner(domain: str) -> dict: result = { "domain": domain, "reachable": False, "load_time_ms": None, "status_code": None, "final_url": None, "page_size_kb": None, "server": None, "cms": None, # Hosting "ip": None, "asn": None, "org": None, "isp": None, "ip_country": None, "ip_region": None, "eu_hosted": None, # SSL "ssl_valid": False, "ssl_expiry_days": None, # Content quality "has_lorem_ipsum": False, "lorem_matches": [], "has_placeholder": False, "placeholder_matches": [], "word_count": 0, "image_count": 0, "script_count": 0, "has_mobile_viewport": False, "page_title": None, "meta_description": None, "h1_text": None, "visible_text_snippet": "", # SEO "has_sitemap": False, "has_robots": False, "robots_disallows_google": False, "analytics_present": [], "webmaster_verified": [], "canonical_url": None, "og_title": None, # GDPR / cookies "cookie_tool": None, "has_cookie_notice": False, "has_privacy_policy": False, "has_gdpr_text": False, # Accessibility "html_lang": None, "images_missing_alt": 0, "has_skip_nav": False, "empty_links": 0, "inputs_without_labels": 0, # Kit Digital "kit_digital": False, "kit_digital_signals": [], # Google My Business "has_gmb": False, "gmb_url": None, # Contacts "emails": [], "phones": [], "whatsapp": [], "social_links": [], # Age / freshness "copyright_year": None, "last_modified": None, "error": None, } # ── Fetch + hosting (parallel) ──────────────────────────────────────────── async def _fetch(): t0 = time.monotonic() try: async with httpx.AsyncClient( timeout=15, follow_redirects=True, verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"}, ) as client: resp = await client.get(f"https://{domain}") if resp.status_code >= 400: resp = await client.get(f"http://{domain}") # Cloudflare JS challenge — retry with headless browser if resp.status_code == 200 and _is_cf_challenge(resp.text): html_pw = await _playwright_fetch(domain) if html_pw and not _is_cf_challenge(html_pw): return ("playwright", html_pw), int((time.monotonic() - t0) * 1000) return resp, int((time.monotonic() - t0) * 1000) except Exception as e: return None, int((time.monotonic() - t0) * 1000) (resp, load_ms), hosting = await asyncio.gather(_fetch(), _get_hosting_info(domain)) result.update(hosting) result["load_time_ms"] = load_ms if resp is None: result["error"] = "Failed to fetch site" else: # Handle playwright fallback tuple ("playwright", html_string) if isinstance(resp, tuple) and resp[0] == "playwright": html = resp[1] result.update({ "reachable": True, "status_code": 200, "final_url": f"https://{domain}/", "page_size_kb": round(len(html.encode()) / 1024, 1), "server": "cloudflare", }) else: html = resp.text result.update({ "reachable": resp.status_code < 400, "status_code": resp.status_code, "final_url": str(resp.url), "page_size_kb": round(len(resp.content) / 1024, 1), "server": resp.headers.get("server"), }) soup = BeautifulSoup(html, "html.parser") hl = html.lower() # ── Basic metadata ──────────────────────────────────────────────────── result["html_lang"] = (soup.find("html") or {}).get("lang") t = soup.find("title") result["page_title"] = t.get_text(strip=True)[:200] if t else None md = soup.find("meta", attrs={"name": "description"}) result["meta_description"] = (md.get("content") or "")[:300] if md else None h1 = soup.find("h1") result["h1_text"] = h1.get_text(strip=True)[:200] if h1 else None result["has_mobile_viewport"] = bool(soup.find("meta", attrs={"name": "viewport"})) c = soup.find("link", rel="canonical") result["canonical_url"] = c.get("href") if c else None og = soup.find("meta", property="og:title") result["og_title"] = og.get("content") if og else None # ── Visible text ────────────────────────────────────────────────────── for tag in soup(["script", "style", "noscript"]): tag.decompose() visible = soup.get_text(separator=" ", strip=True) vl = visible.lower() words = visible.split() result["word_count"] = len(words) result["visible_text_snippet"] = " ".join(words[:600]) # ── Content quality ─────────────────────────────────────────────────── lorem_hits = [p for p in LOREM_PHRASES if p in vl] result["has_lorem_ipsum"] = len(lorem_hits) > 0 result["lorem_matches"] = lorem_hits[:6] ph_hits = [p for p in PLACEHOLDER_PHRASES if p in vl] result["has_placeholder"] = len(ph_hits) > 0 result["placeholder_matches"] = ph_hits[:3] imgs = soup.find_all("img") result["image_count"] = len(imgs) result["script_count"] = len(soup.find_all("script", src=True)) # ── Analytics / webmaster ───────────────────────────────────────────── for name, sigs in ANALYTICS.items(): if any(s.lower() in hl for s in sigs): result["analytics_present"].append(name) for name, sigs in WEBMASTER.items(): if any(s.lower() in hl for s in sigs): result["webmaster_verified"].append(name) # ── GDPR / cookies ──────────────────────────────────────────────────── for tool, sigs in COOKIE_TOOLS.items(): if any(s.lower() in hl for s in sigs): result["cookie_tool"] = tool result["has_cookie_notice"] = True break if not result["has_cookie_notice"]: result["has_cookie_notice"] = any(s in vl for s in COOKIE_TEXT_SIGNALS) result["has_privacy_policy"] = any(s in vl for s in PRIVACY_SIGNALS) or bool( soup.find("a", href=lambda h: h and ("privacidad" in h.lower() or "privacy" in h.lower())) ) result["has_gdpr_text"] = any(s in vl for s in GDPR_TEXT_SIGNALS) # ── Accessibility ───────────────────────────────────────────────────── result["images_missing_alt"] = sum( 1 for img in imgs if not img.get("alt") and img.get("alt") != "" ) result["has_skip_nav"] = bool( soup.find("a", href=lambda h: h and h.lower() in ("#main", "#content", "#maincontent", "#skip")) ) result["empty_links"] = sum( 1 for a in soup.find_all("a") if not a.get_text(strip=True) and not a.find("img") ) all_inputs = soup.find_all("input", type=lambda t: t not in ("hidden", "submit", "button", None) or t is None) labeled_ids = {lbl.get("for") for lbl in soup.find_all("label") if lbl.get("for")} result["inputs_without_labels"] = sum( 1 for inp in all_inputs if inp.get("id") not in labeled_ids and not inp.get("aria-label") and not inp.get("aria-labelledby") ) # ── Kit Digital (specific signals only — generic EU logos excluded) ────── kd_signals = [] for img in soup.find_all("img"): comb = ((img.get("src") or "") + (img.get("alt") or "") + (img.get("srcset") or "")).lower() for p in KIT_STRONG_IMG: if p in comb: kd_signals.append(f"img:{p}") break for p in KIT_STRONG_TEXT: if p in vl: kd_signals.append(f"text:{p}") for a in soup.find_all("a", href=True): href = a["href"].lower() for p in KIT_STRONG_LINK: if p in href: kd_signals.append(f"link:{href[:60]}") break kd_signals = list(dict.fromkeys(kd_signals))[:10] result["kit_digital"] = len(kd_signals) > 0 result["kit_digital_signals"] = kd_signals # ── Google My Business ──────────────────────────────────────────────── for a in soup.find_all("a", href=True): href_g = a["href"] for sig in GMB_URL_SIGNALS: if sig in href_g: result["has_gmb"] = True result["gmb_url"] = href_g[:120] break if result["has_gmb"]: break if not result["has_gmb"]: result["has_gmb"] = any(sig.lower() in hl for sig in GMB_SCHEMA_SIGNALS) # ── Contacts ────────────────────────────────────────────────────────── # Pattern for WhatsApp links that appear inside onclick/data-* attrs _WA_ATTR_RE = re.compile( r'(https?://(?:wa\.me|api\.whatsapp\.com/send|web\.whatsapp\.com/send' r'|wa\.link)[^\s\'"\\>]{0,80})', re.I, ) def _add_whatsapp(raw: str): m = _WA_ATTR_RE.search(raw) url = m.group(1) if m else raw[:80] url = url.rstrip("'\"\\)") if url and url not in result["whatsapp"]: result["whatsapp"].append(url) for tag in soup.find_all("a", href=True): href = tag["href"] if href.startswith("mailto:"): em = href[7:].split("?")[0].strip().lower() if em and em not in result["emails"]: result["emails"].append(em) elif href.startswith("tel:"): ph = re.sub(r"[^\d+]", "", href[4:]) if ph and ph not in result["phones"]: result["phones"].append(ph) elif any(x in href for x in ("wa.me", "api.whatsapp", "wa.link", "web.whatsapp")): _add_whatsapp(href) else: for sd in SOCIAL_DOM: if sd in href.lower(): clean = href.split("?")[0].rstrip("/") if clean not in result["social_links"]: result["social_links"].append(clean) break # Broader scan: WhatsApp / tel links hidden in onclick, data-href, data-url, etc. for tag in soup.find_all(True): for attr in ("onclick", "data-href", "data-url", "data-link", "data-action"): val = tag.get(attr) or "" if not val: continue # WhatsApp in attribute value if any(x in val for x in ("wa.me", "api.whatsapp", "wa.link", "web.whatsapp")): _add_whatsapp(val) # tel: in attribute value m_tel = re.search(r"tel:([\d\s\+\-\(\)]{6,20})", val) if m_tel: ph = re.sub(r"[^\d+]", "", m_tel.group(1)) if ph and ph not in result["phones"]: result["phones"].append(ph) # Social media links in attribute value for sd in SOCIAL_DOM: if sd in val.lower(): url_m = re.search(r"https?://[^\s'\"\\)]{10,120}", val) if url_m: clean = url_m.group(0).split("?")[0].rstrip("/") if clean not in result["social_links"]: result["social_links"].append(clean) break for em in EMAIL_RE.findall(html[:80000]): em = em.lower() if em not in result["emails"] and not any(em.endswith(x) for x in [".png",".jpg",".css",".js",".svg"]): result["emails"].append(em) for ph in PHONE_RE.findall(visible): ph_c = re.sub(r"[\s\-]", "", ph) if ph_c not in result["phones"]: result["phones"].append(ph_c) for k in ["emails", "phones", "whatsapp", "social_links"]: result[k] = list(dict.fromkeys(result[k]))[:8] # ── CMS ─────────────────────────────────────────────────────────────── CMS_SIGS = { "wordpress": ["/wp-content/", "/wp-includes/", 'content="WordPress'], "joomla": ["/components/com_", "Joomla!", 'content="Joomla'], "drupal": ["/sites/default/files/", "Drupal.settings"], "wix": ["static.wixstatic.com", "X-Wix-"], "squarespace": ["squarespace.com", "X-Squarespace-"], "shopify": ["cdn.shopify.com", "Shopify.theme"], "prestashop": ["PrestaShop", "/modules/prestashop"], "magento": ["Mage.Cookies", "X-Magento-"], "typo3": ["typo3temp", "TYPO3 CMS"], "opencart": ["route=common/home", "OpenCart"], } combined_check = html[:60000] + " ".join(f"{k}:{v}" for k, v in resp.headers.items()) for cms, sigs in CMS_SIGS.items(): if any(s.lower() in combined_check.lower() for s in sigs): result["cms"] = cms break # ── Last-Modified / copyright year ──────────────────────────────────── lm = (resp.headers.get("last-modified") or (soup.find("meta", attrs={"name": "last-modified"}) or {}).get("content") or (soup.find("meta", property="article:modified_time") or {}).get("content")) if lm: result["last_modified"] = str(lm)[:30] footer_el = (soup.find("footer") or soup.find(id=re.compile(r"footer", re.I)) or soup.find(class_=re.compile(r"footer", re.I))) search_text = footer_el.get_text() if footer_el else visible[-600:] cp = re.search(r"(?:©|©|copyright)\s*[\d\-–]*\s*(20\d{2})", search_text, re.I) if not cp: cp = re.search(r"(20\d{2})\s*[-–]\s*20\d{2}|(?:©|copyright)\D{0,10}(20\d{2})", search_text, re.I) if cp: result["copyright_year"] = cp.group(1) or cp.group(2) # ── Sitemap & robots (parallel) ─────────────────────────────────────────── async def _get(url): try: async with httpx.AsyncClient(timeout=6, follow_redirects=True, verify=False) as c: r = await c.get(url) return r.text if r.status_code == 200 else None except Exception: return None async def _get_contact_page(): for path in ("/contacto", "/contact", "/contactanos", "/sobre-nosotros"): txt = await _get(f"https://{domain}{path}") if txt: return txt return None sitemap_txt, robots_txt, contact_html = await asyncio.gather( _get(f"https://{domain}/sitemap.xml"), _get(f"https://{domain}/robots.txt"), _get_contact_page(), ) result["has_sitemap"] = sitemap_txt is not None result["has_robots"] = robots_txt is not None if robots_txt: rl = robots_txt.lower() result["robots_disallows_google"] = "disallow: /" in rl and "googlebot" in rl # Merge contacts from /contacto page if contact_html: try: csoup = BeautifulSoup(contact_html, "html.parser") for a in csoup.find_all("a", href=True): href = a["href"] if href.startswith("mailto:"): em = href[7:].split("?")[0].strip().lower() if em and em not in result["emails"]: result["emails"].append(em) elif href.startswith("tel:"): ph = re.sub(r"[^\d+]", "", href[4:]) if ph and ph not in result["phones"]: result["phones"].append(ph) elif "wa.me" in href or "api.whatsapp.com" in href: if href not in result["whatsapp"]: result["whatsapp"].append(href[:80]) ctext = csoup.get_text() for em in EMAIL_RE.findall(contact_html[:60000]): em = em.lower() if em not in result["emails"] and not any(em.endswith(x) for x in [".png",".jpg",".css",".js"]): result["emails"].append(em) for ph in PHONE_RE.findall(ctext): ph_c = re.sub(r"[\s\-]", "", ph) if ph_c not in result["phones"]: result["phones"].append(ph_c) for k in ["emails", "phones", "whatsapp"]: result[k] = list(dict.fromkeys(result[k]))[:5] except Exception: pass # ── SSL ─────────────────────────────────────────────────────────────────── import ssl as _ssl try: def _ssl_check(): import datetime as _dt ctx = _ssl.create_default_context() with socket.create_connection((domain, 443), timeout=5) as s: s.settimeout(5) # SSL handshake timeout (wrap_socket has no timeout arg) with ctx.wrap_socket(s, server_hostname=domain) as ss: cert = ss.getpeercert() exp = _dt.datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z") return True, (_dt.datetime.utcnow() - exp).days * -1 loop = asyncio.get_event_loop() result["ssl_valid"], result["ssl_expiry_days"] = await asyncio.wait_for( loop.run_in_executor(None, _ssl_check), timeout=12 ) except Exception: pass return result async def analyze_site(domain: str) -> dict: """Public entry point — hard 90s timeout so workers never hang permanently.""" try: return await asyncio.wait_for(_analyze_site_inner(domain), timeout=90) except asyncio.TimeoutError: logger.warning("analyze_site timed out for %s", domain) return {"domain": domain, "reachable": False, "error": "analyze_site timeout", "emails": [], "phones": [], "whatsapp": [], "social_links": [], "kit_digital": False, "kit_digital_signals": []}