mirror of
https://github.com/orangecoding/fredy.git
synced 2026-06-16 12:31:07 +00:00
more housekeeping
This commit is contained in:
@@ -199,9 +199,9 @@ class FredyPipelineExecutioner {
|
||||
const toDeleteListingByIds = [];
|
||||
const keptListings = newListings.filter((listing) => {
|
||||
const filterOut =
|
||||
(minRooms && listing.rooms && listing.rooms < minRooms) ||
|
||||
(minSize && listing.size && listing.size < minSize) ||
|
||||
(maxPrice && listing.price && listing.price > maxPrice);
|
||||
(minRooms && listing.rooms != null && listing.rooms < minRooms) ||
|
||||
(minSize && listing.size != null && listing.size < minSize) ||
|
||||
(maxPrice && listing.price != null && listing.price > maxPrice);
|
||||
|
||||
if (filterOut) {
|
||||
toDeleteListingByIds.push(listing.id);
|
||||
|
||||
@@ -20,6 +20,9 @@ function getClientIp(request) {
|
||||
|
||||
function isRateLimited(ip) {
|
||||
const now = Date.now();
|
||||
for (const [key, rec] of loginAttempts) {
|
||||
if (now - rec.firstAttempt > LOGIN_WINDOW_MS) loginAttempts.delete(key);
|
||||
}
|
||||
const record = loginAttempts.get(ip);
|
||||
if (!record || now - record.firstAttempt > LOGIN_WINDOW_MS) {
|
||||
loginAttempts.set(ip, { count: 1, firstAttempt: now });
|
||||
|
||||
@@ -7,6 +7,7 @@ import fetch from 'node-fetch';
|
||||
import { getJob } from '../../services/storage/jobStorage.js';
|
||||
import { markdown2Html } from '../../services/markdown.js';
|
||||
import { normalizeImageUrl } from '../../utils.js';
|
||||
import logger from '../../services/logger.js';
|
||||
|
||||
/**
|
||||
* Generates an idempotent decimal color code. The input string-based color code is
|
||||
@@ -67,19 +68,6 @@ const buildEmbed = (jobKey, listing, baseUrl) => {
|
||||
},
|
||||
];
|
||||
|
||||
const embed = {
|
||||
title: title,
|
||||
color: generateColorFromString(jobKey),
|
||||
url: listing.link,
|
||||
fields: fields,
|
||||
};
|
||||
|
||||
if (listing.image) {
|
||||
embed.image = {
|
||||
url: normalizeImageUrl(listing.image),
|
||||
};
|
||||
}
|
||||
|
||||
if (baseUrl && listing.id) {
|
||||
fields.push({
|
||||
name: 'Open in Fredy',
|
||||
@@ -88,6 +76,19 @@ const buildEmbed = (jobKey, listing, baseUrl) => {
|
||||
});
|
||||
}
|
||||
|
||||
const embed = {
|
||||
title: title,
|
||||
color: generateColorFromString(jobKey),
|
||||
url: listing.link,
|
||||
fields,
|
||||
};
|
||||
|
||||
if (listing.image) {
|
||||
embed.image = {
|
||||
url: normalizeImageUrl(listing.image),
|
||||
};
|
||||
}
|
||||
|
||||
return embed;
|
||||
};
|
||||
|
||||
@@ -119,7 +120,7 @@ export const send = ({ serviceName, newListings, notificationConfig, jobKey, bas
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body,
|
||||
}).catch((error) => {
|
||||
console.error(`Error sending Discord webhook for chunk starting at ${i}:`, error);
|
||||
logger.error(`Error sending Discord webhook for chunk starting at ${i}:`, error);
|
||||
return Promise.reject(new Error(`Webhook failed: ${error.message}`));
|
||||
});
|
||||
|
||||
|
||||
@@ -12,41 +12,45 @@ import logger from '../../services/logger.js';
|
||||
import { shouldUseMultipart, buildPhotoFormData } from './telegramPhotoUploader.js';
|
||||
|
||||
const RATE_LIMIT_INTERVAL = 1000;
|
||||
const THROTTLE_MAX_IDLE_MS = RATE_LIMIT_INTERVAL + 2000;
|
||||
const chatThrottleMap = new Map();
|
||||
|
||||
/**
|
||||
* Removes stale throttled call entries to keep memory bounded.
|
||||
* An entry is stale when no API call has fired for longer than THROTTLE_MAX_IDLE_MS.
|
||||
*/
|
||||
function cleanupOldThrottles() {
|
||||
const now = Date.now();
|
||||
const maxAge = RATE_LIMIT_INTERVAL + 1000;
|
||||
const toBeDeleted = [];
|
||||
for (const [chatId, chatThrottle] of chatThrottleMap.entries()) {
|
||||
if (now - chatThrottle.lastUsedAt > maxAge) toBeDeleted.push(chatId);
|
||||
if (now - chatThrottle.lastUsedAt > THROTTLE_MAX_IDLE_MS) chatThrottleMap.delete(chatId);
|
||||
}
|
||||
for (const chatId of toBeDeleted) chatThrottleMap.delete(chatId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a throttled wrapper for a chatId to limit Telegram API calls.
|
||||
* Uses p-throttle with 1 request per RATE_LIMIT_INTERVAL per chat.
|
||||
* `lastUsedAt` is refreshed on every actual API call so that the idle window
|
||||
* starts from the last fired call, not from when send() was invoked.
|
||||
*
|
||||
* @template {Function} T
|
||||
* @param {string|number} chatId
|
||||
* @param {T} call - async function (endpoint: string, body: any) => Promise<Response>
|
||||
* @returns {T}
|
||||
* @param {Function} call - async function (endpoint: string, body: any) => Promise<Response>
|
||||
* @returns {Function}
|
||||
*/
|
||||
function getThrottled(chatId, call) {
|
||||
cleanupOldThrottles();
|
||||
const now = Date.now();
|
||||
const chatThrottle = chatThrottleMap.get(chatId);
|
||||
if (chatThrottle) {
|
||||
chatThrottle.lastUsedAt = now;
|
||||
return chatThrottle.throttled;
|
||||
const existing = chatThrottleMap.get(chatId);
|
||||
if (existing) {
|
||||
existing.lastUsedAt = Date.now();
|
||||
return existing.throttled;
|
||||
}
|
||||
const throttled = pThrottle({ limit: 1, interval: RATE_LIMIT_INTERVAL })(call);
|
||||
chatThrottleMap.set(chatId, { lastUsedAt: now, throttled });
|
||||
return throttled;
|
||||
const entry = { lastUsedAt: Date.now(), throttled: null };
|
||||
chatThrottleMap.set(chatId, entry);
|
||||
entry.throttled = pThrottle({ limit: 1, interval: RATE_LIMIT_INTERVAL })(async (endpoint, body) => {
|
||||
const e = chatThrottleMap.get(chatId);
|
||||
if (e) e.lastUsedAt = Date.now();
|
||||
return call(endpoint, body);
|
||||
});
|
||||
return entry.throttled;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -70,35 +74,16 @@ function escapeHtml(s = '') {
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a Telegram photo caption (max 1024 characters) using HTML parse mode.
|
||||
* Build a Telegram HTML-formatted message body.
|
||||
* Suitable for both sendMessage (uncapped) and sendPhoto captions (caller must slice to 1024).
|
||||
*
|
||||
* @param {string} jobName
|
||||
* @param {string} serviceName
|
||||
* @param {Object} o - Listing object
|
||||
* @param {string} [o.title]
|
||||
* @param {string} [o.address]
|
||||
* @param {string|number} [o.price]
|
||||
* @param {string|number} [o.size]
|
||||
* @param {string} [o.link]
|
||||
* @param {string} [baseUrl]
|
||||
* @returns {string}
|
||||
*/
|
||||
function buildCaption(jobName, serviceName, o, baseUrl) {
|
||||
const title = shorten((o.title || '').replace(/\*/g, ''), 90);
|
||||
const meta = [o.address, o.price, o.size].filter(Boolean).join(' | ');
|
||||
const fredyLink =
|
||||
baseUrl && o.id ? `\n<a href='${escapeHtml(`${baseUrl}/#/listings/listing/${o.id}`)}'>Open in Fredy</a>` : '';
|
||||
return `<i>${escapeHtml(jobName)}</i> (${escapeHtml(serviceName)})\n<a href='${escapeHtml(
|
||||
o.link || '',
|
||||
)}'><b>${escapeHtml(title)}</b></a>\n${escapeHtml(meta)}${fredyLink}`.slice(0, 1024);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a Telegram message text using HTML parse mode.
|
||||
* @param {string} jobName
|
||||
* @param {string} serviceName
|
||||
* @param {Object} o - Listing object
|
||||
* @returns {string}
|
||||
*/
|
||||
function buildText(jobName, serviceName, o, baseUrl) {
|
||||
function buildHtmlBody(jobName, serviceName, o, baseUrl) {
|
||||
const title = shorten((o.title || '').replace(/\*/g, ''), 90);
|
||||
const meta = [o.address, o.price, o.size].filter(Boolean).join(' | ');
|
||||
const fredyLink =
|
||||
@@ -111,14 +96,16 @@ function buildText(jobName, serviceName, o, baseUrl) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a plain text Telegram photo caption (max 4096 characters).
|
||||
* Build a plain-text Telegram photo caption (max 4096 characters).
|
||||
* Meta appears before the link so the most relevant info is visible within the cap.
|
||||
*
|
||||
* @param {string} jobName
|
||||
* @param {string} serviceName
|
||||
* @param {Object} o - Listing object
|
||||
* @param baseUrl
|
||||
* @param {string} [baseUrl]
|
||||
* @returns {string}
|
||||
*/
|
||||
function buildCaptionPlain(jobName, serviceName, o, baseUrl) {
|
||||
function buildPlainCaption(jobName, serviceName, o, baseUrl) {
|
||||
const title = shorten((o.title || '').replace(/\*/g, ''), 90);
|
||||
const meta = [o.address, o.price, o.size].filter(Boolean).join(' | ');
|
||||
const fredyLine = baseUrl && o.id ? `\nOpen in Fredy: ${baseUrl}/#/listings/listing/${o.id}` : '';
|
||||
@@ -126,19 +113,111 @@ function buildCaptionPlain(jobName, serviceName, o, baseUrl) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a plain text Telegram message.
|
||||
* Build a plain-text Telegram message body.
|
||||
* Link appears early so it is tappable without scrolling.
|
||||
*
|
||||
* @param {string} jobName
|
||||
* @param {string} serviceName
|
||||
* @param {Object} o - Listing object
|
||||
* @param {string} [baseUrl]
|
||||
* @returns {string}
|
||||
*/
|
||||
function buildTextPlain(jobName, serviceName, o, baseUrl) {
|
||||
function buildPlainText(jobName, serviceName, o, baseUrl) {
|
||||
const title = shorten((o.title || '').replace(/\*/g, ''), 90);
|
||||
const meta = [o.address, o.price, o.size].filter(Boolean).join(' | ');
|
||||
const fredyLine = baseUrl && o.id ? `\nOpen in Fredy: ${baseUrl}/#/listings/listing/${o.id}` : '';
|
||||
return `${jobName} (${serviceName})\n${title}\n${o.link || ''}\n${meta}${fredyLine}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the raw Telegram API caller for a given bot token.
|
||||
* Handles JSON and multipart (FormData) bodies.
|
||||
*
|
||||
* @param {string} token - Telegram bot token.
|
||||
* @param {string} jobName - Used in error messages.
|
||||
* @returns {(endpoint: string, body: object|FormData) => Promise<Response>}
|
||||
*/
|
||||
function makeTelegramCaller(token, jobName) {
|
||||
return async function (endpoint, body) {
|
||||
const isFormData = body instanceof FormData;
|
||||
const opts = isFormData
|
||||
? { method: 'post', body }
|
||||
: { method: 'post', body: JSON.stringify(body), headers: { 'Content-Type': 'application/json' } };
|
||||
const res = await fetch(`https://api.telegram.org/bot${token}/${endpoint}`, opts);
|
||||
if (!res.ok) {
|
||||
const errorBody = await res.text();
|
||||
throw new Error(`API error for '${jobName}'. '${endpoint}' returned ${errorBody}`);
|
||||
}
|
||||
return res;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a single listing to a single Telegram chat, with photo-then-text fallback.
|
||||
*
|
||||
* @param {Function} throttledCall - Throttled Telegram API caller for this chat.
|
||||
* @param {Object} listing - Listing object.
|
||||
* @param {string|number} chatId
|
||||
* @param {Object} opts
|
||||
* @param {string} opts.jobName
|
||||
* @param {string} opts.serviceName
|
||||
* @param {string} opts.baseUrl
|
||||
* @param {boolean} opts.plainText
|
||||
* @param {number|undefined} opts.message_thread_id
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async function sendListingToChat(
|
||||
throttledCall,
|
||||
listing,
|
||||
chatId,
|
||||
{ jobName, serviceName, baseUrl, plainText, message_thread_id },
|
||||
) {
|
||||
const img = normalizeImageUrl(listing.image);
|
||||
|
||||
const textPayload = {
|
||||
chat_id: chatId,
|
||||
text: plainText
|
||||
? buildPlainText(jobName, serviceName, listing, baseUrl)
|
||||
: buildHtmlBody(jobName, serviceName, listing, baseUrl),
|
||||
...(plainText ? {} : { parse_mode: 'HTML' }),
|
||||
disable_web_page_preview: true,
|
||||
...(message_thread_id ? { message_thread_id } : {}),
|
||||
};
|
||||
|
||||
if (!img) {
|
||||
return throttledCall('sendMessage', textPayload).catch((e) => {
|
||||
logger.error(`Error sending message to Telegram: ${e.message}`);
|
||||
});
|
||||
}
|
||||
|
||||
const caption = plainText
|
||||
? buildPlainCaption(jobName, serviceName, listing, baseUrl)
|
||||
: buildHtmlBody(jobName, serviceName, listing, baseUrl).slice(0, 1024);
|
||||
const parseMode = plainText ? undefined : 'HTML';
|
||||
|
||||
// .webp URLs (Immowelt/Cloudimage) fail Telegram's URL-based sendPhoto with
|
||||
// "failed to get HTTP URL content". Upload the bytes via multipart instead.
|
||||
const photoCall = shouldUseMultipart(img)
|
||||
? buildPhotoFormData({ chatId, imageUrl: img, caption, parseMode, messageThreadId: message_thread_id }).then((fd) =>
|
||||
throttledCall('sendPhoto', fd),
|
||||
)
|
||||
: throttledCall('sendPhoto', {
|
||||
chat_id: chatId,
|
||||
photo: img,
|
||||
caption,
|
||||
...(parseMode ? { parse_mode: parseMode } : {}),
|
||||
...(message_thread_id ? { message_thread_id } : {}),
|
||||
});
|
||||
|
||||
return photoCall.catch(async (e) => {
|
||||
logger.warn(`Error sending photo to Telegram and use a fallback: ${e.message}`);
|
||||
return throttledCall('sendMessage', textPayload).catch((e) => {
|
||||
logger.error(`Error sending message to Telegram: ${e.message}`);
|
||||
throw e;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Send new listings to Telegram.
|
||||
* - Respects per-chat Telegram rate limits using a lightweight throttle cache.
|
||||
@@ -185,72 +264,10 @@ export const send = ({ serviceName, newListings = [], notificationConfig, jobKey
|
||||
if (!Array.isArray(newListings) || newListings.length === 0) return Promise.resolve([]);
|
||||
|
||||
const allPromises = chatIds.flatMap((id) => {
|
||||
const throttledCall = getThrottled(id, async function (endpoint, body) {
|
||||
// FormData (multipart) vs JSON. node-fetch sets its own multipart boundary
|
||||
// header, so we must NOT supply Content-Type ourselves in that case.
|
||||
const isFormData = body instanceof FormData;
|
||||
const opts = isFormData
|
||||
? { method: 'post', body }
|
||||
: { method: 'post', body: JSON.stringify(body), headers: { 'Content-Type': 'application/json' } };
|
||||
const res = await fetch(`https://api.telegram.org/bot${token}/${endpoint}`, opts);
|
||||
|
||||
if (!res.ok) {
|
||||
const errorBody = await res.text();
|
||||
throw new Error(`API error for '${jobName}'. '${endpoint}' returned ${errorBody}`);
|
||||
}
|
||||
return res;
|
||||
});
|
||||
|
||||
return newListings.map(async (o) => {
|
||||
const img = normalizeImageUrl(o.image);
|
||||
const textPayload = {
|
||||
chat_id: id,
|
||||
text: plainText
|
||||
? buildTextPlain(jobName, serviceName, o, baseUrl)
|
||||
: buildText(jobName, serviceName, o, baseUrl),
|
||||
...(plainText ? {} : { parse_mode: 'HTML' }),
|
||||
disable_web_page_preview: true,
|
||||
...(message_thread_id ? { message_thread_id } : {}),
|
||||
};
|
||||
|
||||
if (!img) {
|
||||
return await throttledCall('sendMessage', textPayload).catch(async (e) => {
|
||||
logger.error(`Error sending message to Telegram: ${e.message}`);
|
||||
});
|
||||
}
|
||||
|
||||
const caption = plainText
|
||||
? buildCaptionPlain(jobName, serviceName, o, baseUrl)
|
||||
: buildCaption(jobName, serviceName, o, baseUrl);
|
||||
const parseMode = plainText ? undefined : 'HTML';
|
||||
|
||||
// .webp URLs (Immowelt/Cloudimage) fail Telegram's URL-based sendPhoto with
|
||||
// "failed to get HTTP URL content". Upload the bytes via multipart instead;
|
||||
// the rendered chat message is identical.
|
||||
const photoCall = shouldUseMultipart(img)
|
||||
? buildPhotoFormData({
|
||||
chatId: id,
|
||||
imageUrl: img,
|
||||
caption,
|
||||
parseMode,
|
||||
messageThreadId: message_thread_id,
|
||||
}).then((fd) => throttledCall('sendPhoto', fd))
|
||||
: throttledCall('sendPhoto', {
|
||||
chat_id: id,
|
||||
photo: img,
|
||||
caption,
|
||||
...(parseMode ? { parse_mode: parseMode } : {}),
|
||||
...(message_thread_id ? { message_thread_id } : {}),
|
||||
});
|
||||
|
||||
return await photoCall.catch(async (e) => {
|
||||
logger.warn(`Error sending photo to Telegram and use a fallback: ${e.message}`);
|
||||
return await throttledCall('sendMessage', textPayload).catch((e) => {
|
||||
logger.error(`Error sending message to Telegram: ${e.message}`);
|
||||
throw e;
|
||||
});
|
||||
});
|
||||
});
|
||||
const caller = makeTelegramCaller(token, jobName);
|
||||
const throttledCall = getThrottled(id, caller);
|
||||
const opts = { jobName, serviceName, baseUrl, plainText, message_thread_id };
|
||||
return newListings.map((listing) => sendListingToChat(throttledCall, listing, id, opts));
|
||||
});
|
||||
|
||||
return Promise.all(allPromises);
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
*/
|
||||
|
||||
import fs from 'fs';
|
||||
import logger from '../services/logger.js';
|
||||
const path = './adapter';
|
||||
|
||||
/** Read every integration existing in ./adapter **/
|
||||
@@ -23,7 +24,13 @@ const findAdapter = (notificationAdapter) => {
|
||||
export const send = (serviceName, newListings, notificationConfig, jobKey, baseUrl) => {
|
||||
//this is not being used in tests, therefore adapter are always set
|
||||
return notificationConfig
|
||||
.map((notificationAdapter) => findAdapter(notificationAdapter))
|
||||
.map((notificationAdapter) => {
|
||||
const found = findAdapter(notificationAdapter);
|
||||
if (!found) {
|
||||
logger.warn(`Notification adapter '${notificationAdapter.id}' not found for job '${jobKey || ''}'`);
|
||||
}
|
||||
return found;
|
||||
})
|
||||
.filter(Boolean)
|
||||
.map((a) => a.send({ serviceName, newListings, notificationConfig, jobKey, baseUrl }));
|
||||
};
|
||||
|
||||
@@ -105,14 +105,11 @@ export function initJobExecutionService({ providers, settings, intervalMs }) {
|
||||
return;
|
||||
}
|
||||
settings.lastRun = now;
|
||||
const jobs = jobStorage
|
||||
.getJobs()
|
||||
.filter((job) => job.enabled)
|
||||
.filter((job) => {
|
||||
if (!context) return true; // startup/cron → all
|
||||
if (context.isAdmin) return true; // admin → all
|
||||
return context.userId ? job.userId === context.userId : false; // user → own
|
||||
});
|
||||
const jobs = jobStorage.getJobs().filter((job) => {
|
||||
if (!context) return true; // startup/cron → all
|
||||
if (context.isAdmin) return true; // admin → all
|
||||
return context.userId ? job.userId === context.userId : false; // user → own
|
||||
});
|
||||
|
||||
for (const job of jobs) {
|
||||
await executeJob(job);
|
||||
|
||||
@@ -16,7 +16,7 @@ import logger from '../../services/logger.js';
|
||||
* Concurrency: network-bound checks are executed with a configurable concurrency limit.
|
||||
*
|
||||
* @param {object} [opts]
|
||||
* @param {number} [opts.concurrency=8] Max number of parallel activeTester calls.
|
||||
* @param {number} [opts.concurrency=4] Max number of parallel activeTester calls.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
export default async function runActiveChecker(opts = {}) {
|
||||
|
||||
@@ -60,18 +60,14 @@ export const getListingsKpisForJobIds = (jobIds = []) => {
|
||||
|
||||
const placeholders = jobIds.map(() => '?').join(',');
|
||||
const rows = SqliteConnection.query(
|
||||
`SELECT
|
||||
SUM(CASE WHEN is_active = 1 THEN 1 ELSE 0 END) OVER() AS active_count,
|
||||
price
|
||||
FROM listings
|
||||
WHERE job_id IN (${placeholders})
|
||||
AND manually_deleted = 0
|
||||
GROUP BY
|
||||
id`,
|
||||
`SELECT is_active, price
|
||||
FROM listings
|
||||
WHERE job_id IN (${placeholders})
|
||||
AND manually_deleted = 0`,
|
||||
jobIds,
|
||||
);
|
||||
|
||||
const activeCount = rows[0]?.active_count ?? 0;
|
||||
const activeCount = rows.filter((r) => r.is_active === 1).length;
|
||||
|
||||
const prices = rows
|
||||
.map((r) => r.price)
|
||||
@@ -508,7 +504,7 @@ export const updateListingGeocoordinates = (id, latitude, longitude) => {
|
||||
* @param {string} [params.jobId]
|
||||
* @param {string} [params.userId]
|
||||
* @param {boolean} [params.isAdmin=false]
|
||||
* @returns {{listings: Object[], maxPrice: number}} Object containing listings and maxPrice.
|
||||
* @returns {{listings: Object[]}} Object containing listings.
|
||||
*/
|
||||
export const getListingsForMap = ({ jobId, userId = null, isAdmin = false } = {}) => {
|
||||
const baseWhereParts = [
|
||||
|
||||
Reference in New Issue
Block a user