"""Deep site analysis: content quality, SEO, hosting, GDPR, accessibility.""" import asyncio import re import time import logging import socket from typing import Optional import httpx from bs4 import BeautifulSoup logger = logging.getLogger(__name__) # ── EU countries (hosting check) ───────────────────────────────────────────── EU_COUNTRIES = { 'AT','BE','BG','HR','CY','CZ','DK','EE','FI','FR','DE','GR', 'HU','IE','IT','LV','LT','LU','MT','NL','PL','PT','RO','SK', 'SI','ES','SE', 'NO','IS','LI', # EEA 'CH','GB','AD', # adequacy / adjacent } # ── Content quality ─────────────────────────────────────────────────────────── LOREM_PHRASES = [ "lorem ipsum", "sed ut perspiciatis", "nunc sem sapien", "nulla id nibh", "aenean dignissim", "aliquam tincidunt", "vestibulum commodo", "fusce nunc lacus", "consectetuer", "cras ornare tristique", "ntulla nec ante", "risus id metus", "praesent placerat", "fusce pellentesque", "suscipit nibh", "integer vitae libero", "felis quis tortor", "dolor sit amet", ] PLACEHOLDER_PHRASES = [ "under construction", "coming soon", "sample page", "this is a demo", "hello world", "test content", "default post", "uncategorized", "demo content", ] # ── Cookie / GDPR consent tools ─────────────────────────────────────────────── COOKIE_TOOLS = { "cookiebot": ["cookiebot.com", "CookieConsent", "CybotCookiebot"], "onetrust": ["onetrust", "otBannerSdk"], "cookiepro": ["cookiepro.com"], "osano": ["osano.com"], "iubenda": ["iubenda.com"], "borlabs": ["borlabs-cookie"], "complianz": ["complianz"], "cookieyes": ["cookieyes.com", "cookie-law-info"], "usercentrics": ["usercentrics.com"], "quantcast": ["quantcast.com/cmp"], } COOKIE_TEXT_SIGNALS = [ "accept cookies", "acepta las cookies", "we use cookies", "usamos cookies", "cookie policy", "política de cookies", "cookie settings", "manage cookies", "aceptar todas", "rechazar cookies", ] PRIVACY_SIGNALS = [ "privacy policy", "política de privacidad", "aviso legal", "privacy notice", "data protection", ] GDPR_TEXT_SIGNALS = [ "rgpd", "gdpr", "reglamento general de protección", "lopd", "protección de datos", "responsable del tratamiento", ] # ── Analytics / webmaster ───────────────────────────────────────────────────── ANALYTICS = { "google_analytics": ["gtag('config'", "google-analytics.com/analytics.js", "G-"], "google_tag_manager": ["googletagmanager.com/gtm.js", "GTM-"], "facebook_pixel": ["fbq('init'", "connect.facebook.net/en_US/fbevents"], "hotjar": ["static.hotjar.com"], "clarity": ["clarity.ms/tag"], } WEBMASTER = { "google_search_console": ["google-site-verification"], "bing_webmaster": ["msvalidate.01"], "yandex": ["yandex-verification"], } # ── Kit Digital — require SPECIFIC signals, not generic EU logos ─────────────── # These patterns are unambiguously Kit Digital programme markers KIT_STRONG_IMG = ["kit-digital", "kitdigital", "kit_digital", "agente-digitalizador", "agente_digitalizador"] KIT_STRONG_TEXT = ["kit digital", "agente digitalizador", "agentes digitalizadores"] KIT_STRONG_LINK = ["acelerapyme.es", "red.es/kit-digital", "kit-digital.red.es"] # ── Google My Business / Business Profile ──────────────────────────────────── GMB_URL_SIGNALS = [ "maps.googleapis.com/maps/api", # embedded Google Map widget "google.com/maps/place", # link to GMB Place page "maps.google.com", "g.page/", "maps.app.goo.gl", "goo.gl/maps", "business.google.com", ] GMB_SCHEMA_SIGNALS = [ '"@type":"LocalBusiness"', '"@type": "LocalBusiness"', "schema.org/LocalBusiness", ] EMAIL_RE = re.compile(r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}") PHONE_RE = re.compile(r"(?:\+34[\s\-]?)?(?:6|7|8|9)\d{2}[\s\-]?\d{3}[\s\-]?\d{3}") SOCIAL_DOM = ["facebook.com", "instagram.com", "linkedin.com", "twitter.com", "x.com", "tiktok.com", "youtube.com"] async def _get_hosting_info(domain: str) -> dict: """Resolve IP, then look up ASN / org / country via ip-api.com.""" info = {"ip": None, "asn": None, "org": None, "isp": None, "ip_country": None, "ip_region": None, "eu_hosted": None} try: loop = asyncio.get_event_loop() ip = await asyncio.wait_for( loop.run_in_executor(None, socket.gethostbyname, domain), timeout=6 ) info["ip"] = ip async with httpx.AsyncClient(timeout=6) as client: r = await client.get( f"http://ip-api.com/json/{ip}", params={"fields": "status,country,countryCode,regionName,org,as,isp"}, ) if r.status_code == 200: d = r.json() if d.get("status") == "success": info.update({ "asn": d.get("as"), "org": d.get("org"), "isp": d.get("isp"), "ip_country": d.get("countryCode"), "ip_region": d.get("regionName"), "eu_hosted": d.get("countryCode") in EU_COUNTRIES, }) except Exception as e: logger.debug("Hosting lookup failed for %s: %s", domain, e) return info async def _analyze_site_inner(domain: str) -> dict: result = { "domain": domain, "reachable": False, "load_time_ms": None, "status_code": None, "final_url": None, "page_size_kb": None, "server": None, "cms": None, # Hosting "ip": None, "asn": None, "org": None, "isp": None, "ip_country": None, "ip_region": None, "eu_hosted": None, # SSL "ssl_valid": False, "ssl_expiry_days": None, # Content quality "has_lorem_ipsum": False, "lorem_matches": [], "has_placeholder": False, "placeholder_matches": [], "word_count": 0, "image_count": 0, "script_count": 0, "has_mobile_viewport": False, "page_title": None, "meta_description": None, "h1_text": None, "visible_text_snippet": "", # SEO "has_sitemap": False, "has_robots": False, "robots_disallows_google": False, "analytics_present": [], "webmaster_verified": [], "canonical_url": None, "og_title": None, # GDPR / cookies "cookie_tool": None, "has_cookie_notice": False, "has_privacy_policy": False, "has_gdpr_text": False, # Accessibility "html_lang": None, "images_missing_alt": 0, "has_skip_nav": False, "empty_links": 0, "inputs_without_labels": 0, # Kit Digital "kit_digital": False, "kit_digital_signals": [], # Google My Business "has_gmb": False, "gmb_url": None, # Contacts "emails": [], "phones": [], "whatsapp": [], "social_links": [], # Age / freshness "copyright_year": None, "last_modified": None, "error": None, } # ── Fetch + hosting (parallel) ──────────────────────────────────────────── async def _fetch(): t0 = time.monotonic() try: async with httpx.AsyncClient( timeout=15, follow_redirects=True, verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"}, ) as client: resp = await client.get(f"https://{domain}") if resp.status_code >= 400: resp = await client.get(f"http://{domain}") return resp, int((time.monotonic() - t0) * 1000) except Exception as e: return None, int((time.monotonic() - t0) * 1000) (resp, load_ms), hosting = await asyncio.gather(_fetch(), _get_hosting_info(domain)) result.update(hosting) result["load_time_ms"] = load_ms if resp is None: result["error"] = "Failed to fetch site" else: html = resp.text result.update({ "reachable": resp.status_code < 400, "status_code": resp.status_code, "final_url": str(resp.url), "page_size_kb": round(len(resp.content) / 1024, 1), "server": resp.headers.get("server"), }) soup = BeautifulSoup(html, "html.parser") hl = html.lower() # ── Basic metadata ──────────────────────────────────────────────────── result["html_lang"] = (soup.find("html") or {}).get("lang") t = soup.find("title") result["page_title"] = t.get_text(strip=True)[:200] if t else None md = soup.find("meta", attrs={"name": "description"}) result["meta_description"] = (md.get("content") or "")[:300] if md else None h1 = soup.find("h1") result["h1_text"] = h1.get_text(strip=True)[:200] if h1 else None result["has_mobile_viewport"] = bool(soup.find("meta", attrs={"name": "viewport"})) c = soup.find("link", rel="canonical") result["canonical_url"] = c.get("href") if c else None og = soup.find("meta", property="og:title") result["og_title"] = og.get("content") if og else None # ── Visible text ────────────────────────────────────────────────────── for tag in soup(["script", "style", "noscript"]): tag.decompose() visible = soup.get_text(separator=" ", strip=True) vl = visible.lower() words = visible.split() result["word_count"] = len(words) result["visible_text_snippet"] = " ".join(words[:600]) # ── Content quality ─────────────────────────────────────────────────── lorem_hits = [p for p in LOREM_PHRASES if p in vl] result["has_lorem_ipsum"] = len(lorem_hits) > 0 result["lorem_matches"] = lorem_hits[:6] ph_hits = [p for p in PLACEHOLDER_PHRASES if p in vl] result["has_placeholder"] = len(ph_hits) > 0 result["placeholder_matches"] = ph_hits[:3] imgs = soup.find_all("img") result["image_count"] = len(imgs) result["script_count"] = len(soup.find_all("script", src=True)) # ── Analytics / webmaster ───────────────────────────────────────────── for name, sigs in ANALYTICS.items(): if any(s.lower() in hl for s in sigs): result["analytics_present"].append(name) for name, sigs in WEBMASTER.items(): if any(s.lower() in hl for s in sigs): result["webmaster_verified"].append(name) # ── GDPR / cookies ──────────────────────────────────────────────────── for tool, sigs in COOKIE_TOOLS.items(): if any(s.lower() in hl for s in sigs): result["cookie_tool"] = tool result["has_cookie_notice"] = True break if not result["has_cookie_notice"]: result["has_cookie_notice"] = any(s in vl for s in COOKIE_TEXT_SIGNALS) result["has_privacy_policy"] = any(s in vl for s in PRIVACY_SIGNALS) or bool( soup.find("a", href=lambda h: h and ("privacidad" in h.lower() or "privacy" in h.lower())) ) result["has_gdpr_text"] = any(s in vl for s in GDPR_TEXT_SIGNALS) # ── Accessibility ───────────────────────────────────────────────────── result["images_missing_alt"] = sum( 1 for img in imgs if not img.get("alt") and img.get("alt") != "" ) result["has_skip_nav"] = bool( soup.find("a", href=lambda h: h and h.lower() in ("#main", "#content", "#maincontent", "#skip")) ) result["empty_links"] = sum( 1 for a in soup.find_all("a") if not a.get_text(strip=True) and not a.find("img") ) all_inputs = soup.find_all("input", type=lambda t: t not in ("hidden", "submit", "button", None) or t is None) labeled_ids = {lbl.get("for") for lbl in soup.find_all("label") if lbl.get("for")} result["inputs_without_labels"] = sum( 1 for inp in all_inputs if inp.get("id") not in labeled_ids and not inp.get("aria-label") and not inp.get("aria-labelledby") ) # ── Kit Digital (specific signals only — generic EU logos excluded) ────── kd_signals = [] for img in soup.find_all("img"): comb = ((img.get("src") or "") + (img.get("alt") or "") + (img.get("srcset") or "")).lower() for p in KIT_STRONG_IMG: if p in comb: kd_signals.append(f"img:{p}") break for p in KIT_STRONG_TEXT: if p in vl: kd_signals.append(f"text:{p}") for a in soup.find_all("a", href=True): href = a["href"].lower() for p in KIT_STRONG_LINK: if p in href: kd_signals.append(f"link:{href[:60]}") break kd_signals = list(dict.fromkeys(kd_signals))[:10] result["kit_digital"] = len(kd_signals) > 0 result["kit_digital_signals"] = kd_signals # ── Google My Business ──────────────────────────────────────────────── for a in soup.find_all("a", href=True): href_g = a["href"] for sig in GMB_URL_SIGNALS: if sig in href_g: result["has_gmb"] = True result["gmb_url"] = href_g[:120] break if result["has_gmb"]: break if not result["has_gmb"]: result["has_gmb"] = any(sig.lower() in hl for sig in GMB_SCHEMA_SIGNALS) # ── Contacts ────────────────────────────────────────────────────────── for a in soup.find_all("a", href=True): href = a["href"] if href.startswith("mailto:"): em = href[7:].split("?")[0].strip().lower() if em and em not in result["emails"]: result["emails"].append(em) elif href.startswith("tel:"): ph = re.sub(r"[^\d+]", "", href[4:]) if ph and ph not in result["phones"]: result["phones"].append(ph) elif "wa.me" in href or "api.whatsapp.com" in href: if href not in result["whatsapp"]: result["whatsapp"].append(href[:80]) else: for sd in SOCIAL_DOM: if sd in href.lower(): clean = href.split("?")[0].rstrip("/") if clean not in result["social_links"]: result["social_links"].append(clean) break for em in EMAIL_RE.findall(html[:80000]): em = em.lower() if em not in result["emails"] and not any(em.endswith(x) for x in [".png",".jpg",".css",".js",".svg"]): result["emails"].append(em) for ph in PHONE_RE.findall(visible): ph_c = re.sub(r"[\s\-]", "", ph) if ph_c not in result["phones"]: result["phones"].append(ph_c) for k in ["emails", "phones", "whatsapp", "social_links"]: result[k] = list(dict.fromkeys(result[k]))[:5] # ── CMS ─────────────────────────────────────────────────────────────── CMS_SIGS = { "wordpress": ["/wp-content/", "/wp-includes/", 'content="WordPress'], "joomla": ["/components/com_", "Joomla!", 'content="Joomla'], "drupal": ["/sites/default/files/", "Drupal.settings"], "wix": ["static.wixstatic.com", "X-Wix-"], "squarespace": ["squarespace.com", "X-Squarespace-"], "shopify": ["cdn.shopify.com", "Shopify.theme"], "prestashop": ["PrestaShop", "/modules/prestashop"], "magento": ["Mage.Cookies", "X-Magento-"], "typo3": ["typo3temp", "TYPO3 CMS"], "opencart": ["route=common/home", "OpenCart"], } combined_check = html[:60000] + " ".join(f"{k}:{v}" for k, v in resp.headers.items()) for cms, sigs in CMS_SIGS.items(): if any(s.lower() in combined_check.lower() for s in sigs): result["cms"] = cms break # ── Last-Modified / copyright year ──────────────────────────────────── lm = (resp.headers.get("last-modified") or (soup.find("meta", attrs={"name": "last-modified"}) or {}).get("content") or (soup.find("meta", property="article:modified_time") or {}).get("content")) if lm: result["last_modified"] = str(lm)[:30] footer_el = (soup.find("footer") or soup.find(id=re.compile(r"footer", re.I)) or soup.find(class_=re.compile(r"footer", re.I))) search_text = footer_el.get_text() if footer_el else visible[-600:] cp = re.search(r"(?:©|©|copyright)\s*[\d\-–]*\s*(20\d{2})", search_text, re.I) if not cp: cp = re.search(r"(20\d{2})\s*[-–]\s*20\d{2}|(?:©|copyright)\D{0,10}(20\d{2})", search_text, re.I) if cp: result["copyright_year"] = cp.group(1) or cp.group(2) # ── Sitemap & robots (parallel) ─────────────────────────────────────────── async def _get(url): try: async with httpx.AsyncClient(timeout=6, follow_redirects=True, verify=False) as c: r = await c.get(url) return r.text if r.status_code == 200 else None except Exception: return None async def _get_contact_page(): for path in ("/contacto", "/contact", "/contactanos", "/sobre-nosotros"): txt = await _get(f"https://{domain}{path}") if txt: return txt return None sitemap_txt, robots_txt, contact_html = await asyncio.gather( _get(f"https://{domain}/sitemap.xml"), _get(f"https://{domain}/robots.txt"), _get_contact_page(), ) result["has_sitemap"] = sitemap_txt is not None result["has_robots"] = robots_txt is not None if robots_txt: rl = robots_txt.lower() result["robots_disallows_google"] = "disallow: /" in rl and "googlebot" in rl # Merge contacts from /contacto page if contact_html: try: csoup = BeautifulSoup(contact_html, "html.parser") for a in csoup.find_all("a", href=True): href = a["href"] if href.startswith("mailto:"): em = href[7:].split("?")[0].strip().lower() if em and em not in result["emails"]: result["emails"].append(em) elif href.startswith("tel:"): ph = re.sub(r"[^\d+]", "", href[4:]) if ph and ph not in result["phones"]: result["phones"].append(ph) elif "wa.me" in href or "api.whatsapp.com" in href: if href not in result["whatsapp"]: result["whatsapp"].append(href[:80]) ctext = csoup.get_text() for em in EMAIL_RE.findall(contact_html[:60000]): em = em.lower() if em not in result["emails"] and not any(em.endswith(x) for x in [".png",".jpg",".css",".js"]): result["emails"].append(em) for ph in PHONE_RE.findall(ctext): ph_c = re.sub(r"[\s\-]", "", ph) if ph_c not in result["phones"]: result["phones"].append(ph_c) for k in ["emails", "phones", "whatsapp"]: result[k] = list(dict.fromkeys(result[k]))[:5] except Exception: pass # ── SSL ─────────────────────────────────────────────────────────────────── import ssl as _ssl try: def _ssl_check(): import datetime as _dt ctx = _ssl.create_default_context() with socket.create_connection((domain, 443), timeout=5) as s: s.settimeout(5) # SSL handshake timeout (wrap_socket has no timeout arg) with ctx.wrap_socket(s, server_hostname=domain) as ss: cert = ss.getpeercert() exp = _dt.datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z") return True, (_dt.datetime.utcnow() - exp).days * -1 loop = asyncio.get_event_loop() result["ssl_valid"], result["ssl_expiry_days"] = await asyncio.wait_for( loop.run_in_executor(None, _ssl_check), timeout=12 ) except Exception: pass return result async def analyze_site(domain: str) -> dict: """Public entry point — hard 90s timeout so workers never hang permanently.""" try: return await asyncio.wait_for(_analyze_site_inner(domain), timeout=90) except asyncio.TimeoutError: logger.warning("analyze_site timed out for %s", domain) return {"domain": domain, "reachable": False, "error": "analyze_site timeout", "emails": [], "phones": [], "whatsapp": [], "social_links": [], "kit_digital": False, "kit_digital_signals": []}