From 9619703a58960cd8001c2664a4a53afe1fcbc38b Mon Sep 17 00:00:00 2001 From: Nystik <236107-Nystik@users.noreply.gitlab.com> Date: Tue, 16 Jun 2026 01:01:30 +0200 Subject: [PATCH] resync the metadata cache on websocket reconnect --- packages/shim/src/fs/index.js | 8 +- packages/shim/src/fs/metadata-cache.js | 5 + packages/shim/src/fs/watcher-client.js | 80 +++++++++++++++- packages/shim/src/fs/watcher-client.test.js | 101 ++++++++++++++++++++ packages/shim/src/ws-client.js | 25 +++++ packages/shim/src/ws-client.test.js | 63 ++++++++++++ 6 files changed, 280 insertions(+), 2 deletions(-) create mode 100644 packages/shim/src/fs/watcher-client.test.js create mode 100644 packages/shim/src/ws-client.test.js diff --git a/packages/shim/src/fs/index.js b/packages/shim/src/fs/index.js index afd8cbf..72c2d8f 100644 --- a/packages/shim/src/fs/index.js +++ b/packages/shim/src/fs/index.js @@ -18,7 +18,13 @@ const contentCache = new ContentCache(); const fsPromises = createFsPromises(metadataCache, contentCache, transport); const fsSync = createFsSync(metadataCache, contentCache, transport); const fsWatch = createFsWatch(transport); -const watcherClient = createWatcherClient(metadataCache, contentCache, fsWatch, wsClient); +const watcherClient = createWatcherClient( + metadataCache, + contentCache, + fsWatch, + wsClient, + transport, +); const fdOps = createFdOps(metadataCache, contentCache, transport); const fsCallbacks = createFsCallbacks(fsPromises); diff --git a/packages/shim/src/fs/metadata-cache.js b/packages/shim/src/fs/metadata-cache.js index f933bc2..cdcd01b 100644 --- a/packages/shim/src/fs/metadata-cache.js +++ b/packages/shim/src/fs/metadata-cache.js @@ -93,6 +93,11 @@ export class MetadataCache { return this._entries.size; } + // Normalized keys of every entry, for callers that diff the cache against a fresh tree. + keys() { + return [...this._entries.keys()]; + } + toStat(path) { const meta = this.get(path); diff --git a/packages/shim/src/fs/watcher-client.js b/packages/shim/src/fs/watcher-client.js index ed64283..c80d68e 100644 --- a/packages/shim/src/fs/watcher-client.js +++ b/packages/shim/src/fs/watcher-client.js @@ -2,8 +2,17 @@ // The WebSocket itself is owned by ws-client.js; this module is a consumer. import { isRecentLocalOp } from "./echo-guard.js"; +import { normalize } from "../util/path.js"; -export function createWatcherClient(metadataCache, contentCache, fsWatch, wsClient) { +const RESYNC_DEBOUNCE_MS = 1000; + +export function createWatcherClient( + metadataCache, + contentCache, + fsWatch, + wsClient, + transport, +) { function handleCreated(msg) { const { path, stat } = msg; @@ -72,6 +81,74 @@ export function createWatcherClient(metadataCache, contentCache, fsWatch, wsClie wsClient.subscribe("modified", handleModified); wsClient.subscribe("deleted", handleDeleted); + // Re-derive the cache from a freshly fetched tree after a reconnect. + // Each delta runs through the live-event handlers, matching live behavior. + function reconcile(tree) { + const fresh = new Set(Object.keys(tree).map(normalize)); + + for (const [path, meta] of Object.entries(tree)) { + const existing = metadataCache.get(path); + + if (!existing) { + if (meta.type === "directory") { + handleFolderCreated({ path }); + } else { + handleCreated({ + path, + stat: { size: meta.size, mtime: meta.mtime, ctime: meta.ctime }, + }); + } + } else if ( + meta.type === "file" && + (existing.mtime !== meta.mtime || existing.size !== meta.size) + ) { + handleModified({ + path, + stat: { size: meta.size, mtime: meta.mtime, ctime: meta.ctime }, + }); + } + } + + // A cache key absent from the fresh tree was deleted while disconnected. + // The empty root key is preserved because the tree never lists it. + for (const key of metadataCache.keys()) { + if (key === "" || fresh.has(key)) { + continue; + } + + handleDeleted({ path: key }); + } + } + + async function resync() { + let tree; + + try { + tree = await transport.fetchTree(); + } catch (e) { + console.warn("[shim:fs] reconnect resync failed:", e); + return; + } + + reconcile(tree); + } + + // Coalesce a burst of reconnects into a single resync once the socket settles. + let resyncTimer = null; + + function scheduleResync() { + if (resyncTimer) { + clearTimeout(resyncTimer); + } + + resyncTimer = setTimeout(() => { + resyncTimer = null; + resync(); + }, RESYNC_DEBOUNCE_MS); + } + + wsClient.onReconnect(scheduleResync); + function connect(vaultId) { wsClient.connect(vaultId); } @@ -83,5 +160,6 @@ export function createWatcherClient(metadataCache, contentCache, fsWatch, wsClie return { connect, disconnect, + reconcile, }; } diff --git a/packages/shim/src/fs/watcher-client.test.js b/packages/shim/src/fs/watcher-client.test.js new file mode 100644 index 0000000..ee789ea --- /dev/null +++ b/packages/shim/src/fs/watcher-client.test.js @@ -0,0 +1,101 @@ +import { describe, it, expect, vi } from "vitest"; +import { createWatcherClient } from "./watcher-client.js"; +import { markLocalOp } from "./echo-guard.js"; + +function makeDeps() { + const store = new Map(); + + const metadataCache = { + get: (p) => store.get(p) || null, + set: (p, m) => store.set(p, m), + delete: (p) => store.delete(p), + has: (p) => store.has(p), + keys: () => [...store.keys()], + }; + + const contentCache = { + invalidate: vi.fn(), + set: vi.fn(), + delete: vi.fn(), + get: () => null, + }; + + const fsWatch = { _dispatch: vi.fn() }; + const wsClient = { subscribe: vi.fn(), onReconnect: vi.fn() }; + const transport = { fetchTree: vi.fn() }; + + const client = createWatcherClient( + metadataCache, + contentCache, + fsWatch, + wsClient, + transport, + ); + + return { store, metadataCache, contentCache, fsWatch, wsClient, transport, client }; +} + +describe("watcher-client reconcile", () => { + it("adds a file present in the tree but missing from the cache", () => { + const d = makeDeps(); + + d.client.reconcile({ "new.md": { type: "file", size: 5, mtime: 100, ctime: 50 } }); + + expect(d.store.get("new.md")).toMatchObject({ type: "file", size: 5 }); + expect(d.contentCache.invalidate).toHaveBeenCalledWith("new.md"); + expect(d.fsWatch._dispatch).toHaveBeenCalledWith("created", "new.md"); + }); + + it("adds a directory as a folder", () => { + const d = makeDeps(); + + d.client.reconcile({ newdir: { type: "directory" } }); + + expect(d.store.get("newdir")).toEqual({ type: "directory" }); + expect(d.fsWatch._dispatch).toHaveBeenCalledWith("folder-created", "newdir"); + }); + + it("modifies a file whose mtime or size changed", () => { + const d = makeDeps(); + d.store.set("a.md", { type: "file", size: 1, mtime: 10 }); + + d.client.reconcile({ "a.md": { type: "file", size: 2, mtime: 20, ctime: 5 } }); + + expect(d.store.get("a.md")).toMatchObject({ size: 2, mtime: 20 }); + expect(d.fsWatch._dispatch).toHaveBeenCalledWith("modified", "a.md"); + }); + + it("is a no-op for an unchanged file", () => { + const d = makeDeps(); + d.store.set("a.md", { type: "file", size: 1, mtime: 10 }); + + d.client.reconcile({ "a.md": { type: "file", size: 1, mtime: 10, ctime: 5 } }); + + expect(d.fsWatch._dispatch).not.toHaveBeenCalled(); + }); + + it("deletes a cache entry absent from the tree and preserves the root", () => { + const d = makeDeps(); + d.store.set("", { type: "directory" }); + d.store.set("gone.md", { type: "file", size: 1, mtime: 10 }); + d.store.set("keep.md", { type: "file", size: 1, mtime: 10 }); + + d.client.reconcile({ "keep.md": { type: "file", size: 1, mtime: 10, ctime: 5 } }); + + expect(d.store.has("gone.md")).toBe(false); + expect(d.store.has("")).toBe(true); + expect(d.fsWatch._dispatch).toHaveBeenCalledWith("deleted", "gone.md"); + expect(d.fsWatch._dispatch).not.toHaveBeenCalledWith("deleted", "keep.md"); + }); + + it("skips a path with a recent local op", () => { + const d = makeDeps(); + const p = "recent-local-op-reconcile.md"; + markLocalOp(p); + + d.client.reconcile({ [p]: { type: "file", size: 5, mtime: 100, ctime: 50 } }); + + expect(d.store.has(p)).toBe(false); + expect(d.fsWatch._dispatch).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/shim/src/ws-client.js b/packages/shim/src/ws-client.js index 531bafa..e27efdb 100644 --- a/packages/shim/src/ws-client.js +++ b/packages/shim/src/ws-client.js @@ -8,12 +8,14 @@ export function createWsClient() { let vaultId = null; let reconnectTimer = null; let manuallyClosed = false; + let hasConnectedBefore = false; let state = "closed"; // "closed" | "connecting" | "open" const globalSubs = new Map(); // type -> Set const channelSubs = new Map(); // channelName -> Map> const channelSubCount = new Map(); // channelName -> integer const stateSubs = new Set(); // handler(state) + const reconnectSubs = new Set(); // handler() fired on a re-open, not the first open function setState(next) { if (state === next) { @@ -107,6 +109,20 @@ export function createWsClient() { for (const name of channelSubCount.keys()) { sendSubscribeChannel(name); } + + // A re-open can miss watcher events that fired while the socket was down. + // Boot covers the first open, so handlers fire only on later opens. + if (hasConnectedBefore) { + for (const fn of reconnectSubs) { + try { + fn(); + } catch (e) { + console.error("[ws] reconnect subscriber threw:", e); + } + } + } else { + hasConnectedBefore = true; + } }; ws.onmessage = (event) => { @@ -252,6 +268,14 @@ export function createWsClient() { }; } + function onReconnect(handler) { + reconnectSubs.add(handler); + + return () => { + reconnectSubs.delete(handler); + }; + } + return { connect, disconnect, @@ -260,6 +284,7 @@ export function createWsClient() { channel, isOpen, onStateChange, + onReconnect, }; } diff --git a/packages/shim/src/ws-client.test.js b/packages/shim/src/ws-client.test.js new file mode 100644 index 0000000..e9a96fe --- /dev/null +++ b/packages/shim/src/ws-client.test.js @@ -0,0 +1,63 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { createWsClient } from "./ws-client.js"; + +let sockets; + +beforeEach(() => { + sockets = []; + + class FakeWebSocket { + constructor(url) { + this.url = url; + this.readyState = 0; + sockets.push(this); + } + send() {} + close() {} + } + + FakeWebSocket.OPEN = 1; + globalThis.WebSocket = FakeWebSocket; + globalThis.window = { location: { protocol: "http:", host: "localhost" } }; + vi.useFakeTimers(); +}); + +afterEach(() => { + vi.useRealTimers(); + delete globalThis.WebSocket; + delete globalThis.window; +}); + +describe("ws-client reconnect", () => { + it("fires onReconnect on a re-open but not on the first open", () => { + const client = createWsClient(); + const onReconnect = vi.fn(); + client.onReconnect(onReconnect); + + client.connect("v1"); + sockets[0].onopen(); + expect(onReconnect).not.toHaveBeenCalled(); + + sockets[0].onclose(); + vi.advanceTimersByTime(2000); + sockets[1].onopen(); + + expect(onReconnect).toHaveBeenCalledTimes(1); + }); + + it("stops firing after unsubscribe", () => { + const client = createWsClient(); + const onReconnect = vi.fn(); + const off = client.onReconnect(onReconnect); + + client.connect("v1"); + sockets[0].onopen(); + off(); + + sockets[0].onclose(); + vi.advanceTimersByTime(2000); + sockets[1].onopen(); + + expect(onReconnect).not.toHaveBeenCalled(); + }); +});