Files
DomGod/app/validator.py

337 lines
13 KiB
Python
Raw Normal View History

"""Bulk domain validator — fast HTTP checks for the entire dataset.
Reads domains from DuckDB in batches, skips already-validated ones,
performs concurrent HTTP checks, and saves prescreen_status + server +
ip + load_time_ms to enriched_domains.
"""
import asyncio
import logging
import os
import random
import socket
import time
from typing import Optional
from urllib.parse import urlparse
import httpx
import aiosqlite
import duckdb
from app.db import SQLITE_PATH, DUCKDB_PATH, index_status
logger = logging.getLogger(__name__)
VAL_CONCURRENCY = int(os.getenv("VAL_CONCURRENCY", "50"))
VAL_BATCH = int(os.getenv("VAL_BATCH", "200"))
PARKING_BODY_SIGNALS = [
"domain is parked", "this domain is for sale", "buy this domain",
"domain parking", "parked domain", "hugedomains.com", "sedo.com",
"parkingcrew.com", "bodis.com", "dan.com", "afternic.com",
"sedoparking.com", "undeveloped.com", "epik.com/domain",
"this web page is parked", "domain has expired",
]
PARKING_REDIRECT_HOSTS = {
"sedo.com", "hugedomains.com", "dan.com", "afternic.com",
"parkingcrew.com", "bodis.com", "undeveloped.com", "epik.com",
"uniregistry.com", "sedoparking.com",
}
# Any HTTP response code means the server is UP — only connection failures
# and timeouts are truly "dead". 4xx/5xx still means a live web server.
_LIVE_CODES = set(range(200, 600))
_UAS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0",
]
def _headers() -> dict:
return {
"User-Agent": random.choice(_UAS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
}
_val_task: Optional[asyncio.Task] = None
_val_stats: dict = {
"running": False,
"processed": 0,
"live": 0,
"dead": 0,
"parked": 0,
"redirect": 0,
"skipped": 0,
"offset": 0,
"rate": 0.0,
"tld_filter": None,
}
def _same_domain(original: str, final_url: str) -> bool:
orig = original.lower().lstrip("www.").split(":")[0]
final = urlparse(final_url).netloc.lower().lstrip("www.")
return orig == final or final.endswith("." + orig) or orig.endswith("." + final)
async def _resolve_ip(domain: str) -> Optional[str]:
try:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, socket.gethostbyname, domain)
except Exception:
return None
async def _check_domain(domain: str) -> dict:
result: dict = {
"domain": domain,
"prescreen_status": "dead",
"status_code": None,
"server": None,
"ip": None,
"load_time_ms": None,
}
t0 = time.monotonic()
# Try http first with a short timeout so we don't waste time on servers
# that accept TCP but never respond on port 80. If http fails for ANY
# reason (refused, timeout, protocol error, redirect loop…) we fall back
# to https directly, which is what most modern sites actually serve.
# Use a short connect timeout for http — if port 80 times out, port 443
# will too, so we don't bother retrying on timeout errors.
# We DO retry https on ConnectError/RemoteProtocolError (port 80 closed or
# speaking wrong protocol) because those servers are often https-only.
# Use the same generous timeout for both schemes so that http→https redirects
# (followed inside the same client) also get enough time for the TLS handshake.
timeout = httpx.Timeout(connect=8, read=12, write=5, pool=8)
for scheme in ("http", "https"):
try:
async with httpx.AsyncClient(
timeout=timeout,
follow_redirects=True,
headers=_headers(),
verify=False,
max_redirects=5,
) as client:
resp = await client.get(f"{scheme}://{domain}")
result["load_time_ms"] = int((time.monotonic() - t0) * 1000)
result["status_code"] = resp.status_code
result["server"] = (resp.headers.get("server") or "")[:100]
result["ip"] = await _resolve_ip(domain)
final_url = str(resp.url)
final_host = urlparse(final_url).netloc.lower().lstrip("www.")
if not _same_domain(domain, final_url):
for ph in PARKING_REDIRECT_HOSTS:
if ph in final_host:
result["prescreen_status"] = "parked"
return result
result["prescreen_status"] = "redirect"
return result
# Any HTTP response = server is alive (4xx/5xx still means live web server)
# Only check parking content on readable 200 responses
if resp.status_code in (200, 203):
html_lc = resp.text[:20_000].lower()
for sig in PARKING_BODY_SIGNALS:
if sig in html_lc:
result["prescreen_status"] = "parked"
return result
result["prescreen_status"] = "live"
return result
except Exception as e:
# Any failure on http (connection refused, timeout, TLS error in redirect
# chain, etc.) → server may be https-only, always try 443 once.
if scheme == "http":
logger.debug("Validator %s: %s on http, trying https", domain, type(e).__name__)
continue
# Both schemes failed → dead
logger.debug("Validator %s (https): %s", domain, type(e).__name__)
break
result["load_time_ms"] = int((time.monotonic() - t0) * 1000)
return result
def _get_domains_batch(offset: int, limit: int, tld: Optional[str]) -> list[str]:
try:
conn = duckdb.connect(str(DUCKDB_PATH), read_only=True)
conn.execute("SET threads=2")
if tld:
rows = conn.execute(
"SELECT domain FROM domains WHERE tld=? LIMIT ? OFFSET ?",
[tld.lower().lstrip("."), limit, offset],
).fetchall()
else:
rows = conn.execute(
"SELECT domain FROM domains LIMIT ? OFFSET ?",
[limit, offset],
).fetchall()
conn.close()
return [r[0] for r in rows]
except Exception as e:
logger.error("Validator DuckDB error: %s", e)
return []
async def _filter_unvalidated(domains: list[str], rescan_dead: bool = False) -> list[str]:
"""Return domains that still need checking.
With rescan_dead=True, domains previously marked 'dead' are included
so they get a fresh check (useful after fixing the http/https bug).
"""
if not domains:
return []
placeholders = ",".join("?" * len(domains))
# A domain is "done" if it has a non-null prescreen_status that isn't dead
# (when rescan_dead=True) or any non-null status (normal mode).
if rescan_dead:
condition = "prescreen_status IS NOT NULL AND prescreen_status != 'dead'"
else:
condition = "prescreen_status IS NOT NULL"
async with aiosqlite.connect(SQLITE_PATH) as db:
async with db.execute(
f"SELECT domain FROM enriched_domains "
f"WHERE domain IN ({placeholders}) AND {condition}",
domains,
) as cur:
already = {r[0] async for r in cur}
return [d for d in domains if d not in already]
async def _save_batch(results: list[dict]):
async with aiosqlite.connect(SQLITE_PATH) as db:
for r in results:
await db.execute(
"""INSERT INTO enriched_domains
(domain, prescreen_status, status_code, server, ip, load_time_ms, prescreen_at)
VALUES (?, ?, ?, ?, ?, ?, datetime('now'))
ON CONFLICT(domain) DO UPDATE SET
prescreen_status = excluded.prescreen_status,
status_code = COALESCE(excluded.status_code, status_code),
server = COALESCE(NULLIF(excluded.server,''), server),
ip = COALESCE(excluded.ip, ip),
load_time_ms = excluded.load_time_ms,
prescreen_at = excluded.prescreen_at""",
(
r["domain"], r["prescreen_status"], r.get("status_code"),
r.get("server"), r.get("ip"), r.get("load_time_ms"),
),
)
await db.commit()
async def _validator_loop(tld_filter: Optional[str], rescan_dead: bool = False):
global _val_stats
_val_stats["running"] = True
offset = _val_stats["offset"]
sem = asyncio.Semaphore(VAL_CONCURRENCY)
rate_buf: list[float] = []
# Wait for DuckDB index to be ready (up to 10 minutes)
for _ in range(120):
if index_status()["ready"]:
break
logger.info("Validator: waiting for DuckDB index…")
await asyncio.sleep(5)
else:
logger.error("Validator: DuckDB index never became ready")
_val_stats["running"] = False
return
try:
while True:
loop = asyncio.get_event_loop()
batch = await loop.run_in_executor(
None, _get_domains_batch, offset, VAL_BATCH, tld_filter
)
if not batch:
logger.info("Validator: dataset complete at offset=%d", offset)
break
to_check = await _filter_unvalidated(batch, rescan_dead=rescan_dead)
_val_stats["skipped"] += len(batch) - len(to_check)
offset += len(batch)
_val_stats["offset"] = offset
if not to_check:
await asyncio.sleep(0) # yield to event loop
continue
t0 = time.monotonic()
async def _run(d: str) -> dict:
async with sem:
return await _check_domain(d)
raw = await asyncio.gather(*[_run(d) for d in to_check], return_exceptions=True)
results = [r for r in raw if isinstance(r, dict)]
await _save_batch(results)
for r in results:
_val_stats["processed"] += 1
s = r.get("prescreen_status", "dead")
_val_stats[s] = _val_stats.get(s, 0) + 1
elapsed = max(time.monotonic() - t0, 0.01)
rate_buf.append(len(results) / elapsed)
if len(rate_buf) > 10:
rate_buf.pop(0)
_val_stats["rate"] = round(sum(rate_buf) / len(rate_buf), 1)
logger.info(
"Validator off=%d proc=%d live=%d dead=%d parked=%d rate=%.1f/s",
offset, _val_stats["processed"], _val_stats["live"],
_val_stats["dead"], _val_stats["parked"], _val_stats["rate"],
)
except asyncio.CancelledError:
logger.info("Validator cancelled at offset=%d", offset)
except Exception as e:
logger.error("Validator loop error: %s", e, exc_info=True)
finally:
_val_stats["running"] = False
_val_stats["offset"] = offset
def get_validator_status() -> dict:
return dict(_val_stats)
def start_validator(tld_filter: Optional[str] = None, rescan_dead: bool = False):
global _val_task, _val_stats
if _val_task and not _val_task.done():
return # already running
# Always reset for a fresh run — _filter_unvalidated skips already-done
# domains, so restarting from 0 is safe and fast even for the same TLD.
_val_stats.update(
running=True,
processed=0, live=0, dead=0, parked=0,
redirect=0, skipped=0, offset=0, rate=0.0,
tld_filter=tld_filter,
rescan_dead=rescan_dead,
)
_val_task = asyncio.create_task(_validator_loop(tld_filter, rescan_dead=rescan_dead))
logger.info("Validator started (tld=%s, rescan_dead=%s)", tld_filter, rescan_dead)
def stop_validator():
global _val_task, _val_stats
_val_stats["running"] = False
if _val_task and not _val_task.done():
_val_task.cancel()
logger.info("Validator stop requested")