commit b2e7a2f2db200d2d0c06f310a61601037c952513 Author: Malin Date: Mon Apr 13 16:22:30 2026 +0200 feat: initial Dockerized domain intelligence dashboard - FastAPI backend with DuckDB pushdown queries on 72M parquet - Async enrichment worker: HTTP, SSL, DNS MX, CMS fingerprint, ip-api.com - Resumable parquet download with HTTP Range support - Lead scoring engine (max 100 pts, target countries ES,GB,DE,FR,RO,PT,AD,IT) - Single-file Alpine.js + Chart.js dashboard on port 6677 - SQLite enrichment DB with job queue and scores tables - Dockerized with persistent /data volume Co-Authored-By: Claude Sonnet 4.6 diff --git a/.env b/.env new file mode 100644 index 0000000..32f22e7 --- /dev/null +++ b/.env @@ -0,0 +1,6 @@ +DATA_DIR=/data +PARQUET_URL=https://github.com/digitalcortex/72m-domains-dataset/raw/refs/heads/master/domains.parquet +CONCURRENCY_LIMIT=50 +SCORE_THRESHOLD=60 +TARGET_TLDS=es,com,net +TARGET_COUNTRIES=ES,GB,DE,FR,RO,PT,AD,IT diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..26af32b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +FROM python:3.12-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y \ + curl gcc libssl-dev && \ + rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app/ ./app/ + +EXPOSE 6677 + +CMD ["python", "app/main.py"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..30b664b --- /dev/null +++ b/README.md @@ -0,0 +1,54 @@ +# DomGod — Domain Intelligence Dashboard + +Dockerized dashboard for filtering, enriching, scoring, and exporting leads from a 72M-domain dataset. + +## Quick start + +```bash +docker compose up --build +``` + +Open **http://localhost:6677** + +On first boot, the container downloads `domains.parquet` (~GB) and caches it in `./data/`. Subsequent restarts skip the download. + +## Environment variables (docker-compose.yml) + +| Variable | Default | Description | +|---|---|---| +| `DATA_DIR` | `/data` | Where parquet + sqlite live | +| `PARQUET_URL` | GitHub raw URL | Source parquet | +| `CONCURRENCY_LIMIT` | `50` | Parallel enrichment workers | +| `SCORE_THRESHOLD` | `60` | "Hot lead" threshold | +| `TARGET_TLDS` | `es,com,net` | TLDs to prioritise | +| `TARGET_COUNTRIES` | `ES,GB,DE,FR,RO,PT,AD,IT` | Countries for scoring bonus | + +## Scoring + +| Signal | Points | +|---|---| +| Domain is live | +20 | +| SSL expiry < 30 days | +15 | +| No valid SSL | +15 | +| Known CMS detected | +15 | +| No MX record | +10 | +| IP in target country | +10 | +| Shared hosting server | +10 | +| Local business keywords in title | +5 | + +Max score: 100. Hot ≥ 80, Warm 50–79, Cold < 50. + +## API + +``` +GET /api/stats +GET /api/domains?tld=es&page=1&limit=100&live_only=false +POST /api/enrich/batch { "domains": ["example.com"] } +GET /api/enrich/status +POST /api/enrich/pause +POST /api/enrich/resume +POST /api/enrich/retry +GET /api/enriched?min_score=60&cms=wordpress&country=ES +GET /api/export?tier=hot (streams CSV) +POST /api/score/run +``` diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/db.py b/app/db.py new file mode 100644 index 0000000..168d627 --- /dev/null +++ b/app/db.py @@ -0,0 +1,231 @@ +import os +import aiosqlite +import duckdb +from pathlib import Path + +DATA_DIR = Path(os.getenv("DATA_DIR", "/data")) +PARQUET_PATH = DATA_DIR / "domains.parquet" +SQLITE_PATH = DATA_DIR / "enrichment.db" + +SCHEMA = """ +CREATE TABLE IF NOT EXISTS enriched_domains ( + domain TEXT PRIMARY KEY, + is_live INTEGER DEFAULT 0, + status_code INTEGER, + ssl_valid INTEGER DEFAULT 0, + ssl_expiry_days INTEGER, + cms TEXT, + has_mx INTEGER DEFAULT 0, + ip_country TEXT, + page_title TEXT, + server TEXT, + enriched_at TEXT, + error TEXT, + score INTEGER DEFAULT 0 +); + +CREATE TABLE IF NOT EXISTS job_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + domain TEXT UNIQUE NOT NULL, + status TEXT DEFAULT 'pending', + created_at TEXT DEFAULT (datetime('now')), + started_at TEXT, + completed_at TEXT, + error TEXT +); + +CREATE TABLE IF NOT EXISTS scores ( + domain TEXT PRIMARY KEY, + score INTEGER NOT NULL, + scored_at TEXT DEFAULT (datetime('now')) +); +""" + + +async def init_db(): + async with aiosqlite.connect(SQLITE_PATH) as db: + await db.executescript(SCHEMA) + await db.commit() + + +async def get_db(): + return await aiosqlite.connect(SQLITE_PATH) + + +def duckdb_query(sql: str, params=None): + conn = duckdb.connect(database=":memory:", read_only=False) + conn.execute(f"SET threads=4") + if params: + result = conn.execute(sql, params).fetchall() + else: + result = conn.execute(sql).fetchall() + conn.close() + return result + + +def duckdb_query_df(sql: str, params=None): + conn = duckdb.connect(database=":memory:", read_only=False) + conn.execute("SET threads=4") + if params: + result = conn.execute(sql, params).df() + else: + result = conn.execute(sql).df() + conn.close() + return result + + +async def get_stats(): + parquet = str(PARQUET_PATH) + + # Total count + TLD breakdown via DuckDB pushdown + total = duckdb_query(f"SELECT COUNT(*) FROM read_parquet('{parquet}')")[0][0] + + tld_rows = duckdb_query(f""" + SELECT + regexp_extract(domain, '\\.([a-zA-Z0-9]+)$', 1) AS tld, + COUNT(*) AS cnt + FROM read_parquet('{parquet}') + GROUP BY tld + ORDER BY cnt DESC + LIMIT 20 + """) + + async with aiosqlite.connect(SQLITE_PATH) as db: + async with db.execute("SELECT COUNT(*) FROM enriched_domains") as cur: + enriched = (await cur.fetchone())[0] + threshold = int(os.getenv("SCORE_THRESHOLD", "60")) + async with db.execute( + "SELECT COUNT(*) FROM enriched_domains WHERE score >= ?", (threshold,) + ) as cur: + hot_leads = (await cur.fetchone())[0] + async with db.execute( + "SELECT COUNT(*) FROM job_queue WHERE status='pending'" + ) as cur: + queue_pending = (await cur.fetchone())[0] + async with db.execute( + "SELECT COUNT(*) FROM job_queue WHERE status='running'" + ) as cur: + queue_running = (await cur.fetchone())[0] + async with db.execute( + "SELECT COUNT(*) FROM job_queue WHERE status='done'" + ) as cur: + queue_done = (await cur.fetchone())[0] + async with db.execute( + "SELECT COUNT(*) FROM job_queue WHERE status='failed'" + ) as cur: + queue_failed = (await cur.fetchone())[0] + + return { + "total_domains": total, + "enriched": enriched, + "hot_leads": hot_leads, + "tld_breakdown": [{"tld": r[0], "count": r[1]} for r in tld_rows], + "queue": { + "pending": queue_pending, + "running": queue_running, + "done": queue_done, + "failed": queue_failed, + }, + } + + +async def get_domains(tld=None, page=1, limit=100, live_only=False): + parquet = str(PARQUET_PATH) + conditions = [] + params = [] + + if tld: + conditions.append(f"regexp_extract(domain, '\\.([a-zA-Z0-9]+)$', 1) = '{tld}'") + if live_only: + # Join with enriched_domains to check is_live + pass + + where = f"WHERE {' AND '.join(conditions)}" if conditions else "" + offset = (page - 1) * limit + + sql = f""" + SELECT domain + FROM read_parquet('{parquet}') + {where} + LIMIT {limit} OFFSET {offset} + """ + rows = duckdb_query(sql) + domains = [r[0] for r in rows] + + # Merge enrichment data from SQLite + if domains: + placeholders = ",".join("?" * len(domains)) + async with aiosqlite.connect(SQLITE_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + f"SELECT * FROM enriched_domains WHERE domain IN ({placeholders})", + domains, + ) as cur: + enriched = {r["domain"]: dict(r) async for r in cur} + + result = [] + for d in domains: + if d in enriched: + result.append(enriched[d]) + else: + result.append({"domain": d}) + return result + return [] + + +async def get_enriched(min_score=0, cms=None, country=None, page=1, limit=100): + offset = (page - 1) * limit + conditions = ["score >= ?"] + params = [min_score] + if cms: + conditions.append("cms = ?") + params.append(cms) + if country: + conditions.append("ip_country = ?") + params.append(country) + + where = "WHERE " + " AND ".join(conditions) + async with aiosqlite.connect(SQLITE_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + f"SELECT * FROM enriched_domains {where} ORDER BY score DESC LIMIT ? OFFSET ?", + params + [limit, offset], + ) as cur: + rows = [dict(r) async for r in cur] + return rows + + +async def queue_domains(domains: list[str]): + async with aiosqlite.connect(SQLITE_PATH) as db: + await db.executemany( + "INSERT OR IGNORE INTO job_queue (domain) VALUES (?)", + [(d,) for d in domains], + ) + await db.commit() + + +async def get_queue_status(): + async with aiosqlite.connect(SQLITE_PATH) as db: + async with db.execute( + "SELECT status, COUNT(*) FROM job_queue GROUP BY status" + ) as cur: + rows = {r[0]: r[1] async for r in cur} + total = sum(rows.values()) + done = rows.get("done", 0) + pending = rows.get("pending", 0) + running = rows.get("running", 0) + failed = rows.get("failed", 0) + + eta_seconds = None + if running > 0 or pending > 0: + rate = int(os.getenv("CONCURRENCY_LIMIT", "50")) + eta_seconds = (pending + running) / max(rate / 10, 1) + + return { + "total": total, + "pending": pending, + "running": running, + "done": done, + "failed": failed, + "eta_seconds": eta_seconds, + } diff --git a/app/enricher.py b/app/enricher.py new file mode 100644 index 0000000..d2074a6 --- /dev/null +++ b/app/enricher.py @@ -0,0 +1,270 @@ +import asyncio +import os +import ssl +import socket +import datetime +import logging +from typing import Optional + +import httpx +import dns.resolver +import aiosqlite +from bs4 import BeautifulSoup + +from app.db import SQLITE_PATH +from app.scorer import score + +logger = logging.getLogger(__name__) + +CONCURRENCY_LIMIT = int(os.getenv("CONCURRENCY_LIMIT", "50")) +# ip-api.com free tier: 45 req/min → ~1.33/s. We use a separate slower semaphore. +IP_API_SEMAPHORE: Optional[asyncio.Semaphore] = None +IP_API_RATE = 45 # per minute + +_worker_task: Optional[asyncio.Task] = None +_paused = False + + +def get_ip_semaphore(): + global IP_API_SEMAPHORE + if IP_API_SEMAPHORE is None: + IP_API_SEMAPHORE = asyncio.Semaphore(1) + return IP_API_SEMAPHORE + + +CMS_SIGNATURES = { + "wordpress": ["/wp-content/", "/wp-includes/", 'name="generator" content="WordPress'], + "joomla": ["/components/com_", "Joomla!", 'name="generator" content="Joomla'], + "drupal": ["/sites/default/files/", "Drupal.settings", 'name="generator" content="Drupal'], + "wix": ["wix.com", "X-Wix-"], + "squarespace": ["squarespace.com", "X-Squarespace-"], + "shopify": ["cdn.shopify.com", "Shopify.theme"], + "prestashop": ["PrestaShop", "/modules/"], + "magento": ["Mage.Cookies", "X-Magento-"], + "typo3": ["typo3", "TYPO3 CMS"], + "opencart": ["route=common/home", "OpenCart"], +} + + +def detect_cms(html: str, headers: dict) -> Optional[str]: + combined = html[:50000] + " ".join(f"{k}:{v}" for k, v in headers.items()) + for cms, sigs in CMS_SIGNATURES.items(): + if any(sig.lower() in combined.lower() for sig in sigs): + return cms + return None + + +async def check_ssl(domain: str) -> tuple[bool, Optional[int]]: + try: + ctx = ssl.create_default_context() + loop = asyncio.get_event_loop() + + def _check(): + with socket.create_connection((domain, 443), timeout=5) as sock: + with ctx.wrap_socket(sock, server_hostname=domain) as ssock: + cert = ssock.getpeercert() + expiry_str = cert.get("notAfter", "") + expiry = datetime.datetime.strptime(expiry_str, "%b %d %H:%M:%S %Y %Z") + days = (expiry - datetime.datetime.utcnow()).days + return True, days + + return await loop.run_in_executor(None, _check) + except Exception: + return False, None + + +async def check_mx(domain: str) -> bool: + try: + loop = asyncio.get_event_loop() + + def _check(): + try: + answers = dns.resolver.resolve(domain, "MX", lifetime=5) + return len(answers) > 0 + except Exception: + return False + + return await loop.run_in_executor(None, _check) + except Exception: + return False + + +_ip_last_call = 0.0 +_ip_lock = asyncio.Lock() if False else None # initialized lazily + + +async def get_ip_country(ip: str) -> Optional[str]: + global _ip_last_call + # Enforce 45 req/min = 1 req per 1.33s + async with get_ip_semaphore(): + now = asyncio.get_event_loop().time() + wait = (1 / (IP_API_RATE / 60)) - (now - _ip_last_call) + if wait > 0: + await asyncio.sleep(wait) + _ip_last_call = asyncio.get_event_loop().time() + + try: + async with httpx.AsyncClient(timeout=5) as client: + resp = await client.get(f"http://ip-api.com/json/{ip}?fields=countryCode") + if resp.status_code == 200: + return resp.json().get("countryCode") + except Exception: + pass + return None + + +async def enrich_domain(domain: str) -> dict: + result = { + "domain": domain, + "is_live": False, + "status_code": None, + "ssl_valid": False, + "ssl_expiry_days": None, + "cms": None, + "has_mx": False, + "ip_country": None, + "page_title": None, + "server": None, + "enriched_at": datetime.datetime.utcnow().isoformat(), + "error": None, + } + + try: + async with httpx.AsyncClient( + timeout=10, + follow_redirects=True, + verify=False, + headers={"User-Agent": "Mozilla/5.0 (compatible; DomGod/1.0)"}, + ) as client: + resp = await client.get(f"http://{domain}") + result["is_live"] = resp.status_code in (200, 301, 302, 303, 307, 308) + result["status_code"] = resp.status_code + result["server"] = resp.headers.get("server") + + html = resp.text + soup = BeautifulSoup(html, "html.parser") + title_tag = soup.find("title") + result["page_title"] = title_tag.get_text(strip=True)[:500] if title_tag else None + result["cms"] = detect_cms(html, dict(resp.headers)) + + # Resolve IP for country lookup + try: + loop = asyncio.get_event_loop() + ip = await loop.run_in_executor(None, socket.gethostbyname, domain) + result["ip_country"] = await get_ip_country(ip) + except Exception: + pass + + except Exception as e: + result["error"] = str(e)[:500] + + # SSL check (independent of HTTP) + ssl_valid, ssl_days = await check_ssl(domain) + result["ssl_valid"] = ssl_valid + result["ssl_expiry_days"] = ssl_days + + # MX check + result["has_mx"] = await check_mx(domain) + + # Score + result["score"] = score(result) + + return result + + +async def save_enriched(data: dict): + async with aiosqlite.connect(SQLITE_PATH) as db: + await db.execute( + """INSERT INTO enriched_domains + (domain, is_live, status_code, ssl_valid, ssl_expiry_days, cms, + has_mx, ip_country, page_title, server, enriched_at, error, score) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) + ON CONFLICT(domain) DO UPDATE SET + is_live=excluded.is_live, status_code=excluded.status_code, + ssl_valid=excluded.ssl_valid, ssl_expiry_days=excluded.ssl_expiry_days, + cms=excluded.cms, has_mx=excluded.has_mx, ip_country=excluded.ip_country, + page_title=excluded.page_title, server=excluded.server, + enriched_at=excluded.enriched_at, error=excluded.error, score=excluded.score""", + ( + data["domain"], data["is_live"], data["status_code"], + data["ssl_valid"], data["ssl_expiry_days"], data["cms"], + data["has_mx"], data["ip_country"], data["page_title"], + data["server"], data["enriched_at"], data["error"], data["score"], + ), + ) + await db.execute( + """INSERT INTO scores (domain, score) VALUES (?,?) + ON CONFLICT(domain) DO UPDATE SET score=excluded.score, scored_at=datetime('now')""", + (data["domain"], data["score"]), + ) + await db.commit() + + +async def mark_job(domain: str, status: str, error: str = None): + async with aiosqlite.connect(SQLITE_PATH) as db: + if status == "running": + await db.execute( + "UPDATE job_queue SET status=?, started_at=datetime('now') WHERE domain=?", + (status, domain), + ) + elif status in ("done", "failed"): + await db.execute( + "UPDATE job_queue SET status=?, completed_at=datetime('now'), error=? WHERE domain=?", + (status, error, domain), + ) + await db.commit() + + +async def worker_loop(): + global _paused + sem = asyncio.Semaphore(CONCURRENCY_LIMIT) + + async def process(domain: str): + async with sem: + await mark_job(domain, "running") + try: + data = await enrich_domain(domain) + await save_enriched(data) + await mark_job(domain, "done") + except Exception as e: + await mark_job(domain, "failed", str(e)[:500]) + + while True: + if _paused: + await asyncio.sleep(1) + continue + + async with aiosqlite.connect(SQLITE_PATH) as db: + async with db.execute( + "SELECT domain FROM job_queue WHERE status='pending' LIMIT 100" + ) as cur: + rows = await cur.fetchall() + + if not rows: + await asyncio.sleep(2) + continue + + tasks = [asyncio.create_task(process(r[0])) for r in rows] + await asyncio.gather(*tasks, return_exceptions=True) + + +def start_worker(): + global _worker_task + if _worker_task is None or _worker_task.done(): + _worker_task = asyncio.create_task(worker_loop()) + _paused = False + + +def pause_worker(): + global _paused + _paused = True + + +def resume_worker(): + global _paused + _paused = False + start_worker() + + +def is_running() -> bool: + return _worker_task is not None and not _worker_task.done() and not _paused diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..bfff1e9 --- /dev/null +++ b/app/main.py @@ -0,0 +1,202 @@ +import os +import sys +import asyncio +import logging +from pathlib import Path +from contextlib import asynccontextmanager + +import httpx +import duckdb +import aiosqlite +from fastapi import FastAPI, Query +from fastapi.responses import StreamingResponse, JSONResponse +from fastapi.staticfiles import StaticFiles +from dotenv import load_dotenv + +load_dotenv() + +from app.db import ( + DATA_DIR, PARQUET_PATH, SQLITE_PATH, + init_db, get_stats, get_domains, get_enriched, + queue_domains, get_queue_status, +) +from app.enricher import start_worker, pause_worker, resume_worker, is_running +from app.scorer import run_scoring + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger(__name__) + +PARQUET_URL = os.getenv("PARQUET_URL", "") + + +async def download_parquet(): + if PARQUET_PATH.exists(): + logger.info("Using cached parquet at %s", PARQUET_PATH) + return + + DATA_DIR.mkdir(parents=True, exist_ok=True) + tmp_path = PARQUET_PATH.with_suffix(".tmp") + + # Resumable download via Range header + downloaded = tmp_path.stat().st_size if tmp_path.exists() else 0 + headers = {"Range": f"bytes={downloaded}-"} if downloaded > 0 else {} + + logger.info("Downloading parquet from %s (offset=%d)...", PARQUET_URL, downloaded) + + async with httpx.AsyncClient(follow_redirects=True, timeout=None) as client: + async with client.stream("GET", PARQUET_URL, headers=headers) as resp: + if resp.status_code == 416: + # Already fully downloaded + tmp_path.rename(PARQUET_PATH) + return + resp.raise_for_status() + total = int(resp.headers.get("content-length", 0)) + downloaded + mode = "ab" if downloaded > 0 else "wb" + with open(tmp_path, mode) as f: + received = downloaded + async for chunk in resp.aiter_bytes(chunk_size=1024 * 1024): + f.write(chunk) + received += len(chunk) + if total: + pct = received / total * 100 + logger.info("Download progress: %.1f%% (%d/%d bytes)", pct, received, total) + + tmp_path.rename(PARQUET_PATH) + logger.info("Parquet download complete: %s", PARQUET_PATH) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + await download_parquet() + await init_db() + start_worker() + logger.info("DomGod dashboard ready on port 6677") + yield + + +app = FastAPI(title="DomGod", lifespan=lifespan) + + +# ── API routes ────────────────────────────────────────────────────────────── + +@app.get("/api/stats") +async def stats(): + return await get_stats() + + +@app.get("/api/domains") +async def domains( + tld: str = Query(None), + page: int = Query(1, ge=1), + limit: int = Query(100, ge=1, le=1000), + live_only: bool = Query(False), +): + rows = await get_domains(tld=tld, page=page, limit=limit, live_only=live_only) + return {"page": page, "limit": limit, "results": rows} + + +@app.post("/api/enrich/batch") +async def enrich_batch(body: dict): + domains_list = body.get("domains", []) + if not domains_list: + return JSONResponse({"error": "no domains provided"}, status_code=400) + await queue_domains(domains_list) + resume_worker() + return {"queued": len(domains_list)} + + +@app.get("/api/enrich/status") +async def enrich_status(): + status = await get_queue_status() + status["worker_running"] = is_running() + return status + + +@app.post("/api/enrich/retry") +async def enrich_retry(): + async with aiosqlite.connect(SQLITE_PATH) as db: + await db.execute("UPDATE job_queue SET status='pending', error=NULL WHERE status='failed'") + await db.commit() + resume_worker() + return {"status": "retrying failed jobs"} + + +@app.post("/api/enrich/pause") +async def enrich_pause(): + pause_worker() + return {"status": "paused"} + + +@app.post("/api/enrich/resume") +async def enrich_resume(): + resume_worker() + return {"status": "resumed"} + + +@app.get("/api/enriched") +async def enriched( + min_score: int = Query(0, ge=0, le=100), + cms: str = Query(None), + country: str = Query(None), + page: int = Query(1, ge=1), + limit: int = Query(100, ge=1, le=1000), +): + rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=page, limit=limit) + return {"page": page, "limit": limit, "results": rows} + + +@app.get("/api/export") +async def export_csv( + min_score: int = Query(0), + cms: str = Query(None), + country: str = Query(None), + tier: str = Query(None), +): + if tier == "hot": + min_score = 80 + elif tier == "warm": + min_score = 50 + + async def generate(): + yield "domain,score,cms,ssl_expiry_days,ip_country,is_live,status_code,has_mx,server,page_title,enriched_at\n" + page = 1 + while True: + rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=page, limit=500) + if not rows: + break + for r in rows: + # Apply warm tier upper bound + if tier == "warm" and r.get("score", 0) >= 80: + continue + line = ",".join( + f'"{str(r.get(col) or "").replace(chr(34), chr(39))}"' + for col in [ + "domain", "score", "cms", "ssl_expiry_days", "ip_country", + "is_live", "status_code", "has_mx", "server", "page_title", "enriched_at" + ] + ) + yield line + "\n" + page += 1 + + filename = f"domgod_leads_score{min_score}{'_' + tier if tier else ''}.csv" + return StreamingResponse( + generate(), + media_type="text/csv", + headers={"Content-Disposition": f'attachment; filename="{filename}"'}, + ) + + +@app.post("/api/score/run") +async def score_run(): + result = await run_scoring() + return result + + +# ── Static UI ─────────────────────────────────────────────────────────────── +static_dir = Path(__file__).parent / "static" +app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static") + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=6677, log_level="info") diff --git a/app/scorer.py b/app/scorer.py new file mode 100644 index 0000000..126a4dd --- /dev/null +++ b/app/scorer.py @@ -0,0 +1,64 @@ +import os +import aiosqlite +from app.db import SQLITE_PATH + +KNOWN_CMS = {"wordpress", "joomla", "drupal", "wix", "squarespace", "shopify", "prestashop", "magento", "typo3", "opencart"} +TARGET_COUNTRIES = set(os.getenv("TARGET_COUNTRIES", "ES,GB,DE,FR").split(",")) + +LOCAL_BIZ_KEYWORDS = { + "restaurant", "cafe", "shop", "store", "salon", "plumber", "electrician", + "dentist", "clinic", "garage", "hotel", "bakery", "bar", "gym", "spa", + "fontanero", "electricista", "dentista", "clínica", "taller", "hotel", + "panadería", "peluquería", "tienda", +} + + +def local_biz_keywords(title: str | None) -> bool: + if not title: + return False + title_lower = title.lower() + return any(kw in title_lower for kw in LOCAL_BIZ_KEYWORDS) + + +def score(domain_row: dict) -> int: + s = 0 + if domain_row.get("is_live"): + s += 20 + ssl_days = domain_row.get("ssl_expiry_days") + if ssl_days is not None and ssl_days < 30: + s += 15 + if not domain_row.get("ssl_valid"): + s += 15 + cms = (domain_row.get("cms") or "").lower() + if cms in KNOWN_CMS: + s += 15 + if not domain_row.get("has_mx"): + s += 10 + if domain_row.get("ip_country") in TARGET_COUNTRIES: + s += 10 + server = (domain_row.get("server") or "").lower() + if "shared" in server: + s += 10 + if local_biz_keywords(domain_row.get("page_title")): + s += 5 + return min(s, 100) + + +async def run_scoring(): + async with aiosqlite.connect(SQLITE_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute("SELECT * FROM enriched_domains") as cur: + rows = [dict(r) async for r in cur] + + updates = [(score(r), r["domain"]) for r in rows] + await db.executemany( + "UPDATE enriched_domains SET score = ? WHERE domain = ?", updates + ) + await db.executemany( + """INSERT INTO scores (domain, score) VALUES (?, ?) + ON CONFLICT(domain) DO UPDATE SET score=excluded.score, scored_at=datetime('now')""", + updates, + ) + await db.commit() + + return {"scored": len(updates)} diff --git a/app/static/index.html b/app/static/index.html new file mode 100644 index 0000000..1c6f7c0 --- /dev/null +++ b/app/static/index.html @@ -0,0 +1,600 @@ + + + + + +DomGod — Domain Intelligence Dashboard + + + + + +
+ +
+

