diff --git a/app/db.py b/app/db.py index 168d627..2f903a6 100644 --- a/app/db.py +++ b/app/db.py @@ -1,10 +1,15 @@ import os +import asyncio +import logging import aiosqlite import duckdb from pathlib import Path +logger = logging.getLogger(__name__) + DATA_DIR = Path(os.getenv("DATA_DIR", "/data")) PARQUET_PATH = DATA_DIR / "domains.parquet" +DUCKDB_PATH = DATA_DIR / "domains.duckdb" SQLITE_PATH = DATA_DIR / "enrichment.db" SCHEMA = """ @@ -23,7 +28,6 @@ CREATE TABLE IF NOT EXISTS enriched_domains ( error TEXT, score INTEGER DEFAULT 0 ); - CREATE TABLE IF NOT EXISTS job_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, domain TEXT UNIQUE NOT NULL, @@ -33,7 +37,6 @@ CREATE TABLE IF NOT EXISTS job_queue ( completed_at TEXT, error TEXT ); - CREATE TABLE IF NOT EXISTS scores ( domain TEXT PRIMARY KEY, score INTEGER NOT NULL, @@ -41,6 +44,15 @@ CREATE TABLE IF NOT EXISTS scores ( ); """ +# Index build state +_index_ready = False +_index_building = False +_index_total = 0 + +# Cached stats (TLD breakdown is expensive — compute once) +_tld_cache: list = [] +_total_cache: int = 0 + async def init_db(): async with aiosqlite.connect(SQLITE_PATH) as db: @@ -48,142 +60,219 @@ async def init_db(): await db.commit() -async def get_db(): - return await aiosqlite.connect(SQLITE_PATH) +# ── DuckDB persistent index ────────────────────────────────────────────────── + +def _build_index_sync(): + global _index_ready, _index_building, _index_total + _index_building = True + try: + conn = duckdb.connect(str(DUCKDB_PATH)) + conn.execute("SET threads=4") + conn.execute("SET memory_limit='2GB'") + + # Check if already built + try: + n = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0] + if n > 0: + _index_total = n + _index_ready = True + _index_building = False + logger.info("DuckDB index already ready (%d rows)", n) + conn.close() + return + except Exception: + pass + + logger.info("Building DuckDB index from parquet (one-time ~2-3 min)...") + conn.execute(""" + CREATE OR REPLACE TABLE domains AS + SELECT + domain, + lower(regexp_extract(domain, '\\.([^.]+)$', 1)) AS tld, + len(string_split(domain, '.')) AS parts + FROM read_parquet(?) + """, [str(PARQUET_PATH)]) + conn.execute("CREATE INDEX IF NOT EXISTS idx_tld ON domains(tld)") + _index_total = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0] + conn.close() + _index_ready = True + logger.info("DuckDB index built: %d rows", _index_total) + except Exception as e: + logger.error("DuckDB index build failed: %s", e) + finally: + _index_building = False -def duckdb_query(sql: str, params=None): - conn = duckdb.connect(database=":memory:", read_only=False) - conn.execute(f"SET threads=4") - if params: - result = conn.execute(sql, params).fetchall() +async def build_duckdb_index(): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, _build_index_sync) + + +def index_status() -> dict: + return { + "ready": _index_ready, + "building": _index_building, + "total": _index_total, + } + + +# ── Domain queries ─────────────────────────────────────────────────────────── + +def _domains_sync(tld, page, limit, alpha_only, no_sld, keyword): + conditions = [] + params_count = [] + params_data = [] + + if _index_ready: + source = "domains" + + def _add(clause, val=None): + conditions.append(clause) + if val is not None: + params_count.append(val) + params_data.append(val) else: - result = conn.execute(sql).fetchall() - conn.close() - return result + source = f"read_parquet('{PARQUET_PATH}')" + def _add(clause, val=None): + conditions.append(clause) + if val is not None: + params_count.append(val) + params_data.append(val) -def duckdb_query_df(sql: str, params=None): - conn = duckdb.connect(database=":memory:", read_only=False) + if tld: + if _index_ready: + _add("tld = ?", tld.lower().lstrip(".")) + else: + _add("lower(regexp_extract(domain, '\\.([^.]+)$', 1)) = ?", tld.lower().lstrip(".")) + + if no_sld: + if _index_ready: + _add("parts = 2") + else: + _add("len(string_split(domain, '.')) = 2") + + if alpha_only: + _add("NOT regexp_matches(domain, '[^a-zA-Z.]')") + + if keyword: + _add("domain LIKE ?", f"%{keyword.lower()}%") + + where = ("WHERE " + " AND ".join(conditions)) if conditions else "" + offset = (page - 1) * limit + + if _index_ready: + conn = duckdb.connect(str(DUCKDB_PATH), read_only=True) + else: + conn = duckdb.connect(":memory:") conn.execute("SET threads=4") - if params: - result = conn.execute(sql, params).df() - else: - result = conn.execute(sql).df() + + total = conn.execute(f"SELECT COUNT(*) FROM {source} {where}", params_count).fetchone()[0] + rows = conn.execute( + f"SELECT domain FROM {source} {where} LIMIT {limit} OFFSET {offset}", params_data + ).fetchall() conn.close() - return result + return total, [r[0] for r in rows] + + +async def get_domains(tld=None, page=1, limit=100, alpha_only=False, no_sld=False, keyword=None, live_only=False): + loop = asyncio.get_event_loop() + total, domain_list = await loop.run_in_executor( + None, _domains_sync, tld, page, limit, alpha_only, no_sld, keyword + ) + + if not domain_list: + return total, [] + + placeholders = ",".join("?" * len(domain_list)) + async with aiosqlite.connect(SQLITE_PATH) as db: + db.row_factory = aiosqlite.Row + async with db.execute( + f"SELECT * FROM enriched_domains WHERE domain IN ({placeholders})", + domain_list, + ) as cur: + enriched_map = {r["domain"]: dict(r) async for r in cur} + + results = [] + for d in domain_list: + row = enriched_map.get(d, {"domain": d}) + if live_only and not row.get("is_live"): + continue + results.append(row) + + return total, results + + +# ── Stats ──────────────────────────────────────────────────────────────────── + +def _tld_stats_sync() -> tuple[int, list]: + if _index_ready: + conn = duckdb.connect(str(DUCKDB_PATH), read_only=True) + total = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0] + rows = conn.execute(""" + SELECT tld, COUNT(*) AS cnt FROM domains + WHERE tld != '' + GROUP BY tld ORDER BY cnt DESC LIMIT 20 + """).fetchall() + conn.close() + else: + p = str(PARQUET_PATH) + conn = duckdb.connect(":memory:") + conn.execute("SET threads=4") + total = conn.execute(f"SELECT COUNT(*) FROM read_parquet('{p}')").fetchone()[0] + rows = conn.execute(f""" + SELECT lower(regexp_extract(domain, '\\.([^.]+)$', 1)) AS tld, COUNT(*) AS cnt + FROM read_parquet('{p}') + GROUP BY tld ORDER BY cnt DESC LIMIT 20 + """).fetchall() + conn.close() + return total, [{"tld": r[0], "count": r[1]} for r in rows] async def get_stats(): - parquet = str(PARQUET_PATH) + global _tld_cache, _total_cache - # Total count + TLD breakdown via DuckDB pushdown - total = duckdb_query(f"SELECT COUNT(*) FROM read_parquet('{parquet}')")[0][0] - - tld_rows = duckdb_query(f""" - SELECT - regexp_extract(domain, '\\.([a-zA-Z0-9]+)$', 1) AS tld, - COUNT(*) AS cnt - FROM read_parquet('{parquet}') - GROUP BY tld - ORDER BY cnt DESC - LIMIT 20 - """) + # Compute TLD breakdown once and cache it + if not _tld_cache: + loop = asyncio.get_event_loop() + _total_cache, _tld_cache = await loop.run_in_executor(None, _tld_stats_sync) async with aiosqlite.connect(SQLITE_PATH) as db: async with db.execute("SELECT COUNT(*) FROM enriched_domains") as cur: enriched = (await cur.fetchone())[0] threshold = int(os.getenv("SCORE_THRESHOLD", "60")) - async with db.execute( - "SELECT COUNT(*) FROM enriched_domains WHERE score >= ?", (threshold,) - ) as cur: + async with db.execute("SELECT COUNT(*) FROM enriched_domains WHERE score >= ?", (threshold,)) as cur: hot_leads = (await cur.fetchone())[0] - async with db.execute( - "SELECT COUNT(*) FROM job_queue WHERE status='pending'" - ) as cur: - queue_pending = (await cur.fetchone())[0] - async with db.execute( - "SELECT COUNT(*) FROM job_queue WHERE status='running'" - ) as cur: - queue_running = (await cur.fetchone())[0] - async with db.execute( - "SELECT COUNT(*) FROM job_queue WHERE status='done'" - ) as cur: - queue_done = (await cur.fetchone())[0] - async with db.execute( - "SELECT COUNT(*) FROM job_queue WHERE status='failed'" - ) as cur: - queue_failed = (await cur.fetchone())[0] + async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur: + q = {r[0]: r[1] async for r in cur} return { - "total_domains": total, + "total_domains": _total_cache, "enriched": enriched, "hot_leads": hot_leads, - "tld_breakdown": [{"tld": r[0], "count": r[1]} for r in tld_rows], + "tld_breakdown": _tld_cache, + "index_status": index_status(), "queue": { - "pending": queue_pending, - "running": queue_running, - "done": queue_done, - "failed": queue_failed, + "pending": q.get("pending", 0), + "running": q.get("running", 0), + "done": q.get("done", 0), + "failed": q.get("failed", 0), }, } -async def get_domains(tld=None, page=1, limit=100, live_only=False): - parquet = str(PARQUET_PATH) - conditions = [] - params = [] - - if tld: - conditions.append(f"regexp_extract(domain, '\\.([a-zA-Z0-9]+)$', 1) = '{tld}'") - if live_only: - # Join with enriched_domains to check is_live - pass - - where = f"WHERE {' AND '.join(conditions)}" if conditions else "" - offset = (page - 1) * limit - - sql = f""" - SELECT domain - FROM read_parquet('{parquet}') - {where} - LIMIT {limit} OFFSET {offset} - """ - rows = duckdb_query(sql) - domains = [r[0] for r in rows] - - # Merge enrichment data from SQLite - if domains: - placeholders = ",".join("?" * len(domains)) - async with aiosqlite.connect(SQLITE_PATH) as db: - db.row_factory = aiosqlite.Row - async with db.execute( - f"SELECT * FROM enriched_domains WHERE domain IN ({placeholders})", - domains, - ) as cur: - enriched = {r["domain"]: dict(r) async for r in cur} - - result = [] - for d in domains: - if d in enriched: - result.append(enriched[d]) - else: - result.append({"domain": d}) - return result - return [] - +# ── Enrichment helpers ─────────────────────────────────────────────────────── async def get_enriched(min_score=0, cms=None, country=None, page=1, limit=100): offset = (page - 1) * limit conditions = ["score >= ?"] - params = [min_score] + params: list = [min_score] if cms: conditions.append("cms = ?") params.append(cms) if country: conditions.append("ip_country = ?") params.append(country) - where = "WHERE " + " AND ".join(conditions) async with aiosqlite.connect(SQLITE_PATH) as db: db.row_factory = aiosqlite.Row @@ -192,7 +281,11 @@ async def get_enriched(min_score=0, cms=None, country=None, page=1, limit=100): params + [limit, offset], ) as cur: rows = [dict(r) async for r in cur] - return rows + async with db.execute( + f"SELECT COUNT(*) FROM enriched_domains {where}", params + ) as cur: + total = (await cur.fetchone())[0] + return total, rows async def queue_domains(domains: list[str]): @@ -206,26 +299,13 @@ async def queue_domains(domains: list[str]): async def get_queue_status(): async with aiosqlite.connect(SQLITE_PATH) as db: - async with db.execute( - "SELECT status, COUNT(*) FROM job_queue GROUP BY status" - ) as cur: + async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur: rows = {r[0]: r[1] async for r in cur} - total = sum(rows.values()) - done = rows.get("done", 0) pending = rows.get("pending", 0) running = rows.get("running", 0) + done = rows.get("done", 0) failed = rows.get("failed", 0) - - eta_seconds = None - if running > 0 or pending > 0: - rate = int(os.getenv("CONCURRENCY_LIMIT", "50")) - eta_seconds = (pending + running) / max(rate / 10, 1) - - return { - "total": total, - "pending": pending, - "running": running, - "done": done, - "failed": failed, - "eta_seconds": eta_seconds, - } + total = sum(rows.values()) + rate = int(os.getenv("CONCURRENCY_LIMIT", "50")) + eta_seconds = (pending + running) / max(rate / 10, 1) if (pending + running) > 0 else None + return {"total": total, "pending": pending, "running": running, "done": done, "failed": failed, "eta_seconds": eta_seconds} diff --git a/app/main.py b/app/main.py index c41cf79..57db25f 100644 --- a/app/main.py +++ b/app/main.py @@ -1,12 +1,10 @@ import os -import sys import asyncio import logging from pathlib import Path from contextlib import asynccontextmanager import httpx -import duckdb import aiosqlite from fastapi import FastAPI, Query from fastapi.responses import StreamingResponse, JSONResponse @@ -18,7 +16,7 @@ load_dotenv() from app.db import ( DATA_DIR, PARQUET_PATH, SQLITE_PATH, init_db, get_stats, get_domains, get_enriched, - queue_domains, get_queue_status, + queue_domains, get_queue_status, build_duckdb_index, index_status, ) from app.enricher import start_worker, pause_worker, resume_worker, is_running from app.scorer import run_scoring @@ -37,16 +35,13 @@ async def download_parquet(): DATA_DIR.mkdir(parents=True, exist_ok=True) tmp_path = PARQUET_PATH.with_suffix(".tmp") - # Resumable download via Range header downloaded = tmp_path.stat().st_size if tmp_path.exists() else 0 headers = {"Range": f"bytes={downloaded}-"} if downloaded > 0 else {} logger.info("Downloading parquet from %s (offset=%d)...", PARQUET_URL, downloaded) - async with httpx.AsyncClient(follow_redirects=True, timeout=None) as client: async with client.stream("GET", PARQUET_URL, headers=headers) as resp: if resp.status_code == 416: - # Already fully downloaded tmp_path.rename(PARQUET_PATH) return resp.raise_for_status() @@ -58,41 +53,54 @@ async def download_parquet(): f.write(chunk) received += len(chunk) if total: - pct = received / total * 100 - logger.info("Download progress: %.1f%% (%d/%d bytes)", pct, received, total) + logger.info("Download: %.1f%% (%d/%d)", received / total * 100, received, total) tmp_path.rename(PARQUET_PATH) - logger.info("Parquet download complete: %s", PARQUET_PATH) + logger.info("Parquet download complete") @asynccontextmanager async def lifespan(app: FastAPI): await download_parquet() await init_db() + # Build DuckDB index in background — queries still work (slower) while building + asyncio.create_task(build_duckdb_index()) start_worker() - logger.info("DomGod dashboard ready on port 6677") + logger.info("DomGod ready on port 6677") yield app = FastAPI(title="DomGod", lifespan=lifespan) -# ── API routes ────────────────────────────────────────────────────────────── +# ── API ────────────────────────────────────────────────────────────────────── @app.get("/api/stats") async def stats(): return await get_stats() +@app.get("/api/index/status") +async def get_index_status(): + return index_status() + + @app.get("/api/domains") async def domains( tld: str = Query(None), page: int = Query(1, ge=1), - limit: int = Query(100, ge=1, le=1000), + limit: int = Query(100, ge=1, le=500), live_only: bool = Query(False), + alpha_only: bool = Query(False), + no_sld: bool = Query(False), + keyword: str = Query(None), ): - rows = await get_domains(tld=tld, page=page, limit=limit, live_only=live_only) - return {"page": page, "limit": limit, "results": rows} + total, rows = await get_domains( + tld=tld, page=page, limit=limit, + alpha_only=alpha_only, no_sld=no_sld, + keyword=keyword, live_only=live_only, + ) + return {"page": page, "limit": limit, "total": total, "results": rows} @app.post("/api/enrich/batch") @@ -118,7 +126,7 @@ async def enrich_retry(): await db.execute("UPDATE job_queue SET status='pending', error=NULL WHERE status='failed'") await db.commit() resume_worker() - return {"status": "retrying failed jobs"} + return {"status": "retrying"} @app.post("/api/enrich/pause") @@ -141,8 +149,8 @@ async def enriched( page: int = Query(1, ge=1), limit: int = Query(100, ge=1, le=1000), ): - rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=page, limit=limit) - return {"page": page, "limit": limit, "results": rows} + total, rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=page, limit=limit) + return {"page": page, "limit": limit, "total": total, "results": rows} @app.get("/api/export") @@ -157,46 +165,42 @@ async def export_csv( elif tier == "warm": min_score = 50 + max_score = 79 if tier == "warm" else 100 + async def generate(): yield "domain,score,cms,ssl_expiry_days,ip_country,is_live,status_code,has_mx,server,page_title,enriched_at\n" - page = 1 + p = 1 while True: - rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=page, limit=500) + _, rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=p, limit=500) if not rows: break for r in rows: - # Apply warm tier upper bound - if tier == "warm" and r.get("score", 0) >= 80: + if r.get("score", 0) > max_score: continue line = ",".join( f'"{str(r.get(col) or "").replace(chr(34), chr(39))}"' - for col in [ - "domain", "score", "cms", "ssl_expiry_days", "ip_country", - "is_live", "status_code", "has_mx", "server", "page_title", "enriched_at" - ] + for col in ["domain", "score", "cms", "ssl_expiry_days", "ip_country", + "is_live", "status_code", "has_mx", "server", "page_title", "enriched_at"] ) yield line + "\n" - page += 1 + p += 1 - filename = f"domgod_leads_score{min_score}{'_' + tier if tier else ''}.csv" + fname = f"domgod_{tier or 'export'}_score{min_score}.csv" return StreamingResponse( - generate(), - media_type="text/csv", - headers={"Content-Disposition": f'attachment; filename="{filename}"'}, + generate(), media_type="text/csv", + headers={"Content-Disposition": f'attachment; filename="{fname}"'}, ) @app.post("/api/score/run") async def score_run(): - result = await run_scoring() - return result + return await run_scoring() -# ── Static UI ─────────────────────────────────────────────────────────────── +# ── Static UI ──────────────────────────────────────────────────────────────── static_dir = Path(__file__).parent / "static" app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static") - if __name__ == "__main__": import uvicorn uvicorn.run("app.main:app", host="0.0.0.0", port=6677, log_level="info") diff --git a/app/static/index.html b/app/static/index.html index 1c6f7c0..92714d9 100644 --- a/app/static/index.html +++ b/app/static/index.html @@ -3,592 +3,529 @@ -DomGod — Domain Intelligence Dashboard +DomGod — Domain Intelligence - -
+ -
-

