diff --git a/app/db.py b/app/db.py index 210054b..486ba75 100644 --- a/app/db.py +++ b/app/db.py @@ -36,7 +36,11 @@ CREATE TABLE IF NOT EXISTS enriched_domains ( ai_contact_channel TEXT, ai_contact_value TEXT, ai_assessed_at TEXT, - site_analysis TEXT + site_analysis TEXT, + prescreen_status TEXT, + niche TEXT, + site_type TEXT, + prescreen_at TEXT ); CREATE TABLE IF NOT EXISTS job_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -76,6 +80,10 @@ _MIGRATIONS = [ "ALTER TABLE enriched_domains ADD COLUMN site_analysis TEXT", "CREATE TABLE IF NOT EXISTS ai_queue (domain TEXT PRIMARY KEY, status TEXT DEFAULT 'pending', created_at TEXT DEFAULT (datetime('now')), completed_at TEXT, error TEXT)", "ALTER TABLE ai_queue ADD COLUMN language TEXT DEFAULT 'ES'", + "ALTER TABLE enriched_domains ADD COLUMN prescreen_status TEXT", + "ALTER TABLE enriched_domains ADD COLUMN niche TEXT", + "ALTER TABLE enriched_domains ADD COLUMN site_type TEXT", + "ALTER TABLE enriched_domains ADD COLUMN prescreen_at TEXT", ] # Index build state @@ -418,6 +426,35 @@ async def save_ai_assessment(domain: str, assessment: dict, site_analysis: dict await db.commit() +async def save_prescreen_results(results: list[dict]): + """Upsert prescreen HTTP results and/or DeepSeek niche/type classifications.""" + async with aiosqlite.connect(SQLITE_PATH) as db: + for r in results: + domain = r.get("domain") + if not domain: + continue + niche = r.get("niche") + site_type = r.get("type") # DeepSeek returns "type" key + if niche or site_type: + # Classification-only update (domain row must already exist) + await db.execute( + "UPDATE enriched_domains SET niche=?, site_type=? WHERE domain=?", + (niche, site_type, domain), + ) + else: + # Prescreen status upsert — create row if it doesn't exist yet + await db.execute( + """INSERT INTO enriched_domains (domain, prescreen_status, prescreen_at, page_title) + VALUES (?, ?, datetime('now'), ?) + ON CONFLICT(domain) DO UPDATE SET + prescreen_status = excluded.prescreen_status, + prescreen_at = excluded.prescreen_at, + page_title = COALESCE(page_title, excluded.page_title)""", + (domain, r.get("prescreen_status"), r.get("title")), + ) + await db.commit() + + async def queue_domains(domains: list[str]): async with aiosqlite.connect(SQLITE_PATH) as db: await db.executemany( diff --git a/app/main.py b/app/main.py index 0c82d1d..1bda565 100644 --- a/app/main.py +++ b/app/main.py @@ -18,7 +18,7 @@ from app.db import ( DATA_DIR, PARQUET_PATH, SQLITE_PATH, init_db, get_stats, get_domains, get_enriched, queue_domains, get_queue_status, build_duckdb_index, index_status, - queue_ai, get_ai_queue_status, save_ai_assessment, + queue_ai, get_ai_queue_status, save_ai_assessment, save_prescreen_results, ) from app.enricher import start_worker, pause_worker, resume_worker, is_running, ensure_workers_alive from app.scorer import run_scoring @@ -171,6 +171,56 @@ async def enriched( # ── AI assessment endpoints ─────────────────────────────────────────────────── +@app.post("/api/prescreen/batch") +async def prescreen_batch(body: dict): + """ + Phase 1 — HTTP check every domain (no AI). Marks live/dead/parked/redirect. + Phase 2 — Single DeepSeek call for all live domains → niche + type. + Max 200 domains per call. + """ + domains = body.get("domains", []) + if not domains: + return JSONResponse({"error": "no domains provided"}, status_code=400) + if len(domains) > 200: + return JSONResponse({"error": "max 200 domains per batch"}, status_code=400) + + from app.prescreener import prescreen_domains, classify_with_deepseek, DEEPSEEK_BATCH_SIZE + + # Phase 1: HTTP checks (concurrent, no AI) + results = await prescreen_domains(domains) + await save_prescreen_results(results) + + counts: dict = {} + for r in results: + s = r.get("prescreen_status", "dead") + counts[s] = counts.get(s, 0) + 1 + + # Phase 2: DeepSeek classification for live sites only + live = [r for r in results if r.get("prescreen_status") == "live"] + classified = 0 + if live: + batches = [live[i:i + DEEPSEEK_BATCH_SIZE] for i in range(0, len(live), DEEPSEEK_BATCH_SIZE)] + batch_cls = await asyncio.gather( + *[classify_with_deepseek(b) for b in batches], return_exceptions=True + ) + all_cls: list = [] + for bc in batch_cls: + if isinstance(bc, list): + all_cls.extend(bc) + if all_cls: + await save_prescreen_results(all_cls) + classified = len(all_cls) + + return { + "total": len(domains), + "live": counts.get("live", 0), + "parked": counts.get("parked", 0), + "redirect": counts.get("redirect", 0), + "dead": counts.get("dead", 0), + "classified": classified, + } + + @app.post("/api/ai/assess/batch") async def ai_assess_batch(body: dict): domains_list = body.get("domains", []) diff --git a/app/prescreener.py b/app/prescreener.py new file mode 100644 index 0000000..7bd038d --- /dev/null +++ b/app/prescreener.py @@ -0,0 +1,224 @@ +"""Fast HTTP pre-screening + batch DeepSeek niche/type classification. + +Phase 1 — pure HTTP (no AI): + Check each domain with a real browser UA, follow redirects, detect + parked/dead/redirect sites. No AI credits spent. + +Phase 2 — single DeepSeek call: + Bundle all live-site titles + snippets into ONE prompt and get back + niche + type for every domain. Far cheaper than one call per domain. +""" +import asyncio +import json +import logging +import os +import re +from urllib.parse import urlparse + +import httpx +from bs4 import BeautifulSoup + +logger = logging.getLogger(__name__) + +REPLICATE_TOKEN = os.getenv("REPLICATE_API_TOKEN", "r8_7I7Feai78f9PzMOs20y5GVFKiLkgUWP463vZO") +DEEPSEEK_MODEL = os.getenv("DEEPSEEK_MODEL", + "https://api.replicate.com/v1/models/deepseek-ai/deepseek-r1/predictions") +PRESCREEN_CONCURRENCY = int(os.getenv("PRESCREEN_CONCURRENCY", "30")) +DEEPSEEK_BATCH_SIZE = int(os.getenv("DEEPSEEK_BATCH_SIZE", "80")) + +# ── Parking / parked detection ──────────────────────────────────────────────── + +PARKING_BODY_SIGNALS = [ + "domain is parked", "this domain is for sale", "buy this domain", + "domain parking", "parked domain", "hugedomains.com", "sedo.com", + "parkingcrew.com", "bodis.com", "dan.com", "afternic.com", + "sedoparking.com", "undeveloped.com", "epik.com/domain", + "this web page is parked", "domain has expired", +] +PARKING_TITLE_SIGNALS = [ + "domain parked", "parked domain", "domain for sale", + "buy this domain", "domain expired", +] +PARKING_REDIRECT_HOSTS = { + "sedo.com", "hugedomains.com", "dan.com", "afternic.com", + "parkingcrew.com", "bodis.com", "undeveloped.com", "epik.com", + "uniregistry.com", "sedoparking.com", +} + +_UA = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/122.0.0.0 Safari/537.36" +) +_HEADERS = { + "User-Agent": _UA, + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.9,es;q=0.8", +} + + +def _same_domain(original: str, final_url: str) -> bool: + """True if final URL is on the same root domain (handles http→https, www).""" + orig = original.lower().lstrip("www.").split(":")[0] + final = urlparse(final_url).netloc.lower().lstrip("www.") + return orig == final or final.endswith("." + orig) or orig.endswith("." + final) + + +async def _check_one(domain: str) -> dict: + result = { + "domain": domain, + "prescreen_status": "dead", + "redirect_to": None, + "title": None, + "snippet": None, + } + try: + async with httpx.AsyncClient( + timeout=httpx.Timeout(connect=6, read=9, write=5, pool=10), + follow_redirects=True, + headers=_HEADERS, + verify=False, + max_redirects=5, + ) as client: + resp = await client.get(f"http://{domain}") + + final_url = str(resp.url) + final_host = urlparse(final_url).netloc.lower().lstrip("www.") + + # Redirected to a different root domain? + if not _same_domain(domain, final_url): + for ph in PARKING_REDIRECT_HOSTS: + if ph in final_host: + result.update(prescreen_status="parked", redirect_to=final_url) + return result + result.update(prescreen_status="redirect", redirect_to=final_url) + return result + + if resp.status_code not in (200, 203): + return result # dead + + html = resp.text[:80_000] + soup = BeautifulSoup(html, "html.parser") + + title_tag = soup.find("title") + title = title_tag.get_text(strip=True)[:200] if title_tag else "" + result["title"] = title + + body_lc = html.lower() + title_lc = title.lower() + + for sig in PARKING_BODY_SIGNALS: + if sig in body_lc: + result["prescreen_status"] = "parked" + return result + for sig in PARKING_TITLE_SIGNALS: + if sig in title_lc: + result["prescreen_status"] = "parked" + return result + + for tag in soup(["script", "style", "nav", "footer", "header"]): + tag.decompose() + snippet = " ".join(soup.get_text(" ", strip=True).split())[:600] + result.update(prescreen_status="live", snippet=snippet) + return result + + except Exception as e: + logger.debug("Prescreen %s: %s", domain, e) + return result + + +async def prescreen_domains(domains: list[str]) -> list[dict]: + """HTTP-check all domains concurrently. Returns one result dict per domain.""" + sem = asyncio.Semaphore(PRESCREEN_CONCURRENCY) + + async def _guard(d): + async with sem: + return await _check_one(d) + + raw = await asyncio.gather(*[_guard(d) for d in domains], return_exceptions=True) + return [ + r if not isinstance(r, Exception) + else {"domain": d, "prescreen_status": "dead", + "redirect_to": None, "title": None, "snippet": None} + for d, r in zip(domains, raw) + ] + + +# ── DeepSeek batch classification ──────────────────────────────────────────── + +_NICHES = ( + "automotive, beauty_cosmetics, travel_tourism, hospitality, " + "restaurant_food, legal, medical_health, real_estate, technology, " + "fashion_retail, finance, education, construction, sports, " + "entertainment, agriculture, industrial, consulting, other" +) +_TYPES = ( + "corporate, ecommerce, blog, newspaper, landing_page, " + "portfolio, directory, forum, informational, other" +) + + +def _build_classify_prompt(items: list[dict]) -> str: + lines = [] + for i, d in enumerate(items, 1): + t = (d.get("title") or "").replace('"', "'")[:100] + s = (d.get("snippet") or "").replace('"', "'")[:300] + lines.append(f'{i}. domain="{d["domain"]}" title="{t}" text="{s}"') + return ( + "Classify each website below.\n" + "Return ONLY a JSON array — no markdown, no explanation, nothing else.\n\n" + f"niche options (pick one closest): {_NICHES}\n" + f"type options (pick one closest): {_TYPES}\n\n" + 'Output format: [{"domain":"x.com","niche":"automotive","type":"corporate"}, ...]\n\n' + "Websites:\n" + "\n".join(lines) + ) + + +def _parse_classify_output(raw: str) -> list[dict]: + text = re.sub(r"```(?:json)?", "", raw).strip().rstrip("`").strip() + # Strip DeepSeek reasoning block if present + text = re.sub(r"[\s\S]*?", "", text).strip() + m = re.search(r"\[[\s\S]+\]", text) + if m: + try: + return json.loads(m.group(0)) + except json.JSONDecodeError: + pass + logger.warning("DeepSeek classification parse failed: %s", raw[:300]) + return [] + + +async def classify_with_deepseek(live_items: list[dict]) -> list[dict]: + """Single DeepSeek call → list of {domain, niche, type}.""" + if not live_items: + return [] + payload = { + "input": { + "prompt": _build_classify_prompt(live_items), + "max_tokens": min(4096, len(live_items) * 80 + 600), + "temperature": 0.1, + } + } + try: + async with httpx.AsyncClient(timeout=120) as client: + resp = await client.post( + DEEPSEEK_MODEL, + headers={ + "Authorization": f"Bearer {REPLICATE_TOKEN}", + "Content-Type": "application/json", + "Prefer": "wait", + }, + json=payload, + ) + resp.raise_for_status() + data = resp.json() + + output = data.get("output", "") + if isinstance(output, list): + output = "".join(output) + + return _parse_classify_output(output) + + except Exception as e: + logger.error("DeepSeek classification error: %s", e) + return [] diff --git a/app/static/index.html b/app/static/index.html index 494eab2..7b5e212 100644 --- a/app/static/index.html +++ b/app/static/index.html @@ -69,7 +69,13 @@ input[type=range]{accent-color:var(--accent);width:100px;cursor:pointer} .bg{background:var(--surface2);color:var(--text);border:1px solid var(--border)} .bkd{background:#f59e0b22;color:var(--kd);border:1px solid #f59e0b44} .bai{background:#a855f722;color:#c084fc;border:1px solid #a855f744} +.bps{background:#0f9d5822;color:#34d399;border:1px solid #0f9d5844} .sm{padding:4px 9px;font-size:11px} +/* Niche / type pills */ +.pni{background:#0ea5e918;color:#38bdf8;border:1px solid #0ea5e933} +.pty{background:#8b5cf618;color:#a78bfa;border:1px solid #8b5cf633} +/* Prescreen status dot */ +.ps-live{color:#34d399} .ps-dead{color:#f87171} .ps-parked{color:#fbbf24} .ps-redirect{color:#94a3b8} /* Table */ .tw{overflow-x:auto;border-radius:var(--r);border:1px solid var(--border)} @@ -345,6 +351,10 @@ tr:hover td{background:rgba(255,255,255,.025)} +