import asyncio import json import os import re import ssl import socket import datetime import logging from typing import Optional import httpx import dns.resolver import aiosqlite from bs4 import BeautifulSoup from app.db import SQLITE_PATH, queue_ai, save_ai_assessment from app.scorer import score logger = logging.getLogger(__name__) CONCURRENCY_LIMIT = int(os.getenv("CONCURRENCY_LIMIT", "50")) IP_API_RATE = 45 # req/min free tier _worker_task: Optional[asyncio.Task] = None _ai_worker_task: Optional[asyncio.Task] = None _paused = False _ip_sem: Optional[asyncio.Semaphore] = None _ip_last_call = 0.0 def _get_ip_sem(): global _ip_sem if _ip_sem is None: _ip_sem = asyncio.Semaphore(1) return _ip_sem # ── CMS detection ──────────────────────────────────────────────────────────── CMS_SIGNATURES = { "wordpress": ["/wp-content/", "/wp-includes/", 'content="WordPress'], "joomla": ["/components/com_", "Joomla!", 'content="Joomla'], "drupal": ["/sites/default/files/", "Drupal.settings", 'content="Drupal'], "wix": ["static.wixstatic.com", "X-Wix-"], "squarespace": ["squarespace.com", "X-Squarespace-"], "shopify": ["cdn.shopify.com", "Shopify.theme"], "prestashop": ["PrestaShop", "/modules/prestashop"], "magento": ["Mage.Cookies", "X-Magento-"], "typo3": ["typo3temp", "TYPO3 CMS"], "opencart": ["route=common/home", "OpenCart"], } def detect_cms(html: str, headers: dict) -> Optional[str]: combined = html[:60000] + " ".join(f"{k}:{v}" for k, v in headers.items()) cl = combined.lower() for cms, sigs in CMS_SIGNATURES.items(): if any(s.lower() in cl for s in sigs): return cms return None # ── Kit Digital detection ──────────────────────────────────────────────────── KIT_IMG_PATS = [ "digitalizadores", "kit-digital", "kitdigital", "kit_digital", "fondos-europeos", "fondos_europeos", "nextgeneration", "next-generation", "prtr", "plan-recuperacion", "planderecuperacion", "acelerapyme", "logo-ue", "recovery-eu", "cofinanciado", ] KIT_TEXT_PATS = [ "kit digital", "agente digitalizador", "agentes digitalizadores", "fondos europeos", "next generation eu", "nextgenerationeu", "plan de recuperación", "plan de recuperacion", "plan de digitalización", "digitalización pymes", "prtr", "financiado por la unión europea", "red.es/kit-digital", "acelerapyme.es", ] KIT_LINK_PATS = ["acelerapyme", "red.es", "kit-digital", "kitdigital"] def detect_kit_digital(soup, html: str) -> tuple[bool, list]: signals = [] hl = html.lower() for img in soup.find_all("img"): combined = ((img.get("src") or "") + (img.get("alt") or "") + (img.get("srcset") or "")).lower() for p in KIT_IMG_PATS: if p in combined: signals.append(f"img:{p}") break for p in KIT_TEXT_PATS: if p in hl: signals.append(f"text:{p}") for a in soup.find_all("a", href=True): href = a["href"].lower() if any(p in href for p in KIT_LINK_PATS): signals.append(f"link:{href[:60]}") signals = list(dict.fromkeys(signals))[:15] return len(signals) > 0, signals # ── Contact extraction ──────────────────────────────────────────────────────── EMAIL_RE = re.compile(r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}") PHONE_RE = re.compile(r"(?:\+34[\s\-]?)?(?:6|7|8|9)\d{2}[\s\-]?\d{3}[\s\-]?\d{3}") SOCIAL_DOMAINS = ["facebook.com", "instagram.com", "linkedin.com", "twitter.com", "x.com", "tiktok.com"] def extract_contacts(soup, html: str) -> dict: contacts: dict = {"emails": [], "phones": [], "whatsapp": [], "social": []} # mailto links for a in soup.find_all("a", href=True): href = a["href"] if href.startswith("mailto:"): em = href[7:].split("?")[0].strip() if em and em not in contacts["emails"]: contacts["emails"].append(em) elif href.startswith("tel:"): ph = re.sub(r"[^\d+]", "", href[4:]) if ph and ph not in contacts["phones"]: contacts["phones"].append(ph) elif "wa.me" in href or "api.whatsapp.com" in href: if href not in contacts["whatsapp"]: contacts["whatsapp"].append(href[:80]) else: for sd in SOCIAL_DOMAINS: if sd in href.lower(): clean = href.split("?")[0].rstrip("/") if clean not in contacts["social"]: contacts["social"].append(clean) break # Email regex in raw HTML (catches obfuscated ones) for em in EMAIL_RE.findall(html[:100000]): em = em.lower() if em not in contacts["emails"] and not em.endswith((".png", ".jpg", ".css", ".js")): contacts["emails"].append(em) # Phone numbers in visible text for ph in PHONE_RE.findall(soup.get_text()): ph_clean = re.sub(r"[\s\-]", "", ph) if ph_clean not in contacts["phones"]: contacts["phones"].append(ph_clean) # Dedupe + cap for k in contacts: contacts[k] = list(dict.fromkeys(contacts[k]))[:5] return contacts # ── SSL / MX / IP ───────────────────────────────────────────────────────────── async def check_ssl(domain: str) -> tuple[bool, Optional[int]]: try: ctx = ssl.create_default_context() loop = asyncio.get_event_loop() def _check(): with socket.create_connection((domain, 443), timeout=5) as sock: with ctx.wrap_socket(sock, server_hostname=domain) as ssock: cert = ssock.getpeercert() expiry = datetime.datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z") return True, (expiry - datetime.datetime.utcnow()).days return await loop.run_in_executor(None, _check) except Exception: return False, None async def check_mx(domain: str) -> bool: try: loop = asyncio.get_event_loop() def _check(): try: return len(dns.resolver.resolve(domain, "MX", lifetime=5)) > 0 except Exception: return False return await loop.run_in_executor(None, _check) except Exception: return False async def get_ip_country(ip: str) -> Optional[str]: global _ip_last_call async with _get_ip_sem(): now = asyncio.get_event_loop().time() wait = (60 / IP_API_RATE) - (now - _ip_last_call) if wait > 0: await asyncio.sleep(wait) _ip_last_call = asyncio.get_event_loop().time() try: async with httpx.AsyncClient(timeout=5) as client: r = await client.get(f"http://ip-api.com/json/{ip}?fields=countryCode") if r.status_code == 200: return r.json().get("countryCode") except Exception: pass return None # ── Main enrichment ─────────────────────────────────────────────────────────── async def enrich_domain(domain: str) -> dict: result = { "domain": domain, "is_live": False, "status_code": None, "ssl_valid": False, "ssl_expiry_days": None, "cms": None, "has_mx": False, "ip_country": None, "page_title": None, "server": None, "kit_digital": False, "kit_digital_signals": "[]", "contact_info": "{}", "enriched_at": datetime.datetime.utcnow().isoformat(), "error": None, } try: async with httpx.AsyncClient( timeout=12, follow_redirects=True, verify=False, headers={"User-Agent": "Mozilla/5.0 (compatible; DomGod/1.0)"}, ) as client: resp = await client.get(f"http://{domain}") result["is_live"] = resp.status_code in (200, 301, 302, 303, 307, 308) result["status_code"] = resp.status_code result["server"] = resp.headers.get("server") html = resp.text soup = BeautifulSoup(html, "html.parser") title = soup.find("title") result["page_title"] = title.get_text(strip=True)[:500] if title else None result["cms"] = detect_cms(html, dict(resp.headers)) kit, signals = detect_kit_digital(soup, html) result["kit_digital"] = kit result["kit_digital_signals"] = json.dumps(signals) result["contact_info"] = json.dumps(extract_contacts(soup, html)) try: loop = asyncio.get_event_loop() ip = await loop.run_in_executor(None, socket.gethostbyname, domain) result["ip_country"] = await get_ip_country(ip) except Exception: pass except Exception as e: result["error"] = str(e)[:500] ssl_valid, ssl_days = await check_ssl(domain) result["ssl_valid"] = ssl_valid result["ssl_expiry_days"] = ssl_days result["has_mx"] = await check_mx(domain) result["score"] = score(result) return result async def save_enriched(data: dict): async with aiosqlite.connect(SQLITE_PATH) as db: await db.execute( """INSERT INTO enriched_domains (domain, is_live, status_code, ssl_valid, ssl_expiry_days, cms, has_mx, ip_country, page_title, server, enriched_at, error, score, kit_digital, kit_digital_signals, contact_info) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT(domain) DO UPDATE SET is_live=excluded.is_live, status_code=excluded.status_code, ssl_valid=excluded.ssl_valid, ssl_expiry_days=excluded.ssl_expiry_days, cms=excluded.cms, has_mx=excluded.has_mx, ip_country=excluded.ip_country, page_title=excluded.page_title, server=excluded.server, enriched_at=excluded.enriched_at, error=excluded.error, score=excluded.score, kit_digital=excluded.kit_digital, kit_digital_signals=excluded.kit_digital_signals, contact_info=excluded.contact_info""", ( data["domain"], data["is_live"], data["status_code"], data["ssl_valid"], data["ssl_expiry_days"], data["cms"], data["has_mx"], data["ip_country"], data["page_title"], data["server"], data["enriched_at"], data["error"], data["score"], int(data["kit_digital"]), data["kit_digital_signals"], data["contact_info"], ), ) await db.execute( """INSERT INTO scores (domain, score) VALUES (?,?) ON CONFLICT(domain) DO UPDATE SET score=excluded.score, scored_at=datetime('now')""", (data["domain"], data["score"]), ) await db.commit() async def mark_job(domain: str, status: str, error: str = None): async with aiosqlite.connect(SQLITE_PATH) as db: if status == "running": await db.execute( "UPDATE job_queue SET status=?, started_at=datetime('now') WHERE domain=?", (status, domain)) elif status in ("done", "failed"): await db.execute( "UPDATE job_queue SET status=?, completed_at=datetime('now'), error=? WHERE domain=?", (status, error, domain)) await db.commit() # ── Enrichment worker ───────────────────────────────────────────────────────── async def worker_loop(): sem = asyncio.Semaphore(CONCURRENCY_LIMIT) async def process(domain: str): async with sem: await mark_job(domain, "running") try: data = await enrich_domain(domain) await save_enriched(data) await mark_job(domain, "done") except Exception as e: await mark_job(domain, "failed", str(e)[:500]) while True: if _paused: await asyncio.sleep(1) continue async with aiosqlite.connect(SQLITE_PATH) as db: async with db.execute( "SELECT domain FROM job_queue WHERE status='pending' LIMIT 100" ) as cur: rows = await cur.fetchall() if not rows: await asyncio.sleep(2) continue await asyncio.gather(*[asyncio.create_task(process(r[0])) for r in rows], return_exceptions=True) # ── AI assessment worker ────────────────────────────────────────────────────── async def _assess_one(domain: str) -> None: """Process a single AI assessment — safe to call concurrently.""" from app.replicate_ai import assess_domain as gemini_assess from app.site_analyzer import analyze_site logger.info("AI: starting analysis for %s", domain) try: # Hard 3-minute ceiling so stuck jobs never block the worker forever async with asyncio.timeout(180): analysis = await analyze_site(domain) logger.info("AI: site analyzed %s (reachable=%s, words=%s)", domain, analysis.get("reachable"), analysis.get("word_count")) assessment = await gemini_assess(analysis) logger.info("AI: Gemini done %s → quality=%s", domain, assessment.get("lead_quality")) await save_ai_assessment(domain, assessment, site_analysis=analysis) logger.info("AI: saved %s", domain) except Exception as e: logger.error("AI: failed %s — %s", domain, e, exc_info=True) try: async with aiosqlite.connect(SQLITE_PATH) as db: await db.execute( "UPDATE ai_queue SET status='failed', completed_at=datetime('now'), error=? WHERE domain=?", (str(e)[:400], domain), ) await db.commit() except Exception: pass async def ai_worker_loop(): logger.info("AI worker loop starting") # Reset any jobs left in 'running' state from a previous crashed worker try: async with aiosqlite.connect(SQLITE_PATH) as db: result = await db.execute( "UPDATE ai_queue SET status='pending' WHERE status='running'" ) count = result.rowcount await db.commit() if count: logger.info("AI worker: reset %d stale 'running' jobs to 'pending'", count) except Exception as e: logger.error("AI worker: failed to reset stale jobs: %s", e) while True: rows = [] try: async with aiosqlite.connect(SQLITE_PATH) as db: async with db.execute( "SELECT domain FROM ai_queue WHERE status='pending' LIMIT 5" ) as cur: rows = await cur.fetchall() if rows: await db.executemany( "UPDATE ai_queue SET status='running' WHERE domain=?", [(r[0],) for r in rows], ) await db.commit() logger.info("AI worker: picked up %d jobs: %s", len(rows), [r[0] for r in rows]) except Exception as e: logger.error("AI worker DB error: %s", e, exc_info=True) await asyncio.sleep(5) continue if not rows: await asyncio.sleep(3) continue # Run assessments concurrently (semaphore in replicate_ai enforces AI_CONCURRENCY) results = await asyncio.gather( *[_assess_one(r[0]) for r in rows], return_exceptions=True, ) for r, exc in zip(rows, results): if isinstance(exc, Exception): logger.error("AI task exception for %s: %s", r[0], exc, exc_info=exc) def start_worker(): global _worker_task, _ai_worker_task if _worker_task is None or _worker_task.done(): _worker_task = asyncio.create_task(worker_loop()) logger.info("Enrichment worker started") if _ai_worker_task is None or _ai_worker_task.done(): if _ai_worker_task is not None and _ai_worker_task.done(): exc = _ai_worker_task.exception() if not _ai_worker_task.cancelled() else None if exc: logger.error("AI worker died with: %s", exc, exc_info=exc) _ai_worker_task = asyncio.create_task(ai_worker_loop()) logger.info("AI worker started/restarted") def ensure_workers_alive(): """Restart workers if they've died — call periodically.""" start_worker() def pause_worker(): global _paused _paused = True def resume_worker(): global _paused _paused = False start_worker() def is_running() -> bool: return _worker_task is not None and not _worker_task.done() and not _paused