feat: persistent DuckDB index, new filters, pagination fix, enrich UX

- Build /data/domains.duckdb on first run (tld+parts columns + ART index)
  → TLD filter goes from ~60s full scan to <100ms index lookup
  → System still works (slower) while index builds in background
- New /api/domains params: alpha_only, no_sld, keyword
  → alpha_only: domains with only letters (no hyphens/numbers)
  → no_sld: parts=2, excludes com.es / net.es patterns
  → keyword: LIKE '%term%' niche search
- /api/domains and /api/enriched now return total count for pagination
- Pagination: shows total matches, page X of Y, Next disabled at last page
- Enrich button: toast notifications instead of alert(), error handling
- Select all on page button, clear selection button
- Stats/TLD breakdown cached after first load (no repeat full scan)
- Header shows index build status (building → ready)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-13 17:00:08 +02:00
parent 2db95cc727
commit 7acff12242
3 changed files with 662 additions and 641 deletions

324
app/db.py
View File

@@ -1,10 +1,15 @@
import os import os
import asyncio
import logging
import aiosqlite import aiosqlite
import duckdb import duckdb
from pathlib import Path from pathlib import Path
logger = logging.getLogger(__name__)
DATA_DIR = Path(os.getenv("DATA_DIR", "/data")) DATA_DIR = Path(os.getenv("DATA_DIR", "/data"))
PARQUET_PATH = DATA_DIR / "domains.parquet" PARQUET_PATH = DATA_DIR / "domains.parquet"
DUCKDB_PATH = DATA_DIR / "domains.duckdb"
SQLITE_PATH = DATA_DIR / "enrichment.db" SQLITE_PATH = DATA_DIR / "enrichment.db"
SCHEMA = """ SCHEMA = """
@@ -23,7 +28,6 @@ CREATE TABLE IF NOT EXISTS enriched_domains (
error TEXT, error TEXT,
score INTEGER DEFAULT 0 score INTEGER DEFAULT 0
); );
CREATE TABLE IF NOT EXISTS job_queue ( CREATE TABLE IF NOT EXISTS job_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
domain TEXT UNIQUE NOT NULL, domain TEXT UNIQUE NOT NULL,
@@ -33,7 +37,6 @@ CREATE TABLE IF NOT EXISTS job_queue (
completed_at TEXT, completed_at TEXT,
error TEXT error TEXT
); );
CREATE TABLE IF NOT EXISTS scores ( CREATE TABLE IF NOT EXISTS scores (
domain TEXT PRIMARY KEY, domain TEXT PRIMARY KEY,
score INTEGER NOT NULL, score INTEGER NOT NULL,
@@ -41,6 +44,15 @@ CREATE TABLE IF NOT EXISTS scores (
); );
""" """
# Index build state
_index_ready = False
_index_building = False
_index_total = 0
# Cached stats (TLD breakdown is expensive — compute once)
_tld_cache: list = []
_total_cache: int = 0
async def init_db(): async def init_db():
async with aiosqlite.connect(SQLITE_PATH) as db: async with aiosqlite.connect(SQLITE_PATH) as db:
@@ -48,142 +60,219 @@ async def init_db():
await db.commit() await db.commit()
async def get_db(): # ── DuckDB persistent index ──────────────────────────────────────────────────
return await aiosqlite.connect(SQLITE_PATH)
def _build_index_sync():
global _index_ready, _index_building, _index_total
_index_building = True
try:
conn = duckdb.connect(str(DUCKDB_PATH))
conn.execute("SET threads=4")
conn.execute("SET memory_limit='2GB'")
# Check if already built
try:
n = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0]
if n > 0:
_index_total = n
_index_ready = True
_index_building = False
logger.info("DuckDB index already ready (%d rows)", n)
conn.close()
return
except Exception:
pass
logger.info("Building DuckDB index from parquet (one-time ~2-3 min)...")
conn.execute("""
CREATE OR REPLACE TABLE domains AS
SELECT
domain,
lower(regexp_extract(domain, '\\.([^.]+)$', 1)) AS tld,
len(string_split(domain, '.')) AS parts
FROM read_parquet(?)
""", [str(PARQUET_PATH)])
conn.execute("CREATE INDEX IF NOT EXISTS idx_tld ON domains(tld)")
_index_total = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0]
conn.close()
_index_ready = True
logger.info("DuckDB index built: %d rows", _index_total)
except Exception as e:
logger.error("DuckDB index build failed: %s", e)
finally:
_index_building = False
def duckdb_query(sql: str, params=None): async def build_duckdb_index():
conn = duckdb.connect(database=":memory:", read_only=False) loop = asyncio.get_event_loop()
conn.execute(f"SET threads=4") await loop.run_in_executor(None, _build_index_sync)
if params:
result = conn.execute(sql, params).fetchall()
def index_status() -> dict:
return {
"ready": _index_ready,
"building": _index_building,
"total": _index_total,
}
# ── Domain queries ───────────────────────────────────────────────────────────
def _domains_sync(tld, page, limit, alpha_only, no_sld, keyword):
conditions = []
params_count = []
params_data = []
if _index_ready:
source = "domains"
def _add(clause, val=None):
conditions.append(clause)
if val is not None:
params_count.append(val)
params_data.append(val)
else: else:
result = conn.execute(sql).fetchall() source = f"read_parquet('{PARQUET_PATH}')"
conn.close()
return result
def _add(clause, val=None):
conditions.append(clause)
if val is not None:
params_count.append(val)
params_data.append(val)
def duckdb_query_df(sql: str, params=None): if tld:
conn = duckdb.connect(database=":memory:", read_only=False) if _index_ready:
_add("tld = ?", tld.lower().lstrip("."))
else:
_add("lower(regexp_extract(domain, '\\.([^.]+)$', 1)) = ?", tld.lower().lstrip("."))
if no_sld:
if _index_ready:
_add("parts = 2")
else:
_add("len(string_split(domain, '.')) = 2")
if alpha_only:
_add("NOT regexp_matches(domain, '[^a-zA-Z.]')")
if keyword:
_add("domain LIKE ?", f"%{keyword.lower()}%")
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
offset = (page - 1) * limit
if _index_ready:
conn = duckdb.connect(str(DUCKDB_PATH), read_only=True)
else:
conn = duckdb.connect(":memory:")
conn.execute("SET threads=4") conn.execute("SET threads=4")
if params:
result = conn.execute(sql, params).df() total = conn.execute(f"SELECT COUNT(*) FROM {source} {where}", params_count).fetchone()[0]
else: rows = conn.execute(
result = conn.execute(sql).df() f"SELECT domain FROM {source} {where} LIMIT {limit} OFFSET {offset}", params_data
).fetchall()
conn.close() conn.close()
return result return total, [r[0] for r in rows]
async def get_domains(tld=None, page=1, limit=100, alpha_only=False, no_sld=False, keyword=None, live_only=False):
loop = asyncio.get_event_loop()
total, domain_list = await loop.run_in_executor(
None, _domains_sync, tld, page, limit, alpha_only, no_sld, keyword
)
if not domain_list:
return total, []
placeholders = ",".join("?" * len(domain_list))
async with aiosqlite.connect(SQLITE_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
f"SELECT * FROM enriched_domains WHERE domain IN ({placeholders})",
domain_list,
) as cur:
enriched_map = {r["domain"]: dict(r) async for r in cur}
results = []
for d in domain_list:
row = enriched_map.get(d, {"domain": d})
if live_only and not row.get("is_live"):
continue
results.append(row)
return total, results
# ── Stats ────────────────────────────────────────────────────────────────────
def _tld_stats_sync() -> tuple[int, list]:
if _index_ready:
conn = duckdb.connect(str(DUCKDB_PATH), read_only=True)
total = conn.execute("SELECT COUNT(*) FROM domains").fetchone()[0]
rows = conn.execute("""
SELECT tld, COUNT(*) AS cnt FROM domains
WHERE tld != ''
GROUP BY tld ORDER BY cnt DESC LIMIT 20
""").fetchall()
conn.close()
else:
p = str(PARQUET_PATH)
conn = duckdb.connect(":memory:")
conn.execute("SET threads=4")
total = conn.execute(f"SELECT COUNT(*) FROM read_parquet('{p}')").fetchone()[0]
rows = conn.execute(f"""
SELECT lower(regexp_extract(domain, '\\.([^.]+)$', 1)) AS tld, COUNT(*) AS cnt
FROM read_parquet('{p}')
GROUP BY tld ORDER BY cnt DESC LIMIT 20
""").fetchall()
conn.close()
return total, [{"tld": r[0], "count": r[1]} for r in rows]
async def get_stats(): async def get_stats():
parquet = str(PARQUET_PATH) global _tld_cache, _total_cache
# Total count + TLD breakdown via DuckDB pushdown # Compute TLD breakdown once and cache it
total = duckdb_query(f"SELECT COUNT(*) FROM read_parquet('{parquet}')")[0][0] if not _tld_cache:
loop = asyncio.get_event_loop()
tld_rows = duckdb_query(f""" _total_cache, _tld_cache = await loop.run_in_executor(None, _tld_stats_sync)
SELECT
regexp_extract(domain, '\\.([a-zA-Z0-9]+)$', 1) AS tld,
COUNT(*) AS cnt
FROM read_parquet('{parquet}')
GROUP BY tld
ORDER BY cnt DESC
LIMIT 20
""")
async with aiosqlite.connect(SQLITE_PATH) as db: async with aiosqlite.connect(SQLITE_PATH) as db:
async with db.execute("SELECT COUNT(*) FROM enriched_domains") as cur: async with db.execute("SELECT COUNT(*) FROM enriched_domains") as cur:
enriched = (await cur.fetchone())[0] enriched = (await cur.fetchone())[0]
threshold = int(os.getenv("SCORE_THRESHOLD", "60")) threshold = int(os.getenv("SCORE_THRESHOLD", "60"))
async with db.execute( async with db.execute("SELECT COUNT(*) FROM enriched_domains WHERE score >= ?", (threshold,)) as cur:
"SELECT COUNT(*) FROM enriched_domains WHERE score >= ?", (threshold,)
) as cur:
hot_leads = (await cur.fetchone())[0] hot_leads = (await cur.fetchone())[0]
async with db.execute( async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur:
"SELECT COUNT(*) FROM job_queue WHERE status='pending'" q = {r[0]: r[1] async for r in cur}
) as cur:
queue_pending = (await cur.fetchone())[0]
async with db.execute(
"SELECT COUNT(*) FROM job_queue WHERE status='running'"
) as cur:
queue_running = (await cur.fetchone())[0]
async with db.execute(
"SELECT COUNT(*) FROM job_queue WHERE status='done'"
) as cur:
queue_done = (await cur.fetchone())[0]
async with db.execute(
"SELECT COUNT(*) FROM job_queue WHERE status='failed'"
) as cur:
queue_failed = (await cur.fetchone())[0]
return { return {
"total_domains": total, "total_domains": _total_cache,
"enriched": enriched, "enriched": enriched,
"hot_leads": hot_leads, "hot_leads": hot_leads,
"tld_breakdown": [{"tld": r[0], "count": r[1]} for r in tld_rows], "tld_breakdown": _tld_cache,
"index_status": index_status(),
"queue": { "queue": {
"pending": queue_pending, "pending": q.get("pending", 0),
"running": queue_running, "running": q.get("running", 0),
"done": queue_done, "done": q.get("done", 0),
"failed": queue_failed, "failed": q.get("failed", 0),
}, },
} }
async def get_domains(tld=None, page=1, limit=100, live_only=False): # ── Enrichment helpers ───────────────────────────────────────────────────────
parquet = str(PARQUET_PATH)
conditions = []
params = []
if tld:
conditions.append(f"regexp_extract(domain, '\\.([a-zA-Z0-9]+)$', 1) = '{tld}'")
if live_only:
# Join with enriched_domains to check is_live
pass
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
offset = (page - 1) * limit
sql = f"""
SELECT domain
FROM read_parquet('{parquet}')
{where}
LIMIT {limit} OFFSET {offset}
"""
rows = duckdb_query(sql)
domains = [r[0] for r in rows]
# Merge enrichment data from SQLite
if domains:
placeholders = ",".join("?" * len(domains))
async with aiosqlite.connect(SQLITE_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
f"SELECT * FROM enriched_domains WHERE domain IN ({placeholders})",
domains,
) as cur:
enriched = {r["domain"]: dict(r) async for r in cur}
result = []
for d in domains:
if d in enriched:
result.append(enriched[d])
else:
result.append({"domain": d})
return result
return []
async def get_enriched(min_score=0, cms=None, country=None, page=1, limit=100): async def get_enriched(min_score=0, cms=None, country=None, page=1, limit=100):
offset = (page - 1) * limit offset = (page - 1) * limit
conditions = ["score >= ?"] conditions = ["score >= ?"]
params = [min_score] params: list = [min_score]
if cms: if cms:
conditions.append("cms = ?") conditions.append("cms = ?")
params.append(cms) params.append(cms)
if country: if country:
conditions.append("ip_country = ?") conditions.append("ip_country = ?")
params.append(country) params.append(country)
where = "WHERE " + " AND ".join(conditions) where = "WHERE " + " AND ".join(conditions)
async with aiosqlite.connect(SQLITE_PATH) as db: async with aiosqlite.connect(SQLITE_PATH) as db:
db.row_factory = aiosqlite.Row db.row_factory = aiosqlite.Row
@@ -192,7 +281,11 @@ async def get_enriched(min_score=0, cms=None, country=None, page=1, limit=100):
params + [limit, offset], params + [limit, offset],
) as cur: ) as cur:
rows = [dict(r) async for r in cur] rows = [dict(r) async for r in cur]
return rows async with db.execute(
f"SELECT COUNT(*) FROM enriched_domains {where}", params
) as cur:
total = (await cur.fetchone())[0]
return total, rows
async def queue_domains(domains: list[str]): async def queue_domains(domains: list[str]):
@@ -206,26 +299,13 @@ async def queue_domains(domains: list[str]):
async def get_queue_status(): async def get_queue_status():
async with aiosqlite.connect(SQLITE_PATH) as db: async with aiosqlite.connect(SQLITE_PATH) as db:
async with db.execute( async with db.execute("SELECT status, COUNT(*) FROM job_queue GROUP BY status") as cur:
"SELECT status, COUNT(*) FROM job_queue GROUP BY status"
) as cur:
rows = {r[0]: r[1] async for r in cur} rows = {r[0]: r[1] async for r in cur}
total = sum(rows.values())
done = rows.get("done", 0)
pending = rows.get("pending", 0) pending = rows.get("pending", 0)
running = rows.get("running", 0) running = rows.get("running", 0)
done = rows.get("done", 0)
failed = rows.get("failed", 0) failed = rows.get("failed", 0)
total = sum(rows.values())
eta_seconds = None rate = int(os.getenv("CONCURRENCY_LIMIT", "50"))
if running > 0 or pending > 0: eta_seconds = (pending + running) / max(rate / 10, 1) if (pending + running) > 0 else None
rate = int(os.getenv("CONCURRENCY_LIMIT", "50")) return {"total": total, "pending": pending, "running": running, "done": done, "failed": failed, "eta_seconds": eta_seconds}
eta_seconds = (pending + running) / max(rate / 10, 1)
return {
"total": total,
"pending": pending,
"running": running,
"done": done,
"failed": failed,
"eta_seconds": eta_seconds,
}

