1. prescreener.py: classify_with_deepseek now retries on 429 with exponential back-off (5s → 10s → 20s → 40s, up to 4 attempts); same back-off also covers other transient errors. 2. main.py: prescreen batches run sequentially with a 3s gap instead of asyncio.gather (parallel). Parallel batches caused the second batch to always hit the 429 rate limit, leaving most domains unclassified (only the smaller last batch succeeded). 3. index.html: prescreenSelected() now clears this.domains before calling _fetch() so Alpine re-renders the full table with the updated niche/type values; also updates the notify hint to mention the expected 1-2 min wait. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
305 lines
12 KiB
Python
305 lines
12 KiB
Python
"""Fast HTTP pre-screening + batch DeepSeek niche/type classification.
|
|
|
|
Phase 1 — pure HTTP (no AI):
|
|
Check each domain with a real browser UA, follow redirects, detect
|
|
parked/dead/redirect sites. No AI credits spent.
|
|
|
|
Phase 2 — single DeepSeek call:
|
|
Bundle all live-site titles + snippets into ONE prompt and get back
|
|
niche + type for every domain. Far cheaper than one call per domain.
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from urllib.parse import urlparse
|
|
|
|
import httpx
|
|
from bs4 import BeautifulSoup
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
REPLICATE_TOKEN = os.getenv("REPLICATE_API_TOKEN", "r8_7I7Feai78f9PzMOs20y5GVFKiLkgUWP463vZO")
|
|
DEEPSEEK_MODEL = os.getenv("DEEPSEEK_MODEL",
|
|
"https://api.replicate.com/v1/models/deepseek-ai/deepseek-r1/predictions")
|
|
PRESCREEN_CONCURRENCY = int(os.getenv("PRESCREEN_CONCURRENCY", "30"))
|
|
DEEPSEEK_BATCH_SIZE = int(os.getenv("DEEPSEEK_BATCH_SIZE", "80"))
|
|
|
|
# ── Parking / parked detection ────────────────────────────────────────────────
|
|
|
|
PARKING_BODY_SIGNALS = [
|
|
"domain is parked", "this domain is for sale", "buy this domain",
|
|
"domain parking", "parked domain", "hugedomains.com", "sedo.com",
|
|
"parkingcrew.com", "bodis.com", "dan.com", "afternic.com",
|
|
"sedoparking.com", "undeveloped.com", "epik.com/domain",
|
|
"this web page is parked", "domain has expired",
|
|
]
|
|
PARKING_TITLE_SIGNALS = [
|
|
"domain parked", "parked domain", "domain for sale",
|
|
"buy this domain", "domain expired",
|
|
]
|
|
PARKING_REDIRECT_HOSTS = {
|
|
"sedo.com", "hugedomains.com", "dan.com", "afternic.com",
|
|
"parkingcrew.com", "bodis.com", "undeveloped.com", "epik.com",
|
|
"uniregistry.com", "sedoparking.com",
|
|
}
|
|
|
|
_UA = (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/122.0.0.0 Safari/537.36"
|
|
)
|
|
_HEADERS = {
|
|
"User-Agent": _UA,
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
"Accept-Language": "en-US,en;q=0.9,es;q=0.8",
|
|
}
|
|
|
|
|
|
def _same_domain(original: str, final_url: str) -> bool:
|
|
"""True if final URL is on the same root domain (handles http→https, www)."""
|
|
orig = original.lower().lstrip("www.").split(":")[0]
|
|
final = urlparse(final_url).netloc.lower().lstrip("www.")
|
|
return orig == final or final.endswith("." + orig) or orig.endswith("." + final)
|
|
|
|
|
|
async def _check_one(domain: str) -> dict:
|
|
result = {
|
|
"domain": domain,
|
|
"prescreen_status": "dead",
|
|
"redirect_to": None,
|
|
"title": None,
|
|
"snippet": None,
|
|
}
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=httpx.Timeout(connect=6, read=9, write=5, pool=10),
|
|
follow_redirects=True,
|
|
headers=_HEADERS,
|
|
verify=False,
|
|
max_redirects=5,
|
|
) as client:
|
|
resp = await client.get(f"http://{domain}")
|
|
|
|
final_url = str(resp.url)
|
|
final_host = urlparse(final_url).netloc.lower().lstrip("www.")
|
|
|
|
# Redirected to a different root domain?
|
|
if not _same_domain(domain, final_url):
|
|
for ph in PARKING_REDIRECT_HOSTS:
|
|
if ph in final_host:
|
|
result.update(prescreen_status="parked", redirect_to=final_url)
|
|
return result
|
|
result.update(prescreen_status="redirect", redirect_to=final_url)
|
|
return result
|
|
|
|
if resp.status_code not in (200, 203):
|
|
return result # dead
|
|
|
|
html = resp.text[:80_000]
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
|
|
title_tag = soup.find("title")
|
|
title = title_tag.get_text(strip=True)[:200] if title_tag else ""
|
|
result["title"] = title
|
|
|
|
body_lc = html.lower()
|
|
title_lc = title.lower()
|
|
|
|
for sig in PARKING_BODY_SIGNALS:
|
|
if sig in body_lc:
|
|
result["prescreen_status"] = "parked"
|
|
return result
|
|
for sig in PARKING_TITLE_SIGNALS:
|
|
if sig in title_lc:
|
|
result["prescreen_status"] = "parked"
|
|
return result
|
|
|
|
for tag in soup(["script", "style", "nav", "footer", "header"]):
|
|
tag.decompose()
|
|
snippet = " ".join(soup.get_text(" ", strip=True).split())[:600]
|
|
result.update(prescreen_status="live", snippet=snippet)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.debug("Prescreen %s: %s", domain, e)
|
|
return result
|
|
|
|
|
|
async def prescreen_domains(domains: list[str]) -> list[dict]:
|
|
"""HTTP-check all domains concurrently. Returns one result dict per domain."""
|
|
sem = asyncio.Semaphore(PRESCREEN_CONCURRENCY)
|
|
|
|
async def _guard(d):
|
|
async with sem:
|
|
return await _check_one(d)
|
|
|
|
raw = await asyncio.gather(*[_guard(d) for d in domains], return_exceptions=True)
|
|
return [
|
|
r if not isinstance(r, Exception)
|
|
else {"domain": d, "prescreen_status": "dead",
|
|
"redirect_to": None, "title": None, "snippet": None}
|
|
for d, r in zip(domains, raw)
|
|
]
|
|
|
|
|
|
# ── DeepSeek batch classification ────────────────────────────────────────────
|
|
|
|
_NICHES = (
|
|
"automotive, beauty_cosmetics, travel_tourism, hospitality, "
|
|
"restaurant_food, legal, medical_health, real_estate, technology, "
|
|
"fashion_retail, finance, education, construction, sports, "
|
|
"entertainment, agriculture, industrial, consulting, other"
|
|
)
|
|
_TYPES = (
|
|
"corporate, ecommerce, blog, newspaper, landing_page, "
|
|
"portfolio, directory, forum, informational, other"
|
|
)
|
|
|
|
|
|
def _build_classify_prompt(items: list[dict]) -> str:
|
|
lines = []
|
|
for i, d in enumerate(items, 1):
|
|
t = (d.get("title") or "").replace('"', "'")[:100]
|
|
s = (d.get("snippet") or "").replace('"', "'")[:300]
|
|
lines.append(f'{i}. domain="{d["domain"]}" title="{t}" text="{s}"')
|
|
return (
|
|
"Classify each website below.\n"
|
|
"Return ONLY a JSON array — no markdown, no explanation, nothing else.\n\n"
|
|
f"niche options (pick one closest): {_NICHES}\n"
|
|
f"type options (pick one closest): {_TYPES}\n\n"
|
|
'Output format: [{"domain":"x.com","niche":"automotive","type":"corporate"}, ...]\n\n'
|
|
"Websites:\n" + "\n".join(lines)
|
|
)
|
|
|
|
|
|
def _parse_classify_output(raw: str) -> list[dict]:
|
|
"""Extract JSON array from DeepSeek output.
|
|
Strategy: search the full raw text first (handles cases where the JSON
|
|
sits outside or inside <think> blocks), then try with think stripped.
|
|
DeepSeek-R1 sometimes puts its answer inside the think block; stripping
|
|
it first would lose the data entirely.
|
|
"""
|
|
text = re.sub(r"```(?:json)?", "", raw).strip().rstrip("`").strip()
|
|
|
|
def _try_parse(s: str):
|
|
m = re.search(r"\[[\s\S]+\]", s)
|
|
if not m:
|
|
return None
|
|
try:
|
|
result = json.loads(m.group(0))
|
|
if isinstance(result, list) and result:
|
|
return result
|
|
except json.JSONDecodeError:
|
|
pass
|
|
return None
|
|
|
|
# 1. Try the full output as-is (handles JSON after </think>)
|
|
parsed = _try_parse(text)
|
|
if parsed:
|
|
logger.info("DeepSeek: parsed %d items from full output", len(parsed))
|
|
return parsed
|
|
|
|
# 2. Try ONLY the content inside <think> (handles JSON inside the block)
|
|
think_m = re.search(r"<think>([\s\S]*?)</think>", text)
|
|
if think_m:
|
|
parsed = _try_parse(think_m.group(1))
|
|
if parsed:
|
|
logger.info("DeepSeek: parsed %d items from <think> block", len(parsed))
|
|
return parsed
|
|
|
|
# 3. Try with think block stripped (standard path)
|
|
stripped = re.sub(r"<think>[\s\S]*?</think>", "", text).strip()
|
|
parsed = _try_parse(stripped)
|
|
if parsed:
|
|
logger.info("DeepSeek: parsed %d items after stripping <think>", len(parsed))
|
|
return parsed
|
|
|
|
logger.warning("DeepSeek classification parse failed, raw snippet: %.400s", raw)
|
|
return []
|
|
|
|
|
|
async def classify_with_deepseek(live_items: list[dict]) -> list[dict]:
|
|
"""Single DeepSeek call → list of {domain, niche, type}.
|
|
|
|
Handles:
|
|
- 429 Too Many Requests: exponential back-off, up to 4 retries
|
|
- 202 Accepted (async prediction): polls urls.get until succeeded
|
|
"""
|
|
if not live_items:
|
|
return []
|
|
payload = {
|
|
"input": {
|
|
"prompt": _build_classify_prompt(live_items),
|
|
"max_tokens": min(4096, len(live_items) * 80 + 600),
|
|
"temperature": 0.1,
|
|
}
|
|
}
|
|
auth_header = {"Authorization": f"Bearer {REPLICATE_TOKEN}"}
|
|
|
|
MAX_RETRIES = 4
|
|
for attempt in range(MAX_RETRIES):
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300) as client:
|
|
resp = await client.post(
|
|
DEEPSEEK_MODEL,
|
|
headers={**auth_header, "Content-Type": "application/json", "Prefer": "wait=60"},
|
|
json=payload,
|
|
)
|
|
|
|
# ── Retry on rate-limit ──────────────────────────────────────
|
|
if resp.status_code == 429:
|
|
wait = min(60, 5 * (2 ** attempt)) # 5 → 10 → 20 → 40s
|
|
logger.warning("DeepSeek 429 (attempt %d/%d), retrying in %ds",
|
|
attempt + 1, MAX_RETRIES, wait)
|
|
await asyncio.sleep(wait)
|
|
continue
|
|
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
# ── Poll if Replicate queued async (202) ─────────────────────
|
|
if resp.status_code == 202 or data.get("status") in ("starting", "processing"):
|
|
poll_url = (data.get("urls") or {}).get("get")
|
|
if not poll_url:
|
|
logger.error("DeepSeek: 202 but no poll URL in response")
|
|
return []
|
|
logger.info("DeepSeek: async prediction, polling %s", poll_url)
|
|
for tick in range(90): # up to ~3 minutes
|
|
await asyncio.sleep(2)
|
|
pr = await client.get(poll_url, headers=auth_header)
|
|
pdata = pr.json()
|
|
status = pdata.get("status")
|
|
logger.debug("DeepSeek poll #%d status=%s", tick + 1, status)
|
|
if status == "succeeded":
|
|
data = pdata
|
|
break
|
|
if status in ("failed", "canceled"):
|
|
logger.error("DeepSeek prediction %s: %s", status, pdata.get("error"))
|
|
return []
|
|
else:
|
|
logger.error("DeepSeek: prediction timed out after 90 polls")
|
|
return []
|
|
|
|
# ── Parse output ─────────────────────────────────────────────────
|
|
output = data.get("output") or ""
|
|
if isinstance(output, list):
|
|
output = "".join(str(t) for t in output if t is not None)
|
|
|
|
logger.info("DeepSeek raw output (first 500 chars): %.500s", output)
|
|
result = _parse_classify_output(output)
|
|
logger.info("DeepSeek classified %d / %d domains", len(result), len(live_items))
|
|
return result
|
|
|
|
except Exception as e:
|
|
if attempt < MAX_RETRIES - 1:
|
|
wait = 5 * (2 ** attempt)
|
|
logger.warning("DeepSeek error (attempt %d/%d), retry in %ds: %s",
|
|
attempt + 1, MAX_RETRIES, wait, e)
|
|
await asyncio.sleep(wait)
|
|
else:
|
|
logger.error("DeepSeek classification failed after %d attempts: %s",
|
|
MAX_RETRIES, e)
|
|
return []
|