DomGod

- - - -
+ +
-
+
+

DomGod

+ + + + + +
- -
-
Overview
-
-
-
Total Domains
-
-
in parquet
-
-
-
Enriched
-
-
-
-
-
Hot Leads
-
-
score ≥ 60
-
-
-
Queue Pending
-
-
-
-
-
Done / Failed
-
-
-
+
+ + +
+
Overview
+
+
Total Domains
in dataset
+
Enriched
+
Hot Leads
score ≥ 60
+
Queue Pending
+
Done / Failed
+
+
+ + +
+
Browse & Filter
+
Enrichment Queue
+
Lead Pipeline
+
TLD Chart
+
+ + +
+
+
+ +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+ + +
- -
-
Browse & Filter
-
Enrichment Queue
-
Lead Pipeline
-
TLD Chart
+
+ + + + + + + matches +
- -
-
-
- - -
-
- - -
-
- - -
-
- - -
- - - -
- -
- - +
+
+ + + + + + + + + + + + + + + + + + +
DomainScoreCMSSSL daysCountryLiveServerStatus
- -
-
-
-
-
Pending
-
-
-
-
Running
-
-
-
-
Done
-
-
-
-
Failed
-
-
-
-
ETA
-
-
+ +
+ + Page + + + + +
+
-
-
- -
- - - -
-
-
-
-
-
-
- -
-
Enrich custom domains
-
- - -
-
+ +
+
+
Pending
+
Running
+
Done
+
Failed
+
ETA
- -
-
- -
-
- -
-

🔥 Hot

-
Score 80–100
-
-
- -
- -
- -
-

♨️ Warm

-
Score 50–79
-
-
- -
- -
- -
-

🧊 Cold

-
Score < 50
-
-
- -
- +
+
+ +
+ + + +
+
+
- -
-
Top 20 TLDs
-
- +
+
Queue custom domains
+
+
+ +
+
+
-
-
+ +
+
+ +
+
+
+

🔥 Hot

+
Score 80–100
+
+
+ +
+ +
+
+

♨️ Warm

+
Score 50–79
+
+
+ +
+ +
+
+

🧊 Cold

+
Score < 50
+
+
+ +
+ +
+
+
+ + +
+
Top 20 TLDs in dataset
+
+
+ +