resync the metadata cache on websocket reconnect

This commit is contained in:
Nystik
2026-06-16 01:01:30 +02:00
parent c60322a287
commit 9619703a58
6 changed files with 280 additions and 2 deletions

View File

@@ -18,7 +18,13 @@ const contentCache = new ContentCache();
const fsPromises = createFsPromises(metadataCache, contentCache, transport);
const fsSync = createFsSync(metadataCache, contentCache, transport);
const fsWatch = createFsWatch(transport);
const watcherClient = createWatcherClient(metadataCache, contentCache, fsWatch, wsClient);
const watcherClient = createWatcherClient(
metadataCache,
contentCache,
fsWatch,
wsClient,
transport,
);
const fdOps = createFdOps(metadataCache, contentCache, transport);
const fsCallbacks = createFsCallbacks(fsPromises);

View File

@@ -93,6 +93,11 @@ export class MetadataCache {
return this._entries.size;
}
// Normalized keys of every entry, for callers that diff the cache against a fresh tree.
keys() {
return [...this._entries.keys()];
}
toStat(path) {
const meta = this.get(path);

View File

@@ -2,8 +2,17 @@
// The WebSocket itself is owned by ws-client.js; this module is a consumer.
import { isRecentLocalOp } from "./echo-guard.js";
import { normalize } from "../util/path.js";
export function createWatcherClient(metadataCache, contentCache, fsWatch, wsClient) {
const RESYNC_DEBOUNCE_MS = 1000;
export function createWatcherClient(
metadataCache,
contentCache,
fsWatch,
wsClient,
transport,
) {
function handleCreated(msg) {
const { path, stat } = msg;
@@ -72,6 +81,74 @@ export function createWatcherClient(metadataCache, contentCache, fsWatch, wsClie
wsClient.subscribe("modified", handleModified);
wsClient.subscribe("deleted", handleDeleted);
// Re-derive the cache from a freshly fetched tree after a reconnect.
// Each delta runs through the live-event handlers, matching live behavior.
function reconcile(tree) {
const fresh = new Set(Object.keys(tree).map(normalize));
for (const [path, meta] of Object.entries(tree)) {
const existing = metadataCache.get(path);
if (!existing) {
if (meta.type === "directory") {
handleFolderCreated({ path });
} else {
handleCreated({
path,
stat: { size: meta.size, mtime: meta.mtime, ctime: meta.ctime },
});
}
} else if (
meta.type === "file" &&
(existing.mtime !== meta.mtime || existing.size !== meta.size)
) {
handleModified({
path,
stat: { size: meta.size, mtime: meta.mtime, ctime: meta.ctime },
});
}
}
// A cache key absent from the fresh tree was deleted while disconnected.
// The empty root key is preserved because the tree never lists it.
for (const key of metadataCache.keys()) {
if (key === "" || fresh.has(key)) {
continue;
}
handleDeleted({ path: key });
}
}
async function resync() {
let tree;
try {
tree = await transport.fetchTree();
} catch (e) {
console.warn("[shim:fs] reconnect resync failed:", e);
return;
}
reconcile(tree);
}
// Coalesce a burst of reconnects into a single resync once the socket settles.
let resyncTimer = null;
function scheduleResync() {
if (resyncTimer) {
clearTimeout(resyncTimer);
}
resyncTimer = setTimeout(() => {
resyncTimer = null;
resync();
}, RESYNC_DEBOUNCE_MS);
}
wsClient.onReconnect(scheduleResync);
function connect(vaultId) {
wsClient.connect(vaultId);
}
@@ -83,5 +160,6 @@ export function createWatcherClient(metadataCache, contentCache, fsWatch, wsClie
return {
connect,
disconnect,
reconcile,
};
}

View File

@@ -0,0 +1,101 @@
import { describe, it, expect, vi } from "vitest";
import { createWatcherClient } from "./watcher-client.js";
import { markLocalOp } from "./echo-guard.js";
function makeDeps() {
const store = new Map();
const metadataCache = {
get: (p) => store.get(p) || null,
set: (p, m) => store.set(p, m),
delete: (p) => store.delete(p),
has: (p) => store.has(p),
keys: () => [...store.keys()],
};
const contentCache = {
invalidate: vi.fn(),
set: vi.fn(),
delete: vi.fn(),
get: () => null,
};
const fsWatch = { _dispatch: vi.fn() };
const wsClient = { subscribe: vi.fn(), onReconnect: vi.fn() };
const transport = { fetchTree: vi.fn() };
const client = createWatcherClient(
metadataCache,
contentCache,
fsWatch,
wsClient,
transport,
);
return { store, metadataCache, contentCache, fsWatch, wsClient, transport, client };
}
describe("watcher-client reconcile", () => {
it("adds a file present in the tree but missing from the cache", () => {
const d = makeDeps();
d.client.reconcile({ "new.md": { type: "file", size: 5, mtime: 100, ctime: 50 } });
expect(d.store.get("new.md")).toMatchObject({ type: "file", size: 5 });
expect(d.contentCache.invalidate).toHaveBeenCalledWith("new.md");
expect(d.fsWatch._dispatch).toHaveBeenCalledWith("created", "new.md");
});
it("adds a directory as a folder", () => {
const d = makeDeps();
d.client.reconcile({ newdir: { type: "directory" } });
expect(d.store.get("newdir")).toEqual({ type: "directory" });
expect(d.fsWatch._dispatch).toHaveBeenCalledWith("folder-created", "newdir");
});
it("modifies a file whose mtime or size changed", () => {
const d = makeDeps();
d.store.set("a.md", { type: "file", size: 1, mtime: 10 });
d.client.reconcile({ "a.md": { type: "file", size: 2, mtime: 20, ctime: 5 } });
expect(d.store.get("a.md")).toMatchObject({ size: 2, mtime: 20 });
expect(d.fsWatch._dispatch).toHaveBeenCalledWith("modified", "a.md");
});
it("is a no-op for an unchanged file", () => {
const d = makeDeps();
d.store.set("a.md", { type: "file", size: 1, mtime: 10 });
d.client.reconcile({ "a.md": { type: "file", size: 1, mtime: 10, ctime: 5 } });
expect(d.fsWatch._dispatch).not.toHaveBeenCalled();
});
it("deletes a cache entry absent from the tree and preserves the root", () => {
const d = makeDeps();
d.store.set("", { type: "directory" });
d.store.set("gone.md", { type: "file", size: 1, mtime: 10 });
d.store.set("keep.md", { type: "file", size: 1, mtime: 10 });
d.client.reconcile({ "keep.md": { type: "file", size: 1, mtime: 10, ctime: 5 } });
expect(d.store.has("gone.md")).toBe(false);
expect(d.store.has("")).toBe(true);
expect(d.fsWatch._dispatch).toHaveBeenCalledWith("deleted", "gone.md");
expect(d.fsWatch._dispatch).not.toHaveBeenCalledWith("deleted", "keep.md");
});
it("skips a path with a recent local op", () => {
const d = makeDeps();
const p = "recent-local-op-reconcile.md";
markLocalOp(p);
d.client.reconcile({ [p]: { type: "file", size: 5, mtime: 100, ctime: 50 } });
expect(d.store.has(p)).toBe(false);
expect(d.fsWatch._dispatch).not.toHaveBeenCalled();
});
});

View File

@@ -8,12 +8,14 @@ export function createWsClient() {
let vaultId = null;
let reconnectTimer = null;
let manuallyClosed = false;
let hasConnectedBefore = false;
let state = "closed"; // "closed" | "connecting" | "open"
const globalSubs = new Map(); // type -> Set<handler>
const channelSubs = new Map(); // channelName -> Map<type, Set<handler>>
const channelSubCount = new Map(); // channelName -> integer
const stateSubs = new Set(); // handler(state)
const reconnectSubs = new Set(); // handler() fired on a re-open, not the first open
function setState(next) {
if (state === next) {
@@ -107,6 +109,20 @@ export function createWsClient() {
for (const name of channelSubCount.keys()) {
sendSubscribeChannel(name);
}
// A re-open can miss watcher events that fired while the socket was down.
// Boot covers the first open, so handlers fire only on later opens.
if (hasConnectedBefore) {
for (const fn of reconnectSubs) {
try {
fn();
} catch (e) {
console.error("[ws] reconnect subscriber threw:", e);
}
}
} else {
hasConnectedBefore = true;
}
};
ws.onmessage = (event) => {
@@ -252,6 +268,14 @@ export function createWsClient() {
};
}
function onReconnect(handler) {
reconnectSubs.add(handler);
return () => {
reconnectSubs.delete(handler);
};
}
return {
connect,
disconnect,
@@ -260,6 +284,7 @@ export function createWsClient() {
channel,
isOpen,
onStateChange,
onReconnect,
};
}

View File

@@ -0,0 +1,63 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { createWsClient } from "./ws-client.js";
let sockets;
beforeEach(() => {
sockets = [];
class FakeWebSocket {
constructor(url) {
this.url = url;
this.readyState = 0;
sockets.push(this);
}
send() {}
close() {}
}
FakeWebSocket.OPEN = 1;
globalThis.WebSocket = FakeWebSocket;
globalThis.window = { location: { protocol: "http:", host: "localhost" } };
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
delete globalThis.WebSocket;
delete globalThis.window;
});
describe("ws-client reconnect", () => {
it("fires onReconnect on a re-open but not on the first open", () => {
const client = createWsClient();
const onReconnect = vi.fn();
client.onReconnect(onReconnect);
client.connect("v1");
sockets[0].onopen();
expect(onReconnect).not.toHaveBeenCalled();
sockets[0].onclose();
vi.advanceTimersByTime(2000);
sockets[1].onopen();
expect(onReconnect).toHaveBeenCalledTimes(1);
});
it("stops firing after unsubscribe", () => {
const client = createWsClient();
const onReconnect = vi.fn();
const off = client.onReconnect(onReconnect);
client.connect("v1");
sockets[0].onopen();
off();
sockets[0].onclose();
vi.advanceTimersByTime(2000);
sockets[1].onopen();
expect(onReconnect).not.toHaveBeenCalled();
});
});