From e95ebb9624fabd3ae417a352a42acfbe6bc20718 Mon Sep 17 00:00:00 2001 From: orangecoding Date: Wed, 3 Jun 2026 10:19:50 +0200 Subject: [PATCH] more housekeeping --- lib/FredyPipelineExecutioner.js | 6 +- lib/api/routes/loginRoute.js | 3 + lib/notification/adapter/discord_webhook.js | 29 +-- lib/notification/adapter/telegram.js | 237 ++++++++++-------- lib/notification/notify.js | 9 +- lib/services/jobs/jobExecutionService.js | 13 +- lib/services/listings/listingActiveService.js | 2 +- lib/services/storage/listingsStorage.js | 16 +- 8 files changed, 168 insertions(+), 147 deletions(-) diff --git a/lib/FredyPipelineExecutioner.js b/lib/FredyPipelineExecutioner.js index 244759c..87355c4 100755 --- a/lib/FredyPipelineExecutioner.js +++ b/lib/FredyPipelineExecutioner.js @@ -199,9 +199,9 @@ class FredyPipelineExecutioner { const toDeleteListingByIds = []; const keptListings = newListings.filter((listing) => { const filterOut = - (minRooms && listing.rooms && listing.rooms < minRooms) || - (minSize && listing.size && listing.size < minSize) || - (maxPrice && listing.price && listing.price > maxPrice); + (minRooms && listing.rooms != null && listing.rooms < minRooms) || + (minSize && listing.size != null && listing.size < minSize) || + (maxPrice && listing.price != null && listing.price > maxPrice); if (filterOut) { toDeleteListingByIds.push(listing.id); diff --git a/lib/api/routes/loginRoute.js b/lib/api/routes/loginRoute.js index 19809e4..749accb 100644 --- a/lib/api/routes/loginRoute.js +++ b/lib/api/routes/loginRoute.js @@ -20,6 +20,9 @@ function getClientIp(request) { function isRateLimited(ip) { const now = Date.now(); + for (const [key, rec] of loginAttempts) { + if (now - rec.firstAttempt > LOGIN_WINDOW_MS) loginAttempts.delete(key); + } const record = loginAttempts.get(ip); if (!record || now - record.firstAttempt > LOGIN_WINDOW_MS) { loginAttempts.set(ip, { count: 1, firstAttempt: now }); diff --git a/lib/notification/adapter/discord_webhook.js b/lib/notification/adapter/discord_webhook.js index a0de1b9..2eceb57 100644 --- a/lib/notification/adapter/discord_webhook.js +++ b/lib/notification/adapter/discord_webhook.js @@ -7,6 +7,7 @@ import fetch from 'node-fetch'; import { getJob } from '../../services/storage/jobStorage.js'; import { markdown2Html } from '../../services/markdown.js'; import { normalizeImageUrl } from '../../utils.js'; +import logger from '../../services/logger.js'; /** * Generates an idempotent decimal color code. The input string-based color code is @@ -67,19 +68,6 @@ const buildEmbed = (jobKey, listing, baseUrl) => { }, ]; - const embed = { - title: title, - color: generateColorFromString(jobKey), - url: listing.link, - fields: fields, - }; - - if (listing.image) { - embed.image = { - url: normalizeImageUrl(listing.image), - }; - } - if (baseUrl && listing.id) { fields.push({ name: 'Open in Fredy', @@ -88,6 +76,19 @@ const buildEmbed = (jobKey, listing, baseUrl) => { }); } + const embed = { + title: title, + color: generateColorFromString(jobKey), + url: listing.link, + fields, + }; + + if (listing.image) { + embed.image = { + url: normalizeImageUrl(listing.image), + }; + } + return embed; }; @@ -119,7 +120,7 @@ export const send = ({ serviceName, newListings, notificationConfig, jobKey, bas headers: { 'Content-Type': 'application/json' }, body, }).catch((error) => { - console.error(`Error sending Discord webhook for chunk starting at ${i}:`, error); + logger.error(`Error sending Discord webhook for chunk starting at ${i}:`, error); return Promise.reject(new Error(`Webhook failed: ${error.message}`)); }); diff --git a/lib/notification/adapter/telegram.js b/lib/notification/adapter/telegram.js index c1d56d7..9491923 100644 --- a/lib/notification/adapter/telegram.js +++ b/lib/notification/adapter/telegram.js @@ -12,41 +12,45 @@ import logger from '../../services/logger.js'; import { shouldUseMultipart, buildPhotoFormData } from './telegramPhotoUploader.js'; const RATE_LIMIT_INTERVAL = 1000; +const THROTTLE_MAX_IDLE_MS = RATE_LIMIT_INTERVAL + 2000; const chatThrottleMap = new Map(); /** * Removes stale throttled call entries to keep memory bounded. + * An entry is stale when no API call has fired for longer than THROTTLE_MAX_IDLE_MS. */ function cleanupOldThrottles() { const now = Date.now(); - const maxAge = RATE_LIMIT_INTERVAL + 1000; - const toBeDeleted = []; for (const [chatId, chatThrottle] of chatThrottleMap.entries()) { - if (now - chatThrottle.lastUsedAt > maxAge) toBeDeleted.push(chatId); + if (now - chatThrottle.lastUsedAt > THROTTLE_MAX_IDLE_MS) chatThrottleMap.delete(chatId); } - for (const chatId of toBeDeleted) chatThrottleMap.delete(chatId); } /** * Return a throttled wrapper for a chatId to limit Telegram API calls. * Uses p-throttle with 1 request per RATE_LIMIT_INTERVAL per chat. + * `lastUsedAt` is refreshed on every actual API call so that the idle window + * starts from the last fired call, not from when send() was invoked. * - * @template {Function} T * @param {string|number} chatId - * @param {T} call - async function (endpoint: string, body: any) => Promise - * @returns {T} + * @param {Function} call - async function (endpoint: string, body: any) => Promise + * @returns {Function} */ function getThrottled(chatId, call) { cleanupOldThrottles(); - const now = Date.now(); - const chatThrottle = chatThrottleMap.get(chatId); - if (chatThrottle) { - chatThrottle.lastUsedAt = now; - return chatThrottle.throttled; + const existing = chatThrottleMap.get(chatId); + if (existing) { + existing.lastUsedAt = Date.now(); + return existing.throttled; } - const throttled = pThrottle({ limit: 1, interval: RATE_LIMIT_INTERVAL })(call); - chatThrottleMap.set(chatId, { lastUsedAt: now, throttled }); - return throttled; + const entry = { lastUsedAt: Date.now(), throttled: null }; + chatThrottleMap.set(chatId, entry); + entry.throttled = pThrottle({ limit: 1, interval: RATE_LIMIT_INTERVAL })(async (endpoint, body) => { + const e = chatThrottleMap.get(chatId); + if (e) e.lastUsedAt = Date.now(); + return call(endpoint, body); + }); + return entry.throttled; } /** @@ -70,35 +74,16 @@ function escapeHtml(s = '') { } /** - * Build a Telegram photo caption (max 1024 characters) using HTML parse mode. + * Build a Telegram HTML-formatted message body. + * Suitable for both sendMessage (uncapped) and sendPhoto captions (caller must slice to 1024). + * * @param {string} jobName * @param {string} serviceName * @param {Object} o - Listing object - * @param {string} [o.title] - * @param {string} [o.address] - * @param {string|number} [o.price] - * @param {string|number} [o.size] - * @param {string} [o.link] + * @param {string} [baseUrl] * @returns {string} */ -function buildCaption(jobName, serviceName, o, baseUrl) { - const title = shorten((o.title || '').replace(/\*/g, ''), 90); - const meta = [o.address, o.price, o.size].filter(Boolean).join(' | '); - const fredyLink = - baseUrl && o.id ? `\nOpen in Fredy` : ''; - return `${escapeHtml(jobName)} (${escapeHtml(serviceName)})\n${escapeHtml(title)}\n${escapeHtml(meta)}${fredyLink}`.slice(0, 1024); -} - -/** - * Build a Telegram message text using HTML parse mode. - * @param {string} jobName - * @param {string} serviceName - * @param {Object} o - Listing object - * @returns {string} - */ -function buildText(jobName, serviceName, o, baseUrl) { +function buildHtmlBody(jobName, serviceName, o, baseUrl) { const title = shorten((o.title || '').replace(/\*/g, ''), 90); const meta = [o.address, o.price, o.size].filter(Boolean).join(' | '); const fredyLink = @@ -111,14 +96,16 @@ function buildText(jobName, serviceName, o, baseUrl) { } /** - * Build a plain text Telegram photo caption (max 4096 characters). + * Build a plain-text Telegram photo caption (max 4096 characters). + * Meta appears before the link so the most relevant info is visible within the cap. + * * @param {string} jobName * @param {string} serviceName * @param {Object} o - Listing object - * @param baseUrl + * @param {string} [baseUrl] * @returns {string} */ -function buildCaptionPlain(jobName, serviceName, o, baseUrl) { +function buildPlainCaption(jobName, serviceName, o, baseUrl) { const title = shorten((o.title || '').replace(/\*/g, ''), 90); const meta = [o.address, o.price, o.size].filter(Boolean).join(' | '); const fredyLine = baseUrl && o.id ? `\nOpen in Fredy: ${baseUrl}/#/listings/listing/${o.id}` : ''; @@ -126,19 +113,111 @@ function buildCaptionPlain(jobName, serviceName, o, baseUrl) { } /** - * Build a plain text Telegram message. + * Build a plain-text Telegram message body. + * Link appears early so it is tappable without scrolling. + * * @param {string} jobName * @param {string} serviceName * @param {Object} o - Listing object + * @param {string} [baseUrl] * @returns {string} */ -function buildTextPlain(jobName, serviceName, o, baseUrl) { +function buildPlainText(jobName, serviceName, o, baseUrl) { const title = shorten((o.title || '').replace(/\*/g, ''), 90); const meta = [o.address, o.price, o.size].filter(Boolean).join(' | '); const fredyLine = baseUrl && o.id ? `\nOpen in Fredy: ${baseUrl}/#/listings/listing/${o.id}` : ''; return `${jobName} (${serviceName})\n${title}\n${o.link || ''}\n${meta}${fredyLine}`; } +/** + * Create the raw Telegram API caller for a given bot token. + * Handles JSON and multipart (FormData) bodies. + * + * @param {string} token - Telegram bot token. + * @param {string} jobName - Used in error messages. + * @returns {(endpoint: string, body: object|FormData) => Promise} + */ +function makeTelegramCaller(token, jobName) { + return async function (endpoint, body) { + const isFormData = body instanceof FormData; + const opts = isFormData + ? { method: 'post', body } + : { method: 'post', body: JSON.stringify(body), headers: { 'Content-Type': 'application/json' } }; + const res = await fetch(`https://api.telegram.org/bot${token}/${endpoint}`, opts); + if (!res.ok) { + const errorBody = await res.text(); + throw new Error(`API error for '${jobName}'. '${endpoint}' returned ${errorBody}`); + } + return res; + }; +} + +/** + * Send a single listing to a single Telegram chat, with photo-then-text fallback. + * + * @param {Function} throttledCall - Throttled Telegram API caller for this chat. + * @param {Object} listing - Listing object. + * @param {string|number} chatId + * @param {Object} opts + * @param {string} opts.jobName + * @param {string} opts.serviceName + * @param {string} opts.baseUrl + * @param {boolean} opts.plainText + * @param {number|undefined} opts.message_thread_id + * @returns {Promise} + */ +async function sendListingToChat( + throttledCall, + listing, + chatId, + { jobName, serviceName, baseUrl, plainText, message_thread_id }, +) { + const img = normalizeImageUrl(listing.image); + + const textPayload = { + chat_id: chatId, + text: plainText + ? buildPlainText(jobName, serviceName, listing, baseUrl) + : buildHtmlBody(jobName, serviceName, listing, baseUrl), + ...(plainText ? {} : { parse_mode: 'HTML' }), + disable_web_page_preview: true, + ...(message_thread_id ? { message_thread_id } : {}), + }; + + if (!img) { + return throttledCall('sendMessage', textPayload).catch((e) => { + logger.error(`Error sending message to Telegram: ${e.message}`); + }); + } + + const caption = plainText + ? buildPlainCaption(jobName, serviceName, listing, baseUrl) + : buildHtmlBody(jobName, serviceName, listing, baseUrl).slice(0, 1024); + const parseMode = plainText ? undefined : 'HTML'; + + // .webp URLs (Immowelt/Cloudimage) fail Telegram's URL-based sendPhoto with + // "failed to get HTTP URL content". Upload the bytes via multipart instead. + const photoCall = shouldUseMultipart(img) + ? buildPhotoFormData({ chatId, imageUrl: img, caption, parseMode, messageThreadId: message_thread_id }).then((fd) => + throttledCall('sendPhoto', fd), + ) + : throttledCall('sendPhoto', { + chat_id: chatId, + photo: img, + caption, + ...(parseMode ? { parse_mode: parseMode } : {}), + ...(message_thread_id ? { message_thread_id } : {}), + }); + + return photoCall.catch(async (e) => { + logger.warn(`Error sending photo to Telegram and use a fallback: ${e.message}`); + return throttledCall('sendMessage', textPayload).catch((e) => { + logger.error(`Error sending message to Telegram: ${e.message}`); + throw e; + }); + }); +} + /** * Send new listings to Telegram. * - Respects per-chat Telegram rate limits using a lightweight throttle cache. @@ -185,72 +264,10 @@ export const send = ({ serviceName, newListings = [], notificationConfig, jobKey if (!Array.isArray(newListings) || newListings.length === 0) return Promise.resolve([]); const allPromises = chatIds.flatMap((id) => { - const throttledCall = getThrottled(id, async function (endpoint, body) { - // FormData (multipart) vs JSON. node-fetch sets its own multipart boundary - // header, so we must NOT supply Content-Type ourselves in that case. - const isFormData = body instanceof FormData; - const opts = isFormData - ? { method: 'post', body } - : { method: 'post', body: JSON.stringify(body), headers: { 'Content-Type': 'application/json' } }; - const res = await fetch(`https://api.telegram.org/bot${token}/${endpoint}`, opts); - - if (!res.ok) { - const errorBody = await res.text(); - throw new Error(`API error for '${jobName}'. '${endpoint}' returned ${errorBody}`); - } - return res; - }); - - return newListings.map(async (o) => { - const img = normalizeImageUrl(o.image); - const textPayload = { - chat_id: id, - text: plainText - ? buildTextPlain(jobName, serviceName, o, baseUrl) - : buildText(jobName, serviceName, o, baseUrl), - ...(plainText ? {} : { parse_mode: 'HTML' }), - disable_web_page_preview: true, - ...(message_thread_id ? { message_thread_id } : {}), - }; - - if (!img) { - return await throttledCall('sendMessage', textPayload).catch(async (e) => { - logger.error(`Error sending message to Telegram: ${e.message}`); - }); - } - - const caption = plainText - ? buildCaptionPlain(jobName, serviceName, o, baseUrl) - : buildCaption(jobName, serviceName, o, baseUrl); - const parseMode = plainText ? undefined : 'HTML'; - - // .webp URLs (Immowelt/Cloudimage) fail Telegram's URL-based sendPhoto with - // "failed to get HTTP URL content". Upload the bytes via multipart instead; - // the rendered chat message is identical. - const photoCall = shouldUseMultipart(img) - ? buildPhotoFormData({ - chatId: id, - imageUrl: img, - caption, - parseMode, - messageThreadId: message_thread_id, - }).then((fd) => throttledCall('sendPhoto', fd)) - : throttledCall('sendPhoto', { - chat_id: id, - photo: img, - caption, - ...(parseMode ? { parse_mode: parseMode } : {}), - ...(message_thread_id ? { message_thread_id } : {}), - }); - - return await photoCall.catch(async (e) => { - logger.warn(`Error sending photo to Telegram and use a fallback: ${e.message}`); - return await throttledCall('sendMessage', textPayload).catch((e) => { - logger.error(`Error sending message to Telegram: ${e.message}`); - throw e; - }); - }); - }); + const caller = makeTelegramCaller(token, jobName); + const throttledCall = getThrottled(id, caller); + const opts = { jobName, serviceName, baseUrl, plainText, message_thread_id }; + return newListings.map((listing) => sendListingToChat(throttledCall, listing, id, opts)); }); return Promise.all(allPromises); diff --git a/lib/notification/notify.js b/lib/notification/notify.js index 32d9f7d..1ee216b 100755 --- a/lib/notification/notify.js +++ b/lib/notification/notify.js @@ -4,6 +4,7 @@ */ import fs from 'fs'; +import logger from '../services/logger.js'; const path = './adapter'; /** Read every integration existing in ./adapter **/ @@ -23,7 +24,13 @@ const findAdapter = (notificationAdapter) => { export const send = (serviceName, newListings, notificationConfig, jobKey, baseUrl) => { //this is not being used in tests, therefore adapter are always set return notificationConfig - .map((notificationAdapter) => findAdapter(notificationAdapter)) + .map((notificationAdapter) => { + const found = findAdapter(notificationAdapter); + if (!found) { + logger.warn(`Notification adapter '${notificationAdapter.id}' not found for job '${jobKey || ''}'`); + } + return found; + }) .filter(Boolean) .map((a) => a.send({ serviceName, newListings, notificationConfig, jobKey, baseUrl })); }; diff --git a/lib/services/jobs/jobExecutionService.js b/lib/services/jobs/jobExecutionService.js index ea7a02e..5577c88 100644 --- a/lib/services/jobs/jobExecutionService.js +++ b/lib/services/jobs/jobExecutionService.js @@ -105,14 +105,11 @@ export function initJobExecutionService({ providers, settings, intervalMs }) { return; } settings.lastRun = now; - const jobs = jobStorage - .getJobs() - .filter((job) => job.enabled) - .filter((job) => { - if (!context) return true; // startup/cron → all - if (context.isAdmin) return true; // admin → all - return context.userId ? job.userId === context.userId : false; // user → own - }); + const jobs = jobStorage.getJobs().filter((job) => { + if (!context) return true; // startup/cron → all + if (context.isAdmin) return true; // admin → all + return context.userId ? job.userId === context.userId : false; // user → own + }); for (const job of jobs) { await executeJob(job); diff --git a/lib/services/listings/listingActiveService.js b/lib/services/listings/listingActiveService.js index c1a1475..29cff21 100644 --- a/lib/services/listings/listingActiveService.js +++ b/lib/services/listings/listingActiveService.js @@ -16,7 +16,7 @@ import logger from '../../services/logger.js'; * Concurrency: network-bound checks are executed with a configurable concurrency limit. * * @param {object} [opts] - * @param {number} [opts.concurrency=8] Max number of parallel activeTester calls. + * @param {number} [opts.concurrency=4] Max number of parallel activeTester calls. * @returns {Promise} */ export default async function runActiveChecker(opts = {}) { diff --git a/lib/services/storage/listingsStorage.js b/lib/services/storage/listingsStorage.js index 3cbe2fd..d409fcf 100755 --- a/lib/services/storage/listingsStorage.js +++ b/lib/services/storage/listingsStorage.js @@ -60,18 +60,14 @@ export const getListingsKpisForJobIds = (jobIds = []) => { const placeholders = jobIds.map(() => '?').join(','); const rows = SqliteConnection.query( - `SELECT - SUM(CASE WHEN is_active = 1 THEN 1 ELSE 0 END) OVER() AS active_count, - price - FROM listings - WHERE job_id IN (${placeholders}) - AND manually_deleted = 0 - GROUP BY - id`, + `SELECT is_active, price + FROM listings + WHERE job_id IN (${placeholders}) + AND manually_deleted = 0`, jobIds, ); - const activeCount = rows[0]?.active_count ?? 0; + const activeCount = rows.filter((r) => r.is_active === 1).length; const prices = rows .map((r) => r.price) @@ -508,7 +504,7 @@ export const updateListingGeocoordinates = (id, latitude, longitude) => { * @param {string} [params.jobId] * @param {string} [params.userId] * @param {boolean} [params.isAdmin=false] - * @returns {{listings: Object[], maxPrice: number}} Object containing listings and maxPrice. + * @returns {{listings: Object[]}} Object containing listings. */ export const getListingsForMap = ({ jobId, userId = null, isAdmin = false } = {}) => { const baseWhereParts = [