diff --git a/packages/server-core/src/ws.js b/packages/server-core/src/ws.js index 02416c7..7219793 100644 --- a/packages/server-core/src/ws.js +++ b/packages/server-core/src/ws.js @@ -7,6 +7,22 @@ function toOriginSet(list) { return Array.isArray(list) && list.length > 0 ? new Set(list) : null; } +const HEARTBEAT_INTERVAL_MS = 30000; + +// Terminates sockets that have not ponged since the previous sweep, and pings the rest. +// A socket silently dropped by an idle-timeout proxy fails the next isAlive check and is terminated. +function heartbeatSweep(clients) { + for (const ws of clients) { + if (ws.isAlive === false) { + ws.terminate(); + continue; + } + + ws.isAlive = false; + ws.ping(); + } +} + function setupWebSocket(server, opts = {}) { const { getVaultPath, originAllowlist } = opts; @@ -126,6 +142,12 @@ function setupWebSocket(server, opts = {}) { const vaultPath = getVaultPath(vaultId); console.log(`[ws] Client connected to vault: ${vaultId}`); + // isAlive is reset by each pong; the heartbeat sweep terminates sockets that miss one. + ws.isAlive = true; + ws.on("pong", () => { + ws.isAlive = true; + }); + if (!clientsByVault.has(vaultId)) { clientsByVault.set(vaultId, new Set()); } @@ -209,7 +231,16 @@ function setupWebSocket(server, opts = {}) { }); }); + // Terminate dead connections behind proxies that silently drop idle sockets. + const heartbeat = setInterval( + () => heartbeatSweep(wss.clients), + HEARTBEAT_INTERVAL_MS, + ); + heartbeat.unref?.(); + + wss.on("close", () => clearInterval(heartbeat)); + return wss; } -module.exports = { setupWebSocket }; +module.exports = { setupWebSocket, heartbeatSweep }; diff --git a/packages/server-core/src/ws.test.mjs b/packages/server-core/src/ws.test.mjs new file mode 100644 index 0000000..5472baa --- /dev/null +++ b/packages/server-core/src/ws.test.mjs @@ -0,0 +1,43 @@ +import { describe, it, expect, vi } from "vitest"; +import { createRequire } from "module"; + +const require = createRequire(import.meta.url); +const { heartbeatSweep } = require("./ws.js"); + +function fakeSocket(isAlive) { + return { isAlive, ping: vi.fn(), terminate: vi.fn() }; +} + +describe("ws heartbeat sweep", () => { + it("terminates a socket that has not ponged since the last sweep", () => { + const dead = fakeSocket(false); + + heartbeatSweep([dead]); + + expect(dead.terminate).toHaveBeenCalledTimes(1); + expect(dead.ping).not.toHaveBeenCalled(); + }); + + it("pings a live socket and marks it pending until its next pong", () => { + const alive = fakeSocket(true); + + heartbeatSweep([alive]); + + expect(alive.ping).toHaveBeenCalledTimes(1); + expect(alive.terminate).not.toHaveBeenCalled(); + expect(alive.isAlive).toBe(false); + }); + + it("terminates the dead and pings the live in the same sweep", () => { + const dead = fakeSocket(false); + const alive = fakeSocket(true); + + heartbeatSweep(new Set([dead, alive])); + + expect(dead.terminate).toHaveBeenCalledTimes(1); + expect(dead.ping).not.toHaveBeenCalled(); + expect(alive.ping).toHaveBeenCalledTimes(1); + expect(alive.terminate).not.toHaveBeenCalled(); + expect(alive.isAlive).toBe(false); + }); +});