mirror of
https://github.com/Nystik-gh/ignis.git
synced 2026-06-17 04:35:53 +00:00
add write coalescing
This commit is contained in:
@@ -75,6 +75,8 @@ module.exports = {
|
||||
vaults = discoverVaults();
|
||||
return vaults;
|
||||
},
|
||||
writeCoalesceMs: parseInt(process.env.WRITE_COALESCE_MS) || 5000,
|
||||
|
||||
obsidianAssetsPath:
|
||||
process.env.OBSIDIAN_ASSETS_PATH ||
|
||||
path.join(__dirname, "..", "investigation", "obsidian_1.12.4_unpacked"),
|
||||
|
||||
@@ -9,6 +9,7 @@ const watcher = require("./watcher");
|
||||
const { updateBridgePluginInAllVaults } = require("./bridge-plugin");
|
||||
const { initPlugins, shutdownPlugins } = require("./plugin-system/manager");
|
||||
const pluginRoutes = require("./routes/plugins");
|
||||
const { flushAll } = require("./write-coalescer");
|
||||
|
||||
const ANSI_RED = "\x1b[31m";
|
||||
const ANSI_YELLOW = "\x1b[33m";
|
||||
@@ -159,6 +160,7 @@ const wss = setupWebSocket(server);
|
||||
async function gracefulShutdown(signal) {
|
||||
console.log(`\n[ignis] Received ${signal}, shutting down gracefully...`);
|
||||
|
||||
await flushAll();
|
||||
await shutdownPlugins();
|
||||
|
||||
server.close(() => {
|
||||
|
||||
@@ -3,6 +3,7 @@ const fs = require("fs");
|
||||
const path = require("path");
|
||||
const archiver = require("archiver");
|
||||
const config = require("../config");
|
||||
const { writeCoalesced, getPending } = require("../write-coalescer");
|
||||
|
||||
const router = express.Router();
|
||||
|
||||
@@ -115,6 +116,25 @@ router.get("/stat", async (req, res) => {
|
||||
}
|
||||
|
||||
try {
|
||||
// If a coalesced write is pending, report its size instead of stale disk data
|
||||
const buffered = getPending(resolved);
|
||||
|
||||
if (buffered) {
|
||||
const diskStat = await fs.promises.stat(resolved).catch(() => null);
|
||||
const size = Buffer.isBuffer(buffered.data)
|
||||
? buffered.data.length
|
||||
: Buffer.byteLength(buffered.data, buffered.encoding || "utf-8");
|
||||
|
||||
res.json({
|
||||
type: "file",
|
||||
size,
|
||||
mtime: Date.now(),
|
||||
ctime: diskStat ? diskStat.ctimeMs : Date.now(),
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const stat = await fs.promises.stat(resolved);
|
||||
|
||||
res.json({
|
||||
@@ -183,6 +203,21 @@ router.get("/readFile", async (req, res) => {
|
||||
});
|
||||
}
|
||||
|
||||
// Serve buffered content if a coalesced write is pending for this path
|
||||
const buffered = getPending(resolved);
|
||||
|
||||
if (buffered) {
|
||||
const encoding = req.query.encoding;
|
||||
|
||||
if (encoding === "utf8" || encoding === "utf-8") {
|
||||
res.type("text/plain").send(buffered.data);
|
||||
} else {
|
||||
res.type("application/octet-stream").send(buffered.data);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const encoding = req.query.encoding;
|
||||
|
||||
if (encoding === "utf8" || encoding === "utf-8") {
|
||||
@@ -221,15 +256,9 @@ router.post("/writeFile", async (req, res) => {
|
||||
data = Buffer.from(req.body.content, "base64");
|
||||
}
|
||||
|
||||
await fs.promises.writeFile(
|
||||
resolved,
|
||||
data,
|
||||
encoding === "binary" ? undefined : encoding,
|
||||
);
|
||||
const result = await writeCoalesced(resolved, data, encoding);
|
||||
|
||||
const stat = await fs.promises.stat(resolved);
|
||||
|
||||
res.json({ ok: true, mtime: stat.mtimeMs, size: stat.size });
|
||||
res.json({ ok: true, mtime: result.mtime, size: result.size });
|
||||
} catch (e) {
|
||||
res.status(500).json({ error: e.message, code: e.code });
|
||||
}
|
||||
|
||||
176
server/write-coalescer.js
Normal file
176
server/write-coalescer.js
Normal file
@@ -0,0 +1,176 @@
|
||||
// Write coalescer for slow filesystems (rclone, FUSE, NFS, SMB).
|
||||
//
|
||||
// First write to a path goes to disk immediately. Subsequent writes within
|
||||
// the coalesce window are buffered; the timer resets on each write. After
|
||||
// the window elapses with no new writes, the buffered data is flushed.
|
||||
//
|
||||
// This prevents rapid-fire writes (e.g. workspace.json saved 20x/min)
|
||||
// from overwhelming network-mounted filesystems.
|
||||
|
||||
const fs = require("fs");
|
||||
const config = require("./config");
|
||||
|
||||
const FLUSH_TIMEOUT_MS = 10000;
|
||||
|
||||
// absPath -> timestamp of last completed write
|
||||
const lastWriteTime = new Map();
|
||||
|
||||
// absPath -> { data, encoding, timer, resolvers: [{ resolve, reject }] }
|
||||
const pending = new Map();
|
||||
|
||||
async function writeToDisk(absPath, data, encoding) {
|
||||
await fs.promises.writeFile(
|
||||
absPath,
|
||||
data,
|
||||
encoding === "binary" ? undefined : encoding,
|
||||
);
|
||||
|
||||
lastWriteTime.set(absPath, Date.now());
|
||||
const stat = await fs.promises.stat(absPath);
|
||||
|
||||
return { mtime: stat.mtimeMs, size: stat.size };
|
||||
}
|
||||
|
||||
function flushEntry(absPath) {
|
||||
const entry = pending.get(absPath);
|
||||
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
|
||||
clearTimeout(entry.timer);
|
||||
pending.delete(absPath);
|
||||
|
||||
writeToDisk(absPath, entry.data, entry.encoding).then(
|
||||
(result) => {
|
||||
for (const r of entry.resolvers) {
|
||||
r.resolve(result);
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
for (const r of entry.resolvers) {
|
||||
r.reject(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
function scheduleFlush(absPath) {
|
||||
const entry = pending.get(absPath);
|
||||
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
|
||||
clearTimeout(entry.timer);
|
||||
entry.timer = setTimeout(() => flushEntry(absPath), config.writeCoalesceMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write file content, coalescing rapid writes.
|
||||
* The returned promise resolves with { mtime, size } once data hits disk.
|
||||
*/
|
||||
async function writeCoalesced(absPath, data, encoding) {
|
||||
const windowMs = config.writeCoalesceMs;
|
||||
|
||||
// Coalescing disabled or first write to this path
|
||||
const last = lastWriteTime.get(absPath);
|
||||
|
||||
if (windowMs <= 0 || !last || Date.now() - last >= windowMs) {
|
||||
// Resolve any pending write for this path first
|
||||
if (pending.has(absPath)) {
|
||||
clearTimeout(pending.get(absPath).timer);
|
||||
pending.delete(absPath);
|
||||
}
|
||||
|
||||
return writeToDisk(absPath, data, encoding);
|
||||
}
|
||||
|
||||
// Within the coalesce window: buffer the write
|
||||
return new Promise((resolve, reject) => {
|
||||
const existing = pending.get(absPath);
|
||||
|
||||
if (existing) {
|
||||
// Update data and add another resolver
|
||||
existing.data = data;
|
||||
existing.encoding = encoding;
|
||||
existing.resolvers.push({ resolve, reject });
|
||||
scheduleFlush(absPath);
|
||||
} else {
|
||||
const entry = {
|
||||
data,
|
||||
encoding,
|
||||
timer: null,
|
||||
resolvers: [{ resolve, reject }],
|
||||
};
|
||||
|
||||
pending.set(absPath, entry);
|
||||
scheduleFlush(absPath);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get pending (not yet flushed) data for a path, or null.
|
||||
* Used by readFile to serve buffered content instead of stale disk data.
|
||||
*/
|
||||
function getPending(absPath) {
|
||||
const entry = pending.get(absPath);
|
||||
|
||||
if (entry) {
|
||||
return { data: entry.data, encoding: entry.encoding };
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush all pending writes to disk. Called on graceful shutdown.
|
||||
*/
|
||||
async function flushAll() {
|
||||
const paths = [...pending.keys()];
|
||||
|
||||
if (paths.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[write-coalesce] Flushing ${paths.length} pending write(s)...`);
|
||||
|
||||
// Clear all timers
|
||||
for (const entry of pending.values()) {
|
||||
clearTimeout(entry.timer);
|
||||
}
|
||||
|
||||
const writes = paths.map((absPath) => {
|
||||
const entry = pending.get(absPath);
|
||||
pending.delete(absPath);
|
||||
|
||||
return writeToDisk(absPath, entry.data, entry.encoding).then(
|
||||
(result) => {
|
||||
for (const r of entry.resolvers) {
|
||||
r.resolve(result);
|
||||
}
|
||||
},
|
||||
(err) => {
|
||||
console.error(`[write-coalesce] Failed to flush ${absPath}:`, err);
|
||||
|
||||
for (const r of entry.resolvers) {
|
||||
r.reject(err);
|
||||
}
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
const timeout = new Promise((resolve) => {
|
||||
setTimeout(() => {
|
||||
console.warn(
|
||||
"[write-coalesce] Flush timeout -- some writes may be lost",
|
||||
);
|
||||
resolve();
|
||||
}, FLUSH_TIMEOUT_MS);
|
||||
});
|
||||
|
||||
await Promise.race([Promise.allSettled(writes), timeout]);
|
||||
}
|
||||
|
||||
module.exports = { writeCoalesced, getPending, flushAll };
|
||||
Reference in New Issue
Block a user