Previous fix only retried on ConnectError. Servers that accept TCP on port 80 but hang, return protocol errors, or timeout also need the https fallback. Now any exception on http triggers https retry. Shorter http timeout (4s) avoids wasting time on non-responsive port 80. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
316 lines
11 KiB
Python
316 lines
11 KiB
Python
"""Bulk domain validator — fast HTTP checks for the entire dataset.
|
|
|
|
Reads domains from DuckDB in batches, skips already-validated ones,
|
|
performs concurrent HTTP checks, and saves prescreen_status + server +
|
|
ip + load_time_ms to enriched_domains.
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import socket
|
|
import time
|
|
from typing import Optional
|
|
from urllib.parse import urlparse
|
|
|
|
import httpx
|
|
import aiosqlite
|
|
import duckdb
|
|
|
|
from app.db import SQLITE_PATH, DUCKDB_PATH, index_status
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
VAL_CONCURRENCY = int(os.getenv("VAL_CONCURRENCY", "50"))
|
|
VAL_BATCH = int(os.getenv("VAL_BATCH", "200"))
|
|
|
|
PARKING_BODY_SIGNALS = [
|
|
"domain is parked", "this domain is for sale", "buy this domain",
|
|
"domain parking", "parked domain", "hugedomains.com", "sedo.com",
|
|
"parkingcrew.com", "bodis.com", "dan.com", "afternic.com",
|
|
"sedoparking.com", "undeveloped.com", "epik.com/domain",
|
|
"this web page is parked", "domain has expired",
|
|
]
|
|
PARKING_REDIRECT_HOSTS = {
|
|
"sedo.com", "hugedomains.com", "dan.com", "afternic.com",
|
|
"parkingcrew.com", "bodis.com", "undeveloped.com", "epik.com",
|
|
"uniregistry.com", "sedoparking.com",
|
|
}
|
|
|
|
_UA = (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/122.0.0.0 Safari/537.36"
|
|
)
|
|
_HEADERS = {
|
|
"User-Agent": _UA,
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
}
|
|
|
|
_val_task: Optional[asyncio.Task] = None
|
|
_val_stats: dict = {
|
|
"running": False,
|
|
"processed": 0,
|
|
"live": 0,
|
|
"dead": 0,
|
|
"parked": 0,
|
|
"redirect": 0,
|
|
"skipped": 0,
|
|
"offset": 0,
|
|
"rate": 0.0,
|
|
"tld_filter": None,
|
|
}
|
|
|
|
|
|
def _same_domain(original: str, final_url: str) -> bool:
|
|
orig = original.lower().lstrip("www.").split(":")[0]
|
|
final = urlparse(final_url).netloc.lower().lstrip("www.")
|
|
return orig == final or final.endswith("." + orig) or orig.endswith("." + final)
|
|
|
|
|
|
async def _resolve_ip(domain: str) -> Optional[str]:
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, socket.gethostbyname, domain)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
async def _check_domain(domain: str) -> dict:
|
|
result: dict = {
|
|
"domain": domain,
|
|
"prescreen_status": "dead",
|
|
"status_code": None,
|
|
"server": None,
|
|
"ip": None,
|
|
"load_time_ms": None,
|
|
}
|
|
t0 = time.monotonic()
|
|
|
|
# Try http first with a short timeout so we don't waste time on servers
|
|
# that accept TCP but never respond on port 80. If http fails for ANY
|
|
# reason (refused, timeout, protocol error, redirect loop…) we fall back
|
|
# to https directly, which is what most modern sites actually serve.
|
|
timeouts = {"http": httpx.Timeout(connect=4, read=8, write=5, pool=10),
|
|
"https": httpx.Timeout(connect=7, read=12, write=5, pool=15)}
|
|
|
|
for scheme in ("http", "https"):
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=timeouts[scheme],
|
|
follow_redirects=True,
|
|
headers=_HEADERS,
|
|
verify=False,
|
|
max_redirects=5,
|
|
) as client:
|
|
resp = await client.get(f"{scheme}://{domain}")
|
|
|
|
result["load_time_ms"] = int((time.monotonic() - t0) * 1000)
|
|
result["status_code"] = resp.status_code
|
|
result["server"] = (resp.headers.get("server") or "")[:100]
|
|
result["ip"] = await _resolve_ip(domain)
|
|
|
|
final_url = str(resp.url)
|
|
final_host = urlparse(final_url).netloc.lower().lstrip("www.")
|
|
|
|
if not _same_domain(domain, final_url):
|
|
for ph in PARKING_REDIRECT_HOSTS:
|
|
if ph in final_host:
|
|
result["prescreen_status"] = "parked"
|
|
return result
|
|
result["prescreen_status"] = "redirect"
|
|
return result
|
|
|
|
if resp.status_code not in (200, 203):
|
|
return result # dead
|
|
|
|
html_lc = resp.text[:20_000].lower()
|
|
for sig in PARKING_BODY_SIGNALS:
|
|
if sig in html_lc:
|
|
result["prescreen_status"] = "parked"
|
|
return result
|
|
|
|
result["prescreen_status"] = "live"
|
|
return result
|
|
|
|
except Exception as e:
|
|
# Any failure on http → always try https next
|
|
# Any failure on https → give up, leave as dead
|
|
logger.debug("Validator %s (%s): %s — %s", domain, scheme, type(e).__name__, e)
|
|
if scheme == "https":
|
|
break
|
|
# fall through to https
|
|
|
|
result["load_time_ms"] = int((time.monotonic() - t0) * 1000)
|
|
return result
|
|
|
|
|
|
def _get_domains_batch(offset: int, limit: int, tld: Optional[str]) -> list[str]:
|
|
try:
|
|
conn = duckdb.connect(str(DUCKDB_PATH), read_only=True)
|
|
conn.execute("SET threads=2")
|
|
if tld:
|
|
rows = conn.execute(
|
|
"SELECT domain FROM domains WHERE tld=? LIMIT ? OFFSET ?",
|
|
[tld.lower().lstrip("."), limit, offset],
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"SELECT domain FROM domains LIMIT ? OFFSET ?",
|
|
[limit, offset],
|
|
).fetchall()
|
|
conn.close()
|
|
return [r[0] for r in rows]
|
|
except Exception as e:
|
|
logger.error("Validator DuckDB error: %s", e)
|
|
return []
|
|
|
|
|
|
async def _filter_unvalidated(domains: list[str], rescan_dead: bool = False) -> list[str]:
|
|
"""Return domains that still need checking.
|
|
|
|
With rescan_dead=True, domains previously marked 'dead' are included
|
|
so they get a fresh check (useful after fixing the http/https bug).
|
|
"""
|
|
if not domains:
|
|
return []
|
|
placeholders = ",".join("?" * len(domains))
|
|
# A domain is "done" if it has a non-null prescreen_status that isn't dead
|
|
# (when rescan_dead=True) or any non-null status (normal mode).
|
|
if rescan_dead:
|
|
condition = "prescreen_status IS NOT NULL AND prescreen_status != 'dead'"
|
|
else:
|
|
condition = "prescreen_status IS NOT NULL"
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
async with db.execute(
|
|
f"SELECT domain FROM enriched_domains "
|
|
f"WHERE domain IN ({placeholders}) AND {condition}",
|
|
domains,
|
|
) as cur:
|
|
already = {r[0] async for r in cur}
|
|
return [d for d in domains if d not in already]
|
|
|
|
|
|
async def _save_batch(results: list[dict]):
|
|
async with aiosqlite.connect(SQLITE_PATH) as db:
|
|
for r in results:
|
|
await db.execute(
|
|
"""INSERT INTO enriched_domains
|
|
(domain, prescreen_status, status_code, server, ip, load_time_ms, prescreen_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, datetime('now'))
|
|
ON CONFLICT(domain) DO UPDATE SET
|
|
prescreen_status = excluded.prescreen_status,
|
|
status_code = COALESCE(excluded.status_code, status_code),
|
|
server = COALESCE(NULLIF(excluded.server,''), server),
|
|
ip = COALESCE(excluded.ip, ip),
|
|
load_time_ms = excluded.load_time_ms,
|
|
prescreen_at = excluded.prescreen_at""",
|
|
(
|
|
r["domain"], r["prescreen_status"], r.get("status_code"),
|
|
r.get("server"), r.get("ip"), r.get("load_time_ms"),
|
|
),
|
|
)
|
|
await db.commit()
|
|
|
|
|
|
async def _validator_loop(tld_filter: Optional[str], rescan_dead: bool = False):
|
|
global _val_stats
|
|
_val_stats["running"] = True
|
|
offset = _val_stats["offset"]
|
|
sem = asyncio.Semaphore(VAL_CONCURRENCY)
|
|
rate_buf: list[float] = []
|
|
|
|
# Wait for DuckDB index to be ready (up to 10 minutes)
|
|
for _ in range(120):
|
|
if index_status()["ready"]:
|
|
break
|
|
logger.info("Validator: waiting for DuckDB index…")
|
|
await asyncio.sleep(5)
|
|
else:
|
|
logger.error("Validator: DuckDB index never became ready")
|
|
_val_stats["running"] = False
|
|
return
|
|
|
|
try:
|
|
while True:
|
|
loop = asyncio.get_event_loop()
|
|
batch = await loop.run_in_executor(
|
|
None, _get_domains_batch, offset, VAL_BATCH, tld_filter
|
|
)
|
|
if not batch:
|
|
logger.info("Validator: dataset complete at offset=%d", offset)
|
|
break
|
|
|
|
to_check = await _filter_unvalidated(batch, rescan_dead=rescan_dead)
|
|
_val_stats["skipped"] += len(batch) - len(to_check)
|
|
offset += len(batch)
|
|
_val_stats["offset"] = offset
|
|
|
|
if not to_check:
|
|
await asyncio.sleep(0) # yield to event loop
|
|
continue
|
|
|
|
t0 = time.monotonic()
|
|
|
|
async def _run(d: str) -> dict:
|
|
async with sem:
|
|
return await _check_domain(d)
|
|
|
|
raw = await asyncio.gather(*[_run(d) for d in to_check], return_exceptions=True)
|
|
results = [r for r in raw if isinstance(r, dict)]
|
|
|
|
await _save_batch(results)
|
|
|
|
for r in results:
|
|
_val_stats["processed"] += 1
|
|
s = r.get("prescreen_status", "dead")
|
|
_val_stats[s] = _val_stats.get(s, 0) + 1
|
|
|
|
elapsed = max(time.monotonic() - t0, 0.01)
|
|
rate_buf.append(len(results) / elapsed)
|
|
if len(rate_buf) > 10:
|
|
rate_buf.pop(0)
|
|
_val_stats["rate"] = round(sum(rate_buf) / len(rate_buf), 1)
|
|
|
|
logger.info(
|
|
"Validator off=%d proc=%d live=%d dead=%d parked=%d rate=%.1f/s",
|
|
offset, _val_stats["processed"], _val_stats["live"],
|
|
_val_stats["dead"], _val_stats["parked"], _val_stats["rate"],
|
|
)
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("Validator cancelled at offset=%d", offset)
|
|
except Exception as e:
|
|
logger.error("Validator loop error: %s", e, exc_info=True)
|
|
finally:
|
|
_val_stats["running"] = False
|
|
_val_stats["offset"] = offset
|
|
|
|
|
|
def get_validator_status() -> dict:
|
|
return dict(_val_stats)
|
|
|
|
|
|
def start_validator(tld_filter: Optional[str] = None, rescan_dead: bool = False):
|
|
global _val_task, _val_stats
|
|
if _val_task and not _val_task.done():
|
|
return # already running
|
|
# Always reset for a fresh run — _filter_unvalidated skips already-done
|
|
# domains, so restarting from 0 is safe and fast even for the same TLD.
|
|
_val_stats.update(
|
|
running=True,
|
|
processed=0, live=0, dead=0, parked=0,
|
|
redirect=0, skipped=0, offset=0, rate=0.0,
|
|
tld_filter=tld_filter,
|
|
rescan_dead=rescan_dead,
|
|
)
|
|
_val_task = asyncio.create_task(_validator_loop(tld_filter, rescan_dead=rescan_dead))
|
|
logger.info("Validator started (tld=%s, rescan_dead=%s)", tld_filter, rescan_dead)
|
|
|
|
|
|
def stop_validator():
|
|
global _val_task, _val_stats
|
|
_val_stats["running"] = False
|
|
if _val_task and not _val_task.done():
|
|
_val_task.cancel()
|
|
logger.info("Validator stop requested")
|