diff --git a/lib/notification/adapter/telegram.js b/lib/notification/adapter/telegram.js index 0c6e81c..e76e43a 100644 --- a/lib/notification/adapter/telegram.js +++ b/lib/notification/adapter/telegram.js @@ -1,8 +1,56 @@ import { markdown2Html } from '../../services/markdown.js'; import { getJob } from '../../services/storage/jobStorage.js'; import fetch from 'node-fetch'; +import pThrottle from 'p-throttle'; + const MAX_ENTITIES_PER_CHUNK = 8; -const RATE_LIMIT_INTERVAL = 1010; +const RATE_LIMIT_INTERVAL = 1000; +const chatThrottleMap = new Map(); + +function cleanupOldThrottles() { + const now = Date.now(); + const maxAge = RATE_LIMIT_INTERVAL + 1000; // adding extra second + const toBeDeleted = []; + + for (const [chatId, chatThrottle] of chatThrottleMap.entries()) { + if (now - chatThrottle.lastUsedAt > maxAge) { + toBeDeleted.push(chatId); + } + } + + for (const chatId of toBeDeleted) { + chatThrottleMap.delete(chatId); + } +} + +/** + * Returns a throttled async function for sending messages to a specific chat. + * Telegram enforces a rate limit of 1 message per second per chat (chatId). + * + * @param {number} chatId - The chat ID to throttle messages for. + * @param {Function} fn - The async function to throttle (should send the message). + * @returns {Function} Throttled async function for sending messages. + */ +function getThrottled(chatId, call) { + cleanupOldThrottles(); + + const now = Date.now(); + const chatThrottle = chatThrottleMap.get(chatId); + + if (chatThrottle) { + chatThrottle.lastUsedAt = now; + return chatThrottle.throttled; + } + + // Create new throttled function + const newThrottle = { + lastUsedAt: now, + throttled: pThrottle({ limit: 1, interval: RATE_LIMIT_INTERVAL })(call), + }; + chatThrottleMap.set(chatId, newThrottle); + return newThrottle.throttled; +} + /** * splitting an array into chunks because Telegram only allows for messages up to * 4096 chars, thus we have to split messages into chunks @@ -22,8 +70,16 @@ export const send = ({ serviceName, newListings, notificationConfig, jobKey }) = const { token, chatId } = notificationConfig.find((adapter) => adapter.id === config.id).fields; const job = getJob(jobKey); const jobName = job == null ? jobKey : job.name; - // we have to split messages into chunks, because otherwise messages are going to become too big and will fail const chunks = arrayChunks(newListings, MAX_ENTITIES_PER_CHUNK); + + const getThrottledSend = getThrottled(chatId, async function (body) { + await fetch(`https://api.telegram.org/bot${token}/sendMessage`, { + method: 'post', + body: JSON.stringify(body), + headers: { 'Content-Type': 'application/json' }, + }); + }); + const promises = chunks.map((chunk) => { const messageParagraphs = []; @@ -36,29 +92,14 @@ export const send = ({ serviceName, newListings, notificationConfig, jobKey }) = ), ); - /** - * This is to not break the rate limit. It is to only send 1 message per second - */ - return new Promise((resolve, reject) => { - setTimeout(() => { - fetch(`https://api.telegram.org/bot${token}/sendMessage`, { - method: 'post', - body: JSON.stringify({ - chat_id: chatId, - text: messageParagraphs.join('\n\n'), - parse_mode: 'HTML', - disable_web_page_preview: true, - }), - headers: { 'Content-Type': 'application/json' }, - }) - .then(() => { - resolve(); - }) - .catch(() => { - reject(); - }); - }, RATE_LIMIT_INTERVAL); - }); + const body = { + chat_id: chatId, + text: messageParagraphs.join('\n\n'), + parse_mode: 'HTML', + disable_web_page_preview: true, + }; + + return getThrottledSend(body); }); return Promise.all(promises); }; diff --git a/package.json b/package.json index 936b517..bdec504 100755 --- a/package.json +++ b/package.json @@ -73,6 +73,7 @@ "nanoid": "5.1.5", "node-fetch": "3.3.2", "node-mailjet": "6.0.8", + "p-throttle": "^7.0.0", "package-up": "^5.0.0", "puppeteer": "^24.14.0", "puppeteer-extra": "^3.3.6", diff --git a/yarn.lock b/yarn.lock index 9f1055d..4350ece 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5432,6 +5432,11 @@ p-locate@^5.0.0: dependencies: p-limit "^3.0.2" +p-throttle@^7.0.0: + version "7.0.0" + resolved "https://registry.yarnpkg.com/p-throttle/-/p-throttle-7.0.0.tgz#d2650e884dad46fd626a9a5cfc3fb239cb799dee" + integrity sha512-aio0v+S0QVkH1O+9x4dHtD4dgCExACcL+3EtNaGqC01GBudS9ijMuUsmN8OVScyV4OOp0jqdLShZFuSlbL/AsA== + pac-proxy-agent@^7.1.0: version "7.2.0" resolved "https://registry.yarnpkg.com/pac-proxy-agent/-/pac-proxy-agent-7.2.0.tgz#9cfaf33ff25da36f6147a20844230ec92c06e5df"