ability to start jobs individually

This commit is contained in:
orangecoding
2025-12-18 19:16:28 +01:00
parent 05f1bc61c9
commit 5dc976c7e3
17 changed files with 727 additions and 138 deletions

View File

@@ -40,7 +40,7 @@ import logger from './services/logger.js';
* 7) Filter out entries similar to already seen ones
* 8) Dispatch notifications
*/
class FredyPipeline {
class FredyPipelineExecutioner {
/**
* Create a new runtime instance for a single provider/job execution.
*
@@ -218,4 +218,4 @@ class FredyPipeline {
}
}
export default FredyPipeline;
export default FredyPipelineExecutioner;

View File

@@ -9,6 +9,8 @@ import * as userStorage from '../../services/storage/userStorage.js';
import { isAdmin } from '../security.js';
import logger from '../../services/logger.js';
import { bus } from '../../services/events/event-bus.js';
import { isRunning as isJobRunning } from '../../services/jobs/run-state.js';
import { addClient as addSseClient, removeClient } from '../../services/sse/sse-broker.js';
const service = restana();
const jobRouter = service.newRouter();
@@ -37,6 +39,7 @@ jobRouter.get('/', async (req, res) => {
.map((job) => {
return {
...job,
running: isJobRunning(job.id),
isOnlyShared:
!isUserAdmin &&
job.userId !== req.session.currentUser &&
@@ -47,9 +50,73 @@ jobRouter.get('/', async (req, res) => {
res.send();
});
// Server-Sent Events for job status updates
jobRouter.get('/events', async (req, res) => {
const userId = req.session.currentUser;
if (userId == null) {
res.send({ message: 'Unauthorized' }, 401);
return;
}
// SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
try {
// Initial comment to establish stream
res.write(': connected\n\n');
addSseClient(userId, res);
// Cleanup on close/aborted
const onClose = () => removeClient(userId, res);
// restana exposes original req/res; use both close and finish
req.on('close', onClose);
req.on('aborted', onClose);
res.on('close', onClose);
} catch (e) {
logger.error('Error establishing SSE connection', e);
try {
res.end();
} catch {
//noop
}
}
});
jobRouter.post('/startAll', async (req, res) => {
bus.emit('jobs:runAll');
res.send();
try {
const userId = req.session.currentUser;
// Emit only the userId; handler will decide based on admin/ownership
bus.emit('jobs:runAll', { userId });
res.send({ message: 'Run all accepted' }, 202);
} catch (err) {
logger.error('Failed to trigger startAll', err);
res.send({ message: 'Unexpected error' }, 500);
}
});
// Trigger a single job run
jobRouter.post('/:jobId/run', async (req, res) => {
const { jobId } = req.params;
try {
const job = jobStorage.getJob(jobId);
if (!job) {
res.send({ message: 'Job not found' }, 404);
return;
}
if (!doesJobBelongsToUser(job, req)) {
res.send({ message: 'You are trying to run a job that is not associated to your user' }, 403);
return;
}
if (isJobRunning(jobId)) {
res.send({ message: 'Job is already running' }, 409);
return;
}
// fire and forget; actual execution handled by index.js listener
bus.emit('jobs:runOne', { jobId });
res.send({ message: 'Job run accepted' }, 202);
} catch (error) {
logger.error(error);
res.send({ message: 'Unexpected error triggering job' }, 500);
}
});
jobRouter.post('/', async (req, res) => {

View File

@@ -0,0 +1,187 @@
/*
* Copyright (c) 2025 by Christian Kellner.
* Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause
*/
import logger from '../logger.js';
import { bus } from '../events/event-bus.js';
import * as jobStorage from '../storage/jobStorage.js';
import * as userStorage from '../storage/userStorage.js';
import { getUser } from '../storage/userStorage.js';
import { duringWorkingHoursOrNotSet } from '../../utils.js';
import FredyPipelineExecutioner from '../../FredyPipelineExecutioner.js';
import * as similarityCache from '../similarity-check/similarityCache.js';
import { isRunning, markFinished, markRunning } from './run-state.js';
import { sendToUsers } from '../sse/sse-broker.js';
/**
* Initializes the job execution service.
* - Registers event-bus listeners for `jobs:runAll`, `jobs:runOne`, and `jobs:status`.
* - Starts the periodic scheduler (if `intervalMs` > 0) and performs an initial run respecting working hours.
* - Forwards job status updates to affected users via Server-Sent Events (SSE).
*
* This function is intentionally side-effectful and exposes no external API.
*
* @param {Object} deps - Dependencies required to initialize the service.
* @param {Array<Object>} deps.providers - Loaded provider modules. Each module must expose `metaInformation.id`, `config`, and `init(config, blacklist)`.
* @param {Object} deps.settings - Global settings object (read/write). Must include `demoMode`, `interval`, and working-hours attributes used by `duringWorkingHoursOrNotSet`.
* @param {number} deps.intervalMs - Scheduler interval in milliseconds. If not finite or <= 0, the scheduler is not started.
* @returns {void}
*/
export function initJobExecutionService({ providers, settings, intervalMs }) {
// Forward job status via SSE to relevant recipients
bus.on('jobs:status', ({ jobId, running }) => {
try {
const recipients = resolveRecipients(jobId);
if (recipients.length > 0) {
sendToUsers(recipients, 'jobStatus', { jobId, running });
}
} catch (err) {
logger.warn('Failed to forward job status', jobId, err);
}
});
// Listen for "run all" requests (admin = all, user = own)
bus.on('jobs:runAll', (payload) => {
const userId = payload?.userId ?? null;
const user = userId ? getUser(userId) : null;
const isAdmin = !!user?.isAdmin;
if (isAdmin) {
logger.debug('Running all jobs manually (admin request)');
} else if (userId) {
logger.debug(`Running all jobs manually for user ${userId}`);
} else {
logger.debug('Running all jobs manually (no user provided)');
}
runAll(false, { userId, isAdmin });
});
// Listen for single job run requests
bus.on('jobs:runOne', ({ jobId }) => {
logger.debug(`Running single job manually: ${jobId}`);
// fire and forget, do not block the bus
runSingle(jobId);
});
// Start scheduler and initial run
if (Number.isFinite(intervalMs) && intervalMs > 0) {
setInterval(() => runAll(true), intervalMs);
}
// start once at startup, respecting working hours
runAll(true);
/**
* Resolve all recipients who should receive SSE updates for a job.
* Includes job owner, users with whom the job is shared, and all admins.
*
* @param {string} jobId
* @returns {string[]} unique userIds
*/
function resolveRecipients(jobId) {
const job = jobStorage.getJob(jobId);
if (!job) return [];
const admins = (userStorage.getUsers && userStorage.getUsers(false)) || [];
const adminIds = admins.filter((u) => u.isAdmin).map((u) => u.id);
const shared = Array.isArray(job.shared_with_user) ? job.shared_with_user : [];
const recipients = [job.userId, ...shared, ...adminIds].filter(Boolean);
return Array.from(new Set(recipients));
}
/**
* Execute all enabled jobs, optionally filtering by context (admin/owner) and respecting working hours.
*
* @param {boolean} [respectWorkingHours=true] - If true, skip execution when outside configured working hours.
* @param {{userId?: string, isAdmin?: boolean}} [context] - Who requested the run; determines job filtering.
* @returns {void}
*/
function runAll(respectWorkingHours = true, context = undefined) {
if (settings.demoMode) return;
const now = Date.now();
const withinHours = duringWorkingHoursOrNotSet(settings, now);
if (respectWorkingHours && !withinHours) {
logger.debug('Working hours set. Skipping as outside of working hours.');
return;
}
settings.lastRun = now;
jobStorage
.getJobs()
.filter((job) => job.enabled)
.filter((job) => {
if (!context) return true; // startup/cron → all
if (context.isAdmin) return true; // admin → all
return context.userId ? job.userId === context.userId : false; // user → own
})
.forEach((job) => executeJob(job));
}
/**
* Execute a single job by id.
* Manual runs are allowed even if the job is disabled, but never duplicated when already running.
*
* @param {string} jobId
* @returns {Promise<void>}
*/
async function runSingle(jobId) {
if (settings.demoMode) return;
const job = jobStorage.getJob(jobId);
if (!job) return;
// allow manual run even if disabled; keep guard to avoid duplicates
await executeJob(job);
}
/**
* Executes one job across all of its configured providers.
* Emits SSE start/finish events via the bus and ensures the run-state guard is always cleared.
* Provider errors are surfaced via logging but do not abort other providers.
*
* @param {Object} job
* @param {string} job.id
* @param {Array<{id:string}>} job.provider
* @param {Array<string>} [job.blacklist]
* @param {*} job.notificationAdapter
* @returns {Promise<void>}
*/
async function executeJob(job) {
if (isRunning(job.id)) {
logger.debug(`Job ${job.id} is already running. Skipping.`);
return;
}
const acquired = markRunning(job.id);
if (!acquired) return;
// notify listeners (SSE) that the job started
try {
bus.emit('jobs:status', { jobId: job.id, running: true });
} catch (err) {
logger.warn('Failed to emit start status for job', job.id, err);
}
try {
const jobProviders = job.provider.filter(
(p) => providers.find((loaded) => loaded.metaInformation.id === p.id) != null,
);
const executions = jobProviders.map(async (prov) => {
const matchedProvider = providers.find((loaded) => loaded.metaInformation.id === prov.id);
matchedProvider.init(prov, job.blacklist);
await new FredyPipelineExecutioner(
matchedProvider.config,
job.notificationAdapter,
prov.id,
job.id,
similarityCache,
).execute();
});
const results = await Promise.allSettled(executions);
for (const r of results) {
if (r.status === 'rejected') {
logger.error(r.reason);
}
}
} finally {
markFinished(job.id);
try {
bus.emit('jobs:status', { jobId: job.id, running: false });
} catch (err) {
logger.warn('Failed to emit finish status for job', job.id, err);
}
}
}
}

View File

@@ -0,0 +1,50 @@
/*
* Copyright (c) 2025 by Christian Kellner.
* Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause
*/
/**
* Simple in-memory running state registry for jobs.
* Prevents concurrent execution of the same job within a single process.
* This registry is reset on process restart.
* @type {Set<string>}
*/
const running = new Set();
/**
* Check if a job is currently marked as running.
* @param {string} jobId
* @returns {boolean}
*/
export function isRunning(jobId) {
return running.has(jobId);
}
/**
* Try to mark a job as running.
* If it was already running, returns false and does not modify the set.
* @param {string} jobId
* @returns {boolean} true if the job was successfully marked as running
*/
export function markRunning(jobId) {
if (running.has(jobId)) return false;
running.add(jobId);
return true;
}
/**
* Mark a job as finished (remove from the running registry).
* @param {string} jobId
* @returns {void}
*/
export function markFinished(jobId) {
running.delete(jobId);
}
/**
* Retrieve all currently running job IDs.
* @returns {string[]}
*/
export function getRunningJobIds() {
return Array.from(running);
}

View File

@@ -0,0 +1,108 @@
/*
* Copyright (c) 2025 by Christian Kellner.
* Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause
*/
/**
* In-memory SSE client registry.
* Maps a userId to a Set of Node.js ServerResponse objects representing open streams.
* @type {Map<string, Set<import('http').ServerResponse>>}
*/
const clients = new Map(); // Map<userId, Set<ServerResponse>>
/**
* Write a single SSE event frame to a response.
*
* @param {import('http').ServerResponse} res - The open SSE HTTP response.
* @param {string} [event] - Optional event name (sent as `event:`). If omitted, a generic message is sent.
* @param {any} [data] - Optional payload. Objects are JSON.stringified.
* @returns {void}
*/
function writeEvent(res, event, data) {
try {
if (event) {
res.write(`event: ${event}\n`);
}
if (data !== undefined) {
const payload = typeof data === 'string' ? data : JSON.stringify(data);
res.write(`data: ${payload}\n`);
}
res.write('\n');
} catch {
// ignore write errors here; cleanup happens on close
}
}
/**
* Register a new SSE client for the given user.
*
* @param {string} userId
* @param {import('http').ServerResponse} res
* @returns {void}
*/
export function addClient(userId, res) {
let set = clients.get(userId);
if (!set) {
set = new Set();
clients.set(userId, set);
}
set.add(res);
// send a hello event
writeEvent(res, 'hello', { ok: true });
}
/**
* Unregister a specific SSE client for a user. Removes the user entry when empty.
*
* @param {string} userId
* @param {import('http').ServerResponse} res
* @returns {void}
*/
export function removeClient(userId, res) {
const set = clients.get(userId);
if (!set) return;
set.delete(res);
if (set.size === 0) clients.delete(userId);
}
/**
* Send an SSE event to all open connections of a user.
*
* @param {string} userId
* @param {string} event
* @param {any} data
* @returns {void}
*/
export function sendToUser(userId, event, data) {
const set = clients.get(userId);
if (!set) return;
for (const res of set) {
writeEvent(res, event, data);
}
}
/**
* Broadcast an SSE event to multiple users (unique by id).
*
* @param {string[]} userIds
* @param {string} event
* @param {any} data
* @returns {void}
*/
export function sendToUsers(userIds, event, data) {
const unique = Array.from(new Set(userIds));
unique.forEach((id) => sendToUser(id, event, data));
}
// Heartbeat to keep connections alive on proxies (every 25s)
setInterval(() => {
for (const set of clients.values()) {
for (const res of set) {
try {
res.write(`: ping ${Date.now()}\n\n`);
} catch {
// ignore
}
}
}
}, 25000);

View File

@@ -85,6 +85,7 @@ export const getJob = (jobId) => {
j.name,
j.blacklist,
j.provider,
j.shared_with_user,
j.notification_adapter AS notificationAdapter,
(SELECT COUNT(1) FROM listings l WHERE l.job_id = j.id) AS numberOfFoundListings
FROM jobs j
@@ -98,6 +99,7 @@ export const getJob = (jobId) => {
enabled: !!row.enabled,
blacklist: fromJson(row.blacklist, []),
provider: fromJson(row.provider, []),
shared_with_user: fromJson(row.shared_with_user, []),
notificationAdapter: fromJson(row.notificationAdapter, []),
};
};