Files
speedboard/queue.js

83 lines
2.2 KiB
JavaScript
Raw Normal View History

import { runTest } from './runner.js';
import { parseResults } from './parser.js';
import { updateJobStatus, updateJobMetrics } from './db.js';
// SSE subscribers: jobId -> Set of send functions
const subscribers = new Map();
// Job queue
const queue = [];
let running = false;
export function subscribe(jobId, sendFn) {
if (!subscribers.has(jobId)) subscribers.set(jobId, new Set());
subscribers.get(jobId).add(sendFn);
return () => {
const set = subscribers.get(jobId);
if (set) {
set.delete(sendFn);
if (set.size === 0) subscribers.delete(jobId);
}
};
}
function emit(jobId, event, data) {
const set = subscribers.get(jobId);
if (!set) return;
const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
for (const send of set) {
try { send(payload); } catch {}
}
}
export function enqueue(job) {
queue.push(job);
emit(job.id, 'status', { message: 'Queued, waiting for runner...', phase: 'queued' });
processQueue();
}
async function processQueue() {
if (running || queue.length === 0) return;
running = true;
const job = queue.shift();
try {
updateJobStatus(job.id, 'running');
emit(job.id, 'status', { message: 'Test starting...', phase: 'running' });
const outputFolder = await runTest(job, (line) => {
emit(job.id, 'log', { line });
});
emit(job.id, 'status', { message: 'Parsing results...', phase: 'parsing' });
updateJobStatus(job.id, 'running', { report_folder: outputFolder });
let metrics = {};
try {
metrics = await parseResults(outputFolder, job.url);
updateJobMetrics(job.id, metrics);
} catch (err) {
emit(job.id, 'log', { line: `[parser warning] ${err.message}` });
}
updateJobStatus(job.id, 'done', { report_folder: outputFolder });
emit(job.id, 'status', { message: 'Done!', phase: 'done' });
emit(job.id, 'done', { jobId: job.id });
} catch (err) {
updateJobStatus(job.id, 'error', { error_msg: err.message });
emit(job.id, 'error', { message: err.message });
} finally {
running = false;
// Small delay then process next
setTimeout(processQueue, 500);
}
}
export function getQueueLength() {
return queue.length;
}
export function isRunning() {
return running;
}