SQLite locking: - Enable WAL journal mode in init_db (readers don't block writers) - Set busy_timeout=30000ms in init_db - Add timeout=30 to every aiosqlite.connect() across db.py, validator.py, enricher.py, main.py so connections wait up to 30s instead of crashing Error status: - 4xx/5xx HTTP responses are now prescreen_status='error' (server alive but broken/blocking) instead of 'live' - Added 'error' counter to validator stats and orange Error stat box in UI - Added ps-error CSS class (orange) and filter option in Browse tab Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
343 lines
13 KiB
Python
343 lines
13 KiB
Python
"""Bulk domain validator — fast HTTP checks for the entire dataset.
|
|
|
|
Reads domains from DuckDB in batches, skips already-validated ones,
|
|
performs concurrent HTTP checks, and saves prescreen_status + server +
|
|
ip + load_time_ms to enriched_domains.
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import random
|
|
import socket
|
|
import time
|
|
from typing import Optional
|
|
from urllib.parse import urlparse
|
|
|
|
import httpx
|
|
import aiosqlite
|
|
import duckdb
|
|
|
|
from app.db import SQLITE_PATH, DUCKDB_PATH, index_status
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
VAL_CONCURRENCY = int(os.getenv("VAL_CONCURRENCY", "50"))
|
|
VAL_BATCH = int(os.getenv("VAL_BATCH", "200"))
|
|
|
|
PARKING_BODY_SIGNALS = [
|
|
"domain is parked", "this domain is for sale", "buy this domain",
|
|
"domain parking", "parked domain", "hugedomains.com", "sedo.com",
|
|
"parkingcrew.com", "bodis.com", "dan.com", "afternic.com",
|
|
"sedoparking.com", "undeveloped.com", "epik.com/domain",
|
|
"this web page is parked", "domain has expired",
|
|
]
|
|
PARKING_REDIRECT_HOSTS = {
|
|
"sedo.com", "hugedomains.com", "dan.com", "afternic.com",
|
|
"parkingcrew.com", "bodis.com", "undeveloped.com", "epik.com",
|
|
"uniregistry.com", "sedoparking.com",
|
|
}
|
|
|
|
# Any HTTP response code means the server is UP — only connection failures
|
|
# and timeouts are truly "dead". 4xx/5xx still means a live web server.
|
|
_LIVE_CODES = set(range(200, 600))
|
|
|
|
_UAS = [
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Safari/605.1.15",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:125.0) Gecko/20100101 Firefox/125.0",
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0",
|
|
]
|
|
|
|
def _headers() -> dict:
|
|
return {
|
|
"User-Agent": random.choice(_UAS),
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
|
|
"Accept-Language": "en-US,en;q=0.9",
|
|
"Accept-Encoding": "gzip, deflate, br",
|
|
"Connection": "keep-alive",
|
|
}
|
|
|
|
_val_task: Optional[asyncio.Task] = None
|
|
_val_stats: dict = {
|
|
"running": False,
|
|
"processed": 0,
|
|
"live": 0,
|
|
"dead": 0,
|
|
"error": 0,
|
|
"parked": 0,
|
|
"redirect": 0,
|
|
"skipped": 0,
|
|
"offset": 0,
|
|
"rate": 0.0,
|
|
"tld_filter": None,
|
|
}
|
|
|
|
|
|
def _same_domain(original: str, final_url: str) -> bool:
|
|
orig = original.lower().lstrip("www.").split(":")[0]
|
|
final = urlparse(final_url).netloc.lower().lstrip("www.")
|
|
return orig == final or final.endswith("." + orig) or orig.endswith("." + final)
|
|
|
|
|
|
async def _resolve_ip(domain: str) -> Optional[str]:
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, socket.gethostbyname, domain)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
async def _check_domain(domain: str) -> dict:
|
|
result: dict = {
|
|
"domain": domain,
|
|
"prescreen_status": "dead",
|
|
"status_code": None,
|
|
"server": None,
|
|
"ip": None,
|
|
"load_time_ms": None,
|
|
}
|
|
t0 = time.monotonic()
|
|
|
|
# Try http first with a short timeout so we don't waste time on servers
|
|
# that accept TCP but never respond on port 80. If http fails for ANY
|
|
# reason (refused, timeout, protocol error, redirect loop…) we fall back
|
|
# to https directly, which is what most modern sites actually serve.
|
|
# Use a short connect timeout for http — if port 80 times out, port 443
|
|
# will too, so we don't bother retrying on timeout errors.
|
|
# We DO retry https on ConnectError/RemoteProtocolError (port 80 closed or
|
|
# speaking wrong protocol) because those servers are often https-only.
|
|
# Use the same generous timeout for both schemes so that http→https redirects
|
|
# (followed inside the same client) also get enough time for the TLS handshake.
|
|
timeout = httpx.Timeout(connect=8, read=12, write=5, pool=8)
|
|
|
|
for scheme in ("http", "https"):
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=timeout,
|
|
follow_redirects=True,
|
|
headers=_headers(),
|
|
verify=False,
|
|
max_redirects=5,
|
|
) as client:
|
|
resp = await client.get(f"{scheme}://{domain}")
|
|
|
|
result["load_time_ms"] = int((time.monotonic() - t0) * 1000)
|
|
result["status_code"] = resp.status_code
|
|
result["server"] = (resp.headers.get("server") or "")[:100]
|
|
result["ip"] = await _resolve_ip(domain)
|
|
|
|
final_url = str(resp.url)
|
|
final_host = urlparse(final_url).netloc.lower().lstrip("www.")
|
|
|
|
if not _same_domain(domain, final_url):
|
|
for ph in PARKING_REDIRECT_HOSTS:
|
|
if ph in final_host:
|
|
result["prescreen_status"] = "parked"
|
|
return result
|
|
result["prescreen_status"] = "redirect"
|
|
return result
|
|
|
|
# Any HTTP response = server is alive (4xx/5xx still means live web server)
|
|
# Only check parking content on readable 200 responses
|
|
if resp.status_code in (200, 203):
|
|
html_lc = resp.text[:20_000].lower()
|
|
for sig in PARKING_BODY_SIGNALS:
|
|
if sig in html_lc:
|
|
result["prescreen_status"] = "parked"
|
|
return result
|
|
|
|
# 4xx / 5xx = server is alive but site is broken/blocking
|
|
if resp.status_code >= 400:
|
|
result["prescreen_status"] = "error"
|
|
return result
|
|
|
|
result["prescreen_status"] = "live"
|
|
return result
|
|
|
|
except Exception as e:
|
|
# Any failure on http (connection refused, timeout, TLS error in redirect
|
|
# chain, etc.) → server may be https-only, always try 443 once.
|
|
if scheme == "http":
|
|
logger.debug("Validator %s: %s on http, trying https", domain, type(e).__name__)
|
|
continue
|
|
# Both schemes failed → dead
|
|
logger.debug("Validator %s (https): %s", domain, type(e).__name__)
|
|
break
|
|
|
|
result["load_time_ms"] = int((time.monotonic() - t0) * 1000)
|
|
return result
|
|
|
|
|
|
def _get_domains_batch(offset: int, limit: int, tld: Optional[str]) -> list[str]:
|
|
try:
|
|
conn = duckdb.connect(str(DUCKDB_PATH), read_only=True)
|
|
conn.execute("SET threads=2")
|
|
if tld:
|
|
rows = conn.execute(
|
|
"SELECT domain FROM domains WHERE tld=? LIMIT ? OFFSET ?",
|
|
[tld.lower().lstrip("."), limit, offset],
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"SELECT domain FROM domains LIMIT ? OFFSET ?",
|
|
[limit, offset],
|
|
).fetchall()
|
|
conn.close()
|
|
return [r[0] for r in rows]
|
|
except Exception as e:
|
|
logger.error("Validator DuckDB error: %s", e)
|
|
return []
|
|
|
|
|
|
async def _filter_unvalidated(domains: list[str], rescan_dead: bool = False) -> list[str]:
|
|
"""Return domains that still need checking.
|
|
|
|
With rescan_dead=True, domains previously marked 'dead' are included
|
|
so they get a fresh check (useful after fixing the http/https bug).
|
|
"""
|
|
if not domains:
|
|
return []
|
|
placeholders = ",".join("?" * len(domains))
|
|
# A domain is "done" if it has a non-null prescreen_status that isn't dead
|
|
# (when rescan_dead=True) or any non-null status (normal mode).
|
|
if rescan_dead:
|
|
condition = "prescreen_status IS NOT NULL AND prescreen_status != 'dead'"
|
|
else:
|
|
condition = "prescreen_status IS NOT NULL"
|
|
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
|
|
async with db.execute(
|
|
f"SELECT domain FROM enriched_domains "
|
|
f"WHERE domain IN ({placeholders}) AND {condition}",
|
|
domains,
|
|
) as cur:
|
|
already = {r[0] async for r in cur}
|
|
return [d for d in domains if d not in already]
|
|
|
|
|
|
async def _save_batch(results: list[dict]):
|
|
async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db:
|
|
for r in results:
|
|
await db.execute(
|
|
"""INSERT INTO enriched_domains
|
|
(domain, prescreen_status, status_code, server, ip, load_time_ms, prescreen_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, datetime('now'))
|
|
ON CONFLICT(domain) DO UPDATE SET
|
|
prescreen_status = excluded.prescreen_status,
|
|
status_code = COALESCE(excluded.status_code, status_code),
|
|
server = COALESCE(NULLIF(excluded.server,''), server),
|
|
ip = COALESCE(excluded.ip, ip),
|
|
load_time_ms = excluded.load_time_ms,
|
|
prescreen_at = excluded.prescreen_at""",
|
|
(
|
|
r["domain"], r["prescreen_status"], r.get("status_code"),
|
|
r.get("server"), r.get("ip"), r.get("load_time_ms"),
|
|
),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def _validator_loop(tld_filter: Optional[str], rescan_dead: bool = False):
|
|
global _val_stats
|
|
_val_stats["running"] = True
|
|
offset = _val_stats["offset"]
|
|
sem = asyncio.Semaphore(VAL_CONCURRENCY)
|
|
rate_buf: list[float] = []
|
|
|
|
# Wait for DuckDB index to be ready (up to 10 minutes)
|
|
for _ in range(120):
|
|
if index_status()["ready"]:
|
|
break
|
|
logger.info("Validator: waiting for DuckDB index…")
|
|
await asyncio.sleep(5)
|
|
else:
|
|
logger.error("Validator: DuckDB index never became ready")
|
|
_val_stats["running"] = False
|
|
return
|
|
|
|
try:
|
|
while True:
|
|
loop = asyncio.get_event_loop()
|
|
batch = await loop.run_in_executor(
|
|
None, _get_domains_batch, offset, VAL_BATCH, tld_filter
|
|
)
|
|
if not batch:
|
|
logger.info("Validator: dataset complete at offset=%d", offset)
|
|
break
|
|
|
|
to_check = await _filter_unvalidated(batch, rescan_dead=rescan_dead)
|
|
_val_stats["skipped"] += len(batch) - len(to_check)
|
|
offset += len(batch)
|
|
_val_stats["offset"] = offset
|
|
|
|
if not to_check:
|
|
await asyncio.sleep(0) # yield to event loop
|
|
continue
|
|
|
|
t0 = time.monotonic()
|
|
|
|
async def _run(d: str) -> dict:
|
|
async with sem:
|
|
return await _check_domain(d)
|
|
|
|
raw = await asyncio.gather(*[_run(d) for d in to_check], return_exceptions=True)
|
|
results = [r for r in raw if isinstance(r, dict)]
|
|
|
|
await _save_batch(results)
|
|
|
|
for r in results:
|
|
_val_stats["processed"] += 1
|
|
s = r.get("prescreen_status", "dead")
|
|
_val_stats[s] = _val_stats.get(s, 0) + 1
|
|
|
|
elapsed = max(time.monotonic() - t0, 0.01)
|
|
rate_buf.append(len(results) / elapsed)
|
|
if len(rate_buf) > 10:
|
|
rate_buf.pop(0)
|
|
_val_stats["rate"] = round(sum(rate_buf) / len(rate_buf), 1)
|
|
|
|
logger.info(
|
|
"Validator off=%d proc=%d live=%d dead=%d parked=%d rate=%.1f/s",
|
|
offset, _val_stats["processed"], _val_stats["live"],
|
|
_val_stats["dead"], _val_stats["parked"], _val_stats["rate"],
|
|
)
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("Validator cancelled at offset=%d", offset)
|
|
except Exception as e:
|
|
logger.error("Validator loop error: %s", e, exc_info=True)
|
|
finally:
|
|
_val_stats["running"] = False
|
|
_val_stats["offset"] = offset
|
|
|
|
|
|
def get_validator_status() -> dict:
|
|
return dict(_val_stats)
|
|
|
|
|
|
def start_validator(tld_filter: Optional[str] = None, rescan_dead: bool = False):
|
|
global _val_task, _val_stats
|
|
if _val_task and not _val_task.done():
|
|
return # already running
|
|
# Always reset for a fresh run — _filter_unvalidated skips already-done
|
|
# domains, so restarting from 0 is safe and fast even for the same TLD.
|
|
_val_stats.update(
|
|
running=True,
|
|
processed=0, live=0, dead=0, error=0, parked=0,
|
|
redirect=0, skipped=0, offset=0, rate=0.0,
|
|
tld_filter=tld_filter,
|
|
rescan_dead=rescan_dead,
|
|
)
|
|
_val_task = asyncio.create_task(_validator_loop(tld_filter, rescan_dead=rescan_dead))
|
|
logger.info("Validator started (tld=%s, rescan_dead=%s)", tld_filter, rescan_dead)
|
|
|
|
|
|
def stop_validator():
|
|
global _val_task, _val_stats
|
|
_val_stats["running"] = False
|
|
if _val_task and not _val_task.done():
|
|
_val_task.cancel()
|
|
logger.info("Validator stop requested")
|