"""Fast HTTP pre-screening + batch DeepSeek niche/type classification. Phase 1 — pure HTTP (no AI): Check each domain with a real browser UA, follow redirects, detect parked/dead/redirect sites. No AI credits spent. Phase 2 — single DeepSeek call: Bundle all live-site titles + snippets into ONE prompt and get back niche + type for every domain. Far cheaper than one call per domain. """ import asyncio import json import logging import os import re from urllib.parse import urlparse import httpx from bs4 import BeautifulSoup logger = logging.getLogger(__name__) REPLICATE_TOKEN = os.getenv("REPLICATE_API_TOKEN", "r8_7I7Feai78f9PzMOs20y5GVFKiLkgUWP463vZO") DEEPSEEK_MODEL = os.getenv("DEEPSEEK_MODEL", "https://api.replicate.com/v1/models/deepseek-ai/deepseek-r1/predictions") PRESCREEN_CONCURRENCY = int(os.getenv("PRESCREEN_CONCURRENCY", "30")) DEEPSEEK_BATCH_SIZE = int(os.getenv("DEEPSEEK_BATCH_SIZE", "80")) # ── Parking / parked detection ──────────────────────────────────────────────── PARKING_BODY_SIGNALS = [ "domain is parked", "this domain is for sale", "buy this domain", "domain parking", "parked domain", "hugedomains.com", "sedo.com", "parkingcrew.com", "bodis.com", "dan.com", "afternic.com", "sedoparking.com", "undeveloped.com", "epik.com/domain", "this web page is parked", "domain has expired", ] PARKING_TITLE_SIGNALS = [ "domain parked", "parked domain", "domain for sale", "buy this domain", "domain expired", ] PARKING_REDIRECT_HOSTS = { "sedo.com", "hugedomains.com", "dan.com", "afternic.com", "parkingcrew.com", "bodis.com", "undeveloped.com", "epik.com", "uniregistry.com", "sedoparking.com", } _UA = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/122.0.0.0 Safari/537.36" ) _HEADERS = { "User-Agent": _UA, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9,es;q=0.8", } def _same_domain(original: str, final_url: str) -> bool: """True if final URL is on the same root domain (handles http→https, www).""" orig = original.lower().lstrip("www.").split(":")[0] final = urlparse(final_url).netloc.lower().lstrip("www.") return orig == final or final.endswith("." + orig) or orig.endswith("." + final) async def _check_one(domain: str) -> dict: result = { "domain": domain, "prescreen_status": "dead", "redirect_to": None, "title": None, "snippet": None, } try: async with httpx.AsyncClient( timeout=httpx.Timeout(connect=6, read=9, write=5, pool=10), follow_redirects=True, headers=_HEADERS, verify=False, max_redirects=5, ) as client: resp = await client.get(f"http://{domain}") final_url = str(resp.url) final_host = urlparse(final_url).netloc.lower().lstrip("www.") # Redirected to a different root domain? if not _same_domain(domain, final_url): for ph in PARKING_REDIRECT_HOSTS: if ph in final_host: result.update(prescreen_status="parked", redirect_to=final_url) return result result.update(prescreen_status="redirect", redirect_to=final_url) return result if resp.status_code not in (200, 203): return result # dead html = resp.text[:80_000] soup = BeautifulSoup(html, "html.parser") title_tag = soup.find("title") title = title_tag.get_text(strip=True)[:200] if title_tag else "" result["title"] = title body_lc = html.lower() title_lc = title.lower() for sig in PARKING_BODY_SIGNALS: if sig in body_lc: result["prescreen_status"] = "parked" return result for sig in PARKING_TITLE_SIGNALS: if sig in title_lc: result["prescreen_status"] = "parked" return result for tag in soup(["script", "style", "nav", "footer", "header"]): tag.decompose() snippet = " ".join(soup.get_text(" ", strip=True).split())[:600] result.update(prescreen_status="live", snippet=snippet) return result except Exception as e: logger.debug("Prescreen %s: %s", domain, e) return result async def prescreen_domains(domains: list[str]) -> list[dict]: """HTTP-check all domains concurrently. Returns one result dict per domain.""" sem = asyncio.Semaphore(PRESCREEN_CONCURRENCY) async def _guard(d): async with sem: return await _check_one(d) raw = await asyncio.gather(*[_guard(d) for d in domains], return_exceptions=True) return [ r if not isinstance(r, Exception) else {"domain": d, "prescreen_status": "dead", "redirect_to": None, "title": None, "snippet": None} for d, r in zip(domains, raw) ] # ── DeepSeek batch classification ──────────────────────────────────────────── _NICHES = ( "automotive, beauty_cosmetics, travel_tourism, hospitality, " "restaurant_food, legal, medical_health, real_estate, technology, " "fashion_retail, finance, education, construction, sports, " "entertainment, agriculture, industrial, consulting, other" ) _TYPES = ( "corporate, ecommerce, blog, newspaper, landing_page, " "portfolio, directory, forum, informational, other" ) def _build_classify_prompt(items: list[dict]) -> str: lines = [] for i, d in enumerate(items, 1): t = (d.get("title") or "").replace('"', "'")[:100] s = (d.get("snippet") or "").replace('"', "'")[:300] lines.append(f'{i}. domain="{d["domain"]}" title="{t}" text="{s}"') return ( "Classify each website below.\n" "Return ONLY a JSON array — no markdown, no explanation, nothing else.\n\n" f"niche options (pick one closest): {_NICHES}\n" f"type options (pick one closest): {_TYPES}\n\n" 'Output format: [{"domain":"x.com","niche":"automotive","type":"corporate"}, ...]\n\n' "Websites:\n" + "\n".join(lines) ) def _parse_classify_output(raw: str) -> list[dict]: """Extract JSON array from DeepSeek output. Strategy: search the full raw text first (handles cases where the JSON sits outside or inside blocks), then try with think stripped. DeepSeek-R1 sometimes puts its answer inside the think block; stripping it first would lose the data entirely. """ text = re.sub(r"```(?:json)?", "", raw).strip().rstrip("`").strip() def _try_parse(s: str): m = re.search(r"\[[\s\S]+\]", s) if not m: return None try: result = json.loads(m.group(0)) if isinstance(result, list) and result: return result except json.JSONDecodeError: pass return None # 1. Try the full output as-is (handles JSON after ) parsed = _try_parse(text) if parsed: logger.info("DeepSeek: parsed %d items from full output", len(parsed)) return parsed # 2. Try ONLY the content inside (handles JSON inside the block) think_m = re.search(r"([\s\S]*?)", text) if think_m: parsed = _try_parse(think_m.group(1)) if parsed: logger.info("DeepSeek: parsed %d items from block", len(parsed)) return parsed # 3. Try with think block stripped (standard path) stripped = re.sub(r"[\s\S]*?", "", text).strip() parsed = _try_parse(stripped) if parsed: logger.info("DeepSeek: parsed %d items after stripping ", len(parsed)) return parsed logger.warning("DeepSeek classification parse failed, raw snippet: %.400s", raw) return [] async def classify_with_deepseek(live_items: list[dict]) -> list[dict]: """Single DeepSeek call → list of {domain, niche, type}. Handles: - 429 Too Many Requests: exponential back-off, up to 4 retries - 202 Accepted (async prediction): polls urls.get until succeeded """ if not live_items: return [] payload = { "input": { "prompt": _build_classify_prompt(live_items), "max_tokens": min(4096, len(live_items) * 80 + 600), "temperature": 0.1, } } auth_header = {"Authorization": f"Bearer {REPLICATE_TOKEN}"} MAX_RETRIES = 4 for attempt in range(MAX_RETRIES): try: async with httpx.AsyncClient(timeout=300) as client: resp = await client.post( DEEPSEEK_MODEL, headers={**auth_header, "Content-Type": "application/json", "Prefer": "wait=60"}, json=payload, ) # ── Retry on rate-limit ────────────────────────────────────── if resp.status_code == 429: wait = min(60, 5 * (2 ** attempt)) # 5 → 10 → 20 → 40s logger.warning("DeepSeek 429 (attempt %d/%d), retrying in %ds", attempt + 1, MAX_RETRIES, wait) await asyncio.sleep(wait) continue resp.raise_for_status() data = resp.json() # ── Poll if Replicate queued async (202) ───────────────────── if resp.status_code == 202 or data.get("status") in ("starting", "processing"): poll_url = (data.get("urls") or {}).get("get") if not poll_url: logger.error("DeepSeek: 202 but no poll URL in response") return [] logger.info("DeepSeek: async prediction, polling %s", poll_url) for tick in range(90): # up to ~3 minutes await asyncio.sleep(2) pr = await client.get(poll_url, headers=auth_header) pdata = pr.json() status = pdata.get("status") logger.debug("DeepSeek poll #%d status=%s", tick + 1, status) if status == "succeeded": data = pdata break if status in ("failed", "canceled"): logger.error("DeepSeek prediction %s: %s", status, pdata.get("error")) return [] else: logger.error("DeepSeek: prediction timed out after 90 polls") return [] # ── Parse output ───────────────────────────────────────────────── output = data.get("output") or "" if isinstance(output, list): output = "".join(str(t) for t in output if t is not None) logger.info("DeepSeek raw output (first 500 chars): %.500s", output) result = _parse_classify_output(output) logger.info("DeepSeek classified %d / %d domains", len(result), len(live_items)) return result except Exception as e: if attempt < MAX_RETRIES - 1: wait = 5 * (2 ** attempt) logger.warning("DeepSeek error (attempt %d/%d), retry in %ds: %s", attempt + 1, MAX_RETRIES, wait, e) await asyncio.sleep(wait) else: logger.error("DeepSeek classification failed after %d attempts: %s", MAX_RETRIES, e) return []