"""Bulk domain validator — fast HTTP checks for the entire dataset. Reads domains from DuckDB in batches, skips already-validated ones, performs concurrent HTTP checks, and saves prescreen_status + server + ip + load_time_ms to enriched_domains. """ import asyncio import logging import os import random import socket import time from typing import Optional from urllib.parse import urlparse import httpx import aiosqlite import duckdb from app.db import SQLITE_PATH, DUCKDB_PATH, index_status logger = logging.getLogger(__name__) VAL_CONCURRENCY = int(os.getenv("VAL_CONCURRENCY", "50")) VAL_BATCH = int(os.getenv("VAL_BATCH", "200")) PARKING_BODY_SIGNALS = [ "domain is parked", "this domain is for sale", "buy this domain", "domain parking", "parked domain", "hugedomains.com", "sedo.com", "parkingcrew.com", "bodis.com", "dan.com", "afternic.com", "sedoparking.com", "undeveloped.com", "epik.com/domain", "this web page is parked", "domain has expired", ] PARKING_REDIRECT_HOSTS = { "sedo.com", "hugedomains.com", "dan.com", "afternic.com", "parkingcrew.com", "bodis.com", "undeveloped.com", "epik.com", "uniregistry.com", "sedoparking.com", } # Any HTTP response code means the server is UP — only connection failures # and timeouts are truly "dead". 4xx/5xx still means a live web server. _LIVE_CODES = set(range(200, 600)) _UAS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Safari/605.1.15", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0", ] def _headers() -> dict: return { "User-Agent": random.choice(_UAS), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", } _val_task: Optional[asyncio.Task] = None _val_stats: dict = { "running": False, "processed": 0, "live": 0, "dead": 0, "parked": 0, "redirect": 0, "skipped": 0, "offset": 0, "rate": 0.0, "tld_filter": None, } def _same_domain(original: str, final_url: str) -> bool: orig = original.lower().lstrip("www.").split(":")[0] final = urlparse(final_url).netloc.lower().lstrip("www.") return orig == final or final.endswith("." + orig) or orig.endswith("." + final) async def _resolve_ip(domain: str) -> Optional[str]: try: loop = asyncio.get_event_loop() return await loop.run_in_executor(None, socket.gethostbyname, domain) except Exception: return None async def _check_domain(domain: str) -> dict: result: dict = { "domain": domain, "prescreen_status": "dead", "status_code": None, "server": None, "ip": None, "load_time_ms": None, } t0 = time.monotonic() # Try http first with a short timeout so we don't waste time on servers # that accept TCP but never respond on port 80. If http fails for ANY # reason (refused, timeout, protocol error, redirect loop…) we fall back # to https directly, which is what most modern sites actually serve. # Use a short connect timeout for http — if port 80 times out, port 443 # will too, so we don't bother retrying on timeout errors. # We DO retry https on ConnectError/RemoteProtocolError (port 80 closed or # speaking wrong protocol) because those servers are often https-only. timeouts = { "http": httpx.Timeout(connect=4, read=6, write=3, pool=5), "https": httpx.Timeout(connect=6, read=10, write=3, pool=5), } for scheme in ("http", "https"): try: async with httpx.AsyncClient( timeout=timeouts[scheme], follow_redirects=True, headers=_headers(), verify=False, max_redirects=5, ) as client: resp = await client.get(f"{scheme}://{domain}") result["load_time_ms"] = int((time.monotonic() - t0) * 1000) result["status_code"] = resp.status_code result["server"] = (resp.headers.get("server") or "")[:100] result["ip"] = await _resolve_ip(domain) final_url = str(resp.url) final_host = urlparse(final_url).netloc.lower().lstrip("www.") if not _same_domain(domain, final_url): for ph in PARKING_REDIRECT_HOSTS: if ph in final_host: result["prescreen_status"] = "parked" return result result["prescreen_status"] = "redirect" return result # Any HTTP response = server is alive (4xx/5xx still means live web server) # Only check parking content on readable 200 responses if resp.status_code in (200, 203): html_lc = resp.text[:20_000].lower() for sig in PARKING_BODY_SIGNALS: if sig in html_lc: result["prescreen_status"] = "parked" return result result["prescreen_status"] = "live" return result except (httpx.ConnectError, httpx.RemoteProtocolError) as e: # Port refused or bad HTTP response → server may be https-only, try it if scheme == "http": logger.debug("Validator %s: %s on http, trying https", domain, type(e).__name__) continue break except Exception as e: # Timeout or other error → https won't help, mark dead logger.debug("Validator %s (%s): %s", domain, scheme, type(e).__name__) break result["load_time_ms"] = int((time.monotonic() - t0) * 1000) return result def _get_domains_batch(offset: int, limit: int, tld: Optional[str]) -> list[str]: try: conn = duckdb.connect(str(DUCKDB_PATH), read_only=True) conn.execute("SET threads=2") if tld: rows = conn.execute( "SELECT domain FROM domains WHERE tld=? LIMIT ? OFFSET ?", [tld.lower().lstrip("."), limit, offset], ).fetchall() else: rows = conn.execute( "SELECT domain FROM domains LIMIT ? OFFSET ?", [limit, offset], ).fetchall() conn.close() return [r[0] for r in rows] except Exception as e: logger.error("Validator DuckDB error: %s", e) return [] async def _filter_unvalidated(domains: list[str], rescan_dead: bool = False) -> list[str]: """Return domains that still need checking. With rescan_dead=True, domains previously marked 'dead' are included so they get a fresh check (useful after fixing the http/https bug). """ if not domains: return [] placeholders = ",".join("?" * len(domains)) # A domain is "done" if it has a non-null prescreen_status that isn't dead # (when rescan_dead=True) or any non-null status (normal mode). if rescan_dead: condition = "prescreen_status IS NOT NULL AND prescreen_status != 'dead'" else: condition = "prescreen_status IS NOT NULL" async with aiosqlite.connect(SQLITE_PATH) as db: async with db.execute( f"SELECT domain FROM enriched_domains " f"WHERE domain IN ({placeholders}) AND {condition}", domains, ) as cur: already = {r[0] async for r in cur} return [d for d in domains if d not in already] async def _save_batch(results: list[dict]): async with aiosqlite.connect(SQLITE_PATH) as db: for r in results: await db.execute( """INSERT INTO enriched_domains (domain, prescreen_status, status_code, server, ip, load_time_ms, prescreen_at) VALUES (?, ?, ?, ?, ?, ?, datetime('now')) ON CONFLICT(domain) DO UPDATE SET prescreen_status = excluded.prescreen_status, status_code = COALESCE(excluded.status_code, status_code), server = COALESCE(NULLIF(excluded.server,''), server), ip = COALESCE(excluded.ip, ip), load_time_ms = excluded.load_time_ms, prescreen_at = excluded.prescreen_at""", ( r["domain"], r["prescreen_status"], r.get("status_code"), r.get("server"), r.get("ip"), r.get("load_time_ms"), ), ) await db.commit() async def _validator_loop(tld_filter: Optional[str], rescan_dead: bool = False): global _val_stats _val_stats["running"] = True offset = _val_stats["offset"] sem = asyncio.Semaphore(VAL_CONCURRENCY) rate_buf: list[float] = [] # Wait for DuckDB index to be ready (up to 10 minutes) for _ in range(120): if index_status()["ready"]: break logger.info("Validator: waiting for DuckDB index…") await asyncio.sleep(5) else: logger.error("Validator: DuckDB index never became ready") _val_stats["running"] = False return try: while True: loop = asyncio.get_event_loop() batch = await loop.run_in_executor( None, _get_domains_batch, offset, VAL_BATCH, tld_filter ) if not batch: logger.info("Validator: dataset complete at offset=%d", offset) break to_check = await _filter_unvalidated(batch, rescan_dead=rescan_dead) _val_stats["skipped"] += len(batch) - len(to_check) offset += len(batch) _val_stats["offset"] = offset if not to_check: await asyncio.sleep(0) # yield to event loop continue t0 = time.monotonic() async def _run(d: str) -> dict: async with sem: return await _check_domain(d) raw = await asyncio.gather(*[_run(d) for d in to_check], return_exceptions=True) results = [r for r in raw if isinstance(r, dict)] await _save_batch(results) for r in results: _val_stats["processed"] += 1 s = r.get("prescreen_status", "dead") _val_stats[s] = _val_stats.get(s, 0) + 1 elapsed = max(time.monotonic() - t0, 0.01) rate_buf.append(len(results) / elapsed) if len(rate_buf) > 10: rate_buf.pop(0) _val_stats["rate"] = round(sum(rate_buf) / len(rate_buf), 1) logger.info( "Validator off=%d proc=%d live=%d dead=%d parked=%d rate=%.1f/s", offset, _val_stats["processed"], _val_stats["live"], _val_stats["dead"], _val_stats["parked"], _val_stats["rate"], ) except asyncio.CancelledError: logger.info("Validator cancelled at offset=%d", offset) except Exception as e: logger.error("Validator loop error: %s", e, exc_info=True) finally: _val_stats["running"] = False _val_stats["offset"] = offset def get_validator_status() -> dict: return dict(_val_stats) def start_validator(tld_filter: Optional[str] = None, rescan_dead: bool = False): global _val_task, _val_stats if _val_task and not _val_task.done(): return # already running # Always reset for a fresh run — _filter_unvalidated skips already-done # domains, so restarting from 0 is safe and fast even for the same TLD. _val_stats.update( running=True, processed=0, live=0, dead=0, parked=0, redirect=0, skipped=0, offset=0, rate=0.0, tld_filter=tld_filter, rescan_dead=rescan_dead, ) _val_task = asyncio.create_task(_validator_loop(tld_filter, rescan_dead=rescan_dead)) logger.info("Validator started (tld=%s, rescan_dead=%s)", tld_filter, rescan_dead) def stop_validator(): global _val_task, _val_stats _val_stats["running"] = False if _val_task and not _val_task.done(): _val_task.cancel() logger.info("Validator stop requested")