From 5bef587ca0a70b530e58def9eec2438d27f9aac1 Mon Sep 17 00:00:00 2001 From: Malin Date: Mon, 13 Apr 2026 18:11:27 +0200 Subject: [PATCH] fix: add timeouts to SSL/DNS blocking calls, reset stuck AI jobs on startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - SSL handshake: set socket timeout before wrap_socket (prevents indefinite hang) - SSL executor: asyncio.wait_for(..., timeout=12) - DNS gethostbyname: asyncio.wait_for(..., timeout=6) - analyze_site: hard 90s timeout wrapper - _assess_one: hard 180s ceiling via asyncio.timeout() - ai_worker_loop: reset 'running' → 'pending' on startup (clears crashed-session jobs) - Add POST /api/ai/reset endpoint + UI button to unstick jobs without restart Co-Authored-By: Claude Sonnet 4.6 --- app/enricher.py | 30 ++++++++++++++++++++++-------- app/main.py | 11 +++++++++++ app/site_analyzer.py | 22 +++++++++++++++++++--- app/static/index.html | 4 +++- 4 files changed, 55 insertions(+), 12 deletions(-) diff --git a/app/enricher.py b/app/enricher.py index 21e33f0..5ce4c85 100644 --- a/app/enricher.py +++ b/app/enricher.py @@ -345,14 +345,16 @@ async def _assess_one(domain: str) -> None: logger.info("AI: starting analysis for %s", domain) try: - analysis = await analyze_site(domain) - logger.info("AI: site analyzed %s (reachable=%s, words=%s)", - domain, analysis.get("reachable"), analysis.get("word_count")) - assessment = await gemini_assess(analysis) - logger.info("AI: Gemini done %s → quality=%s", - domain, assessment.get("lead_quality")) - await save_ai_assessment(domain, assessment, site_analysis=analysis) - logger.info("AI: saved %s", domain) + # Hard 3-minute ceiling so stuck jobs never block the worker forever + async with asyncio.timeout(180): + analysis = await analyze_site(domain) + logger.info("AI: site analyzed %s (reachable=%s, words=%s)", + domain, analysis.get("reachable"), analysis.get("word_count")) + assessment = await gemini_assess(analysis) + logger.info("AI: Gemini done %s → quality=%s", + domain, assessment.get("lead_quality")) + await save_ai_assessment(domain, assessment, site_analysis=analysis) + logger.info("AI: saved %s", domain) except Exception as e: logger.error("AI: failed %s — %s", domain, e, exc_info=True) try: @@ -368,6 +370,18 @@ async def _assess_one(domain: str) -> None: async def ai_worker_loop(): logger.info("AI worker loop starting") + # Reset any jobs left in 'running' state from a previous crashed worker + try: + async with aiosqlite.connect(SQLITE_PATH) as db: + result = await db.execute( + "UPDATE ai_queue SET status='pending' WHERE status='running'" + ) + count = result.rowcount + await db.commit() + if count: + logger.info("AI worker: reset %d stale 'running' jobs to 'pending'", count) + except Exception as e: + logger.error("AI worker: failed to reset stale jobs: %s", e) while True: rows = [] try: diff --git a/app/main.py b/app/main.py index 1f13132..07ce4d6 100644 --- a/app/main.py +++ b/app/main.py @@ -184,6 +184,17 @@ async def ai_worker_restart(): return {"status": "restarted"} +@app.post("/api/ai/reset") +async def ai_reset(): + """Reset all 'running' AI queue jobs back to 'pending' (unstick hung jobs).""" + async with aiosqlite.connect(SQLITE_PATH) as db: + r = await db.execute("UPDATE ai_queue SET status='pending' WHERE status='running'") + count = r.rowcount + await db.commit() + ensure_workers_alive() + return {"reset": count} + + @app.get("/api/ai/debug") async def ai_debug(): """Returns worker state + last 10 queue entries for troubleshooting.""" diff --git a/app/site_analyzer.py b/app/site_analyzer.py index df57552..ef3d57f 100644 --- a/app/site_analyzer.py +++ b/app/site_analyzer.py @@ -100,7 +100,9 @@ async def _get_hosting_info(domain: str) -> dict: "ip_country": None, "ip_region": None, "eu_hosted": None} try: loop = asyncio.get_event_loop() - ip = await loop.run_in_executor(None, socket.gethostbyname, domain) + ip = await asyncio.wait_for( + loop.run_in_executor(None, socket.gethostbyname, domain), timeout=6 + ) info["ip"] = ip async with httpx.AsyncClient(timeout=6) as client: r = await client.get( @@ -123,7 +125,7 @@ async def _get_hosting_info(domain: str) -> dict: return info -async def analyze_site(domain: str) -> dict: +async def _analyze_site_inner(domain: str) -> dict: result = { "domain": domain, "reachable": False, "load_time_ms": None, "status_code": None, @@ -361,13 +363,27 @@ async def analyze_site(domain: str) -> dict: import datetime as _dt ctx = _ssl.create_default_context() with socket.create_connection((domain, 443), timeout=5) as s: + s.settimeout(5) # SSL handshake timeout (wrap_socket has no timeout arg) with ctx.wrap_socket(s, server_hostname=domain) as ss: cert = ss.getpeercert() exp = _dt.datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z") return True, (_dt.datetime.utcnow() - exp).days * -1 loop = asyncio.get_event_loop() - result["ssl_valid"], result["ssl_expiry_days"] = await loop.run_in_executor(None, _ssl_check) + result["ssl_valid"], result["ssl_expiry_days"] = await asyncio.wait_for( + loop.run_in_executor(None, _ssl_check), timeout=12 + ) except Exception: pass return result + + +async def analyze_site(domain: str) -> dict: + """Public entry point — hard 90s timeout so workers never hang permanently.""" + try: + return await asyncio.wait_for(_analyze_site_inner(domain), timeout=90) + except asyncio.TimeoutError: + logger.warning("analyze_site timed out for %s", domain) + return {"domain": domain, "reachable": False, "error": "analyze_site timeout", + "emails": [], "phones": [], "whatsapp": [], "social_links": [], + "kit_digital": False, "kit_digital_signals": []} diff --git a/app/static/index.html b/app/static/index.html index 389460e..c629aef 100644 --- a/app/static/index.html +++ b/app/static/index.html @@ -444,6 +444,7 @@ tr:hover td{background:rgba(255,255,255,.025)}
+ 🔍 Debug AI queue
@@ -630,7 +631,8 @@ function app() { try { this.qst = await fetch('/api/enrich/status').then(r=>r.json()); } catch(e){} }, - async restartAiWorker() { const r=await fetch('/api/ai/worker/restart',{method:'POST'}); this.notify('AI worker restarted','info'); await this.loadAiStatus(); }, + async restartAiWorker() { await fetch('/api/ai/worker/restart',{method:'POST'}); this.notify('AI worker restarted','info'); await this.loadAiStatus(); }, + async resetAiStuck() { const r=await fetch('/api/ai/reset',{method:'POST'}); const d=await r.json(); this.notify(`Reset ${d.reset} stuck jobs → pending`,'success'); await this.loadAiStatus(); }, async startEnrich() { await fetch('/api/enrich/resume',{method:'POST'}); this.notify('Worker started','success'); await this.loadQueue(); }, async pauseEnrich() { await fetch('/api/enrich/pause',{method:'POST'}); this.notify('Worker paused','success'); await this.loadQueue(); }, async retryFailed() { await fetch('/api/enrich/retry',{method:'POST'}); this.notify('Retrying failed','success'); await this.loadQueue(); },