diff --git a/app/main.py b/app/main.py index 1bda565..29f53ba 100644 --- a/app/main.py +++ b/app/main.py @@ -196,17 +196,17 @@ async def prescreen_batch(body: dict): counts[s] = counts.get(s, 0) + 1 # Phase 2: DeepSeek classification for live sites only + # Run batches sequentially — parallel requests cause 429 rate-limit errors. live = [r for r in results if r.get("prescreen_status") == "live"] classified = 0 if live: batches = [live[i:i + DEEPSEEK_BATCH_SIZE] for i in range(0, len(live), DEEPSEEK_BATCH_SIZE)] - batch_cls = await asyncio.gather( - *[classify_with_deepseek(b) for b in batches], return_exceptions=True - ) all_cls: list = [] - for bc in batch_cls: - if isinstance(bc, list): - all_cls.extend(bc) + for i, batch in enumerate(batches): + if i > 0: + await asyncio.sleep(3) # brief gap between batches + cls = await classify_with_deepseek(batch) + all_cls.extend(cls) if all_cls: await save_prescreen_results(all_cls) classified = len(all_cls) diff --git a/app/prescreener.py b/app/prescreener.py index 51a32d5..e1d10db 100644 --- a/app/prescreener.py +++ b/app/prescreener.py @@ -223,8 +223,9 @@ def _parse_classify_output(raw: str) -> list[dict]: async def classify_with_deepseek(live_items: list[dict]) -> list[dict]: """Single DeepSeek call → list of {domain, niche, type}. - Replicate may return 202 (async) for slow models like DeepSeek-R1. - We poll the prediction URL until it succeeds or times out. + Handles: + - 429 Too Many Requests: exponential back-off, up to 4 retries + - 202 Accepted (async prediction): polls urls.get until succeeded """ if not live_items: return [] @@ -235,55 +236,69 @@ async def classify_with_deepseek(live_items: list[dict]) -> list[dict]: "temperature": 0.1, } } - auth_headers = { - "Authorization": f"Bearer {REPLICATE_TOKEN}", - "Content-Type": "application/json", - } - try: - async with httpx.AsyncClient(timeout=300) as client: - resp = await client.post( - DEEPSEEK_MODEL, - headers={**auth_headers, "Prefer": "wait=60"}, - json=payload, - ) - resp.raise_for_status() - data = resp.json() + auth_header = {"Authorization": f"Bearer {REPLICATE_TOKEN}"} - # ── Poll if Replicate accepted async (202 or status starting/processing) ── - if resp.status_code == 202 or data.get("status") in ("starting", "processing"): - poll_url = (data.get("urls") or {}).get("get") - if not poll_url: - logger.error("DeepSeek: 202 but no poll URL in response") - return [] - logger.info("DeepSeek: async prediction, polling %s", poll_url) - for attempt in range(90): # up to ~3 minutes - await asyncio.sleep(2) - pr = await client.get( - poll_url, - headers={"Authorization": f"Bearer {REPLICATE_TOKEN}"}, - ) - pdata = pr.json() - status = pdata.get("status") - logger.debug("DeepSeek poll #%d status=%s", attempt + 1, status) - if status == "succeeded": - data = pdata - break - if status in ("failed", "canceled"): - logger.error("DeepSeek prediction %s: %s", status, pdata.get("error")) + MAX_RETRIES = 4 + for attempt in range(MAX_RETRIES): + try: + async with httpx.AsyncClient(timeout=300) as client: + resp = await client.post( + DEEPSEEK_MODEL, + headers={**auth_header, "Content-Type": "application/json", "Prefer": "wait=60"}, + json=payload, + ) + + # ── Retry on rate-limit ────────────────────────────────────── + if resp.status_code == 429: + wait = min(60, 5 * (2 ** attempt)) # 5 → 10 → 20 → 40s + logger.warning("DeepSeek 429 (attempt %d/%d), retrying in %ds", + attempt + 1, MAX_RETRIES, wait) + await asyncio.sleep(wait) + continue + + resp.raise_for_status() + data = resp.json() + + # ── Poll if Replicate queued async (202) ───────────────────── + if resp.status_code == 202 or data.get("status") in ("starting", "processing"): + poll_url = (data.get("urls") or {}).get("get") + if not poll_url: + logger.error("DeepSeek: 202 but no poll URL in response") + return [] + logger.info("DeepSeek: async prediction, polling %s", poll_url) + for tick in range(90): # up to ~3 minutes + await asyncio.sleep(2) + pr = await client.get(poll_url, headers=auth_header) + pdata = pr.json() + status = pdata.get("status") + logger.debug("DeepSeek poll #%d status=%s", tick + 1, status) + if status == "succeeded": + data = pdata + break + if status in ("failed", "canceled"): + logger.error("DeepSeek prediction %s: %s", status, pdata.get("error")) + return [] + else: + logger.error("DeepSeek: prediction timed out after 90 polls") return [] - else: - logger.error("DeepSeek: prediction timed out after polling 90×2s") - return [] - output = data.get("output") or "" - if isinstance(output, list): - output = "".join(str(t) for t in output if t is not None) + # ── Parse output ───────────────────────────────────────────────── + output = data.get("output") or "" + if isinstance(output, list): + output = "".join(str(t) for t in output if t is not None) - logger.info("DeepSeek raw output (first 500 chars): %.500s", output) - result = _parse_classify_output(output) - logger.info("DeepSeek classified %d / %d domains", len(result), len(live_items)) - return result + logger.info("DeepSeek raw output (first 500 chars): %.500s", output) + result = _parse_classify_output(output) + logger.info("DeepSeek classified %d / %d domains", len(result), len(live_items)) + return result - except Exception as e: - logger.error("DeepSeek classification error: %s", e) - return [] + except Exception as e: + if attempt < MAX_RETRIES - 1: + wait = 5 * (2 ** attempt) + logger.warning("DeepSeek error (attempt %d/%d), retry in %ds: %s", + attempt + 1, MAX_RETRIES, wait, e) + await asyncio.sleep(wait) + else: + logger.error("DeepSeek classification failed after %d attempts: %s", + MAX_RETRIES, e) + return [] diff --git a/app/static/index.html b/app/static/index.html index 7b5e212..6a00849 100644 --- a/app/static/index.html +++ b/app/static/index.html @@ -773,7 +773,8 @@ function app() { async prescreenSelected() { if(!this.selected.length || this.prescreening) return; this.prescreening = true; - this.notify(`Pre-screening ${this.selected.length} domains… (may take ~30s)`, 'info'); + const count = this.selected.length; + this.notify(`Pre-screening ${count} domains… (DeepSeek classification may take 1-2 min)`, 'info'); try { const r = await fetch('/api/prescreen/batch', { method: 'POST', @@ -786,7 +787,10 @@ function app() { `✅ ${d.live} live · 🅿 ${d.parked} parked · ↗ ${d.redirect} redirect · ☠ ${d.dead} dead · 🏷 ${d.classified} classified`, 'success' ); - await this._fetch(); // refresh to show niche/type columns + this.selected = []; + // Force full re-fetch of current page to show updated niche/type + this.domains = []; + await this._fetch(); } else { this.notify('Error: ' + (d.error||'unknown'), 'error'); }