import os import aiosqlite import duckdb from pathlib import Path DATA_DIR = Path(os.getenv("DATA_DIR", "/data")) PARQUET_PATH = DATA_DIR / "domains.parquet" SQLITE_PATH = DATA_DIR / "enrichment.db" SCHEMA = """ CREATE TABLE IF NOT EXISTS enriched_domains ( domain TEXT PRIMARY KEY, is_live INTEGER DEFAULT 0, status_code INTEGER, ssl_valid INTEGER DEFAULT 0, ssl_expiry_days INTEGER, cms TEXT, has_mx INTEGER DEFAULT 0, ip_country TEXT, page_title TEXT, server TEXT, enriched_at TEXT, error TEXT, score INTEGER DEFAULT 0 ); CREATE TABLE IF NOT EXISTS job_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, domain TEXT UNIQUE NOT NULL, status TEXT DEFAULT 'pending', created_at TEXT DEFAULT (datetime('now')), started_at TEXT, completed_at TEXT, error TEXT ); CREATE TABLE IF NOT EXISTS scores ( domain TEXT PRIMARY KEY, score INTEGER NOT NULL, scored_at TEXT DEFAULT (datetime('now')) ); """ async def init_db(): async with aiosqlite.connect(SQLITE_PATH) as db: await db.executescript(SCHEMA) await db.commit() async def get_db(): return await aiosqlite.connect(SQLITE_PATH) def duckdb_query(sql: str, params=None): conn = duckdb.connect(database=":memory:", read_only=False) conn.execute(f"SET threads=4") if params: result = conn.execute(sql, params).fetchall() else: result = conn.execute(sql).fetchall() conn.close() return result def duckdb_query_df(sql: str, params=None): conn = duckdb.connect(database=":memory:", read_only=False) conn.execute("SET threads=4") if params: result = conn.execute(sql, params).df() else: result = conn.execute(sql).df() conn.close() return result async def get_stats(): parquet = str(PARQUET_PATH) # Total count + TLD breakdown via DuckDB pushdown total = duckdb_query(f"SELECT COUNT(*) FROM read_parquet('{parquet}')")[0][0] tld_rows = duckdb_query(f""" SELECT regexp_extract(domain, '\\.([a-zA-Z0-9]+)$', 1) AS tld, COUNT(*) AS cnt FROM read_parquet('{parquet}') GROUP BY tld ORDER BY cnt DESC LIMIT 20 """) async with aiosqlite.connect(SQLITE_PATH) as db: async with db.execute("SELECT COUNT(*) FROM enriched_domains") as cur: enriched = (await cur.fetchone())[0] threshold = int(os.getenv("SCORE_THRESHOLD", "60")) async with db.execute( "SELECT COUNT(*) FROM enriched_domains WHERE score >= ?", (threshold,) ) as cur: hot_leads = (await cur.fetchone())[0] async with db.execute( "SELECT COUNT(*) FROM job_queue WHERE status='pending'" ) as cur: queue_pending = (await cur.fetchone())[0] async with db.execute( "SELECT COUNT(*) FROM job_queue WHERE status='running'" ) as cur: queue_running = (await cur.fetchone())[0] async with db.execute( "SELECT COUNT(*) FROM job_queue WHERE status='done'" ) as cur: queue_done = (await cur.fetchone())[0] async with db.execute( "SELECT COUNT(*) FROM job_queue WHERE status='failed'" ) as cur: queue_failed = (await cur.fetchone())[0] return { "total_domains": total, "enriched": enriched, "hot_leads": hot_leads, "tld_breakdown": [{"tld": r[0], "count": r[1]} for r in tld_rows], "queue": { "pending": queue_pending, "running": queue_running, "done": queue_done, "failed": queue_failed, }, } async def get_domains(tld=None, page=1, limit=100, live_only=False): parquet = str(PARQUET_PATH) conditions = [] params = [] if tld: conditions.append(f"regexp_extract(domain, '\\.([a-zA-Z0-9]+)$', 1) = '{tld}'") if live_only: # Join with enriched_domains to check is_live pass where = f"WHERE {' AND '.join(conditions)}" if conditions else "" offset = (page - 1) * limit sql = f""" SELECT domain FROM read_parquet('{parquet}') {where} LIMIT {limit} OFFSET {offset} """ rows = duckdb_query(sql) domains = [r[0] for r in rows] # Merge enrichment data from SQLite if domains: placeholders = ",".join("?" * len(domains)) async with aiosqlite.connect(SQLITE_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( f"SELECT * FROM enriched_domains WHERE domain IN ({placeholders})", domains, ) as cur: enriched = {r["domain"]: dict(r) async for r in cur} result = [] for d in domains: if d in enriched: result.append(enriched[d]) else: result.append({"domain": d}) return result return [] async def get_enriched(min_score=0, cms=None, country=None, page=1, limit=100): offset = (page - 1) * limit conditions = ["score >= ?"] params = [min_score] if cms: conditions.append("cms = ?") params.append(cms) if country: conditions.append("ip_country = ?") params.append(country) where = "WHERE " + " AND ".join(conditions) async with aiosqlite.connect(SQLITE_PATH) as db: db.row_factory = aiosqlite.Row async with db.execute( f"SELECT * FROM enriched_domains {where} ORDER BY score DESC LIMIT ? OFFSET ?", params + [limit, offset], ) as cur: rows = [dict(r) async for r in cur] return rows async def queue_domains(domains: list[str]): async with aiosqlite.connect(SQLITE_PATH) as db: await db.executemany( "INSERT OR IGNORE INTO job_queue (domain) VALUES (?)", [(d,) for d in domains], ) await db.commit() async def get_queue_status(): async with aiosqlite.connect(SQLITE_PATH) as db: async with db.execute( "SELECT status, COUNT(*) FROM job_queue GROUP BY status" ) as cur: rows = {r[0]: r[1] async for r in cur} total = sum(rows.values()) done = rows.get("done", 0) pending = rows.get("pending", 0) running = rows.get("running", 0) failed = rows.get("failed", 0) eta_seconds = None if running > 0 or pending > 0: rate = int(os.getenv("CONCURRENCY_LIMIT", "50")) eta_seconds = (pending + running) / max(rate / 10, 1) return { "total": total, "pending": pending, "running": running, "done": done, "failed": failed, "eta_seconds": eta_seconds, }