mirror of
https://github.com/Nystik-gh/ignis.git
synced 2026-06-17 04:35:53 +00:00
break out code into server-core
This commit is contained in:
@@ -1,5 +1,10 @@
|
||||
{
|
||||
"name": "@ignis/server-core",
|
||||
"version": "0.0.0-internal",
|
||||
"private": true
|
||||
"private": true,
|
||||
"main": "src/index.js",
|
||||
"dependencies": {
|
||||
"chokidar": "^3.6.0",
|
||||
"ws": "^8.16.0"
|
||||
}
|
||||
}
|
||||
|
||||
15
packages/server-core/src/index.js
Normal file
15
packages/server-core/src/index.js
Normal file
@@ -0,0 +1,15 @@
|
||||
const writeCoalescer = require("./write-coalescer");
|
||||
const watcher = require("./watcher");
|
||||
const { setupWebSocket } = require("./ws");
|
||||
const {
|
||||
encodeContentDispositionFilename,
|
||||
resolveVaultPath,
|
||||
} = require("./path-utils");
|
||||
|
||||
module.exports = {
|
||||
writeCoalescer,
|
||||
watcher,
|
||||
setupWebSocket,
|
||||
encodeContentDispositionFilename,
|
||||
resolveVaultPath,
|
||||
};
|
||||
64
packages/server-core/src/path-utils.js
Normal file
64
packages/server-core/src/path-utils.js
Normal file
@@ -0,0 +1,64 @@
|
||||
const path = require("path");
|
||||
|
||||
/**
|
||||
* Encode a filename for use in Content-Disposition header.
|
||||
* Handles non-ASCII characters and special characters to prevent header injection.
|
||||
* Uses RFC 5987 encoding for filename* parameter when needed.
|
||||
*/
|
||||
function encodeContentDispositionFilename(filename) {
|
||||
const hasNonASCII = /[^\x00-\x7F]/.test(filename);
|
||||
|
||||
// Escape quotes and backslashes in ASCII filename
|
||||
const escapedFilename = filename.replace(/["\\ ]/g, function (match) {
|
||||
if (match === '"') return '\\"';
|
||||
if (match === "\\") return "\\\\";
|
||||
return match;
|
||||
});
|
||||
|
||||
// Remove any control characters that could cause header injection
|
||||
const sanitizedFilename = escapedFilename.replace(/[\x00-\x1F\x7F]/g, "");
|
||||
|
||||
if (!hasNonASCII) {
|
||||
// Simple ASCII filename - use standard format
|
||||
return `attachment; filename="${sanitizedFilename}"`;
|
||||
}
|
||||
|
||||
// Non-ASCII filename - use RFC 5987 encoding
|
||||
// Encode using percent-encoding for UTF-8
|
||||
const encodedFilename = encodeURIComponent(filename)
|
||||
.replace(/['()]/g, function (c) {
|
||||
return "%" + c.charCodeAt(0).toString(16).toUpperCase();
|
||||
})
|
||||
.replace(/\*/g, "%2A");
|
||||
|
||||
// Provide both filename (ASCII fallback) and filename* (UTF-8 encoded)
|
||||
// For fallback, replace non-ASCII with underscores
|
||||
const asciiFallback = filename
|
||||
.replace(/[^\x00-\x7F]/g, "_")
|
||||
.replace(/["\\ ]/g, function (match) {
|
||||
if (match === '"') return '\\"';
|
||||
if (match === "\\") return "\\\\";
|
||||
return match;
|
||||
});
|
||||
|
||||
return `attachment; filename="${asciiFallback}"; filename*=UTF-8''${encodedFilename}`;
|
||||
}
|
||||
|
||||
// Resolve a client-provided path to an absolute path within a vault.
|
||||
// Strips leading slashes so paths from the client are always treated as relative to the vault root.
|
||||
function resolveVaultPath(vaultRoot, relativePath) {
|
||||
const cleaned = (relativePath || "").replace(/^\/+/, "");
|
||||
const resolved = path.resolve(vaultRoot, cleaned);
|
||||
|
||||
const resolvedRoot = path.resolve(vaultRoot);
|
||||
|
||||
if (
|
||||
resolved !== resolvedRoot &&
|
||||
!resolved.startsWith(resolvedRoot + path.sep)
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
return resolved;
|
||||
}
|
||||
|
||||
module.exports = { encodeContentDispositionFilename, resolveVaultPath };
|
||||
120
packages/server-core/src/watcher.js
Normal file
120
packages/server-core/src/watcher.js
Normal file
@@ -0,0 +1,120 @@
|
||||
const chokidar = require("chokidar");
|
||||
const path = require("path");
|
||||
const fs = require("fs");
|
||||
|
||||
// Per-vault chokidar watchers
|
||||
// Map<vaultId, { watcher, listeners: Set<fn>, vaultPath }>
|
||||
const vaultWatchers = new Map();
|
||||
|
||||
function startWatching(vaultId, vaultPath) {
|
||||
if (vaultWatchers.has(vaultId)) {
|
||||
return vaultWatchers.get(vaultId);
|
||||
}
|
||||
|
||||
const watcher = chokidar.watch(vaultPath, {
|
||||
persistent: true,
|
||||
ignoreInitial: true,
|
||||
awaitWriteFinish: {
|
||||
stabilityThreshold: 300,
|
||||
pollInterval: 100,
|
||||
},
|
||||
ignored: [
|
||||
/(^|[\/\\])\.git([\/\\]|$)/, // .git directories
|
||||
],
|
||||
});
|
||||
|
||||
const entry = { watcher, listeners: new Set(), vaultPath };
|
||||
|
||||
function emit(type, fullPath, stat) {
|
||||
const rel = path.relative(vaultPath, fullPath).replace(/\\/g, "/");
|
||||
|
||||
const event = { type, path: rel };
|
||||
|
||||
if (stat) {
|
||||
event.stat = {
|
||||
size: stat.size,
|
||||
mtime: stat.mtimeMs,
|
||||
ctime: stat.ctimeMs,
|
||||
};
|
||||
}
|
||||
|
||||
for (const fn of entry.listeners) {
|
||||
try {
|
||||
fn(event);
|
||||
} catch (e) {
|
||||
console.error("[watcher] Listener error:", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
watcher
|
||||
.on("add", (fullPath) => {
|
||||
try {
|
||||
const stat = fs.statSync(fullPath);
|
||||
emit("created", fullPath, stat);
|
||||
} catch {
|
||||
emit("created", fullPath, null);
|
||||
}
|
||||
})
|
||||
.on("change", (fullPath) => {
|
||||
try {
|
||||
const stat = fs.statSync(fullPath);
|
||||
emit("modified", fullPath, stat);
|
||||
} catch {
|
||||
emit("modified", fullPath, null);
|
||||
}
|
||||
})
|
||||
.on("unlink", (fullPath) => {
|
||||
emit("deleted", fullPath, null);
|
||||
})
|
||||
.on("addDir", (fullPath) => {
|
||||
// Skip vault root itself
|
||||
if (path.resolve(fullPath) === path.resolve(vaultPath)) return;
|
||||
emit("folder-created", fullPath, null);
|
||||
})
|
||||
.on("unlinkDir", (fullPath) => {
|
||||
emit("deleted", fullPath, null);
|
||||
})
|
||||
.on("error", (err) => {
|
||||
console.error(`[watcher] Error on vault "${vaultId}":`, err.message);
|
||||
});
|
||||
|
||||
vaultWatchers.set(vaultId, entry);
|
||||
console.log(`[watcher] Started watching vault: ${vaultId}`);
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
function stopWatching(vaultId) {
|
||||
const entry = vaultWatchers.get(vaultId);
|
||||
|
||||
if (entry) {
|
||||
entry.watcher.close();
|
||||
entry.listeners.clear();
|
||||
vaultWatchers.delete(vaultId);
|
||||
console.log(`[watcher] Stopped watching vault: ${vaultId}`);
|
||||
}
|
||||
}
|
||||
|
||||
function addListener(vaultId, fn) {
|
||||
const entry = vaultWatchers.get(vaultId);
|
||||
|
||||
if (entry) {
|
||||
entry.listeners.add(fn);
|
||||
}
|
||||
}
|
||||
|
||||
function removeListener(vaultId, fn) {
|
||||
const entry = vaultWatchers.get(vaultId);
|
||||
|
||||
if (entry) {
|
||||
entry.listeners.delete(fn);
|
||||
|
||||
// Stop watching if no listeners remain
|
||||
if (entry.listeners.size === 0) {
|
||||
stopWatching(vaultId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = { startWatching, stopWatching, addListener, removeListener };
|
||||
170
packages/server-core/src/write-coalescer.js
Normal file
170
packages/server-core/src/write-coalescer.js
Normal file
@@ -0,0 +1,170 @@
|
||||
// Write coalescer for slow filesystems (rclone, FUSE, NFS, SMB).
|
||||
//
|
||||
// First write to a path goes to disk immediately. Subsequent writes within the coalesce window are buffered and flushed when the debounce timer fires; the timer resets on each write.
|
||||
//
|
||||
// Buffered writes respond to the HTTP client right away with synthetic mtime/size. Otherwise the browser's per-host connection cap blocks unrelated reads while writes sit in the buffer.
|
||||
|
||||
const fs = require("fs");
|
||||
|
||||
const FLUSH_TIMEOUT_MS = 10000;
|
||||
|
||||
// Coalesce window in ms. 0 disables coalescing. Set via configure({ writeCoalesceMs }).
|
||||
let writeCoalesceMs = 0;
|
||||
|
||||
function configure(opts) {
|
||||
if (typeof opts?.writeCoalesceMs === "number") {
|
||||
writeCoalesceMs = opts.writeCoalesceMs;
|
||||
}
|
||||
}
|
||||
|
||||
// absPath -> timestamp of last completed (or scheduled) write
|
||||
const lastWriteTime = new Map();
|
||||
|
||||
// absPath -> { data, encoding, timer }
|
||||
const pending = new Map();
|
||||
|
||||
async function writeToDisk(absPath, data, encoding) {
|
||||
await fs.promises.writeFile(
|
||||
absPath,
|
||||
data,
|
||||
encoding === "binary" ? undefined : encoding,
|
||||
);
|
||||
|
||||
lastWriteTime.set(absPath, Date.now());
|
||||
const stat = await fs.promises.stat(absPath);
|
||||
|
||||
return { mtime: stat.mtimeMs, size: stat.size };
|
||||
}
|
||||
|
||||
function flushEntry(absPath) {
|
||||
const entry = pending.get(absPath);
|
||||
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
|
||||
clearTimeout(entry.timer);
|
||||
pending.delete(absPath);
|
||||
|
||||
writeToDisk(absPath, entry.data, entry.encoding).catch((err) => {
|
||||
console.error(`[write-coalesce] Flush failed for ${absPath}:`, err);
|
||||
});
|
||||
}
|
||||
|
||||
function scheduleFlush(absPath) {
|
||||
const entry = pending.get(absPath);
|
||||
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
|
||||
clearTimeout(entry.timer);
|
||||
entry.timer = setTimeout(() => flushEntry(absPath), writeCoalesceMs);
|
||||
}
|
||||
|
||||
function estimateSize(data, encoding) {
|
||||
if (typeof data === "string") {
|
||||
return Buffer.byteLength(data, encoding === "binary" ? "utf-8" : encoding);
|
||||
}
|
||||
|
||||
return data.length || data.byteLength || 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write file content, coalescing rapid writes.
|
||||
* Fresh writes resolve with real mtime/size once data is on disk. Buffered writes resolve immediately with synthetic values; the disk flush happens later when the debounce timer fires.
|
||||
*/
|
||||
async function writeCoalesced(absPath, data, encoding) {
|
||||
const windowMs = writeCoalesceMs;
|
||||
const last = lastWriteTime.get(absPath);
|
||||
|
||||
// Fast path: coalescing disabled or far enough from the last write.
|
||||
if (windowMs <= 0 || !last || Date.now() - last >= windowMs) {
|
||||
if (pending.has(absPath)) {
|
||||
clearTimeout(pending.get(absPath).timer);
|
||||
pending.delete(absPath);
|
||||
}
|
||||
|
||||
return writeToDisk(absPath, data, encoding);
|
||||
}
|
||||
|
||||
// Within the coalesce window: buffer the write and respond immediately.
|
||||
const existing = pending.get(absPath);
|
||||
|
||||
if (existing) {
|
||||
existing.data = data;
|
||||
existing.encoding = encoding;
|
||||
scheduleFlush(absPath);
|
||||
} else {
|
||||
pending.set(absPath, {
|
||||
data,
|
||||
encoding,
|
||||
timer: null,
|
||||
});
|
||||
scheduleFlush(absPath);
|
||||
}
|
||||
|
||||
return { mtime: Date.now(), size: estimateSize(data, encoding) };
|
||||
}
|
||||
|
||||
/**
|
||||
* Get pending (not yet flushed) data for a path, or null.
|
||||
* Used by readFile to serve buffered content instead of stale disk data.
|
||||
*/
|
||||
function getPending(absPath) {
|
||||
const entry = pending.get(absPath);
|
||||
|
||||
if (entry) {
|
||||
return { data: entry.data, encoding: entry.encoding };
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush all pending writes to disk. Called on graceful shutdown.
|
||||
*/
|
||||
async function flushAll() {
|
||||
const paths = [...pending.keys()];
|
||||
|
||||
if (paths.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[write-coalesce] Flushing ${paths.length} pending write(s)...`);
|
||||
|
||||
for (const entry of pending.values()) {
|
||||
clearTimeout(entry.timer);
|
||||
}
|
||||
|
||||
const writes = paths.map((absPath) => {
|
||||
const entry = pending.get(absPath);
|
||||
pending.delete(absPath);
|
||||
|
||||
return writeToDisk(absPath, entry.data, entry.encoding).catch((err) => {
|
||||
console.error(`[write-coalesce] Failed to flush ${absPath}:`, err);
|
||||
});
|
||||
});
|
||||
|
||||
const timeout = new Promise((resolve) => {
|
||||
setTimeout(() => {
|
||||
console.warn(
|
||||
"[write-coalesce] Flush timeout -- some writes may be lost",
|
||||
);
|
||||
resolve();
|
||||
}, FLUSH_TIMEOUT_MS);
|
||||
});
|
||||
|
||||
await Promise.race([Promise.allSettled(writes), timeout]);
|
||||
}
|
||||
|
||||
// Test-only: clear all internal state. Not exported for production use.
|
||||
function _reset() {
|
||||
for (const entry of pending.values()) {
|
||||
clearTimeout(entry.timer);
|
||||
}
|
||||
pending.clear();
|
||||
lastWriteTime.clear();
|
||||
}
|
||||
|
||||
module.exports = { writeCoalesced, getPending, flushAll, configure, _reset };
|
||||
137
packages/server-core/src/write-coalescer.test.mjs
Normal file
137
packages/server-core/src/write-coalescer.test.mjs
Normal file
@@ -0,0 +1,137 @@
|
||||
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
|
||||
import { createRequire } from "module";
|
||||
import path from "path";
|
||||
import fs from "fs";
|
||||
import os from "os";
|
||||
|
||||
const require = createRequire(import.meta.url);
|
||||
const coalescer = require("./write-coalescer.js");
|
||||
|
||||
const SHORT_WINDOW_MS = 50;
|
||||
|
||||
let tmpDir;
|
||||
|
||||
beforeEach(async () => {
|
||||
coalescer.configure({ writeCoalesceMs: SHORT_WINDOW_MS });
|
||||
coalescer._reset();
|
||||
tmpDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), "coalesce-test-"));
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
coalescer._reset();
|
||||
vi.restoreAllMocks();
|
||||
coalescer.configure({ writeCoalesceMs: 0 });
|
||||
await fs.promises.rm(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
function sleep(ms) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
describe("writeCoalesced", () => {
|
||||
it("first write hits disk immediately with real mtime/size", async () => {
|
||||
const filePath = path.join(tmpDir, "file.txt");
|
||||
const result = await coalescer.writeCoalesced(filePath, "hello", "utf-8");
|
||||
|
||||
expect(result.size).toBe(5);
|
||||
expect(result.mtime).toBeGreaterThan(0);
|
||||
|
||||
const onDisk = await fs.promises.readFile(filePath, "utf-8");
|
||||
expect(onDisk).toBe("hello");
|
||||
});
|
||||
|
||||
it("buffered write within the window returns immediately with synthetic values and is not yet on disk", async () => {
|
||||
const filePath = path.join(tmpDir, "file.txt");
|
||||
|
||||
await coalescer.writeCoalesced(filePath, "first", "utf-8");
|
||||
|
||||
const start = Date.now();
|
||||
const result = await coalescer.writeCoalesced(filePath, "second", "utf-8");
|
||||
const elapsed = Date.now() - start;
|
||||
|
||||
expect(elapsed).toBeLessThan(10);
|
||||
expect(result.size).toBe(6);
|
||||
|
||||
const onDisk = await fs.promises.readFile(filePath, "utf-8");
|
||||
expect(onDisk).toBe("first");
|
||||
});
|
||||
|
||||
it("flushes the latest buffered data after the window elapses", async () => {
|
||||
const filePath = path.join(tmpDir, "file.txt");
|
||||
|
||||
await coalescer.writeCoalesced(filePath, "v1", "utf-8");
|
||||
await coalescer.writeCoalesced(filePath, "v2", "utf-8");
|
||||
await coalescer.writeCoalesced(filePath, "v3", "utf-8");
|
||||
|
||||
await sleep(SHORT_WINDOW_MS + 30);
|
||||
|
||||
const onDisk = await fs.promises.readFile(filePath, "utf-8");
|
||||
expect(onDisk).toBe("v3");
|
||||
});
|
||||
|
||||
it("collapses many rapid writes into exactly two disk writes", async () => {
|
||||
const filePath = path.join(tmpDir, "file.txt");
|
||||
const spy = vi.spyOn(fs.promises, "writeFile");
|
||||
|
||||
for (let i = 0; i < 20; i++) {
|
||||
await coalescer.writeCoalesced(filePath, `v${i}`, "utf-8");
|
||||
}
|
||||
|
||||
await sleep(SHORT_WINDOW_MS + 30);
|
||||
|
||||
expect(spy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("stays snappy when the filesystem is slow", async () => {
|
||||
const filePath = path.join(tmpDir, "file.txt");
|
||||
const realWrite = fs.promises.writeFile.bind(fs.promises);
|
||||
|
||||
vi.spyOn(fs.promises, "writeFile").mockImplementation(async (...args) => {
|
||||
await sleep(200);
|
||||
return realWrite(...args);
|
||||
});
|
||||
|
||||
await coalescer.writeCoalesced(filePath, "first", "utf-8");
|
||||
|
||||
const start = Date.now();
|
||||
await coalescer.writeCoalesced(filePath, "second", "utf-8");
|
||||
const elapsed = Date.now() - start;
|
||||
|
||||
expect(elapsed).toBeLessThan(20);
|
||||
});
|
||||
});
|
||||
|
||||
describe("getPending", () => {
|
||||
it("returns buffered data for paths with a pending write", async () => {
|
||||
const filePath = path.join(tmpDir, "file.txt");
|
||||
|
||||
await coalescer.writeCoalesced(filePath, "first", "utf-8");
|
||||
await coalescer.writeCoalesced(filePath, "buffered", "utf-8");
|
||||
|
||||
const pending = coalescer.getPending(filePath);
|
||||
expect(pending).not.toBeNull();
|
||||
expect(pending.data).toBe("buffered");
|
||||
});
|
||||
});
|
||||
|
||||
describe("flushAll", () => {
|
||||
it("drains all buffered writes to disk and clears pending state", async () => {
|
||||
const fileA = path.join(tmpDir, "a.txt");
|
||||
const fileB = path.join(tmpDir, "b.txt");
|
||||
|
||||
await coalescer.writeCoalesced(fileA, "first-a", "utf-8");
|
||||
await coalescer.writeCoalesced(fileA, "buffered-a", "utf-8");
|
||||
await coalescer.writeCoalesced(fileB, "first-b", "utf-8");
|
||||
await coalescer.writeCoalesced(fileB, "buffered-b", "utf-8");
|
||||
|
||||
expect(coalescer.getPending(fileA)).not.toBeNull();
|
||||
expect(coalescer.getPending(fileB)).not.toBeNull();
|
||||
|
||||
await coalescer.flushAll();
|
||||
|
||||
expect(await fs.promises.readFile(fileA, "utf-8")).toBe("buffered-a");
|
||||
expect(await fs.promises.readFile(fileB, "utf-8")).toBe("buffered-b");
|
||||
expect(coalescer.getPending(fileA)).toBeNull();
|
||||
expect(coalescer.getPending(fileB)).toBeNull();
|
||||
});
|
||||
});
|
||||
62
packages/server-core/src/ws.js
Normal file
62
packages/server-core/src/ws.js
Normal file
@@ -0,0 +1,62 @@
|
||||
const { WebSocketServer } = require("ws");
|
||||
const url = require("url");
|
||||
const watcher = require("./watcher");
|
||||
|
||||
function setupWebSocket(server, opts = {}) {
|
||||
const { getVaultPath } = opts;
|
||||
|
||||
if (typeof getVaultPath !== "function") {
|
||||
throw new Error("setupWebSocket: opts.getVaultPath is required");
|
||||
}
|
||||
|
||||
const wss = new WebSocketServer({ server, path: "/ws" });
|
||||
|
||||
// Plugin-registered message handlers: type -> handler(msg, ws)
|
||||
wss.messageHandlers = new Map();
|
||||
|
||||
wss.on("connection", (ws, req) => {
|
||||
const params = new url.URL(req.url, "http://localhost").searchParams;
|
||||
const vaultId = params.get("vault");
|
||||
|
||||
if (!vaultId || !getVaultPath(vaultId)) {
|
||||
ws.close(4001, "Invalid or missing vault ID");
|
||||
return;
|
||||
}
|
||||
|
||||
const vaultPath = getVaultPath(vaultId);
|
||||
console.log(`[ws] Client connected to vault: ${vaultId}`);
|
||||
|
||||
// Start watching this vault (no-op if already watching)
|
||||
watcher.startWatching(vaultId, vaultPath);
|
||||
|
||||
// Per-client listener that forwards events over WebSocket
|
||||
const listener = (event) => {
|
||||
if (ws.readyState === ws.OPEN) {
|
||||
ws.send(JSON.stringify(event));
|
||||
}
|
||||
};
|
||||
|
||||
watcher.addListener(vaultId, listener);
|
||||
|
||||
// Dispatch incoming messages to registered handlers
|
||||
ws.on("message", (data) => {
|
||||
try {
|
||||
const msg = JSON.parse(data);
|
||||
const handler = wss.messageHandlers.get(msg.type);
|
||||
|
||||
if (handler) {
|
||||
handler(msg, ws);
|
||||
}
|
||||
} catch {}
|
||||
});
|
||||
|
||||
ws.on("close", () => {
|
||||
console.log(`[ws] Client disconnected from vault: ${vaultId}`);
|
||||
watcher.removeListener(vaultId, listener);
|
||||
});
|
||||
});
|
||||
|
||||
return wss;
|
||||
}
|
||||
|
||||
module.exports = { setupWebSocket };
|
||||
Reference in New Issue
Block a user