"""Bulk domain validator — fast HTTP checks for the entire dataset. Reads domains from DuckDB in batches, skips already-validated ones, performs concurrent HTTP checks, and saves prescreen_status + server + ip + load_time_ms to enriched_domains. """ import asyncio import logging import os import socket import time from typing import Optional from urllib.parse import urlparse import httpx import aiosqlite import duckdb from app.db import SQLITE_PATH, DUCKDB_PATH, index_status logger = logging.getLogger(__name__) VAL_CONCURRENCY = int(os.getenv("VAL_CONCURRENCY", "50")) VAL_BATCH = int(os.getenv("VAL_BATCH", "200")) PARKING_BODY_SIGNALS = [ "domain is parked", "this domain is for sale", "buy this domain", "domain parking", "parked domain", "hugedomains.com", "sedo.com", "parkingcrew.com", "bodis.com", "dan.com", "afternic.com", "sedoparking.com", "undeveloped.com", "epik.com/domain", "this web page is parked", "domain has expired", ] PARKING_REDIRECT_HOSTS = { "sedo.com", "hugedomains.com", "dan.com", "afternic.com", "parkingcrew.com", "bodis.com", "undeveloped.com", "epik.com", "uniregistry.com", "sedoparking.com", } _UA = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/122.0.0.0 Safari/537.36" ) _HEADERS = { "User-Agent": _UA, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", } _val_task: Optional[asyncio.Task] = None _val_stats: dict = { "running": False, "processed": 0, "live": 0, "dead": 0, "parked": 0, "redirect": 0, "skipped": 0, "offset": 0, "rate": 0.0, "tld_filter": None, } def _same_domain(original: str, final_url: str) -> bool: orig = original.lower().lstrip("www.").split(":")[0] final = urlparse(final_url).netloc.lower().lstrip("www.") return orig == final or final.endswith("." + orig) or orig.endswith("." + final) async def _resolve_ip(domain: str) -> Optional[str]: try: loop = asyncio.get_event_loop() return await loop.run_in_executor(None, socket.gethostbyname, domain) except Exception: return None async def _check_domain(domain: str) -> dict: result: dict = { "domain": domain, "prescreen_status": "dead", "status_code": None, "server": None, "ip": None, "load_time_ms": None, } t0 = time.monotonic() # Try http first (follows http→https redirects automatically). # Fall back to https directly if port 80 is closed/refused — many modern # servers only listen on 443 and would be wrongly marked dead otherwise. for scheme in ("http", "https"): try: async with httpx.AsyncClient( timeout=httpx.Timeout(connect=7, read=12, write=5, pool=15), follow_redirects=True, headers=_HEADERS, verify=False, max_redirects=5, ) as client: resp = await client.get(f"{scheme}://{domain}") result["load_time_ms"] = int((time.monotonic() - t0) * 1000) result["status_code"] = resp.status_code result["server"] = (resp.headers.get("server") or "")[:100] result["ip"] = await _resolve_ip(domain) final_url = str(resp.url) final_host = urlparse(final_url).netloc.lower().lstrip("www.") if not _same_domain(domain, final_url): for ph in PARKING_REDIRECT_HOSTS: if ph in final_host: result["prescreen_status"] = "parked" return result result["prescreen_status"] = "redirect" return result if resp.status_code not in (200, 203): return result # dead html_lc = resp.text[:20_000].lower() for sig in PARKING_BODY_SIGNALS: if sig in html_lc: result["prescreen_status"] = "parked" return result result["prescreen_status"] = "live" return result except httpx.ConnectError: # Port closed / connection refused — try the other scheme logger.debug("Validator %s: ConnectError on %s, trying next scheme", domain, scheme) continue except Exception as e: logger.debug("Validator %s (%s): %s", domain, scheme, e) break # timeout or other error — don't retry result["load_time_ms"] = int((time.monotonic() - t0) * 1000) return result def _get_domains_batch(offset: int, limit: int, tld: Optional[str]) -> list[str]: try: conn = duckdb.connect(str(DUCKDB_PATH), read_only=True) conn.execute("SET threads=2") if tld: rows = conn.execute( "SELECT domain FROM domains WHERE tld=? LIMIT ? OFFSET ?", [tld.lower().lstrip("."), limit, offset], ).fetchall() else: rows = conn.execute( "SELECT domain FROM domains LIMIT ? OFFSET ?", [limit, offset], ).fetchall() conn.close() return [r[0] for r in rows] except Exception as e: logger.error("Validator DuckDB error: %s", e) return [] async def _filter_unvalidated(domains: list[str]) -> list[str]: """Return only domains that don't have a prescreen_status set yet.""" if not domains: return [] placeholders = ",".join("?" * len(domains)) async with aiosqlite.connect(SQLITE_PATH) as db: async with db.execute( f"SELECT domain FROM enriched_domains " f"WHERE domain IN ({placeholders}) AND prescreen_status IS NOT NULL", domains, ) as cur: already = {r[0] async for r in cur} return [d for d in domains if d not in already] async def _save_batch(results: list[dict]): async with aiosqlite.connect(SQLITE_PATH) as db: for r in results: await db.execute( """INSERT INTO enriched_domains (domain, prescreen_status, status_code, server, ip, load_time_ms, prescreen_at) VALUES (?, ?, ?, ?, ?, ?, datetime('now')) ON CONFLICT(domain) DO UPDATE SET prescreen_status = excluded.prescreen_status, status_code = COALESCE(excluded.status_code, status_code), server = COALESCE(NULLIF(excluded.server,''), server), ip = COALESCE(excluded.ip, ip), load_time_ms = excluded.load_time_ms, prescreen_at = excluded.prescreen_at""", ( r["domain"], r["prescreen_status"], r.get("status_code"), r.get("server"), r.get("ip"), r.get("load_time_ms"), ), ) await db.commit() async def _validator_loop(tld_filter: Optional[str]): global _val_stats _val_stats["running"] = True offset = _val_stats["offset"] sem = asyncio.Semaphore(VAL_CONCURRENCY) rate_buf: list[float] = [] # Wait for DuckDB index to be ready (up to 10 minutes) for _ in range(120): if index_status()["ready"]: break logger.info("Validator: waiting for DuckDB index…") await asyncio.sleep(5) else: logger.error("Validator: DuckDB index never became ready") _val_stats["running"] = False return try: while True: loop = asyncio.get_event_loop() batch = await loop.run_in_executor( None, _get_domains_batch, offset, VAL_BATCH, tld_filter ) if not batch: logger.info("Validator: dataset complete at offset=%d", offset) break to_check = await _filter_unvalidated(batch) _val_stats["skipped"] += len(batch) - len(to_check) offset += len(batch) _val_stats["offset"] = offset if not to_check: await asyncio.sleep(0) # yield to event loop continue t0 = time.monotonic() async def _run(d: str) -> dict: async with sem: return await _check_domain(d) raw = await asyncio.gather(*[_run(d) for d in to_check], return_exceptions=True) results = [r for r in raw if isinstance(r, dict)] await _save_batch(results) for r in results: _val_stats["processed"] += 1 s = r.get("prescreen_status", "dead") _val_stats[s] = _val_stats.get(s, 0) + 1 elapsed = max(time.monotonic() - t0, 0.01) rate_buf.append(len(results) / elapsed) if len(rate_buf) > 10: rate_buf.pop(0) _val_stats["rate"] = round(sum(rate_buf) / len(rate_buf), 1) logger.info( "Validator off=%d proc=%d live=%d dead=%d parked=%d rate=%.1f/s", offset, _val_stats["processed"], _val_stats["live"], _val_stats["dead"], _val_stats["parked"], _val_stats["rate"], ) except asyncio.CancelledError: logger.info("Validator cancelled at offset=%d", offset) except Exception as e: logger.error("Validator loop error: %s", e, exc_info=True) finally: _val_stats["running"] = False _val_stats["offset"] = offset def get_validator_status() -> dict: return dict(_val_stats) def start_validator(tld_filter: Optional[str] = None): global _val_task, _val_stats if _val_task and not _val_task.done(): return # already running # Always reset for a fresh run — _filter_unvalidated skips already-done # domains, so restarting from 0 is safe and fast even for the same TLD. _val_stats.update( running=True, processed=0, live=0, dead=0, parked=0, redirect=0, skipped=0, offset=0, rate=0.0, tld_filter=tld_filter, ) _val_task = asyncio.create_task(_validator_loop(tld_filter)) logger.info("Validator started (tld=%s)", tld_filter) def stop_validator(): global _val_task, _val_stats _val_stats["running"] = False if _val_task and not _val_task.done(): _val_task.cancel() logger.info("Validator stop requested")