"""Fast HTTP pre-screening + batch DeepSeek niche/type classification. Phase 1 — pure HTTP (no AI): Check each domain with a real browser UA, follow redirects, detect parked/dead/redirect sites. No AI credits spent. Phase 2 — single DeepSeek call: Bundle all live-site titles + snippets into ONE prompt and get back niche + type for every domain. Far cheaper than one call per domain. """ import asyncio import json import logging import os import re from urllib.parse import urlparse import httpx from bs4 import BeautifulSoup logger = logging.getLogger(__name__) REPLICATE_TOKEN = os.getenv("REPLICATE_API_TOKEN", "r8_7I7Feai78f9PzMOs20y5GVFKiLkgUWP463vZO") DEEPSEEK_MODEL = os.getenv("DEEPSEEK_MODEL", "https://api.replicate.com/v1/models/deepseek-ai/deepseek-r1/predictions") PRESCREEN_CONCURRENCY = int(os.getenv("PRESCREEN_CONCURRENCY", "30")) DEEPSEEK_BATCH_SIZE = int(os.getenv("DEEPSEEK_BATCH_SIZE", "80")) # ── Parking / parked detection ──────────────────────────────────────────────── PARKING_BODY_SIGNALS = [ "domain is parked", "this domain is for sale", "buy this domain", "domain parking", "parked domain", "hugedomains.com", "sedo.com", "parkingcrew.com", "bodis.com", "dan.com", "afternic.com", "sedoparking.com", "undeveloped.com", "epik.com/domain", "this web page is parked", "domain has expired", ] PARKING_TITLE_SIGNALS = [ "domain parked", "parked domain", "domain for sale", "buy this domain", "domain expired", ] PARKING_REDIRECT_HOSTS = { "sedo.com", "hugedomains.com", "dan.com", "afternic.com", "parkingcrew.com", "bodis.com", "undeveloped.com", "epik.com", "uniregistry.com", "sedoparking.com", } _UA = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/122.0.0.0 Safari/537.36" ) _HEADERS = { "User-Agent": _UA, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9,es;q=0.8", } def _same_domain(original: str, final_url: str) -> bool: """True if final URL is on the same root domain (handles http→https, www).""" orig = original.lower().lstrip("www.").split(":")[0] final = urlparse(final_url).netloc.lower().lstrip("www.") return orig == final or final.endswith("." + orig) or orig.endswith("." + final) async def _check_one(domain: str) -> dict: result = { "domain": domain, "prescreen_status": "dead", "redirect_to": None, "title": None, "snippet": None, } try: async with httpx.AsyncClient( timeout=httpx.Timeout(connect=6, read=9, write=5, pool=10), follow_redirects=True, headers=_HEADERS, verify=False, max_redirects=5, ) as client: resp = await client.get(f"http://{domain}") final_url = str(resp.url) final_host = urlparse(final_url).netloc.lower().lstrip("www.") # Redirected to a different root domain? if not _same_domain(domain, final_url): for ph in PARKING_REDIRECT_HOSTS: if ph in final_host: result.update(prescreen_status="parked", redirect_to=final_url) return result result.update(prescreen_status="redirect", redirect_to=final_url) return result if resp.status_code not in (200, 203): return result # dead html = resp.text[:80_000] soup = BeautifulSoup(html, "html.parser") title_tag = soup.find("title") title = title_tag.get_text(strip=True)[:200] if title_tag else "" result["title"] = title body_lc = html.lower() title_lc = title.lower() for sig in PARKING_BODY_SIGNALS: if sig in body_lc: result["prescreen_status"] = "parked" return result for sig in PARKING_TITLE_SIGNALS: if sig in title_lc: result["prescreen_status"] = "parked" return result for tag in soup(["script", "style", "nav", "footer", "header"]): tag.decompose() snippet = " ".join(soup.get_text(" ", strip=True).split())[:600] result.update(prescreen_status="live", snippet=snippet) return result except Exception as e: logger.debug("Prescreen %s: %s", domain, e) return result async def prescreen_domains(domains: list[str]) -> list[dict]: """HTTP-check all domains concurrently. Returns one result dict per domain.""" sem = asyncio.Semaphore(PRESCREEN_CONCURRENCY) async def _guard(d): async with sem: return await _check_one(d) raw = await asyncio.gather(*[_guard(d) for d in domains], return_exceptions=True) return [ r if not isinstance(r, Exception) else {"domain": d, "prescreen_status": "dead", "redirect_to": None, "title": None, "snippet": None} for d, r in zip(domains, raw) ] # ── DeepSeek batch classification ──────────────────────────────────────────── _NICHES = ( "automotive, beauty_cosmetics, travel_tourism, hospitality, " "restaurant_food, legal, medical_health, real_estate, technology, " "fashion_retail, finance, education, construction, sports, " "entertainment, agriculture, industrial, consulting, other" ) _TYPES = ( "corporate, ecommerce, blog, newspaper, landing_page, " "portfolio, directory, forum, informational, other" ) def _build_classify_prompt(items: list[dict]) -> str: lines = [] for i, d in enumerate(items, 1): t = (d.get("title") or "").replace('"', "'")[:100] s = (d.get("snippet") or "").replace('"', "'")[:300] lines.append(f'{i}. domain="{d["domain"]}" title="{t}" text="{s}"') return ( "Classify each website below.\n" "Return ONLY a JSON array — no markdown, no explanation, nothing else.\n\n" f"niche options (pick one closest): {_NICHES}\n" f"type options (pick one closest): {_TYPES}\n\n" 'Output format: [{"domain":"x.com","niche":"automotive","type":"corporate"}, ...]\n\n' "Websites:\n" + "\n".join(lines) ) def _parse_classify_output(raw: str) -> list[dict]: text = re.sub(r"```(?:json)?", "", raw).strip().rstrip("`").strip() # Strip DeepSeek reasoning block if present text = re.sub(r"[\s\S]*?", "", text).strip() m = re.search(r"\[[\s\S]+\]", text) if m: try: return json.loads(m.group(0)) except json.JSONDecodeError: pass logger.warning("DeepSeek classification parse failed: %s", raw[:300]) return [] async def classify_with_deepseek(live_items: list[dict]) -> list[dict]: """Single DeepSeek call → list of {domain, niche, type}.""" if not live_items: return [] payload = { "input": { "prompt": _build_classify_prompt(live_items), "max_tokens": min(4096, len(live_items) * 80 + 600), "temperature": 0.1, } } try: async with httpx.AsyncClient(timeout=120) as client: resp = await client.post( DEEPSEEK_MODEL, headers={ "Authorization": f"Bearer {REPLICATE_TOKEN}", "Content-Type": "application/json", "Prefer": "wait", }, json=payload, ) resp.raise_for_status() data = resp.json() output = data.get("output", "") if isinstance(output, list): output = "".join(output) return _parse_classify_output(output) except Exception as e: logger.error("DeepSeek classification error: %s", e) return []