diff --git a/Dockerfile b/Dockerfile index 3365a6b..824d53d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,6 +11,6 @@ RUN pip install --no-cache-dir -r requirements.txt COPY app/ ./app/ -EXPOSE 6677 +EXPOSE 6677 7788 CMD ["python", "-m", "app.main"] diff --git a/app/beauty_ai.py b/app/beauty_ai.py new file mode 100644 index 0000000..2daec93 --- /dev/null +++ b/app/beauty_ai.py @@ -0,0 +1,352 @@ +"""Beauty B2B AI assessment — cosmetics distribution lead qualification. + +Pre-scans scraped text for known brands, then sends a focused prompt to Gemini +to evaluate fit as a B2B customer for a cosmetics distribution business. +""" +import asyncio +import json +import logging +import os +import re +from typing import Optional + +import httpx +from bs4 import BeautifulSoup + +logger = logging.getLogger(__name__) + +REPLICATE_TOKEN = os.getenv("REPLICATE_API_TOKEN", "r8_7I7Feai78f9PzMOs20y5GVFKiLkgUWP463vZO") +REPLICATE_MODEL = "https://api.replicate.com/v1/models/google/gemini-3-pro/predictions" +AI_CONCURRENCY = int(os.getenv("AI_CONCURRENCY", "3")) + +_ai_sem: Optional[asyncio.Semaphore] = None + +def _sem() -> asyncio.Semaphore: + global _ai_sem + if _ai_sem is None: + _ai_sem = asyncio.Semaphore(AI_CONCURRENCY) + return _ai_sem + + +# ── Brand universe (market brands we can detect on client sites) ────────────── + +BEAUTY_BRANDS = [ + "4711","7days","7th Heaven","A-derma","Abercrombie & Fitch","Abril Et Nature", + "Acqua Di Parma","Actinica","Adidas","Adolfo Dominguez","Aesop","Agatha Ruiz De La Prada", + "Agave","Agua Lavanda","Ahava","Air-wick","Aire Sevilla","Al Haramain","Albal","Alcantara", + "Alejandro Sanz","Alfaparf Milano","Algasiv","Alma Secret","Alpecin","Alqvimia","Alterna", + "Alvarez Gomez","Alyssa Ashley","Ambi Pur","American Crew","Amichi","Ana María Lajusticia", + "Angel Schlesser","Anian","Annayake","Anne Möller","Anso","Antonio Banderas","Apisérum", + "Apivita","Aqc Fragrances","Aquilea","Aramis","Ardell","Arganour","Ariel","Armaf", + "Armand Basi","Artdeco","Artero","As I Am","Aseptine","Atashi","Atrix","Ausonia","Aussie", + "Australian Gold","Autan","Aveda","Avena Kinesia","Avène","Axe","Axovital","Azalea", + "Azzaro","Babaria","Babyliss","Barbie","Bare Minerals","Barulab","Batiste","Beaver", + "Beconfident","Belcils","Bella Aurora","Benefit","Benton","Benzacare","Beter","Biafin", + "Bio Ionic","Bio-oil","Bioderma","Biolage","Biotherm","Biovène","Biretix","Bobbi Brown", + "Bouclème","Bourjois","Bperfect Cosmetics","Britney Spears","Bumble & Bumble","Burberry", + "Bvlgari","Byly","Byphasse","Cacharel","Calvin Klein","Camomila Intea","Cantu","Carefree", + "Carmex","Carolina Herrera","Carrera","Carthusia","Catrice","Caudalie","Cerave","Cerruti", + "Cetaphil","Chanel","Chanson D'Eau","Chloé","Chopard","Christina Aguilera","Christophe Robin", + "Clarins","Clean & Clear","Clinique","Coach","Cocosolis","Colab","Colgate","Collistar", + "Color Wow","Comfort Zone","Comodynes","Compeed","Cosrx","Creed","Creme Of Nature", + "Cristalinas","Crossmen","Crusellas","Cryopharma","Cumlaude Lab","Cutex","Cygnetic", + "Daffoil","Darphin","Davidoff","Declaré","Delfy","Delisea","Denenes","Dentiblanc", + "Dermalogica","Desensin","Dexeryl","Diadermine","Diesel","Diet Esthetic","Dior","Diptyque", + "Dodot","Dolce & Gabbana","Donna Karan","Dove","Dr. Hauschka","Dr.jart+","Dr. Organic", + "Dr. Rimpler","Dr. Tree","Drasanvi","Drunk Elephant","Dsquared2","Ducray","Durex", + "Elancyl","Elegant Touch","Elemis","Elie Saab","Elizabeth Arden","Elizabeth Taylor", + "Emilio Pucci","Endocare","Eric Favre","Escada","Essence","Essie","Estée Lauder", + "Etat Libre D'Orange","Eucerin","Eudermin","Evax","Eve Lom","Eylure","Fa","Fairy","Fanola", + "Farmatint","Farmavita","Farouk","Figuière","Fisiocrem","Flor De Mayo","Fluocaril","Foreo", + "Forté Pharma","Foxy","Francis Kurkdjian","Frederic Malle","Frosch","Garnier","Ghd", + "Gillette","Giorgi Line","Givenchy","Glam Of Sweden","Goldwell","Gosh","Goutal","Gritti", + "Gucci","Guerlain","Guess By Marciano","Gummy","Hair Rituel By Sisley","Hairgum","Halita", + "Halloween","Hansaplast","Hask","Hawaiian Tropic","Head & Shoulders","Heliocare", + "Heno De Pravia","Herbal Essences","Hermès","Hidracel","Hollister","Hugo Boss", + "I.c.o.n.","Ibizaloe","Iceberg","Idc Institute","Iroha","Isabelle Lancray","Isdin", + "Issey Miyake","It Cosmetics","Ivybears","Jacadi","Jean Paul Gaultier","Jil Sander", + "Jimmy Choo","Jo Malone","John Frieda","Johnson's Baby","Joico","Joop","Jordan","Jowaé", + "Juicy Couture","Juliette Has A Gun","Just For Men","Juvena","Kaloo","Karl Lagerfeld", + "Karseell","Katai","Kate Spade","Kativa","Kenzo","Kerasilk","Kerastase","Kevin Murphy", + "Kevyn Aucoin","Kilian","Klorane","L'Anza","L'Occitane","L'Oréal Paris", + "L'Oréal Professionnel","La Cabine","La Mer","La Prairie","La Roche Posay","La Toja", + "Laboratoires Filorga","Lacer","Lacoste","Lactacyd","Lactovit","Lalique","Lancaster", + "Lanvin","Lattafa","Laura Biagiotti","Le Petit Marseillais","Legrain","Lierac","Listerine", + "Living Proof","Loewe","Lola Cosmetics","Lolita Lempicka","Lussoni","Lutsine E45", + "M2 Beauté","Mac","Macadamia","Mad Beauty","Maria Nila","Marlies Möller","Martiderm", + "Martinelia","Marvis","Matrix","Maui","Mavala","Max Factor","Maybelline","Melvita", + "Mermade","Michael Kors","Milk Shake","Mix & Shout","Mixa","Moroccanoil","Moschino", + "Mustela","Nabeel","Nanobrow","Nanoil","Nanolash","Narciso Rodriguez","Nars","Natur Vital", + "Natura Bissé","Natural Honey","Naturalium","Naturtint","Nenuco","Neogen","Neoretin", + "Neostrata","Neutrogena","Nivea","Nûby","Nuggela & Sulé","Nyx Professional Make Up", + "Ogx","Olaplex","Olay","Old Spice","Olivia Garden","Opi","Oral-b","Oraldine","Orofluido", + "Orlane","Oscar De La Renta","Pacha","Paese","Palette","Paloma Picasso","Paltons", + "Pantene","Paranix","Parfums Saphir","Parlux","Payot","Phyto","Picu Baby","Pilexil", + "Piz Buin","Plantur 39","Platanomelón","Polaar","Police","Polident","Ponds","Poseidon", + "Postquam","Proraso","Puig","Purito","Rabanne","Raid","Ralph Lauren","Rated Green", + "Real Techniques","Redenhair","Redist","Redken","Reebok","Ref","Refectocil","Relec", + "Remescar","Rene Furterer","Revlon","Revolution Hair Care","Revolution Make Up", + "Revolution Pro","Rexaline","Rexona","Rilastil","Rimmel London","Roberto Cavalli","Roc", + "Rochas","Roger & Gallet","Roja Parfums","Rosacure","S3","Sabon","Salerm","Sally Hansen", + "Salvatore Ferragamo","Sanex","Sarah Jessica Parker","Saryna Key","Satisfyer","Scalpers", + "Scholl","Schwarzkopf","Scottex","Sebamed","Sebastian Professionals","Seche Vite", + "Sensai","Sensilis","Sensodyne","Serge Lutens","Serumkind","Sesderma","Seven Cosmetics", + "Sexy Hair","Shiseido","Shu Uemura","Sisley","Skeyndor","Skin Generics","Sleek", + "Snp","Soap & Glory","Sol De Janeiro","Solgar","Somatoline Cosmetic","Sophie La Girafe", + "Soria Natural","Steinhart","Stendhal Paris","Sterimar","Strivectin","Suavinex", + "Suavipiel","Svr Laboratoire Dermatologique","Syoss","System Professional","Tabac", + "Taky","Talika","Tampax","Tangle Teezer","Tanit","Teaology","Tena Lady","The Body Shop", + "The Ordinary","The Wet Brush","Thermacare","Tiffany & Co","Tigi","Timotei", + "Tiziana Terenzi","Tod's","Tom Ford","Tommy Hilfiger","Topicrem","Torriden","Tot Herba", + "Tous","Trendy Hair","Tresemme","Trussardi","Tulipán Negro","Urban Decay","Uriage", + "Usu Cosmetics","Vagisil","Valmont","Valquer","Vanderbilt","Vaseline","Veet","Vichy", + "Victor","Victoria's Secret","Victorio & Lucchino","Vital Proteins","Vivra", + "Voltage Cosmetics","Volumax","Waterpik","Waterwipes","Wella","Weleda", + "Williams","Woodwick","Xerjoff","Xls Medical","Yankee Candle","Yari","Yotuel", + "Youth Lab","Zadig & Voltaire","Ziaja", +] + +# Our distribution portfolio — the brands we sell to B2B clients +OUR_BRANDS = [ + "AIMX","Al Haramain","Apivita","Armaf","Aveda","Bouclème","Clarena", + "Curly Girl Movement","Cutrin","Davines","Dr. Hauschka","FanPalm","Farmavita", + "Flora Curl","GAMMA+","GHD","GOSH","ICON","Image Skincare","Instituto Español", + "Janeke","Kay Pro","Kerasilk","Kyo","Label M","Lierac","Living Proof","Londa", + "M2 Beauté","Malibu C","Maria Nila","Medik8","Misslyn","Mustela","Nesti Dante", + "Nuxe","Obagi","Osmo","Payot","Philip B","Philip Martins","Phyto","Piz Buin", + "Ramon Monegal","Redken","REF","Saryna Key","Sesderma","Skala Brasil","Skin1004", + "Strivectin","Swissdent","Topicrem","Uriage","Vita Liberata","Waterclouds", + "Wella","Youngblood Cosmetics", +] + +BEAUTY_CATEGORIES = [ + "Perfumes","Facial Cosmetics","Makeup","Hair Care","Health","Body Cosmetics", + "Hygiene","Kids & Babies","Sun Care","Eyewear","Home","Nutrition","Erotic","Fashion", +] + + +# ── Brand detection (fast pre-scan, no AI) ───────────────────────────────────── + +def detect_brands_in_text(text: str) -> list[str]: + """Find which brands from the universe appear in the scraped page text.""" + tl = text.lower() + return [b for b in BEAUTY_BRANDS if b.lower() in tl][:60] + + +def get_dist_matches(detected: list[str]) -> list[str]: + """Return which detected brands are in our distribution portfolio.""" + dl = {b.lower() for b in detected} + return [b for b in OUR_BRANDS if b.lower() in dl] + + +# ── DuckDuckGo search (contact/company lookup) ──────────────────────────────── + +async def _ddg_search(query: str) -> str: + try: + async with httpx.AsyncClient( + timeout=10, follow_redirects=True, + headers={"User-Agent": "Mozilla/5.0 (compatible; BeautyLeads/1.0)"}, + ) as client: + r = await client.get( + "https://html.duckduckgo.com/html/", + params={"q": query, "kl": "es-es"}, + ) + if r.status_code != 200: + return "" + soup = BeautifulSoup(r.text, "html.parser") + parts = [] + for res in soup.select(".result")[:4]: + title = res.select_one(".result__a") + snip = res.select_one(".result__snippet") + url = res.select_one(".result__url") + if snip: + t = title.get_text(strip=True) if title else "" + u = url.get_text(strip=True) if url else "" + parts.append(f"[{u}] {t} — {snip.get_text(strip=True)}") + return "\n".join(parts) + except Exception as e: + logger.debug("DDG search failed: %s", e) + return "" + + +# ── Prompt builder ───────────────────────────────────────────────────────────── + +def _build_beauty_prompt(a: dict, detected_brands: list, dist_matches: list, + search_results: str = "") -> str: + contacts_block = [] + if a.get("emails"): contacts_block.append(f" Emails: {', '.join(a['emails'][:3])}") + if a.get("phones"): contacts_block.append(f" Phones: {', '.join(a['phones'][:3])}") + if a.get("social_links"): contacts_block.append(f" Social: {', '.join(a['social_links'][:4])}") + contacts_str = "\n".join(contacts_block) or " Not found" + + snippet = (a.get("visible_text_snippet") or "")[:1200] + title = a.get("page_title") or "" + meta = a.get("meta_description") or "" + country = a.get("ip_country") or "unknown" + cms = a.get("cms") or "unknown" + detected_str = ", ".join(detected_brands) if detected_brands else "none detected" + dist_str = ", ".join(dist_matches) if dist_matches else "none" + + return f"""You are a senior B2B sales analyst for a cosmetics distribution company operating in Europe. +Your task: evaluate whether this website is a viable B2B customer (retailer, multi-brand store, +e-commerce, distributor or chain that buys beauty products wholesale) and generate an outreach plan. + +=== SITE DATA === +Domain: {a.get("domain")} +Country (IP): {country} +Title: {title} +Meta desc: {meta} +CMS: {cms} +Contact info: +{contacts_str} + +=== PAGE CONTENT SAMPLE === +{snippet} + +=== BRANDS ALREADY DETECTED ON SITE === +{detected_str} + +=== OUR PORTFOLIO BRANDS FOUND ON THEIR SITE === +(These brands we distribute — finding them means we're already in their market) +{dist_str} + +=== WEB SEARCH RESULTS === +{(search_results or "No results.")[:500]} + +=== OUR DISTRIBUTION PORTFOLIO === +{', '.join(OUR_BRANDS)} + +=== BEAUTY CATEGORIES WE COVER === +{', '.join(BEAUTY_CATEGORIES)} + +=== ASSESSMENT RULES === +1. Determine if this is a B2B prospect: retailer, pharmacy, parafarmacia, + perfumería, multi-brand beauty ecommerce, salon chain, supermarket beauty section, + or beauty products distributor based in Europe. +2. Identify which categories from our list they cover. +3. From the page content (even if brands list is empty), identify any beauty brands mentioned. +4. Match detected brands against our portfolio — this drives lead quality: + - HOT: 3+ of our portfolio brands detected, OR a large EU retailer clearly in our niche + - WARM: 1-2 portfolio brand matches, OR clear beauty retailer with good potential + - COLD: beauty-adjacent but weak match, OR can't confirm they buy wholesale + - NOT_RELEVANT: not a beauty business or not in Europe +5. Write all human text (proposal, email) in Spanish. +6. Keep JSON values concise (≤ 25 words each). + +Respond ONLY with valid JSON, no markdown, no text outside JSON: +{{ + "is_relevant": true/false, + "lead_quality": "HOT|WARM|COLD|NOT_RELEVANT", + "lead_reasoning": "1-2 sentences why", + "business_type": "retailer|ecommerce|distributor|pharmacy|salon_chain|other", + "business_name": "name from title or domain", + "country_fiscal": "2-letter ISO or full name", + "countries_active": ["ES","FR"], + "categories": ["Hair Care","Makeup"], + "detected_brands": ["brand1","brand2"], + "dist_matches": ["OurBrand1","OurBrand2"], + "contact_email": "email or empty string", + "contact_phone": "phone or empty string", + "b2b_proposal": "1-2 sentence value proposition in Spanish referencing their categories and our matching brands", + "outreach_subject": "short Spanish subject line referencing their business name", + "outreach_email": "3-4 sentence ready-to-send email in Spanish. Mention their business, 1-2 specific brands from our portfolio that match their range, and a clear call to action (catálogo, muestra, llamada).", + "revenue_estimate": "unknown", + "outreach_notes": "brief context for sales rep" +}}""" + + +def _parse_beauty_output(raw: str) -> dict: + text = re.sub(r"```(?:json)?", "", raw).strip().rstrip("`").strip() + m = re.search(r"\{[\s\S]+\}", text) + if m: + candidate = m.group(0) + try: + return json.loads(candidate) + except json.JSONDecodeError: + depth_obj = candidate.count("{") - candidate.count("}") + depth_arr = candidate.count("[") - candidate.count("]") + fixed = re.sub(r',\s*"[^"]*"?\s*:\s*[^,\}\]]*$', '', candidate) + fixed += "]" * max(0, depth_arr) + "}" * max(0, depth_obj) + try: + return json.loads(fixed) + except json.JSONDecodeError: + pass + logger.warning("Beauty AI parse failed, raw: %.300s", raw) + return { + "is_relevant": False, + "lead_quality": "COLD", + "business_name": "", + "contact_email": "", + "dist_matches": [], + "parse_error": True, + } + + +# ── Main entry point ─────────────────────────────────────────────────────────── + +async def assess_beauty_domain(analysis: dict) -> dict: + """Full beauty B2B assessment: brand scan + AI evaluation.""" + async with _sem(): + domain = analysis.get("domain", "") + text = analysis.get("visible_text_snippet", "") or "" + html_raw = text # use snippet; brands already extracted from full page in site_analyzer + + detected = detect_brands_in_text(text) + dist_match = get_dist_matches(detected) + + # Also search for company context + title = analysis.get("page_title") or "" + biz_name = title.split("|")[0].split("-")[0].strip() or domain + search_results = await _ddg_search(f'"{biz_name}" {domain} beauty cosmetics wholesale contact') + logger.info("Beauty assess %s: %d brands detected, %d portfolio matches", + domain, len(detected), len(dist_match)) + + payload = { + "input": { + "prompt": _build_beauty_prompt(analysis, detected, dist_match, search_results), + "images": [], "videos": [], + "top_p": 0.9, + "temperature": 0.15, + "thinking_level": "low", + "max_output_tokens": 2000, + } + } + try: + async with httpx.AsyncClient(timeout=120) as client: + resp = await client.post( + REPLICATE_MODEL, + headers={ + "Authorization": f"Bearer {REPLICATE_TOKEN}", + "Content-Type": "application/json", + "Prefer": "wait", + }, + json=payload, + ) + resp.raise_for_status() + data = resp.json() + + output = data.get("output", "") + if isinstance(output, list): + output = "".join(output) + + result = _parse_beauty_output(output) + # Merge pre-scan data that AI might miss + if not result.get("dist_matches") and dist_match: + result["dist_matches"] = dist_match + if not result.get("detected_brands") and detected: + result["detected_brands"] = detected + + logger.info("Beauty AI %s → quality=%s, dist_matches=%s", + domain, result.get("lead_quality"), result.get("dist_matches")) + return result + + except Exception as e: + logger.error("Beauty AI error %s: %s", domain, e) + return { + "error": str(e)[:300], + "is_relevant": False, + "lead_quality": "COLD", + "dist_matches": dist_match, + "detected_brands": detected, + "contact_email": "", + } diff --git a/app/beauty_main.py b/app/beauty_main.py new file mode 100644 index 0000000..b54aa0d --- /dev/null +++ b/app/beauty_main.py @@ -0,0 +1,337 @@ +"""BeautyLeads — Cosmetics B2B intelligence dashboard (port 7788). + +Shares the same /data volume as the main DomGod service. +Does NOT re-download parquet or rebuild DuckDB index (those run in main service). +Runs its own beauty AI assessment worker against the shared enriched_domains table. +""" +import asyncio +import logging +import os +from pathlib import Path +from contextlib import asynccontextmanager + +import aiosqlite +from typing import Optional +from fastapi import FastAPI, Query +from fastapi.responses import StreamingResponse, JSONResponse +from fastapi.staticfiles import StaticFiles +from dotenv import load_dotenv + +load_dotenv() + +from app.db import ( + SQLITE_PATH, init_db, get_stats, get_domains, get_enriched, + build_duckdb_index, index_status, + queue_beauty, get_beauty_queue_status, save_beauty_assessment, get_beauty_leads, + save_prescreen_results, +) +from app.validator import start_validator, stop_validator, get_validator_status + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger(__name__) + +# ── Beauty AI worker ────────────────────────────────────────────────────────── + +_beauty_worker_task: Optional[asyncio.Task] = None + + +async def _assess_one_beauty(domain: str) -> None: + from app.beauty_ai import assess_beauty_domain + from app.site_analyzer import analyze_site + logger.info("Beauty AI: starting %s", domain) + try: + async with asyncio.timeout(180): + analysis = await analyze_site(domain) + assessment = await assess_beauty_domain(analysis) + await save_beauty_assessment(domain, assessment) + logger.info("Beauty AI: saved %s → %s", domain, assessment.get("lead_quality")) + except Exception as e: + logger.error("Beauty AI: failed %s — %s", domain, e) + try: + async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: + await db.execute( + "UPDATE beauty_queue SET status='failed', completed_at=datetime('now'), error=? WHERE domain=?", + (str(e)[:400], domain), + ) + await db.commit() + except Exception: + pass + + +async def _beauty_worker_loop(): + logger.info("Beauty AI worker starting") + # Reset stale running jobs + try: + async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: + await db.execute("UPDATE beauty_queue SET status='pending' WHERE status='running'") + await db.commit() + except Exception as e: + logger.error("Beauty worker: stale reset failed: %s", e) + + while True: + rows = [] + try: + async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: + async with db.execute( + "SELECT domain FROM beauty_queue WHERE status='pending' LIMIT 5" + ) as cur: + rows = await cur.fetchall() + if rows: + await db.executemany( + "UPDATE beauty_queue SET status='running' WHERE domain=?", + [(r[0],) for r in rows], + ) + await db.commit() + except Exception as e: + logger.error("Beauty worker DB error: %s", e) + await asyncio.sleep(5) + continue + + if not rows: + await asyncio.sleep(3) + continue + + await asyncio.gather(*[_assess_one_beauty(r[0]) for r in rows], return_exceptions=True) + + +def _start_beauty_worker(): + global _beauty_worker_task + if _beauty_worker_task is None or _beauty_worker_task.done(): + _beauty_worker_task = asyncio.create_task(_beauty_worker_loop()) + logger.info("Beauty AI worker started") + + +# ── App lifespan ────────────────────────────────────────────────────────────── + +@asynccontextmanager +async def lifespan(app: FastAPI): + await init_db() + # Detect existing DuckDB index (built by main service); don't rebuild + asyncio.create_task(build_duckdb_index()) + _start_beauty_worker() + logger.info("BeautyLeads ready on port 7788") + yield + + +app = FastAPI(title="BeautyLeads", lifespan=lifespan) + + +# ── Shared read endpoints (same DB) ────────────────────────────────────────── + +@app.get("/api/stats") +async def stats(): + return await get_stats() + + +@app.get("/api/index/status") +async def get_index_status(): + return index_status() + + +@app.get("/api/domains") +async def domains( + tld: str = Query(None), + page: int = Query(1, ge=1), + limit: int = Query(100, ge=1, le=500), + live_only: bool = Query(False), + alpha_only: bool = Query(False), + no_sld: bool = Query(False), + keyword: str = Query(None), +): + total, rows = await get_domains( + tld=tld, page=page, limit=limit, + alpha_only=alpha_only, no_sld=no_sld, + keyword=keyword, live_only=live_only, + ) + return {"page": page, "limit": limit, "total": total, "results": rows} + + +@app.get("/api/enriched") +async def enriched( + min_score: int = Query(0), + country: str = Query(None), + prescreen_status: str = Query(None), + niche: str = Query(None), + site_type: str = Query(None), + page: int = Query(1, ge=1), + limit: int = Query(100, ge=1, le=1000), +): + total, rows = await get_enriched( + min_score=min_score, country=country, + prescreen_status=prescreen_status, niche=niche, site_type=site_type, + page=page, limit=limit, + ) + return {"page": page, "limit": limit, "total": total, "results": rows} + + +# ── Validator (shared) ──────────────────────────────────────────────────────── + +@app.post("/api/validator/start") +async def validator_start(tld: str = Query(None), rescan_dead: bool = Query(False)): + start_validator(tld_filter=tld or None, rescan_dead=rescan_dead) + return get_validator_status() + + +@app.post("/api/validator/stop") +async def validator_stop(): + stop_validator() + return {"status": "stopped"} + + +@app.get("/api/validator/status") +async def validator_status(): + return get_validator_status() + + +# ── Pre-screen (shared) ─────────────────────────────────────────────────────── + +@app.post("/api/prescreen/batch") +async def prescreen_batch(body: dict): + domains_list = body.get("domains", []) + if not domains_list: + return JSONResponse({"error": "no domains provided"}, status_code=400) + if len(domains_list) > 200: + return JSONResponse({"error": "max 200 domains per batch"}, status_code=400) + + from app.prescreener import prescreen_domains, classify_with_deepseek, DEEPSEEK_BATCH_SIZE + + results = await prescreen_domains(domains_list) + await save_prescreen_results(results) + + counts: dict = {} + for r in results: + s = r.get("prescreen_status", "dead") + counts[s] = counts.get(s, 0) + 1 + + live = [r for r in results if r.get("prescreen_status") == "live"] + classified = 0 + if live: + batches = [live[i:i + DEEPSEEK_BATCH_SIZE] for i in range(0, len(live), DEEPSEEK_BATCH_SIZE)] + all_cls: list = [] + for i, batch in enumerate(batches): + if i > 0: + await asyncio.sleep(3) + cls = await classify_with_deepseek(batch) + all_cls.extend(cls) + if all_cls: + await save_prescreen_results(all_cls) + classified = len(all_cls) + + return { + "total": len(domains_list), + "live": counts.get("live", 0), + "parked": counts.get("parked", 0), + "redirect": counts.get("redirect", 0), + "dead": counts.get("dead", 0), + "classified": classified, + } + + +# ── Beauty AI endpoints ─────────────────────────────────────────────────────── + +@app.post("/api/beauty/assess/batch") +async def beauty_assess_batch(body: dict): + domains_list = body.get("domains", []) + if not domains_list: + return JSONResponse({"error": "no domains provided"}, status_code=400) + await queue_beauty(domains_list) + _start_beauty_worker() + return {"queued": len(domains_list)} + + +@app.post("/api/beauty/worker/restart") +async def beauty_worker_restart(): + _start_beauty_worker() + return {"status": "restarted"} + + +@app.post("/api/beauty/reset") +async def beauty_reset(): + """Reset stale running jobs back to pending.""" + async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: + r = await db.execute("UPDATE beauty_queue SET status='pending' WHERE status='running'") + count = r.rowcount + await db.commit() + _start_beauty_worker() + return {"reset": count} + + +@app.get("/api/beauty/status") +async def beauty_status(): + return await get_beauty_queue_status() + + +@app.get("/api/beauty/leads") +async def beauty_leads( + quality: str = Query(None), + country: str = Query(None), + page: int = Query(1, ge=1), + limit: int = Query(100, ge=1, le=500), +): + total, rows = await get_beauty_leads(quality=quality, country=country, page=page, limit=limit) + return {"page": page, "limit": limit, "total": total, "results": rows} + + +@app.post("/api/beauty/assess/single") +async def beauty_assess_single(body: dict): + domain = body.get("domain") + if not domain: + return JSONResponse({"error": "no domain"}, status_code=400) + from app.beauty_ai import assess_beauty_domain + from app.site_analyzer import analyze_site + analysis = await analyze_site(domain) + assessment = await assess_beauty_domain(analysis) + await save_beauty_assessment(domain, assessment) + return {**assessment, "site_analysis": analysis} + + +# ── Export ──────────────────────────────────────────────────────────────────── + +@app.get("/api/beauty/export") +async def export_beauty_csv(quality: str = Query(None), country: str = Query(None)): + import json as _json + + async def generate(): + yield "domain,quality,business_name,country_fiscal,countries_active,categories,detected_brands,portfolio_matches,contact_email,contact_phone,proposal,outreach_subject,outreach_email\n" + p = 1 + while True: + _, rows = await get_beauty_leads(quality=quality, country=country, page=p, limit=500) + if not rows: + break + for r in rows: + b = r.get("_beauty") or {} + def esc(v): + return f'"{str(v or "").replace(chr(34), chr(39))}"' + line = ",".join([ + esc(r.get("domain")), + esc(r.get("beauty_lead_quality")), + esc(b.get("business_name")), + esc(b.get("country_fiscal")), + esc(", ".join(b.get("countries_active") or [])), + esc(", ".join(b.get("categories") or [])), + esc(", ".join(b.get("detected_brands") or [])), + esc(", ".join(b.get("dist_matches") or [])), + esc(b.get("contact_email")), + esc(b.get("contact_phone")), + esc(b.get("b2b_proposal")), + esc(b.get("outreach_subject")), + esc(b.get("outreach_email")), + ]) + yield line + "\n" + p += 1 + + qual = f"_{quality.lower()}" if quality else "" + return StreamingResponse( + generate(), media_type="text/csv", + headers={"Content-Disposition": f'attachment; filename="beautyleads{qual}.csv"'}, + ) + + +# ── Static UI ───────────────────────────────────────────────────────────────── + +static_dir = Path(__file__).parent / "static" / "beauty" +app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static") + +if __name__ == "__main__": + import uvicorn + uvicorn.run("app.beauty_main:app", host="0.0.0.0", port=7788, log_level="info") diff --git a/app/db.py b/app/db.py index 621acbd..d91130c 100644 --- a/app/db.py +++ b/app/db.py @@ -88,6 +88,16 @@ _MIGRATIONS = [ "ALTER TABLE enriched_domains ADD COLUMN prescreen_at TEXT", "ALTER TABLE enriched_domains ADD COLUMN ip TEXT", "ALTER TABLE enriched_domains ADD COLUMN load_time_ms INTEGER", + "ALTER TABLE enriched_domains ADD COLUMN beauty_lead_quality TEXT", + "ALTER TABLE enriched_domains ADD COLUMN beauty_assessment TEXT", + "ALTER TABLE enriched_domains ADD COLUMN beauty_assessed_at TEXT", + """CREATE TABLE IF NOT EXISTS beauty_queue ( + domain TEXT PRIMARY KEY, + status TEXT DEFAULT 'pending', + created_at TEXT DEFAULT (datetime('now')), + completed_at TEXT, + error TEXT + )""", ] # Index build state @@ -488,6 +498,81 @@ async def queue_domains(domains: list[str]): await db.commit() +async def queue_beauty(domains: list[str]): + async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: + await db.executemany( + "INSERT OR IGNORE INTO beauty_queue (domain) VALUES (?)", + [(d,) for d in domains], + ) + await db.commit() + + +async def get_beauty_queue_status(): + async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: + async with db.execute("SELECT status, COUNT(*) FROM beauty_queue GROUP BY status") as cur: + rows = {r[0]: r[1] async for r in cur} + return { + "pending": rows.get("pending", 0), + "running": rows.get("running", 0), + "done": rows.get("done", 0), + "failed": rows.get("failed", 0), + "total": sum(rows.values()), + } + + +async def save_beauty_assessment(domain: str, assessment: dict): + import json as _json + async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: + await db.execute( + "INSERT INTO enriched_domains (domain) VALUES (?) ON CONFLICT(domain) DO NOTHING", + (domain,), + ) + await db.execute( + """UPDATE enriched_domains SET + beauty_lead_quality=?, beauty_assessment=?, beauty_assessed_at=datetime('now') + WHERE domain=?""", + (assessment.get("lead_quality"), _json.dumps(assessment), domain), + ) + await db.execute( + "UPDATE beauty_queue SET status='done', completed_at=datetime('now') WHERE domain=?", + (domain,), + ) + await db.commit() + + +async def get_beauty_leads(quality: str = None, country: str = None, + page: int = 1, limit: int = 100): + import json as _json + offset = (page - 1) * limit + conditions = ["beauty_lead_quality IS NOT NULL"] + params: list = [] + if quality: + conditions.append("beauty_lead_quality = ?") + params.append(quality.upper()) + if country: + conditions.append("ip_country = ?") + params.append(country.upper()) + where = "WHERE " + " AND ".join(conditions) + async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + f"SELECT * FROM enriched_domains {where} " + f"ORDER BY CASE beauty_lead_quality WHEN 'HOT' THEN 1 WHEN 'WARM' THEN 2 ELSE 3 END " + f"LIMIT ? OFFSET ?", + params + [limit, offset], + ) as cur: + rows = [dict(r) async for r in cur] + async with db.execute(f"SELECT COUNT(*) FROM enriched_domains {where}", params) as cur: + total = (await cur.fetchone())[0] + # Parse beauty_assessment JSON inline + for r in rows: + try: + r["_beauty"] = _json.loads(r.get("beauty_assessment") or "{}") + except Exception: + r["_beauty"] = {} + return total, rows + + async def get_queue_status(): async with aiosqlite.connect(SQLITE_PATH, timeout=30) as db: async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur: diff --git a/app/static/beauty/index.html b/app/static/beauty/index.html new file mode 100644 index 0000000..95b5484 --- /dev/null +++ b/app/static/beauty/index.html @@ -0,0 +1,659 @@ + + +
+ + ++ Exports: domain, quality, business name, country, categories, detected brands, + portfolio matches, contact email, B2B proposal, outreach email. +
+