mirror of
https://github.com/Nystik-gh/ignis.git
synced 2026-06-17 04:35:53 +00:00
163 lines
4.3 KiB
JavaScript
163 lines
4.3 KiB
JavaScript
// Write coalescer for slow filesystems (rclone, FUSE, NFS, SMB).
|
|
//
|
|
// First write to a path goes to disk immediately. Subsequent writes within the coalesce window are buffered and flushed when the debounce timer fires; the timer resets on each write.
|
|
//
|
|
// Buffered writes respond to the HTTP client right away with synthetic mtime/size. Otherwise the browser's per-host connection cap blocks unrelated reads while writes sit in the buffer.
|
|
|
|
const fs = require("fs");
|
|
const config = require("./config");
|
|
|
|
const FLUSH_TIMEOUT_MS = 10000;
|
|
|
|
// absPath -> timestamp of last completed (or scheduled) write
|
|
const lastWriteTime = new Map();
|
|
|
|
// absPath -> { data, encoding, timer }
|
|
const pending = new Map();
|
|
|
|
async function writeToDisk(absPath, data, encoding) {
|
|
await fs.promises.writeFile(
|
|
absPath,
|
|
data,
|
|
encoding === "binary" ? undefined : encoding,
|
|
);
|
|
|
|
lastWriteTime.set(absPath, Date.now());
|
|
const stat = await fs.promises.stat(absPath);
|
|
|
|
return { mtime: stat.mtimeMs, size: stat.size };
|
|
}
|
|
|
|
function flushEntry(absPath) {
|
|
const entry = pending.get(absPath);
|
|
|
|
if (!entry) {
|
|
return;
|
|
}
|
|
|
|
clearTimeout(entry.timer);
|
|
pending.delete(absPath);
|
|
|
|
writeToDisk(absPath, entry.data, entry.encoding).catch((err) => {
|
|
console.error(`[write-coalesce] Flush failed for ${absPath}:`, err);
|
|
});
|
|
}
|
|
|
|
function scheduleFlush(absPath) {
|
|
const entry = pending.get(absPath);
|
|
|
|
if (!entry) {
|
|
return;
|
|
}
|
|
|
|
clearTimeout(entry.timer);
|
|
entry.timer = setTimeout(() => flushEntry(absPath), config.writeCoalesceMs);
|
|
}
|
|
|
|
function estimateSize(data, encoding) {
|
|
if (typeof data === "string") {
|
|
return Buffer.byteLength(data, encoding === "binary" ? "utf-8" : encoding);
|
|
}
|
|
|
|
return data.length || data.byteLength || 0;
|
|
}
|
|
|
|
/**
|
|
* Write file content, coalescing rapid writes.
|
|
* Fresh writes resolve with real mtime/size once data is on disk. Buffered writes resolve immediately with synthetic values; the disk flush happens later when the debounce timer fires.
|
|
*/
|
|
async function writeCoalesced(absPath, data, encoding) {
|
|
const windowMs = config.writeCoalesceMs;
|
|
const last = lastWriteTime.get(absPath);
|
|
|
|
// Fast path: coalescing disabled or far enough from the last write.
|
|
if (windowMs <= 0 || !last || Date.now() - last >= windowMs) {
|
|
if (pending.has(absPath)) {
|
|
clearTimeout(pending.get(absPath).timer);
|
|
pending.delete(absPath);
|
|
}
|
|
|
|
return writeToDisk(absPath, data, encoding);
|
|
}
|
|
|
|
// Within the coalesce window: buffer the write and respond immediately.
|
|
const existing = pending.get(absPath);
|
|
|
|
if (existing) {
|
|
existing.data = data;
|
|
existing.encoding = encoding;
|
|
scheduleFlush(absPath);
|
|
} else {
|
|
pending.set(absPath, {
|
|
data,
|
|
encoding,
|
|
timer: null,
|
|
});
|
|
scheduleFlush(absPath);
|
|
}
|
|
|
|
return { mtime: Date.now(), size: estimateSize(data, encoding) };
|
|
}
|
|
|
|
/**
|
|
* Get pending (not yet flushed) data for a path, or null.
|
|
* Used by readFile to serve buffered content instead of stale disk data.
|
|
*/
|
|
function getPending(absPath) {
|
|
const entry = pending.get(absPath);
|
|
|
|
if (entry) {
|
|
return { data: entry.data, encoding: entry.encoding };
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Flush all pending writes to disk. Called on graceful shutdown.
|
|
*/
|
|
async function flushAll() {
|
|
const paths = [...pending.keys()];
|
|
|
|
if (paths.length === 0) {
|
|
return;
|
|
}
|
|
|
|
console.log(`[write-coalesce] Flushing ${paths.length} pending write(s)...`);
|
|
|
|
for (const entry of pending.values()) {
|
|
clearTimeout(entry.timer);
|
|
}
|
|
|
|
const writes = paths.map((absPath) => {
|
|
const entry = pending.get(absPath);
|
|
pending.delete(absPath);
|
|
|
|
return writeToDisk(absPath, entry.data, entry.encoding).catch((err) => {
|
|
console.error(`[write-coalesce] Failed to flush ${absPath}:`, err);
|
|
});
|
|
});
|
|
|
|
const timeout = new Promise((resolve) => {
|
|
setTimeout(() => {
|
|
console.warn(
|
|
"[write-coalesce] Flush timeout -- some writes may be lost",
|
|
);
|
|
resolve();
|
|
}, FLUSH_TIMEOUT_MS);
|
|
});
|
|
|
|
await Promise.race([Promise.allSettled(writes), timeout]);
|
|
}
|
|
|
|
// Test-only: clear all internal state. Not exported for production use.
|
|
function _reset() {
|
|
for (const entry of pending.values()) {
|
|
clearTimeout(entry.timer);
|
|
}
|
|
pending.clear();
|
|
lastWriteTime.clear();
|
|
}
|
|
|
|
module.exports = { writeCoalesced, getPending, flushAll, _reset };
|