Files
DomGod/app/enricher.py

452 lines
17 KiB
Python
Raw Normal View History

import asyncio
import json
import os
import re
import ssl
import socket
import datetime
import logging
from typing import Optional
import httpx
import dns.resolver
import aiosqlite
from bs4 import BeautifulSoup
feat: deep site analysis engine + fix AI assess for any domain site_analyzer.py (new): - Fresh scrape with timing, page size, server, CMS detection - Lorem ipsum detection (16 phrases incl. user's example) - Placeholder content detection (hello world, sample page, etc.) - Analytics: GA4, GTM, Facebook Pixel, Hotjar, Clarity - Webmaster: Google Search Console, Bing, Yandex verification tags - sitemap.xml and robots.txt check + Googlebot block detection - Mobile viewport check, word count, image/script count - Full contact extraction: emails, phones, WhatsApp, social links - Kit Digital signal detection AI worker fix: - No longer requires pre-enrichment — works on ANY selected domain - Does fresh site_analyzer scrape then calls Gemini with full context - Stores site_analysis JSON alongside AI assessment - Upserts into enriched_domains even if domain was never enriched Gemini prompt now includes: - Complete technical snapshot (load time, size, server, SSL) - Full SEO signals (sitemap, robots, analytics, webmaster verified) - Content quality (lorem ipsum matches, placeholder matches) - Kit Digital signals - All extracted contacts - 500-word page text sample - Outputs: summary, site_quality_score/10, content_issues[], urgency_signals[], performance_notes, seo_status, best_contact_channel+value, all_contacts, ES pitch, services_needed, outreach_notes UI: rich AI modal with summary banner, quality grid, content issues, urgency signals, full contact list, technical snapshot Fixes: correct Replicate token, ai_queue status='running' bug Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 17:46:01 +02:00
from app.db import SQLITE_PATH, queue_ai, save_ai_assessment
from app.scorer import score
logger = logging.getLogger(__name__)
CONCURRENCY_LIMIT = int(os.getenv("CONCURRENCY_LIMIT", "50"))
IP_API_RATE = 45 # req/min free tier
_worker_task: Optional[asyncio.Task] = None
_ai_worker_task: Optional[asyncio.Task] = None
_paused = False
_ip_sem: Optional[asyncio.Semaphore] = None
_ip_last_call = 0.0
def _get_ip_sem():
global _ip_sem
if _ip_sem is None:
_ip_sem = asyncio.Semaphore(1)
return _ip_sem
# ── CMS detection ────────────────────────────────────────────────────────────
CMS_SIGNATURES = {
"wordpress": ["/wp-content/", "/wp-includes/", 'content="WordPress'],
"joomla": ["/components/com_", "Joomla!", 'content="Joomla'],
"drupal": ["/sites/default/files/", "Drupal.settings", 'content="Drupal'],
"wix": ["static.wixstatic.com", "X-Wix-"],
"squarespace": ["squarespace.com", "X-Squarespace-"],
"shopify": ["cdn.shopify.com", "Shopify.theme"],
"prestashop": ["PrestaShop", "/modules/prestashop"],
"magento": ["Mage.Cookies", "X-Magento-"],
"typo3": ["typo3temp", "TYPO3 CMS"],
"opencart": ["route=common/home", "OpenCart"],
}
def detect_cms(html: str, headers: dict) -> Optional[str]:
combined = html[:60000] + " ".join(f"{k}:{v}" for k, v in headers.items())
cl = combined.lower()
for cms, sigs in CMS_SIGNATURES.items():
if any(s.lower() in cl for s in sigs):
return cms
return None
# ── Kit Digital detection ────────────────────────────────────────────────────
KIT_IMG_PATS = [
"digitalizadores", "kit-digital", "kitdigital", "kit_digital",
"fondos-europeos", "fondos_europeos", "nextgeneration", "next-generation",
"prtr", "plan-recuperacion", "planderecuperacion",
"acelerapyme", "logo-ue", "recovery-eu", "cofinanciado",
]
KIT_TEXT_PATS = [
"kit digital", "agente digitalizador", "agentes digitalizadores",
"fondos europeos", "next generation eu", "nextgenerationeu",
"plan de recuperación", "plan de recuperacion",
"plan de digitalización", "digitalización pymes",
"prtr", "financiado por la unión europea",
"red.es/kit-digital", "acelerapyme.es",
]
KIT_LINK_PATS = ["acelerapyme", "red.es", "kit-digital", "kitdigital"]
def detect_kit_digital(soup, html: str) -> tuple[bool, list]:
signals = []
hl = html.lower()
for img in soup.find_all("img"):
combined = ((img.get("src") or "") + (img.get("alt") or "") + (img.get("srcset") or "")).lower()
for p in KIT_IMG_PATS:
if p in combined:
signals.append(f"img:{p}")
break
for p in KIT_TEXT_PATS:
if p in hl:
signals.append(f"text:{p}")
for a in soup.find_all("a", href=True):
href = a["href"].lower()
if any(p in href for p in KIT_LINK_PATS):
signals.append(f"link:{href[:60]}")
signals = list(dict.fromkeys(signals))[:15]
return len(signals) > 0, signals
# ── Contact extraction ────────────────────────────────────────────────────────
EMAIL_RE = re.compile(r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}")
PHONE_RE = re.compile(r"(?:\+34[\s\-]?)?(?:6|7|8|9)\d{2}[\s\-]?\d{3}[\s\-]?\d{3}")
SOCIAL_DOMAINS = ["facebook.com", "instagram.com", "linkedin.com", "twitter.com", "x.com", "tiktok.com"]
def extract_contacts(soup, html: str) -> dict:
contacts: dict = {"emails": [], "phones": [], "whatsapp": [], "social": []}
# mailto links
for a in soup.find_all("a", href=True):
href = a["href"]
if href.startswith("mailto:"):
em = href[7:].split("?")[0].strip()
if em and em not in contacts["emails"]:
contacts["emails"].append(em)
elif href.startswith("tel:"):
ph = re.sub(r"[^\d+]", "", href[4:])
if ph and ph not in contacts["phones"]:
contacts["phones"].append(ph)
elif "wa.me" in href or "api.whatsapp.com" in href:
if href not in contacts["whatsapp"]:
contacts["whatsapp"].append(href[:80])
else:
for sd in SOCIAL_DOMAINS:
if sd in href.lower():
clean = href.split("?")[0].rstrip("/")
if clean not in contacts["social"]:
contacts["social"].append(clean)
break
# Email regex in raw HTML (catches obfuscated ones)
for em in EMAIL_RE.findall(html[:100000]):
em = em.lower()
if em not in contacts["emails"] and not em.endswith((".png", ".jpg", ".css", ".js")):
contacts["emails"].append(em)
# Phone numbers in visible text
for ph in PHONE_RE.findall(soup.get_text()):
ph_clean = re.sub(r"[\s\-]", "", ph)
if ph_clean not in contacts["phones"]:
contacts["phones"].append(ph_clean)
# Dedupe + cap
for k in contacts:
contacts[k] = list(dict.fromkeys(contacts[k]))[:5]
return contacts
# ── SSL / MX / IP ─────────────────────────────────────────────────────────────
async def check_ssl(domain: str) -> tuple[bool, Optional[int]]:
try:
ctx = ssl.create_default_context()
loop = asyncio.get_event_loop()
def _check():
with socket.create_connection((domain, 443), timeout=5) as sock:
with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
expiry = datetime.datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z")
return True, (expiry - datetime.datetime.utcnow()).days
return await loop.run_in_executor(None, _check)
except Exception:
return False, None
async def check_mx(domain: str) -> bool:
try:
loop = asyncio.get_event_loop()
def _check():
try:
return len(dns.resolver.resolve(domain, "MX", lifetime=5)) > 0
except Exception:
return False
return await loop.run_in_executor(None, _check)
except Exception:
return False
async def get_ip_country(ip: str) -> Optional[str]:
global _ip_last_call
async with _get_ip_sem():
now = asyncio.get_event_loop().time()
wait = (60 / IP_API_RATE) - (now - _ip_last_call)
if wait > 0:
await asyncio.sleep(wait)
_ip_last_call = asyncio.get_event_loop().time()
try:
async with httpx.AsyncClient(timeout=5) as client:
r = await client.get(f"http://ip-api.com/json/{ip}?fields=countryCode")
if r.status_code == 200:
return r.json().get("countryCode")
except Exception:
pass
return None
# ── Main enrichment ───────────────────────────────────────────────────────────
async def enrich_domain(domain: str) -> dict:
result = {
"domain": domain,
"is_live": False, "status_code": None,
"ssl_valid": False, "ssl_expiry_days": None,
"cms": None, "has_mx": False,
"ip_country": None, "page_title": None,
"server": None,
"kit_digital": False, "kit_digital_signals": "[]",
"contact_info": "{}",
"enriched_at": datetime.datetime.utcnow().isoformat(),
"error": None,
}
try:
async with httpx.AsyncClient(
timeout=12, follow_redirects=True, verify=False,
headers={"User-Agent": "Mozilla/5.0 (compatible; DomGod/1.0)"},
) as client:
resp = await client.get(f"http://{domain}")
result["is_live"] = resp.status_code in (200, 301, 302, 303, 307, 308)
result["status_code"] = resp.status_code
result["server"] = resp.headers.get("server")
html = resp.text
soup = BeautifulSoup(html, "html.parser")
title = soup.find("title")
result["page_title"] = title.get_text(strip=True)[:500] if title else None
result["cms"] = detect_cms(html, dict(resp.headers))
kit, signals = detect_kit_digital(soup, html)
result["kit_digital"] = kit
result["kit_digital_signals"] = json.dumps(signals)
result["contact_info"] = json.dumps(extract_contacts(soup, html))
try:
loop = asyncio.get_event_loop()
ip = await loop.run_in_executor(None, socket.gethostbyname, domain)
result["ip_country"] = await get_ip_country(ip)
except Exception:
pass
except Exception as e:
result["error"] = str(e)[:500]
ssl_valid, ssl_days = await check_ssl(domain)
result["ssl_valid"] = ssl_valid
result["ssl_expiry_days"] = ssl_days
result["has_mx"] = await check_mx(domain)
result["score"] = score(result)
return result
async def save_enriched(data: dict):
async with aiosqlite.connect(SQLITE_PATH) as db:
await db.execute(
"""INSERT INTO enriched_domains
(domain, is_live, status_code, ssl_valid, ssl_expiry_days, cms,
has_mx, ip_country, page_title, server, enriched_at, error, score,
kit_digital, kit_digital_signals, contact_info)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(domain) DO UPDATE SET
is_live=excluded.is_live, status_code=excluded.status_code,
ssl_valid=excluded.ssl_valid, ssl_expiry_days=excluded.ssl_expiry_days,
cms=excluded.cms, has_mx=excluded.has_mx, ip_country=excluded.ip_country,
page_title=excluded.page_title, server=excluded.server,
enriched_at=excluded.enriched_at, error=excluded.error, score=excluded.score,
kit_digital=excluded.kit_digital,
kit_digital_signals=excluded.kit_digital_signals,
contact_info=excluded.contact_info""",
(
data["domain"], data["is_live"], data["status_code"],
data["ssl_valid"], data["ssl_expiry_days"], data["cms"],
data["has_mx"], data["ip_country"], data["page_title"],
data["server"], data["enriched_at"], data["error"], data["score"],
int(data["kit_digital"]), data["kit_digital_signals"], data["contact_info"],
),
)
await db.execute(
"""INSERT INTO scores (domain, score) VALUES (?,?)
ON CONFLICT(domain) DO UPDATE SET score=excluded.score, scored_at=datetime('now')""",
(data["domain"], data["score"]),
)
await db.commit()
async def mark_job(domain: str, status: str, error: str = None):
async with aiosqlite.connect(SQLITE_PATH) as db:
if status == "running":
await db.execute(
"UPDATE job_queue SET status=?, started_at=datetime('now') WHERE domain=?",
(status, domain))
elif status in ("done", "failed"):
await db.execute(
"UPDATE job_queue SET status=?, completed_at=datetime('now'), error=? WHERE domain=?",
(status, error, domain))
await db.commit()
# ── Enrichment worker ─────────────────────────────────────────────────────────
async def worker_loop():
sem = asyncio.Semaphore(CONCURRENCY_LIMIT)
async def process(domain: str):
async with sem:
await mark_job(domain, "running")
try:
data = await enrich_domain(domain)
await save_enriched(data)
await mark_job(domain, "done")
except Exception as e:
await mark_job(domain, "failed", str(e)[:500])
while True:
if _paused:
await asyncio.sleep(1)
continue
async with aiosqlite.connect(SQLITE_PATH) as db:
async with db.execute(
"SELECT domain FROM job_queue WHERE status='pending' LIMIT 100"
) as cur:
rows = await cur.fetchall()
if not rows:
await asyncio.sleep(2)
continue
await asyncio.gather(*[asyncio.create_task(process(r[0])) for r in rows], return_exceptions=True)
# ── AI assessment worker ──────────────────────────────────────────────────────
fix: AI worker crash-proof + GDPR/hosting/accessibility analysis AI worker fixes (root cause of "nothing reaches Replicate"): - Worker task died silently — no exception handler around while loop - Added try/except around entire loop body with exc_info logging - Added watchdog task that restarts dead workers every 10 seconds - ensure_workers_alive() called on every /api/ai/assess/batch POST - _assess_one() is now a top-level function (not closure) — avoids subtle scoping bugs with async inner functions in while loops - /api/ai/debug endpoint: shows worker alive status, task exception, last 10 queue entries — browse to /api/ai/debug to diagnose - /api/ai/worker/restart endpoint + UI button - "Restart AI worker" button + "Debug AI queue" link in enrichment tab site_analyzer.py — new signals: - IP resolution + ip-api.com for ASN, org, ISP, host country - EU hosting detection (27 EU + EEA + adequacy countries) - GDPR: detects Cookiebot, OneTrust, CookiePro, Osano, Iubenda, Borlabs, CookieYes, Complianz, Usercentrics + text signals - Privacy policy and GDPR text presence - Accessibility: html lang missing, images without alt count, skip nav link, empty links, inputs without labels Gemini prompt additions: - Hosting section: IP, ASN, org/ISP, EU vs non-EU flag - GDPR section: cookie tool, notice, privacy policy - Accessibility section: all quick-scan results - New output fields: hosting_notes, gdpr_compliance, accessibility_issues[] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 18:01:34 +02:00
async def _assess_one(domain: str) -> None:
"""Process a single AI assessment — safe to call concurrently."""
from app.replicate_ai import assess_domain as gemini_assess
feat: deep site analysis engine + fix AI assess for any domain site_analyzer.py (new): - Fresh scrape with timing, page size, server, CMS detection - Lorem ipsum detection (16 phrases incl. user's example) - Placeholder content detection (hello world, sample page, etc.) - Analytics: GA4, GTM, Facebook Pixel, Hotjar, Clarity - Webmaster: Google Search Console, Bing, Yandex verification tags - sitemap.xml and robots.txt check + Googlebot block detection - Mobile viewport check, word count, image/script count - Full contact extraction: emails, phones, WhatsApp, social links - Kit Digital signal detection AI worker fix: - No longer requires pre-enrichment — works on ANY selected domain - Does fresh site_analyzer scrape then calls Gemini with full context - Stores site_analysis JSON alongside AI assessment - Upserts into enriched_domains even if domain was never enriched Gemini prompt now includes: - Complete technical snapshot (load time, size, server, SSL) - Full SEO signals (sitemap, robots, analytics, webmaster verified) - Content quality (lorem ipsum matches, placeholder matches) - Kit Digital signals - All extracted contacts - 500-word page text sample - Outputs: summary, site_quality_score/10, content_issues[], urgency_signals[], performance_notes, seo_status, best_contact_channel+value, all_contacts, ES pitch, services_needed, outreach_notes UI: rich AI modal with summary banner, quality grid, content issues, urgency signals, full contact list, technical snapshot Fixes: correct Replicate token, ai_queue status='running' bug Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 17:46:01 +02:00
from app.site_analyzer import analyze_site
fix: AI worker crash-proof + GDPR/hosting/accessibility analysis AI worker fixes (root cause of "nothing reaches Replicate"): - Worker task died silently — no exception handler around while loop - Added try/except around entire loop body with exc_info logging - Added watchdog task that restarts dead workers every 10 seconds - ensure_workers_alive() called on every /api/ai/assess/batch POST - _assess_one() is now a top-level function (not closure) — avoids subtle scoping bugs with async inner functions in while loops - /api/ai/debug endpoint: shows worker alive status, task exception, last 10 queue entries — browse to /api/ai/debug to diagnose - /api/ai/worker/restart endpoint + UI button - "Restart AI worker" button + "Debug AI queue" link in enrichment tab site_analyzer.py — new signals: - IP resolution + ip-api.com for ASN, org, ISP, host country - EU hosting detection (27 EU + EEA + adequacy countries) - GDPR: detects Cookiebot, OneTrust, CookiePro, Osano, Iubenda, Borlabs, CookieYes, Complianz, Usercentrics + text signals - Privacy policy and GDPR text presence - Accessibility: html lang missing, images without alt count, skip nav link, empty links, inputs without labels Gemini prompt additions: - Hosting section: IP, ASN, org/ISP, EU vs non-EU flag - GDPR section: cookie tool, notice, privacy policy - Accessibility section: all quick-scan results - New output fields: hosting_notes, gdpr_compliance, accessibility_issues[] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 18:01:34 +02:00
logger.info("AI: starting analysis for %s", domain)
try:
# Hard 3-minute ceiling so stuck jobs never block the worker forever
async with asyncio.timeout(180):
analysis = await analyze_site(domain)
logger.info("AI: site analyzed %s (reachable=%s, words=%s)",
domain, analysis.get("reachable"), analysis.get("word_count"))
assessment = await gemini_assess(analysis)
logger.info("AI: Gemini done %s → quality=%s",
domain, assessment.get("lead_quality"))
await save_ai_assessment(domain, assessment, site_analysis=analysis)
logger.info("AI: saved %s", domain)
fix: AI worker crash-proof + GDPR/hosting/accessibility analysis AI worker fixes (root cause of "nothing reaches Replicate"): - Worker task died silently — no exception handler around while loop - Added try/except around entire loop body with exc_info logging - Added watchdog task that restarts dead workers every 10 seconds - ensure_workers_alive() called on every /api/ai/assess/batch POST - _assess_one() is now a top-level function (not closure) — avoids subtle scoping bugs with async inner functions in while loops - /api/ai/debug endpoint: shows worker alive status, task exception, last 10 queue entries — browse to /api/ai/debug to diagnose - /api/ai/worker/restart endpoint + UI button - "Restart AI worker" button + "Debug AI queue" link in enrichment tab site_analyzer.py — new signals: - IP resolution + ip-api.com for ASN, org, ISP, host country - EU hosting detection (27 EU + EEA + adequacy countries) - GDPR: detects Cookiebot, OneTrust, CookiePro, Osano, Iubenda, Borlabs, CookieYes, Complianz, Usercentrics + text signals - Privacy policy and GDPR text presence - Accessibility: html lang missing, images without alt count, skip nav link, empty links, inputs without labels Gemini prompt additions: - Hosting section: IP, ASN, org/ISP, EU vs non-EU flag - GDPR section: cookie tool, notice, privacy policy - Accessibility section: all quick-scan results - New output fields: hosting_notes, gdpr_compliance, accessibility_issues[] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 18:01:34 +02:00
except Exception as e:
logger.error("AI: failed %s%s", domain, e, exc_info=True)
try:
async with aiosqlite.connect(SQLITE_PATH) as db:
await db.execute(
"UPDATE ai_queue SET status='failed', completed_at=datetime('now'), error=? WHERE domain=?",
(str(e)[:400], domain),
)
await db.commit()
fix: AI worker crash-proof + GDPR/hosting/accessibility analysis AI worker fixes (root cause of "nothing reaches Replicate"): - Worker task died silently — no exception handler around while loop - Added try/except around entire loop body with exc_info logging - Added watchdog task that restarts dead workers every 10 seconds - ensure_workers_alive() called on every /api/ai/assess/batch POST - _assess_one() is now a top-level function (not closure) — avoids subtle scoping bugs with async inner functions in while loops - /api/ai/debug endpoint: shows worker alive status, task exception, last 10 queue entries — browse to /api/ai/debug to diagnose - /api/ai/worker/restart endpoint + UI button - "Restart AI worker" button + "Debug AI queue" link in enrichment tab site_analyzer.py — new signals: - IP resolution + ip-api.com for ASN, org, ISP, host country - EU hosting detection (27 EU + EEA + adequacy countries) - GDPR: detects Cookiebot, OneTrust, CookiePro, Osano, Iubenda, Borlabs, CookieYes, Complianz, Usercentrics + text signals - Privacy policy and GDPR text presence - Accessibility: html lang missing, images without alt count, skip nav link, empty links, inputs without labels Gemini prompt additions: - Hosting section: IP, ASN, org/ISP, EU vs non-EU flag - GDPR section: cookie tool, notice, privacy policy - Accessibility section: all quick-scan results - New output fields: hosting_notes, gdpr_compliance, accessibility_issues[] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 18:01:34 +02:00
except Exception:
pass
fix: AI worker crash-proof + GDPR/hosting/accessibility analysis AI worker fixes (root cause of "nothing reaches Replicate"): - Worker task died silently — no exception handler around while loop - Added try/except around entire loop body with exc_info logging - Added watchdog task that restarts dead workers every 10 seconds - ensure_workers_alive() called on every /api/ai/assess/batch POST - _assess_one() is now a top-level function (not closure) — avoids subtle scoping bugs with async inner functions in while loops - /api/ai/debug endpoint: shows worker alive status, task exception, last 10 queue entries — browse to /api/ai/debug to diagnose - /api/ai/worker/restart endpoint + UI button - "Restart AI worker" button + "Debug AI queue" link in enrichment tab site_analyzer.py — new signals: - IP resolution + ip-api.com for ASN, org, ISP, host country - EU hosting detection (27 EU + EEA + adequacy countries) - GDPR: detects Cookiebot, OneTrust, CookiePro, Osano, Iubenda, Borlabs, CookieYes, Complianz, Usercentrics + text signals - Privacy policy and GDPR text presence - Accessibility: html lang missing, images without alt count, skip nav link, empty links, inputs without labels Gemini prompt additions: - Hosting section: IP, ASN, org/ISP, EU vs non-EU flag - GDPR section: cookie tool, notice, privacy policy - Accessibility section: all quick-scan results - New output fields: hosting_notes, gdpr_compliance, accessibility_issues[] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 18:01:34 +02:00
async def ai_worker_loop():
logger.info("AI worker loop starting")
# Reset any jobs left in 'running' state from a previous crashed worker
try:
async with aiosqlite.connect(SQLITE_PATH) as db:
result = await db.execute(
"UPDATE ai_queue SET status='pending' WHERE status='running'"
)
count = result.rowcount
await db.commit()
if count:
logger.info("AI worker: reset %d stale 'running' jobs to 'pending'", count)
except Exception as e:
logger.error("AI worker: failed to reset stale jobs: %s", e)
fix: AI worker crash-proof + GDPR/hosting/accessibility analysis AI worker fixes (root cause of "nothing reaches Replicate"): - Worker task died silently — no exception handler around while loop - Added try/except around entire loop body with exc_info logging - Added watchdog task that restarts dead workers every 10 seconds - ensure_workers_alive() called on every /api/ai/assess/batch POST - _assess_one() is now a top-level function (not closure) — avoids subtle scoping bugs with async inner functions in while loops - /api/ai/debug endpoint: shows worker alive status, task exception, last 10 queue entries — browse to /api/ai/debug to diagnose - /api/ai/worker/restart endpoint + UI button - "Restart AI worker" button + "Debug AI queue" link in enrichment tab site_analyzer.py — new signals: - IP resolution + ip-api.com for ASN, org, ISP, host country - EU hosting detection (27 EU + EEA + adequacy countries) - GDPR: detects Cookiebot, OneTrust, CookiePro, Osano, Iubenda, Borlabs, CookieYes, Complianz, Usercentrics + text signals - Privacy policy and GDPR text presence - Accessibility: html lang missing, images without alt count, skip nav link, empty links, inputs without labels Gemini prompt additions: - Hosting section: IP, ASN, org/ISP, EU vs non-EU flag - GDPR section: cookie tool, notice, privacy policy - Accessibility section: all quick-scan results - New output fields: hosting_notes, gdpr_compliance, accessibility_issues[] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 18:01:34 +02:00
while True:
rows = []
try:
async with aiosqlite.connect(SQLITE_PATH) as db:
async with db.execute(
"SELECT domain FROM ai_queue WHERE status='pending' LIMIT 5"
) as cur:
rows = await cur.fetchall()
if rows:
await db.executemany(
"UPDATE ai_queue SET status='running' WHERE domain=?",
[(r[0],) for r in rows],
)
await db.commit()
fix: AI worker crash-proof + GDPR/hosting/accessibility analysis AI worker fixes (root cause of "nothing reaches Replicate"): - Worker task died silently — no exception handler around while loop - Added try/except around entire loop body with exc_info logging - Added watchdog task that restarts dead workers every 10 seconds - ensure_workers_alive() called on every /api/ai/assess/batch POST - _assess_one() is now a top-level function (not closure) — avoids subtle scoping bugs with async inner functions in while loops - /api/ai/debug endpoint: shows worker alive status, task exception, last 10 queue entries — browse to /api/ai/debug to diagnose - /api/ai/worker/restart endpoint + UI button - "Restart AI worker" button + "Debug AI queue" link in enrichment tab site_analyzer.py — new signals: - IP resolution + ip-api.com for ASN, org, ISP, host country - EU hosting detection (27 EU + EEA + adequacy countries) - GDPR: detects Cookiebot, OneTrust, CookiePro, Osano, Iubenda, Borlabs, CookieYes, Complianz, Usercentrics + text signals - Privacy policy and GDPR text presence - Accessibility: html lang missing, images without alt count, skip nav link, empty links, inputs without labels Gemini prompt additions: - Hosting section: IP, ASN, org/ISP, EU vs non-EU flag - GDPR section: cookie tool, notice, privacy policy - Accessibility section: all quick-scan results - New output fields: hosting_notes, gdpr_compliance, accessibility_issues[] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 18:01:34 +02:00
logger.info("AI worker: picked up %d jobs: %s",
len(rows), [r[0] for r in rows])
except Exception as e:
logger.error("AI worker DB error: %s", e, exc_info=True)
await asyncio.sleep(5)
continue
if not rows:
await asyncio.sleep(3)
continue
fix: AI worker crash-proof + GDPR/hosting/accessibility analysis AI worker fixes (root cause of "nothing reaches Replicate"): - Worker task died silently — no exception handler around while loop - Added try/except around entire loop body with exc_info logging - Added watchdog task that restarts dead workers every 10 seconds - ensure_workers_alive() called on every /api/ai/assess/batch POST - _assess_one() is now a top-level function (not closure) — avoids subtle scoping bugs with async inner functions in while loops - /api/ai/debug endpoint: shows worker alive status, task exception, last 10 queue entries — browse to /api/ai/debug to diagnose - /api/ai/worker/restart endpoint + UI button - "Restart AI worker" button + "Debug AI queue" link in enrichment tab site_analyzer.py — new signals: - IP resolution + ip-api.com for ASN, org, ISP, host country - EU hosting detection (27 EU + EEA + adequacy countries) - GDPR: detects Cookiebot, OneTrust, CookiePro, Osano, Iubenda, Borlabs, CookieYes, Complianz, Usercentrics + text signals - Privacy policy and GDPR text presence - Accessibility: html lang missing, images without alt count, skip nav link, empty links, inputs without labels Gemini prompt additions: - Hosting section: IP, ASN, org/ISP, EU vs non-EU flag - GDPR section: cookie tool, notice, privacy policy - Accessibility section: all quick-scan results - New output fields: hosting_notes, gdpr_compliance, accessibility_issues[] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 18:01:34 +02:00
# Run assessments concurrently (semaphore in replicate_ai enforces AI_CONCURRENCY)
results = await asyncio.gather(
*[_assess_one(r[0]) for r in rows],
return_exceptions=True,
)
for r, exc in zip(rows, results):
if isinstance(exc, Exception):
logger.error("AI task exception for %s: %s", r[0], exc, exc_info=exc)
def start_worker():
global _worker_task, _ai_worker_task
if _worker_task is None or _worker_task.done():
_worker_task = asyncio.create_task(worker_loop())
fix: AI worker crash-proof + GDPR/hosting/accessibility analysis AI worker fixes (root cause of "nothing reaches Replicate"): - Worker task died silently — no exception handler around while loop - Added try/except around entire loop body with exc_info logging - Added watchdog task that restarts dead workers every 10 seconds - ensure_workers_alive() called on every /api/ai/assess/batch POST - _assess_one() is now a top-level function (not closure) — avoids subtle scoping bugs with async inner functions in while loops - /api/ai/debug endpoint: shows worker alive status, task exception, last 10 queue entries — browse to /api/ai/debug to diagnose - /api/ai/worker/restart endpoint + UI button - "Restart AI worker" button + "Debug AI queue" link in enrichment tab site_analyzer.py — new signals: - IP resolution + ip-api.com for ASN, org, ISP, host country - EU hosting detection (27 EU + EEA + adequacy countries) - GDPR: detects Cookiebot, OneTrust, CookiePro, Osano, Iubenda, Borlabs, CookieYes, Complianz, Usercentrics + text signals - Privacy policy and GDPR text presence - Accessibility: html lang missing, images without alt count, skip nav link, empty links, inputs without labels Gemini prompt additions: - Hosting section: IP, ASN, org/ISP, EU vs non-EU flag - GDPR section: cookie tool, notice, privacy policy - Accessibility section: all quick-scan results - New output fields: hosting_notes, gdpr_compliance, accessibility_issues[] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 18:01:34 +02:00
logger.info("Enrichment worker started")
if _ai_worker_task is None or _ai_worker_task.done():
fix: AI worker crash-proof + GDPR/hosting/accessibility analysis AI worker fixes (root cause of "nothing reaches Replicate"): - Worker task died silently — no exception handler around while loop - Added try/except around entire loop body with exc_info logging - Added watchdog task that restarts dead workers every 10 seconds - ensure_workers_alive() called on every /api/ai/assess/batch POST - _assess_one() is now a top-level function (not closure) — avoids subtle scoping bugs with async inner functions in while loops - /api/ai/debug endpoint: shows worker alive status, task exception, last 10 queue entries — browse to /api/ai/debug to diagnose - /api/ai/worker/restart endpoint + UI button - "Restart AI worker" button + "Debug AI queue" link in enrichment tab site_analyzer.py — new signals: - IP resolution + ip-api.com for ASN, org, ISP, host country - EU hosting detection (27 EU + EEA + adequacy countries) - GDPR: detects Cookiebot, OneTrust, CookiePro, Osano, Iubenda, Borlabs, CookieYes, Complianz, Usercentrics + text signals - Privacy policy and GDPR text presence - Accessibility: html lang missing, images without alt count, skip nav link, empty links, inputs without labels Gemini prompt additions: - Hosting section: IP, ASN, org/ISP, EU vs non-EU flag - GDPR section: cookie tool, notice, privacy policy - Accessibility section: all quick-scan results - New output fields: hosting_notes, gdpr_compliance, accessibility_issues[] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 18:01:34 +02:00
if _ai_worker_task is not None and _ai_worker_task.done():
exc = _ai_worker_task.exception() if not _ai_worker_task.cancelled() else None
if exc:
logger.error("AI worker died with: %s", exc, exc_info=exc)
_ai_worker_task = asyncio.create_task(ai_worker_loop())
fix: AI worker crash-proof + GDPR/hosting/accessibility analysis AI worker fixes (root cause of "nothing reaches Replicate"): - Worker task died silently — no exception handler around while loop - Added try/except around entire loop body with exc_info logging - Added watchdog task that restarts dead workers every 10 seconds - ensure_workers_alive() called on every /api/ai/assess/batch POST - _assess_one() is now a top-level function (not closure) — avoids subtle scoping bugs with async inner functions in while loops - /api/ai/debug endpoint: shows worker alive status, task exception, last 10 queue entries — browse to /api/ai/debug to diagnose - /api/ai/worker/restart endpoint + UI button - "Restart AI worker" button + "Debug AI queue" link in enrichment tab site_analyzer.py — new signals: - IP resolution + ip-api.com for ASN, org, ISP, host country - EU hosting detection (27 EU + EEA + adequacy countries) - GDPR: detects Cookiebot, OneTrust, CookiePro, Osano, Iubenda, Borlabs, CookieYes, Complianz, Usercentrics + text signals - Privacy policy and GDPR text presence - Accessibility: html lang missing, images without alt count, skip nav link, empty links, inputs without labels Gemini prompt additions: - Hosting section: IP, ASN, org/ISP, EU vs non-EU flag - GDPR section: cookie tool, notice, privacy policy - Accessibility section: all quick-scan results - New output fields: hosting_notes, gdpr_compliance, accessibility_issues[] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 18:01:34 +02:00
logger.info("AI worker started/restarted")
def ensure_workers_alive():
"""Restart workers if they've died — call periodically."""
start_worker()
def pause_worker():
global _paused
_paused = True
def resume_worker():
global _paused
_paused = False
start_worker()
def is_running() -> bool:
return _worker_task is not None and not _worker_task.done() and not _paused