import { runTest } from './runner.js'; import { parseResults } from './parser.js'; import { updateJobStatus, updateJobMetrics } from './db.js'; // SSE subscribers: jobId -> Set of send functions const subscribers = new Map(); // Job queue const queue = []; let running = false; export function subscribe(jobId, sendFn) { if (!subscribers.has(jobId)) subscribers.set(jobId, new Set()); subscribers.get(jobId).add(sendFn); return () => { const set = subscribers.get(jobId); if (set) { set.delete(sendFn); if (set.size === 0) subscribers.delete(jobId); } }; } function emit(jobId, event, data) { const set = subscribers.get(jobId); if (!set) return; const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; for (const send of set) { try { send(payload); } catch {} } } export function enqueue(job) { queue.push(job); emit(job.id, 'status', { message: 'Queued, waiting for runner...', phase: 'queued' }); processQueue(); } async function processQueue() { if (running || queue.length === 0) return; running = true; const job = queue.shift(); try { updateJobStatus(job.id, 'running'); emit(job.id, 'status', { message: 'Test starting...', phase: 'running' }); const outputFolder = await runTest(job, (line) => { emit(job.id, 'log', { line }); }); emit(job.id, 'status', { message: 'Parsing results...', phase: 'parsing' }); updateJobStatus(job.id, 'running', { report_folder: outputFolder }); let metrics = {}; try { metrics = await parseResults(outputFolder, job.url); updateJobMetrics(job.id, metrics); } catch (err) { emit(job.id, 'log', { line: `[parser warning] ${err.message}` }); } updateJobStatus(job.id, 'done', { report_folder: outputFolder }); emit(job.id, 'status', { message: 'Done!', phase: 'done' }); emit(job.id, 'done', { jobId: job.id }); } catch (err) { updateJobStatus(job.id, 'error', { error_msg: err.message }); emit(job.id, 'error', { message: err.message }); } finally { running = false; // Small delay then process next setTimeout(processQueue, 500); } } export function getQueueLength() { return queue.length; } export function isRunning() { return running; }