import asyncio import os import ssl import socket import datetime import logging from typing import Optional import httpx import dns.resolver import aiosqlite from bs4 import BeautifulSoup from app.db import SQLITE_PATH from app.scorer import score logger = logging.getLogger(__name__) CONCURRENCY_LIMIT = int(os.getenv("CONCURRENCY_LIMIT", "50")) # ip-api.com free tier: 45 req/min → ~1.33/s. We use a separate slower semaphore. IP_API_SEMAPHORE: Optional[asyncio.Semaphore] = None IP_API_RATE = 45 # per minute _worker_task: Optional[asyncio.Task] = None _paused = False def get_ip_semaphore(): global IP_API_SEMAPHORE if IP_API_SEMAPHORE is None: IP_API_SEMAPHORE = asyncio.Semaphore(1) return IP_API_SEMAPHORE CMS_SIGNATURES = { "wordpress": ["/wp-content/", "/wp-includes/", 'name="generator" content="WordPress'], "joomla": ["/components/com_", "Joomla!", 'name="generator" content="Joomla'], "drupal": ["/sites/default/files/", "Drupal.settings", 'name="generator" content="Drupal'], "wix": ["wix.com", "X-Wix-"], "squarespace": ["squarespace.com", "X-Squarespace-"], "shopify": ["cdn.shopify.com", "Shopify.theme"], "prestashop": ["PrestaShop", "/modules/"], "magento": ["Mage.Cookies", "X-Magento-"], "typo3": ["typo3", "TYPO3 CMS"], "opencart": ["route=common/home", "OpenCart"], } def detect_cms(html: str, headers: dict) -> Optional[str]: combined = html[:50000] + " ".join(f"{k}:{v}" for k, v in headers.items()) for cms, sigs in CMS_SIGNATURES.items(): if any(sig.lower() in combined.lower() for sig in sigs): return cms return None async def check_ssl(domain: str) -> tuple[bool, Optional[int]]: try: ctx = ssl.create_default_context() loop = asyncio.get_event_loop() def _check(): with socket.create_connection((domain, 443), timeout=5) as sock: with ctx.wrap_socket(sock, server_hostname=domain) as ssock: cert = ssock.getpeercert() expiry_str = cert.get("notAfter", "") expiry = datetime.datetime.strptime(expiry_str, "%b %d %H:%M:%S %Y %Z") days = (expiry - datetime.datetime.utcnow()).days return True, days return await loop.run_in_executor(None, _check) except Exception: return False, None async def check_mx(domain: str) -> bool: try: loop = asyncio.get_event_loop() def _check(): try: answers = dns.resolver.resolve(domain, "MX", lifetime=5) return len(answers) > 0 except Exception: return False return await loop.run_in_executor(None, _check) except Exception: return False _ip_last_call = 0.0 _ip_lock = asyncio.Lock() if False else None # initialized lazily async def get_ip_country(ip: str) -> Optional[str]: global _ip_last_call # Enforce 45 req/min = 1 req per 1.33s async with get_ip_semaphore(): now = asyncio.get_event_loop().time() wait = (1 / (IP_API_RATE / 60)) - (now - _ip_last_call) if wait > 0: await asyncio.sleep(wait) _ip_last_call = asyncio.get_event_loop().time() try: async with httpx.AsyncClient(timeout=5) as client: resp = await client.get(f"http://ip-api.com/json/{ip}?fields=countryCode") if resp.status_code == 200: return resp.json().get("countryCode") except Exception: pass return None async def enrich_domain(domain: str) -> dict: result = { "domain": domain, "is_live": False, "status_code": None, "ssl_valid": False, "ssl_expiry_days": None, "cms": None, "has_mx": False, "ip_country": None, "page_title": None, "server": None, "enriched_at": datetime.datetime.utcnow().isoformat(), "error": None, } try: async with httpx.AsyncClient( timeout=10, follow_redirects=True, verify=False, headers={"User-Agent": "Mozilla/5.0 (compatible; DomGod/1.0)"}, ) as client: resp = await client.get(f"http://{domain}") result["is_live"] = resp.status_code in (200, 301, 302, 303, 307, 308) result["status_code"] = resp.status_code result["server"] = resp.headers.get("server") html = resp.text soup = BeautifulSoup(html, "html.parser") title_tag = soup.find("title") result["page_title"] = title_tag.get_text(strip=True)[:500] if title_tag else None result["cms"] = detect_cms(html, dict(resp.headers)) # Resolve IP for country lookup try: loop = asyncio.get_event_loop() ip = await loop.run_in_executor(None, socket.gethostbyname, domain) result["ip_country"] = await get_ip_country(ip) except Exception: pass except Exception as e: result["error"] = str(e)[:500] # SSL check (independent of HTTP) ssl_valid, ssl_days = await check_ssl(domain) result["ssl_valid"] = ssl_valid result["ssl_expiry_days"] = ssl_days # MX check result["has_mx"] = await check_mx(domain) # Score result["score"] = score(result) return result async def save_enriched(data: dict): async with aiosqlite.connect(SQLITE_PATH) as db: await db.execute( """INSERT INTO enriched_domains (domain, is_live, status_code, ssl_valid, ssl_expiry_days, cms, has_mx, ip_country, page_title, server, enriched_at, error, score) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT(domain) DO UPDATE SET is_live=excluded.is_live, status_code=excluded.status_code, ssl_valid=excluded.ssl_valid, ssl_expiry_days=excluded.ssl_expiry_days, cms=excluded.cms, has_mx=excluded.has_mx, ip_country=excluded.ip_country, page_title=excluded.page_title, server=excluded.server, enriched_at=excluded.enriched_at, error=excluded.error, score=excluded.score""", ( data["domain"], data["is_live"], data["status_code"], data["ssl_valid"], data["ssl_expiry_days"], data["cms"], data["has_mx"], data["ip_country"], data["page_title"], data["server"], data["enriched_at"], data["error"], data["score"], ), ) await db.execute( """INSERT INTO scores (domain, score) VALUES (?,?) ON CONFLICT(domain) DO UPDATE SET score=excluded.score, scored_at=datetime('now')""", (data["domain"], data["score"]), ) await db.commit() async def mark_job(domain: str, status: str, error: str = None): async with aiosqlite.connect(SQLITE_PATH) as db: if status == "running": await db.execute( "UPDATE job_queue SET status=?, started_at=datetime('now') WHERE domain=?", (status, domain), ) elif status in ("done", "failed"): await db.execute( "UPDATE job_queue SET status=?, completed_at=datetime('now'), error=? WHERE domain=?", (status, error, domain), ) await db.commit() async def worker_loop(): global _paused sem = asyncio.Semaphore(CONCURRENCY_LIMIT) async def process(domain: str): async with sem: await mark_job(domain, "running") try: data = await enrich_domain(domain) await save_enriched(data) await mark_job(domain, "done") except Exception as e: await mark_job(domain, "failed", str(e)[:500]) while True: if _paused: await asyncio.sleep(1) continue async with aiosqlite.connect(SQLITE_PATH) as db: async with db.execute( "SELECT domain FROM job_queue WHERE status='pending' LIMIT 100" ) as cur: rows = await cur.fetchall() if not rows: await asyncio.sleep(2) continue tasks = [asyncio.create_task(process(r[0])) for r in rows] await asyncio.gather(*tasks, return_exceptions=True) def start_worker(): global _worker_task if _worker_task is None or _worker_task.done(): _worker_task = asyncio.create_task(worker_loop()) _paused = False def pause_worker(): global _paused _paused = True def resume_worker(): global _paused _paused = False start_worker() def is_running() -> bool: return _worker_task is not None and not _worker_task.done() and not _paused