diff --git a/app/db.py b/app/db.py index f093e2f..44b013a 100644 --- a/app/db.py +++ b/app/db.py @@ -40,7 +40,9 @@ CREATE TABLE IF NOT EXISTS enriched_domains ( prescreen_status TEXT, niche TEXT, site_type TEXT, - prescreen_at TEXT + prescreen_at TEXT, + ip TEXT, + load_time_ms INTEGER ); CREATE TABLE IF NOT EXISTS job_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -84,6 +86,8 @@ _MIGRATIONS = [ "ALTER TABLE enriched_domains ADD COLUMN niche TEXT", "ALTER TABLE enriched_domains ADD COLUMN site_type TEXT", "ALTER TABLE enriched_domains ADD COLUMN prescreen_at TEXT", + "ALTER TABLE enriched_domains ADD COLUMN ip TEXT", + "ALTER TABLE enriched_domains ADD COLUMN load_time_ms INTEGER", ] # Index build state @@ -315,7 +319,9 @@ async def get_stats(): # ── Enrichment helpers ─────────────────────────────────────────────────────── async def get_enriched(min_score=0, cms=None, country=None, kit_digital=None, - ai_only=False, lead_quality=None, page=1, limit=100): + ai_only=False, lead_quality=None, + prescreen_status=None, niche=None, site_type=None, + page=1, limit=100): offset = (page - 1) * limit conditions = ["score >= ?"] params: list = [min_score] @@ -333,6 +339,17 @@ async def get_enriched(min_score=0, cms=None, country=None, kit_digital=None, if lead_quality: conditions.append("ai_lead_quality = ?") params.append(lead_quality.upper()) + if prescreen_status == "none": + conditions.append("prescreen_status IS NULL") + elif prescreen_status: + conditions.append("prescreen_status = ?") + params.append(prescreen_status) + if niche: + conditions.append("niche = ?") + params.append(niche) + if site_type: + conditions.append("site_type = ?") + params.append(site_type) where = "WHERE " + " AND ".join(conditions) async with aiosqlite.connect(SQLITE_PATH) as db: db.row_factory = aiosqlite.Row diff --git a/app/main.py b/app/main.py index 29f53ba..c63f7c3 100644 --- a/app/main.py +++ b/app/main.py @@ -22,6 +22,7 @@ from app.db import ( ) from app.enricher import start_worker, pause_worker, resume_worker, is_running, ensure_workers_alive from app.scorer import run_scoring +from app.validator import start_validator, stop_validator, get_validator_status logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) @@ -158,17 +159,40 @@ async def enriched( kit_digital: Optional[bool] = Query(None), ai_only: bool = Query(False), lead_quality: str = Query(None), + prescreen_status: str = Query(None), + niche: str = Query(None), + site_type: str = Query(None), page: int = Query(1, ge=1), limit: int = Query(100, ge=1, le=1000), ): total, rows = await get_enriched( min_score=min_score, cms=cms, country=country, kit_digital=kit_digital, ai_only=ai_only, lead_quality=lead_quality, + prescreen_status=prescreen_status, niche=niche, site_type=site_type, page=page, limit=limit, ) return {"page": page, "limit": limit, "total": total, "results": rows} +# ── Bulk Validator endpoints ────────────────────────────────────────────────── + +@app.post("/api/validator/start") +async def validator_start(tld: str = Query(None)): + start_validator(tld_filter=tld or None) + return get_validator_status() + + +@app.post("/api/validator/stop") +async def validator_stop(): + stop_validator() + return {"status": "stopped"} + + +@app.get("/api/validator/status") +async def validator_status(): + return get_validator_status() + + # ── AI assessment endpoints ─────────────────────────────────────────────────── @app.post("/api/prescreen/batch") diff --git a/app/static/index.html b/app/static/index.html index 6a00849..fa6ecfb 100644 --- a/app/static/index.html +++ b/app/static/index.html @@ -314,6 +314,7 @@ tr:hover td{background:rgba(255,255,255,.025)}
Browse & Filter
Enrichment
+
Validator 🔬
Lead Pipeline
Leads 🤖
TLD Chart
@@ -343,6 +344,40 @@ tr:hover td{background:rgba(255,255,255,.025)} +
+ +
+
+ +
+
+ +
@@ -375,7 +410,7 @@ tr:hover td{background:rgba(255,255,255,.025)} DomainScoreKDAI NicheType ContactCMSSSL days - CountryLive + CountryStatus @@ -410,7 +445,7 @@ tr:hover td{background:rgba(255,255,255,.025)} - + @@ -440,7 +475,11 @@ tr:hover td{background:rgba(255,255,255,.025)} - + + + + + @@ -509,7 +548,48 @@ tr:hover td{background:rgba(255,255,255,.025)}
- + +
+
Bulk Domain Validator
+
+ HTTP-checks the entire dataset to determine live/dead/parked/redirect status. + Extracts server type, IP, and load time. Skips already-validated domains. + Results appear as the Status column in Browse & Filter. +
+ + +
+
Checked
+
Live
+
Dead
+
Parked
+
Redirect
+
dom/sec
+
+ + +
+
+ + +
+
+ + +
+ + + ⚡ Running… +
+ + +
+
+
+
+ +
@@ -668,8 +748,10 @@ function app() { aiSt: {pending:0,running:0,done:0,failed:0,total:0}, domains: [], selected: [], aiLang: 'ES', loading: false, page: 1, searchTotal: 0, - f: {tld:'',keyword:'',min_score:0,cms:'',live_only:false,alpha_only:false,no_sld:false,kit_digital_only:false,exclude_assessed:false,limit:'100'}, + f: {tld:'',keyword:'',min_score:0,cms:'',live_only:false,alpha_only:false,no_sld:false,kit_digital_only:false,exclude_assessed:false,limit:'100',prescreen_status:'',niche:'',site_type:''}, qst: {}, customDomains: '', + valSt: {running:false,processed:0,live:0,dead:0,parked:0,redirect:0,skipped:0,offset:0,rate:0}, + valTld: '', leadsQ: {quality:'', country:'', limit:'50'}, leadsData: [], leadsTotal: 0, leadsPage: 1, leadsLoading: false, prescreening: false, @@ -691,8 +773,9 @@ function app() { } else { this._lastAiDone = this.aiSt.done ?? 0; } - if(this.tab==='enrich') this.loadQueue(); - if(this.tab==='pipeline') this.loadPipeline(); + if(this.tab==='enrich') this.loadQueue(); + if(this.tab==='validator') this.loadValStatus(); + if(this.tab==='pipeline') this.loadPipeline(); if(this.tab==='leads') this.loadLeads(); }, 3000); }, @@ -727,10 +810,14 @@ function app() { const data = await fetch('/api/domains?'+p).then(r=>r.json()); this.searchTotal = data.total ?? 0; let rows = data.results; - if(this.f.min_score>0) rows = rows.filter(r=> r.score==null || r.score>=Number(this.f.min_score)); - if(this.f.cms) rows = rows.filter(r=> r.cms===this.f.cms); - if(this.f.kit_digital_only) rows = rows.filter(r=> r.kit_digital); - if(this.f.exclude_assessed) rows = rows.filter(r=> !r.ai_lead_quality); + if(this.f.min_score>0) rows = rows.filter(r=> r.score==null || r.score>=Number(this.f.min_score)); + if(this.f.cms) rows = rows.filter(r=> r.cms===this.f.cms); + if(this.f.kit_digital_only) rows = rows.filter(r=> r.kit_digital); + if(this.f.exclude_assessed) rows = rows.filter(r=> !r.ai_lead_quality); + if(this.f.prescreen_status==='none') rows = rows.filter(r=> !r.prescreen_status); + else if(this.f.prescreen_status) rows = rows.filter(r=> r.prescreen_status===this.f.prescreen_status); + if(this.f.niche) rows = rows.filter(r=> r.niche===this.f.niche); + if(this.f.site_type) rows = rows.filter(r=> r.site_type===this.f.site_type); this.domains = rows; } catch(e) { this.domains = []; @@ -740,7 +827,7 @@ function app() { }, selectAll() { this.selected = this.domains.map(d=>d.domain); }, - resetFilters() { this.f={tld:'',keyword:'',min_score:0,cms:'',live_only:false,alpha_only:false,no_sld:false,kit_digital_only:false,exclude_assessed:false,limit:'100'}; }, + resetFilters() { this.f={tld:'',keyword:'',min_score:0,cms:'',live_only:false,alpha_only:false,no_sld:false,kit_digital_only:false,exclude_assessed:false,limit:'100',prescreen_status:'',niche:'',site_type:''}; }, async enqueueSelected() { if(!this.selected.length) return; @@ -852,6 +939,22 @@ function app() { try { this.qst = await fetch('/api/enrich/status').then(r=>r.json()); } catch(e){} }, + async loadValStatus() { + try { this.valSt = await fetch('/api/validator/status').then(r=>r.json()); } catch(e){} + }, + async startValidator() { + const p = new URLSearchParams(); + if(this.valTld.trim()) p.set('tld', this.valTld.trim()); + await fetch('/api/validator/start'+(p.toString()? '?'+p : ''), {method:'POST'}); + this.notify('Validator started', 'success'); + await this.loadValStatus(); + }, + async stopValidator() { + await fetch('/api/validator/stop', {method:'POST'}); + this.notify('Validator stopped', 'info'); + await this.loadValStatus(); + }, + async restartAiWorker() { await fetch('/api/ai/worker/restart',{method:'POST'}); this.notify('AI worker restarted','info'); await this.loadAiStatus(); }, copyEmail() { const subj = this.modal.ai.email_subject ? `Subject: ${this.modal.ai.email_subject}\n\n` : ''; diff --git a/app/validator.py b/app/validator.py new file mode 100644 index 0000000..6224513 --- /dev/null +++ b/app/validator.py @@ -0,0 +1,292 @@ +"""Bulk domain validator — fast HTTP checks for the entire dataset. + +Reads domains from DuckDB in batches, skips already-validated ones, +performs concurrent HTTP checks, and saves prescreen_status + server + +ip + load_time_ms to enriched_domains. +""" +import asyncio +import logging +import os +import socket +import time +from typing import Optional +from urllib.parse import urlparse + +import httpx +import aiosqlite +import duckdb + +from app.db import SQLITE_PATH, DUCKDB_PATH, index_status + +logger = logging.getLogger(__name__) + +VAL_CONCURRENCY = int(os.getenv("VAL_CONCURRENCY", "50")) +VAL_BATCH = int(os.getenv("VAL_BATCH", "200")) + +PARKING_BODY_SIGNALS = [ + "domain is parked", "this domain is for sale", "buy this domain", + "domain parking", "parked domain", "hugedomains.com", "sedo.com", + "parkingcrew.com", "bodis.com", "dan.com", "afternic.com", + "sedoparking.com", "undeveloped.com", "epik.com/domain", + "this web page is parked", "domain has expired", +] +PARKING_REDIRECT_HOSTS = { + "sedo.com", "hugedomains.com", "dan.com", "afternic.com", + "parkingcrew.com", "bodis.com", "undeveloped.com", "epik.com", + "uniregistry.com", "sedoparking.com", +} + +_UA = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/122.0.0.0 Safari/537.36" +) +_HEADERS = { + "User-Agent": _UA, + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", +} + +_val_task: Optional[asyncio.Task] = None +_val_stats: dict = { + "running": False, + "processed": 0, + "live": 0, + "dead": 0, + "parked": 0, + "redirect": 0, + "skipped": 0, + "offset": 0, + "rate": 0.0, + "tld_filter": None, +} + + +def _same_domain(original: str, final_url: str) -> bool: + orig = original.lower().lstrip("www.").split(":")[0] + final = urlparse(final_url).netloc.lower().lstrip("www.") + return orig == final or final.endswith("." + orig) or orig.endswith("." + final) + + +async def _resolve_ip(domain: str) -> Optional[str]: + try: + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, socket.gethostbyname, domain) + except Exception: + return None + + +async def _check_domain(domain: str) -> dict: + result: dict = { + "domain": domain, + "prescreen_status": "dead", + "status_code": None, + "server": None, + "ip": None, + "load_time_ms": None, + } + t0 = time.monotonic() + try: + async with httpx.AsyncClient( + timeout=httpx.Timeout(connect=5, read=8, write=5, pool=10), + follow_redirects=True, + headers=_HEADERS, + verify=False, + max_redirects=5, + ) as client: + resp = await client.get(f"http://{domain}") + + result["load_time_ms"] = int((time.monotonic() - t0) * 1000) + result["status_code"] = resp.status_code + result["server"] = (resp.headers.get("server") or "")[:100] + + # Resolve IP for live-looking domains + result["ip"] = await _resolve_ip(domain) + + final_url = str(resp.url) + final_host = urlparse(final_url).netloc.lower().lstrip("www.") + + # Redirected to a different root domain? + if not _same_domain(domain, final_url): + for ph in PARKING_REDIRECT_HOSTS: + if ph in final_host: + result["prescreen_status"] = "parked" + return result + result["prescreen_status"] = "redirect" + return result + + if resp.status_code not in (200, 203): + return result # dead + + html_lc = resp.text[:20_000].lower() + for sig in PARKING_BODY_SIGNALS: + if sig in html_lc: + result["prescreen_status"] = "parked" + return result + + result["prescreen_status"] = "live" + return result + + except Exception as e: + logger.debug("Validator %s: %s", domain, e) + result["load_time_ms"] = int((time.monotonic() - t0) * 1000) + return result + + +def _get_domains_batch(offset: int, limit: int, tld: Optional[str]) -> list[str]: + try: + conn = duckdb.connect(str(DUCKDB_PATH), read_only=True) + conn.execute("SET threads=2") + if tld: + rows = conn.execute( + "SELECT domain FROM domains WHERE tld=? LIMIT ? OFFSET ?", + [tld.lower().lstrip("."), limit, offset], + ).fetchall() + else: + rows = conn.execute( + "SELECT domain FROM domains LIMIT ? OFFSET ?", + [limit, offset], + ).fetchall() + conn.close() + return [r[0] for r in rows] + except Exception as e: + logger.error("Validator DuckDB error: %s", e) + return [] + + +async def _filter_unvalidated(domains: list[str]) -> list[str]: + """Return only domains that don't have a prescreen_status set yet.""" + if not domains: + return [] + placeholders = ",".join("?" * len(domains)) + async with aiosqlite.connect(SQLITE_PATH) as db: + async with db.execute( + f"SELECT domain FROM enriched_domains " + f"WHERE domain IN ({placeholders}) AND prescreen_status IS NOT NULL", + domains, + ) as cur: + already = {r[0] async for r in cur} + return [d for d in domains if d not in already] + + +async def _save_batch(results: list[dict]): + async with aiosqlite.connect(SQLITE_PATH) as db: + for r in results: + await db.execute( + """INSERT INTO enriched_domains + (domain, prescreen_status, status_code, server, ip, load_time_ms, prescreen_at) + VALUES (?, ?, ?, ?, ?, ?, datetime('now')) + ON CONFLICT(domain) DO UPDATE SET + prescreen_status = excluded.prescreen_status, + status_code = COALESCE(excluded.status_code, status_code), + server = COALESCE(NULLIF(excluded.server,''), server), + ip = COALESCE(excluded.ip, ip), + load_time_ms = excluded.load_time_ms, + prescreen_at = excluded.prescreen_at""", + ( + r["domain"], r["prescreen_status"], r.get("status_code"), + r.get("server"), r.get("ip"), r.get("load_time_ms"), + ), + ) + await db.commit() + + +async def _validator_loop(tld_filter: Optional[str]): + global _val_stats + _val_stats["running"] = True + offset = _val_stats["offset"] + sem = asyncio.Semaphore(VAL_CONCURRENCY) + rate_buf: list[float] = [] + + # Wait for DuckDB index to be ready (up to 10 minutes) + for _ in range(120): + if index_status()["ready"]: + break + logger.info("Validator: waiting for DuckDB index…") + await asyncio.sleep(5) + else: + logger.error("Validator: DuckDB index never became ready") + _val_stats["running"] = False + return + + try: + while True: + loop = asyncio.get_event_loop() + batch = await loop.run_in_executor( + None, _get_domains_batch, offset, VAL_BATCH, tld_filter + ) + if not batch: + logger.info("Validator: dataset complete at offset=%d", offset) + break + + to_check = await _filter_unvalidated(batch) + _val_stats["skipped"] += len(batch) - len(to_check) + offset += len(batch) + _val_stats["offset"] = offset + + if not to_check: + await asyncio.sleep(0) # yield to event loop + continue + + t0 = time.monotonic() + + async def _run(d: str) -> dict: + async with sem: + return await _check_domain(d) + + raw = await asyncio.gather(*[_run(d) for d in to_check], return_exceptions=True) + results = [r for r in raw if isinstance(r, dict)] + + await _save_batch(results) + + for r in results: + _val_stats["processed"] += 1 + s = r.get("prescreen_status", "dead") + _val_stats[s] = _val_stats.get(s, 0) + 1 + + elapsed = max(time.monotonic() - t0, 0.01) + rate_buf.append(len(results) / elapsed) + if len(rate_buf) > 10: + rate_buf.pop(0) + _val_stats["rate"] = round(sum(rate_buf) / len(rate_buf), 1) + + logger.info( + "Validator off=%d proc=%d live=%d dead=%d parked=%d rate=%.1f/s", + offset, _val_stats["processed"], _val_stats["live"], + _val_stats["dead"], _val_stats["parked"], _val_stats["rate"], + ) + + except asyncio.CancelledError: + logger.info("Validator cancelled at offset=%d", offset) + except Exception as e: + logger.error("Validator loop error: %s", e, exc_info=True) + finally: + _val_stats["running"] = False + _val_stats["offset"] = offset + + +def get_validator_status() -> dict: + return dict(_val_stats) + + +def start_validator(tld_filter: Optional[str] = None): + global _val_task, _val_stats + if _val_task and not _val_task.done(): + return # already running + _val_stats["running"] = True + _val_stats["tld_filter"] = tld_filter + # Only reset counters on a completely fresh start + if not _val_stats.get("processed"): + _val_stats.update( + processed=0, live=0, dead=0, parked=0, + redirect=0, skipped=0, offset=0, rate=0.0, + ) + _val_task = asyncio.create_task(_validator_loop(tld_filter)) + logger.info("Validator started (tld=%s, offset=%d)", tld_filter, _val_stats["offset"]) + + +def stop_validator(): + global _val_task, _val_stats + _val_stats["running"] = False + if _val_task and not _val_task.done(): + _val_task.cancel() + logger.info("Validator stop requested")