DomGod

+ + + +
+ +
+ + +
+
Overview
+
+
+
Total Domains
+
+
in parquet
+
+
+
Enriched
+
+
+
+
+
Hot Leads
+
+
score ≥ 60
+
+
+
Queue Pending
+
+
+
+
+
Done / Failed
+
+
+
+
+
+ + +
+
Browse & Filter
+
Enrichment Queue
+
Lead Pipeline
+
TLD Chart
+
+ + +
+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+ + + +
+ +
+ + + + + + + + + + + + + + + + + + + + + + +
DomainScoreCMSSSL daysCountryLiveStatus
No results — run a search above
Loading…
+
+ + +
+ + +
+
+
+
+
Pending
+
+
+
+
Running
+
+
+
+
Done
+
+
+
+
Failed
+
+
+
+
ETA
+
+
+ +
+
+ +
+ + + +
+
+
+
+
+
+
+ +
+
Enrich custom domains
+
+ + +
+
+
+ + +
+
+ +
+
+ +
+

🔥 Hot

+
Score 80–100
+
+
+ +
+ +
+ +
+

♨️ Warm

+
Score 50–79
+
+
+ +
+ +
+ +
+

🧊 Cold

+
Score < 50
+
+
+ +
+ +
+
+
+ + +
+
Top 20 TLDs
+
+ +
+
+ +
+
+ + + + diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..f179c99 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,16 @@ +version: "3.9" +services: + dashboard: + build: . + ports: + - "6677:6677" + volumes: + - ./data:/data + environment: + - DATA_DIR=/data + - PARQUET_URL=https://github.com/digitalcortex/72m-domains-dataset/raw/refs/heads/master/domains.parquet + - CONCURRENCY_LIMIT=50 + - SCORE_THRESHOLD=60 + - TARGET_TLDS=es,com,net + - TARGET_COUNTRIES=ES,GB,DE,FR,RO,PT,AD,IT + restart: unless-stopped diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..62963c1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +fastapi +uvicorn[standard] +duckdb +httpx +dnspython +beautifulsoup4 +aiosqlite +python-dotenv