From 5dc976c7e3bf074ad0c5a28644df69350e3e29dc Mon Sep 17 00:00:00 2001 From: orangecoding Date: Thu, 18 Dec 2025 19:16:28 +0100 Subject: [PATCH] ability to start jobs individually --- Dockerfile | 3 +- README.md | 2 +- docker-compose.yml | 2 + index.js | 52 +---- ...ipeline.js => FredyPipelineExecutioner.js} | 4 +- lib/api/routes/jobRouter.js | 71 ++++++- lib/services/jobs/jobExecutionService.js | 187 ++++++++++++++++++ lib/services/jobs/run-state.js | 50 +++++ lib/services/sse/sse-broker.js | 108 ++++++++++ lib/services/storage/jobStorage.js | 2 + package.json | 12 +- .../services/jobs/jobExecutionService.test.js | 124 ++++++++++++ test/utils.js | 2 +- ui/src/components/table/JobTable.jsx | 19 +- ui/src/services/state/store.js | 8 + ui/src/views/jobs/Jobs.jsx | 69 ++++++- yarn.lock | 150 +++++++------- 17 files changed, 727 insertions(+), 138 deletions(-) rename lib/{FredyPipeline.js => FredyPipelineExecutioner.js} (99%) create mode 100644 lib/services/jobs/jobExecutionService.js create mode 100644 lib/services/jobs/run-state.js create mode 100644 lib/services/sse/sse-broker.js create mode 100644 test/services/jobs/jobExecutionService.test.js diff --git a/Dockerfile b/Dockerfile index 198e9e5..51a6fa5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -34,7 +34,8 @@ WORKDIR /fredy # Using Alpine's chromium package which is much smaller RUN apk add --no-cache chromium curl -ENV PUPPETEER_SKIP_CHROMIUM_DOWNLOAD=true \ +ENV NODE_ENV=production \ + PUPPETEER_SKIP_CHROMIUM_DOWNLOAD=true \ PUPPETEER_EXECUTABLE_PATH=/usr/bin/chromium-browser # Install build dependencies for native modules, then remove them after yarn install diff --git a/README.md b/README.md index 3442ee7..8a6d86b 100755 --- a/README.md +++ b/README.md @@ -206,7 +206,7 @@ flowchart TD F2["Adapter 2"] end - A1 --> B["FredyPipeline"] + A1 --> B["FredyPipelineExecutioner"] A2 --> B A3 --> B B --> C1 & C2 & C3 diff --git a/docker-compose.yml b/docker-compose.yml index 91499e4..5961f3d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,6 +5,8 @@ services: context: . dockerfile: Dockerfile image: ghcr.io/orangecoding/fredy + environment: + - NODE_ENV=production volumes: - ./conf:/conf - ./db:/db diff --git a/index.js b/index.js index 75a763b..9d06bf6 100755 --- a/index.js +++ b/index.js @@ -6,18 +6,15 @@ import fs from 'fs'; import { checkIfConfigIsAccessible, getProviders, refreshConfig } from './lib/utils.js'; import * as similarityCache from './lib/services/similarity-check/similarityCache.js'; -import * as jobStorage from './lib/services/storage/jobStorage.js'; -import FredyPipeline from './lib/FredyPipeline.js'; -import { duringWorkingHoursOrNotSet } from './lib/utils.js'; import { runMigrations } from './lib/services/storage/migrations/migrate.js'; import { ensureDemoUserExists, ensureAdminUserExists } from './lib/services/storage/userStorage.js'; import { cleanupDemoAtMidnight } from './lib/services/crons/demoCleanup-cron.js'; import { initTrackerCron } from './lib/services/crons/tracker-cron.js'; import logger from './lib/services/logger.js'; -import { bus } from './lib/services/events/event-bus.js'; import { initActiveCheckerCron } from './lib/services/crons/listing-alive-cron.js'; import { getSettings } from './lib/services/storage/settingsStorage.js'; import SqliteConnection, { computeDbPath } from './lib/services/storage/SqliteConnection.js'; +import { initJobExecutionService } from './lib/services/jobs/jobExecutionService.js'; //in the config, we store the path of the sqlite file, thus we must check if it is available const isConfigAccessible = await checkIfConfigIsAccessible(); @@ -36,7 +33,7 @@ await runMigrations(); const settings = await getSettings(); -// Ensure sqlite directory exists before loading anything else (based on config.sqlitepath) +// Ensure the sqlite directory exists before loading anything else (based on config.sqlitepath) const { dir: sqliteDir } = await computeDbPath(); if (!fs.existsSync(sqliteDir)) { fs.mkdirSync(sqliteDir, { recursive: true }); @@ -59,52 +56,13 @@ if (settings.demoMode) { cleanupDemoAtMidnight(); } -logger.info(`Started Fredy successfully. Ui can be accessed via http://localhost:${settings.port}`); - ensureAdminUserExists(); ensureDemoUserExists(); await initTrackerCron(); //do not wait for this to finish, let it run in the background initActiveCheckerCron(); -bus.on('jobs:runAll', () => { - logger.debug('Running Fredy Job manually'); - execute(); -}); +logger.info(`Started Fredy successfully. Ui can be accessed via http://localhost:${settings.port}`); -const execute = () => { - const isDuringWorkingHoursOrNotSet = duringWorkingHoursOrNotSet(settings, Date.now()); - if (!settings.demoMode) { - if (isDuringWorkingHoursOrNotSet) { - settings.lastRun = Date.now(); - jobStorage - .getJobs() - .filter((job) => job.enabled) - .forEach((job) => { - job.provider - .filter((p) => providers.find((loaded) => loaded.metaInformation.id === p.id) != null) - .forEach(async (prov) => { - try { - const matchedProvider = providers.find((loaded) => loaded.metaInformation.id === prov.id); - matchedProvider.init(prov, job.blacklist); - await new FredyPipeline( - matchedProvider.config, - job.notificationAdapter, - prov.id, - job.id, - similarityCache, - ).execute(); - } catch (error) { - logger.error(error); - } - }); - }); - } else { - logger.debug('Working hours set. Skipping as outside of working hours.'); - } - } -}; - -setInterval(execute, INTERVAL); -//start once at startup -execute(); +// Initialize the lean Job Execution Service (schedules and bus listeners) +initJobExecutionService({ providers, settings, intervalMs: INTERVAL }); diff --git a/lib/FredyPipeline.js b/lib/FredyPipelineExecutioner.js similarity index 99% rename from lib/FredyPipeline.js rename to lib/FredyPipelineExecutioner.js index c450eb2..357288f 100755 --- a/lib/FredyPipeline.js +++ b/lib/FredyPipelineExecutioner.js @@ -40,7 +40,7 @@ import logger from './services/logger.js'; * 7) Filter out entries similar to already seen ones * 8) Dispatch notifications */ -class FredyPipeline { +class FredyPipelineExecutioner { /** * Create a new runtime instance for a single provider/job execution. * @@ -218,4 +218,4 @@ class FredyPipeline { } } -export default FredyPipeline; +export default FredyPipelineExecutioner; diff --git a/lib/api/routes/jobRouter.js b/lib/api/routes/jobRouter.js index 3dc6beb..9f6e244 100644 --- a/lib/api/routes/jobRouter.js +++ b/lib/api/routes/jobRouter.js @@ -9,6 +9,8 @@ import * as userStorage from '../../services/storage/userStorage.js'; import { isAdmin } from '../security.js'; import logger from '../../services/logger.js'; import { bus } from '../../services/events/event-bus.js'; +import { isRunning as isJobRunning } from '../../services/jobs/run-state.js'; +import { addClient as addSseClient, removeClient } from '../../services/sse/sse-broker.js'; const service = restana(); const jobRouter = service.newRouter(); @@ -37,6 +39,7 @@ jobRouter.get('/', async (req, res) => { .map((job) => { return { ...job, + running: isJobRunning(job.id), isOnlyShared: !isUserAdmin && job.userId !== req.session.currentUser && @@ -47,9 +50,73 @@ jobRouter.get('/', async (req, res) => { res.send(); }); +// Server-Sent Events for job status updates +jobRouter.get('/events', async (req, res) => { + const userId = req.session.currentUser; + if (userId == null) { + res.send({ message: 'Unauthorized' }, 401); + return; + } + // SSE headers + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + try { + // Initial comment to establish stream + res.write(': connected\n\n'); + addSseClient(userId, res); + // Cleanup on close/aborted + const onClose = () => removeClient(userId, res); + // restana exposes original req/res; use both close and finish + req.on('close', onClose); + req.on('aborted', onClose); + res.on('close', onClose); + } catch (e) { + logger.error('Error establishing SSE connection', e); + try { + res.end(); + } catch { + //noop + } + } +}); + jobRouter.post('/startAll', async (req, res) => { - bus.emit('jobs:runAll'); - res.send(); + try { + const userId = req.session.currentUser; + // Emit only the userId; handler will decide based on admin/ownership + bus.emit('jobs:runAll', { userId }); + res.send({ message: 'Run all accepted' }, 202); + } catch (err) { + logger.error('Failed to trigger startAll', err); + res.send({ message: 'Unexpected error' }, 500); + } +}); + +// Trigger a single job run +jobRouter.post('/:jobId/run', async (req, res) => { + const { jobId } = req.params; + try { + const job = jobStorage.getJob(jobId); + if (!job) { + res.send({ message: 'Job not found' }, 404); + return; + } + if (!doesJobBelongsToUser(job, req)) { + res.send({ message: 'You are trying to run a job that is not associated to your user' }, 403); + return; + } + if (isJobRunning(jobId)) { + res.send({ message: 'Job is already running' }, 409); + return; + } + // fire and forget; actual execution handled by index.js listener + bus.emit('jobs:runOne', { jobId }); + res.send({ message: 'Job run accepted' }, 202); + } catch (error) { + logger.error(error); + res.send({ message: 'Unexpected error triggering job' }, 500); + } }); jobRouter.post('/', async (req, res) => { diff --git a/lib/services/jobs/jobExecutionService.js b/lib/services/jobs/jobExecutionService.js new file mode 100644 index 0000000..96190b8 --- /dev/null +++ b/lib/services/jobs/jobExecutionService.js @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2025 by Christian Kellner. + * Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause + */ + +import logger from '../logger.js'; +import { bus } from '../events/event-bus.js'; +import * as jobStorage from '../storage/jobStorage.js'; +import * as userStorage from '../storage/userStorage.js'; +import { getUser } from '../storage/userStorage.js'; +import { duringWorkingHoursOrNotSet } from '../../utils.js'; +import FredyPipelineExecutioner from '../../FredyPipelineExecutioner.js'; +import * as similarityCache from '../similarity-check/similarityCache.js'; +import { isRunning, markFinished, markRunning } from './run-state.js'; +import { sendToUsers } from '../sse/sse-broker.js'; + +/** + * Initializes the job execution service. + * - Registers event-bus listeners for `jobs:runAll`, `jobs:runOne`, and `jobs:status`. + * - Starts the periodic scheduler (if `intervalMs` > 0) and performs an initial run respecting working hours. + * - Forwards job status updates to affected users via Server-Sent Events (SSE). + * + * This function is intentionally side-effectful and exposes no external API. + * + * @param {Object} deps - Dependencies required to initialize the service. + * @param {Array} deps.providers - Loaded provider modules. Each module must expose `metaInformation.id`, `config`, and `init(config, blacklist)`. + * @param {Object} deps.settings - Global settings object (read/write). Must include `demoMode`, `interval`, and working-hours attributes used by `duringWorkingHoursOrNotSet`. + * @param {number} deps.intervalMs - Scheduler interval in milliseconds. If not finite or <= 0, the scheduler is not started. + * @returns {void} + */ +export function initJobExecutionService({ providers, settings, intervalMs }) { + // Forward job status via SSE to relevant recipients + bus.on('jobs:status', ({ jobId, running }) => { + try { + const recipients = resolveRecipients(jobId); + if (recipients.length > 0) { + sendToUsers(recipients, 'jobStatus', { jobId, running }); + } + } catch (err) { + logger.warn('Failed to forward job status', jobId, err); + } + }); + + // Listen for "run all" requests (admin = all, user = own) + bus.on('jobs:runAll', (payload) => { + const userId = payload?.userId ?? null; + const user = userId ? getUser(userId) : null; + const isAdmin = !!user?.isAdmin; + if (isAdmin) { + logger.debug('Running all jobs manually (admin request)'); + } else if (userId) { + logger.debug(`Running all jobs manually for user ${userId}`); + } else { + logger.debug('Running all jobs manually (no user provided)'); + } + runAll(false, { userId, isAdmin }); + }); + + // Listen for single job run requests + bus.on('jobs:runOne', ({ jobId }) => { + logger.debug(`Running single job manually: ${jobId}`); + // fire and forget, do not block the bus + runSingle(jobId); + }); + + // Start scheduler and initial run + if (Number.isFinite(intervalMs) && intervalMs > 0) { + setInterval(() => runAll(true), intervalMs); + } + // start once at startup, respecting working hours + runAll(true); + + /** + * Resolve all recipients who should receive SSE updates for a job. + * Includes job owner, users with whom the job is shared, and all admins. + * + * @param {string} jobId + * @returns {string[]} unique userIds + */ + function resolveRecipients(jobId) { + const job = jobStorage.getJob(jobId); + if (!job) return []; + const admins = (userStorage.getUsers && userStorage.getUsers(false)) || []; + const adminIds = admins.filter((u) => u.isAdmin).map((u) => u.id); + const shared = Array.isArray(job.shared_with_user) ? job.shared_with_user : []; + const recipients = [job.userId, ...shared, ...adminIds].filter(Boolean); + return Array.from(new Set(recipients)); + } + + /** + * Execute all enabled jobs, optionally filtering by context (admin/owner) and respecting working hours. + * + * @param {boolean} [respectWorkingHours=true] - If true, skip execution when outside configured working hours. + * @param {{userId?: string, isAdmin?: boolean}} [context] - Who requested the run; determines job filtering. + * @returns {void} + */ + function runAll(respectWorkingHours = true, context = undefined) { + if (settings.demoMode) return; + const now = Date.now(); + const withinHours = duringWorkingHoursOrNotSet(settings, now); + if (respectWorkingHours && !withinHours) { + logger.debug('Working hours set. Skipping as outside of working hours.'); + return; + } + settings.lastRun = now; + jobStorage + .getJobs() + .filter((job) => job.enabled) + .filter((job) => { + if (!context) return true; // startup/cron → all + if (context.isAdmin) return true; // admin → all + return context.userId ? job.userId === context.userId : false; // user → own + }) + .forEach((job) => executeJob(job)); + } + + /** + * Execute a single job by id. + * Manual runs are allowed even if the job is disabled, but never duplicated when already running. + * + * @param {string} jobId + * @returns {Promise} + */ + async function runSingle(jobId) { + if (settings.demoMode) return; + const job = jobStorage.getJob(jobId); + if (!job) return; + // allow manual run even if disabled; keep guard to avoid duplicates + await executeJob(job); + } + + /** + * Executes one job across all of its configured providers. + * Emits SSE start/finish events via the bus and ensures the run-state guard is always cleared. + * Provider errors are surfaced via logging but do not abort other providers. + * + * @param {Object} job + * @param {string} job.id + * @param {Array<{id:string}>} job.provider + * @param {Array} [job.blacklist] + * @param {*} job.notificationAdapter + * @returns {Promise} + */ + async function executeJob(job) { + if (isRunning(job.id)) { + logger.debug(`Job ${job.id} is already running. Skipping.`); + return; + } + const acquired = markRunning(job.id); + if (!acquired) return; + // notify listeners (SSE) that the job started + try { + bus.emit('jobs:status', { jobId: job.id, running: true }); + } catch (err) { + logger.warn('Failed to emit start status for job', job.id, err); + } + try { + const jobProviders = job.provider.filter( + (p) => providers.find((loaded) => loaded.metaInformation.id === p.id) != null, + ); + const executions = jobProviders.map(async (prov) => { + const matchedProvider = providers.find((loaded) => loaded.metaInformation.id === prov.id); + matchedProvider.init(prov, job.blacklist); + await new FredyPipelineExecutioner( + matchedProvider.config, + job.notificationAdapter, + prov.id, + job.id, + similarityCache, + ).execute(); + }); + const results = await Promise.allSettled(executions); + for (const r of results) { + if (r.status === 'rejected') { + logger.error(r.reason); + } + } + } finally { + markFinished(job.id); + try { + bus.emit('jobs:status', { jobId: job.id, running: false }); + } catch (err) { + logger.warn('Failed to emit finish status for job', job.id, err); + } + } + } +} diff --git a/lib/services/jobs/run-state.js b/lib/services/jobs/run-state.js new file mode 100644 index 0000000..6cffac1 --- /dev/null +++ b/lib/services/jobs/run-state.js @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2025 by Christian Kellner. + * Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause + */ + +/** + * Simple in-memory running state registry for jobs. + * Prevents concurrent execution of the same job within a single process. + * This registry is reset on process restart. + * @type {Set} + */ +const running = new Set(); + +/** + * Check if a job is currently marked as running. + * @param {string} jobId + * @returns {boolean} + */ +export function isRunning(jobId) { + return running.has(jobId); +} + +/** + * Try to mark a job as running. + * If it was already running, returns false and does not modify the set. + * @param {string} jobId + * @returns {boolean} true if the job was successfully marked as running + */ +export function markRunning(jobId) { + if (running.has(jobId)) return false; + running.add(jobId); + return true; +} + +/** + * Mark a job as finished (remove from the running registry). + * @param {string} jobId + * @returns {void} + */ +export function markFinished(jobId) { + running.delete(jobId); +} + +/** + * Retrieve all currently running job IDs. + * @returns {string[]} + */ +export function getRunningJobIds() { + return Array.from(running); +} diff --git a/lib/services/sse/sse-broker.js b/lib/services/sse/sse-broker.js new file mode 100644 index 0000000..ae80b50 --- /dev/null +++ b/lib/services/sse/sse-broker.js @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2025 by Christian Kellner. + * Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause + */ + +/** + * In-memory SSE client registry. + * Maps a userId to a Set of Node.js ServerResponse objects representing open streams. + * @type {Map>} + */ +const clients = new Map(); // Map> + +/** + * Write a single SSE event frame to a response. + * + * @param {import('http').ServerResponse} res - The open SSE HTTP response. + * @param {string} [event] - Optional event name (sent as `event:`). If omitted, a generic message is sent. + * @param {any} [data] - Optional payload. Objects are JSON.stringified. + * @returns {void} + */ +function writeEvent(res, event, data) { + try { + if (event) { + res.write(`event: ${event}\n`); + } + if (data !== undefined) { + const payload = typeof data === 'string' ? data : JSON.stringify(data); + res.write(`data: ${payload}\n`); + } + res.write('\n'); + } catch { + // ignore write errors here; cleanup happens on close + } +} + +/** + * Register a new SSE client for the given user. + * + * @param {string} userId + * @param {import('http').ServerResponse} res + * @returns {void} + */ +export function addClient(userId, res) { + let set = clients.get(userId); + if (!set) { + set = new Set(); + clients.set(userId, set); + } + set.add(res); + // send a hello event + writeEvent(res, 'hello', { ok: true }); +} + +/** + * Unregister a specific SSE client for a user. Removes the user entry when empty. + * + * @param {string} userId + * @param {import('http').ServerResponse} res + * @returns {void} + */ +export function removeClient(userId, res) { + const set = clients.get(userId); + if (!set) return; + set.delete(res); + if (set.size === 0) clients.delete(userId); +} + +/** + * Send an SSE event to all open connections of a user. + * + * @param {string} userId + * @param {string} event + * @param {any} data + * @returns {void} + */ +export function sendToUser(userId, event, data) { + const set = clients.get(userId); + if (!set) return; + for (const res of set) { + writeEvent(res, event, data); + } +} + +/** + * Broadcast an SSE event to multiple users (unique by id). + * + * @param {string[]} userIds + * @param {string} event + * @param {any} data + * @returns {void} + */ +export function sendToUsers(userIds, event, data) { + const unique = Array.from(new Set(userIds)); + unique.forEach((id) => sendToUser(id, event, data)); +} + +// Heartbeat to keep connections alive on proxies (every 25s) +setInterval(() => { + for (const set of clients.values()) { + for (const res of set) { + try { + res.write(`: ping ${Date.now()}\n\n`); + } catch { + // ignore + } + } + } +}, 25000); diff --git a/lib/services/storage/jobStorage.js b/lib/services/storage/jobStorage.js index e2a662c..a593ca8 100644 --- a/lib/services/storage/jobStorage.js +++ b/lib/services/storage/jobStorage.js @@ -85,6 +85,7 @@ export const getJob = (jobId) => { j.name, j.blacklist, j.provider, + j.shared_with_user, j.notification_adapter AS notificationAdapter, (SELECT COUNT(1) FROM listings l WHERE l.job_id = j.id) AS numberOfFoundListings FROM jobs j @@ -98,6 +99,7 @@ export const getJob = (jobId) => { enabled: !!row.enabled, blacklist: fromJson(row.blacklist, []), provider: fromJson(row.provider, []), + shared_with_user: fromJson(row.shared_with_user, []), notificationAdapter: fromJson(row.notificationAdapter, []), }; }; diff --git a/package.json b/package.json index 5a9d178..7f14c6c 100755 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "fredy", - "version": "16.2.0", + "version": "16.1.0", "description": "[F]ind [R]eal [E]states [d]amn eas[y].", "scripts": { "prepare": "husky", @@ -60,8 +60,8 @@ ], "dependencies": { "adm-zip": "^0.5.16", - "@douyinfe/semi-icons": "^2.89.0", - "@douyinfe/semi-ui": "2.89.0", + "@douyinfe/semi-icons": "^2.89.1", + "@douyinfe/semi-ui": "2.89.1", "@sendgrid/mail": "8.1.6", "@vitejs/plugin-react": "5.1.2", "better-sqlite3": "^12.5.0", @@ -77,15 +77,15 @@ "node-mailjet": "6.0.11", "p-throttle": "^8.1.0", "package-up": "^5.0.0", - "puppeteer": "^24.33.0", + "puppeteer": "^24.33.1", "puppeteer-extra": "^3.3.6", "puppeteer-extra-plugin-stealth": "^2.11.2", "query-string": "9.3.1", "react": "18.3.1", "react-chartjs-2": "^5.3.1", "react-dom": "18.3.1", - "react-router": "7.10.1", - "react-router-dom": "7.10.1", + "react-router": "7.11.0", + "react-router-dom": "7.11.0", "restana": "5.1.0", "semver": "^7.7.3", "serve-static": "2.2.1", diff --git a/test/services/jobs/jobExecutionService.test.js b/test/services/jobs/jobExecutionService.test.js new file mode 100644 index 0000000..3014bdf --- /dev/null +++ b/test/services/jobs/jobExecutionService.test.js @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2025 by Christian Kellner. + * Licensed under Apache-2.0 with Commons Clause and Attribution/Naming Clause + */ + +import { expect } from 'chai'; +import esmock from 'esmock'; +import { EventEmitter } from 'node:events'; + +describe('services/jobs/jobExecutionService', () => { + /** @type {EventEmitter} */ + let bus; + let calls; + let state; + + async function initService() { + const root = (await import('node:path')).resolve('.'); + const svcPath = root + '/lib/services/jobs/jobExecutionService.js'; + const busPath = root + '/lib/services/events/event-bus.js'; + const jobStoragePath = root + '/lib/services/storage/jobStorage.js'; + const userStoragePath = root + '/lib/services/storage/userStorage.js'; + const brokerPath = root + '/lib/services/sse/sse-broker.js'; + const utilsPath = root + '/lib/utils.js'; + const loggerPath = root + '/lib/services/logger.js'; + + // esmock the service with all its collaborators + const mod = await esmock( + svcPath, + {}, + { + [busPath]: { bus }, + [jobStoragePath]: { + getJob: (id) => state.jobsById[id] || null, + getJobs: () => state.jobsList.slice(), + }, + [userStoragePath]: { + getUsers: () => state.users.slice(), + getUser: (id) => state.users.find((u) => u.id === id) || null, + }, + [brokerPath]: { + sendToUsers: (...args) => calls.sent.push(args), + }, + [utilsPath]: { + duringWorkingHoursOrNotSet: () => false, // avoid startup run + }, + [loggerPath]: { + debug: () => {}, + info: () => {}, + warn: () => {}, + error: () => {}, + }, + [root + '/lib/services/jobs/run-state.js']: { + isRunning: () => false, + markRunning: (id) => { + calls.markRunning.push(id); + return true; + }, + markFinished: () => {}, + }, + }, + ); + + // call initializer with minimal deps + mod.initJobExecutionService({ providers: [], settings: { demoMode: false }, intervalMs: 0 }); + return mod; + } + + beforeEach(() => { + bus = new EventEmitter(); + calls = { sent: [], markRunning: [] }; + state = { + jobsById: {}, + jobsList: [], + users: [], + }; + }); + + it('forwards SSE jobStatus to owner, shared users and admins', async () => { + state.jobsById['j1'] = { id: 'j1', userId: 'owner1', shared_with_user: ['u2'] }; + state.users = [ + { id: 'a1', isAdmin: true }, + { id: 'owner1', isAdmin: false }, + { id: 'u2', isAdmin: false }, + ]; + + await initService(); + + bus.emit('jobs:status', { jobId: 'j1', running: true }); + + expect(calls.sent.length).to.equal(1, 'sendToUsers should be called once'); + const [recipients, event, data] = calls.sent[0]; + expect(event).to.equal('jobStatus'); + expect(data).to.deep.equal({ jobId: 'j1', running: true }); + const got = new Set(recipients); + const expected = new Set(['owner1', 'u2', 'a1']); + expect(got).to.deep.equal(expected); + }); + + it('runs all jobs for admin; only own jobs for regular user', async () => { + state.jobsList = [ + { id: 'j1', enabled: true, userId: 'u1', provider: [] }, + { id: 'j2', enabled: true, userId: 'u2', provider: [] }, + ]; + state.users = [ + { id: 'u1', isAdmin: false }, + { id: 'u2', isAdmin: false }, + { id: 'admin', isAdmin: true }, + ]; + + await initService(); + + // Non-admin: only own jobs + bus.emit('jobs:runAll', { userId: 'u1' }); + // allow microtasks to flush + await new Promise((r) => setTimeout(r, 0)); + expect(new Set(calls.markRunning)).to.deep.equal(new Set(['j1'])); + + // Admin: all jobs + calls.markRunning = []; + bus.emit('jobs:runAll', { userId: 'admin' }); + await new Promise((r) => setTimeout(r, 0)); + expect(new Set(calls.markRunning)).to.deep.equal(new Set(['j1', 'j2'])); + }); +}); diff --git a/test/utils.js b/test/utils.js index fb82257..7fa546b 100644 --- a/test/utils.js +++ b/test/utils.js @@ -11,7 +11,7 @@ import { send } from './mocks/mockNotification.js'; export const providerConfig = JSON.parse(await readFile(new URL('./provider/testProvider.json', import.meta.url))); export const mockFredy = async () => { - return await esmock('../lib/FredyPipeline', { + return await esmock('../lib/FredyPipelineExecutioner', { '../lib/services/storage/listingsStorage.js': { ...mockStore, }, diff --git a/ui/src/components/table/JobTable.jsx b/ui/src/components/table/JobTable.jsx index baf95a9..15788f6 100644 --- a/ui/src/components/table/JobTable.jsx +++ b/ui/src/components/table/JobTable.jsx @@ -6,7 +6,7 @@ import React from 'react'; import { Button, Empty, Table, Switch, Popover } from '@douyinfe/semi-ui'; -import { IconAlertTriangle, IconDelete, IconDescend2, IconEdit } from '@douyinfe/semi-icons'; +import { IconAlertTriangle, IconDelete, IconDescend2, IconEdit, IconPlayCircle } from '@douyinfe/semi-icons'; import { IllustrationNoResult, IllustrationNoResultDark } from '@douyinfe/semi-illustrations'; import './JobTable.less'; @@ -21,7 +21,14 @@ const empty = ( const getPopoverContent = (text) =>
{text}
; -export default function JobTable({ jobs = {}, onJobRemoval, onJobStatusChanged, onJobEdit, onListingRemoval } = {}) { +export default function JobTable({ + jobs = {}, + onJobRemoval, + onJobStatusChanged, + onJobEdit, + onListingRemoval, + onJobRun, +} = {}) { return ( { return (
+ +