View File

@@ -1,12 +1,10 @@
import os import os
import sys
import asyncio import asyncio
import logging import logging
from pathlib import Path from pathlib import Path
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import httpx import httpx
import duckdb
import aiosqlite import aiosqlite
from fastapi import FastAPI, Query from fastapi import FastAPI, Query
from fastapi.responses import StreamingResponse, JSONResponse from fastapi.responses import StreamingResponse, JSONResponse
@@ -18,7 +16,7 @@ load_dotenv()
from app.db import ( from app.db import (
DATA_DIR, PARQUET_PATH, SQLITE_PATH, DATA_DIR, PARQUET_PATH, SQLITE_PATH,
init_db, get_stats, get_domains, get_enriched, init_db, get_stats, get_domains, get_enriched,
queue_domains, get_queue_status, queue_domains, get_queue_status, build_duckdb_index, index_status,
) )
from app.enricher import start_worker, pause_worker, resume_worker, is_running from app.enricher import start_worker, pause_worker, resume_worker, is_running
from app.scorer import run_scoring from app.scorer import run_scoring
@@ -37,16 +35,13 @@ async def download_parquet():
DATA_DIR.mkdir(parents=True, exist_ok=True) DATA_DIR.mkdir(parents=True, exist_ok=True)
tmp_path = PARQUET_PATH.with_suffix(".tmp") tmp_path = PARQUET_PATH.with_suffix(".tmp")
# Resumable download via Range header
downloaded = tmp_path.stat().st_size if tmp_path.exists() else 0 downloaded = tmp_path.stat().st_size if tmp_path.exists() else 0
headers = {"Range": f"bytes={downloaded}-"} if downloaded > 0 else {} headers = {"Range": f"bytes={downloaded}-"} if downloaded > 0 else {}
logger.info("Downloading parquet from %s (offset=%d)...", PARQUET_URL, downloaded) logger.info("Downloading parquet from %s (offset=%d)...", PARQUET_URL, downloaded)
async with httpx.AsyncClient(follow_redirects=True, timeout=None) as client: async with httpx.AsyncClient(follow_redirects=True, timeout=None) as client:
async with client.stream("GET", PARQUET_URL, headers=headers) as resp: async with client.stream("GET", PARQUET_URL, headers=headers) as resp:
if resp.status_code == 416: if resp.status_code == 416:
# Already fully downloaded
tmp_path.rename(PARQUET_PATH) tmp_path.rename(PARQUET_PATH)
return return
resp.raise_for_status() resp.raise_for_status()
@@ -58,41 +53,54 @@ async def download_parquet():
f.write(chunk) f.write(chunk)
received += len(chunk) received += len(chunk)
if total: if total:
pct = received / total * 100 logger.info("Download: %.1f%% (%d/%d)", received / total * 100, received, total)
logger.info("Download progress: %.1f%% (%d/%d bytes)", pct, received, total)
tmp_path.rename(PARQUET_PATH) tmp_path.rename(PARQUET_PATH)
logger.info("Parquet download complete: %s", PARQUET_PATH) logger.info("Parquet download complete")
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
await download_parquet() await download_parquet()
await init_db() await init_db()
# Build DuckDB index in background — queries still work (slower) while building
asyncio.create_task(build_duckdb_index())
start_worker() start_worker()
logger.info("DomGod dashboard ready on port 6677") logger.info("DomGod ready on port 6677")
yield yield
app = FastAPI(title="DomGod", lifespan=lifespan) app = FastAPI(title="DomGod", lifespan=lifespan)
# ── API routes ────────────────────────────────────────────────────────────── # ── API ──────────────────────────────────────────────────────────────────────
@app.get("/api/stats") @app.get("/api/stats")
async def stats(): async def stats():
return await get_stats() return await get_stats()
@app.get("/api/index/status")
async def get_index_status():
return index_status()
@app.get("/api/domains") @app.get("/api/domains")
async def domains( async def domains(
tld: str = Query(None), tld: str = Query(None),
page: int = Query(1, ge=1), page: int = Query(1, ge=1),
limit: int = Query(100, ge=1, le=1000), limit: int = Query(100, ge=1, le=500),
live_only: bool = Query(False), live_only: bool = Query(False),
alpha_only: bool = Query(False),
no_sld: bool = Query(False),
keyword: str = Query(None),
): ):
rows = await get_domains(tld=tld, page=page, limit=limit, live_only=live_only) total, rows = await get_domains(
return {"page": page, "limit": limit, "results": rows} tld=tld, page=page, limit=limit,
alpha_only=alpha_only, no_sld=no_sld,
keyword=keyword, live_only=live_only,
)
return {"page": page, "limit": limit, "total": total, "results": rows}
@app.post("/api/enrich/batch") @app.post("/api/enrich/batch")
@@ -118,7 +126,7 @@ async def enrich_retry():
await db.execute("UPDATE job_queue SET status='pending', error=NULL WHERE status='failed'") await db.execute("UPDATE job_queue SET status='pending', error=NULL WHERE status='failed'")
await db.commit() await db.commit()
resume_worker() resume_worker()
return {"status": "retrying failed jobs"} return {"status": "retrying"}
@app.post("/api/enrich/pause") @app.post("/api/enrich/pause")
@@ -141,8 +149,8 @@ async def enriched(
page: int = Query(1, ge=1), page: int = Query(1, ge=1),
limit: int = Query(100, ge=1, le=1000), limit: int = Query(100, ge=1, le=1000),
): ):
rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=page, limit=limit) total, rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=page, limit=limit)
return {"page": page, "limit": limit, "results": rows} return {"page": page, "limit": limit, "total": total, "results": rows}
@app.get("/api/export") @app.get("/api/export")
@@ -157,46 +165,42 @@ async def export_csv(
elif tier == "warm": elif tier == "warm":
min_score = 50 min_score = 50
max_score = 79 if tier == "warm" else 100
async def generate(): async def generate():
yield "domain,score,cms,ssl_expiry_days,ip_country,is_live,status_code,has_mx,server,page_title,enriched_at\n" yield "domain,score,cms,ssl_expiry_days,ip_country,is_live,status_code,has_mx,server,page_title,enriched_at\n"
page = 1 p = 1
while True: while True:
rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=page, limit=500) _, rows = await get_enriched(min_score=min_score, cms=cms, country=country, page=p, limit=500)
if not rows: if not rows:
break break
for r in rows: for r in rows:
# Apply warm tier upper bound if r.get("score", 0) > max_score:
if tier == "warm" and r.get("score", 0) >= 80:
continue continue
line = ",".join( line = ",".join(
f'"{str(r.get(col) or "").replace(chr(34), chr(39))}"' f'"{str(r.get(col) or "").replace(chr(34), chr(39))}"'
for col in [ for col in ["domain", "score", "cms", "ssl_expiry_days", "ip_country",
"domain", "score", "cms", "ssl_expiry_days", "ip_country", "is_live", "status_code", "has_mx", "server", "page_title", "enriched_at"]
"is_live", "status_code", "has_mx", "server", "page_title", "enriched_at"
]
) )
yield line + "\n" yield line + "\n"
page += 1 p += 1
filename = f"domgod_leads_score{min_score}{'_' + tier if tier else ''}.csv" fname = f"domgod_{tier or 'export'}_score{min_score}.csv"
return StreamingResponse( return StreamingResponse(
generate(), generate(), media_type="text/csv",
media_type="text/csv", headers={"Content-Disposition": f'attachment; filename="{fname}"'},
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
) )
@app.post("/api/score/run") @app.post("/api/score/run")
async def score_run(): async def score_run():
result = await run_scoring() return await run_scoring()
return result
# ── Static UI ─────────────────────────────────────────────────────────────── # ── Static UI ───────────────────────────────────────────────────────────────
static_dir = Path(__file__).parent / "static" static_dir = Path(__file__).parent / "static"
app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static") app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static")
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
uvicorn.run("app.main:app", host="0.0.0.0", port=6677, log_level="info") uvicorn.run("app.main:app", host="0.0.0.0", port=6677, log_level="info")

File diff suppressed because it is too large Load Diff