From acb700a82b27964da653fdf49aa5367bd3d3d853 Mon Sep 17 00:00:00 2001 From: Nystik <236107-Nystik@users.noreply.gitlab.com> Date: Sat, 28 Mar 2026 18:50:32 +0100 Subject: [PATCH] add syncmanager --- server/plugins/headless-sync/index.js | 47 ++- server/plugins/headless-sync/routes.js | 86 +++++ server/plugins/headless-sync/sync-manager.js | 325 +++++++++++++++++++ 3 files changed, 456 insertions(+), 2 deletions(-) create mode 100644 server/plugins/headless-sync/sync-manager.js diff --git a/server/plugins/headless-sync/index.js b/server/plugins/headless-sync/index.js index 92ad097..eeeaa14 100644 --- a/server/plugins/headless-sync/index.js +++ b/server/plugins/headless-sync/index.js @@ -1,6 +1,7 @@ const path = require("path"); const obCli = require("./ob-cli"); const auth = require("./auth"); +const { SyncManager } = require("./sync-manager"); module.exports = { id: "headless-sync", @@ -11,6 +12,7 @@ module.exports = { _ctx: null, _obStatus: null, + _syncManager: null, async register(ctx) { this._ctx = ctx; @@ -29,11 +31,37 @@ module.exports = { ctx.log("Auth token loaded"); } + this._syncManager = new SyncManager(ctx); + + // Load saved sync states for enabled vaults + const enabledVaults = ctx.getEnabledVaults(); + const vaultMap = {}; + + for (const vaultId of enabledVaults) { + const vaultPath = ctx.config.getVaultPath(vaultId); + + if (vaultPath) { + vaultMap[vaultId] = vaultPath; + } + } + + this._syncManager.loadStates(vaultMap); + + // Auto-start syncs that were running before shutdown + if (this._obStatus.installed && auth.isAuthenticated(ctx.dataDir)) { + this._syncManager.autoStartAll(); + } + const { mountRoutes } = require("./routes"); mountRoutes(ctx.router, this); }, async shutdown() { + if (this._syncManager) { + await this._syncManager.shutdown(); + this._syncManager = null; + } + this._ctx = null; }, @@ -44,8 +72,19 @@ module.exports = { }, async onVaultDisabled(vaultId, vaultPath) { - if (this._ctx) { - this._ctx.log(`Vault disabled: ${vaultId}`); + if (!this._ctx) { + return; + } + + this._ctx.log(`Vault disabled: ${vaultId}`); + + // Stop sync if running, but keep the config + if (this._syncManager) { + const state = this._syncManager.getState(vaultId); + + if (state && state.status === "running") { + this._syncManager.stopSync(vaultId); + } } }, @@ -56,4 +95,8 @@ module.exports = { getCtx() { return this._ctx; }, + + getSyncManager() { + return this._syncManager; + }, }; diff --git a/server/plugins/headless-sync/routes.js b/server/plugins/headless-sync/routes.js index 4404959..010a19e 100644 --- a/server/plugins/headless-sync/routes.js +++ b/server/plugins/headless-sync/routes.js @@ -46,6 +46,92 @@ function mountRoutes(router, plugin) { } }); + router.post("/setup", async (req, res) => { + const ctx = plugin.getCtx(); + const syncManager = plugin.getSyncManager(); + const { vaultId, remoteVault, vaultPassword, deviceName, mode } = req.body; + + if (!vaultId || !remoteVault) { + return res.status(400).json({ error: "vaultId and remoteVault are required" }); + } + + if (!auth.isAuthenticated(ctx.dataDir)) { + return res.status(401).json({ error: "Not authenticated" }); + } + + const vaultPath = ctx.config.getVaultPath(vaultId); + + if (!vaultPath) { + return res.status(404).json({ error: "Vault not found" }); + } + + try { + const state = await syncManager.setupSync(vaultId, vaultPath, remoteVault, { + vaultPassword, + deviceName, + mode, + }); + + res.json({ success: true, state }); + } catch (e) { + ctx.log(`Failed to setup sync: ${e.message}`); + res.status(500).json({ error: e.message }); + } + }); + + router.post("/start", (req, res) => { + const ctx = plugin.getCtx(); + const syncManager = plugin.getSyncManager(); + const { vaultId } = req.body; + + if (!vaultId) { + return res.status(400).json({ error: "vaultId is required" }); + } + + try { + const state = syncManager.startSync(vaultId); + res.json({ success: true, state }); + } catch (e) { + ctx.log(`Failed to start sync: ${e.message}`); + res.status(500).json({ error: e.message }); + } + }); + + router.post("/stop", (req, res) => { + const ctx = plugin.getCtx(); + const syncManager = plugin.getSyncManager(); + const { vaultId } = req.body; + + if (!vaultId) { + return res.status(400).json({ error: "vaultId is required" }); + } + + try { + const state = syncManager.stopSync(vaultId); + res.json({ success: true, state }); + } catch (e) { + ctx.log(`Failed to stop sync: ${e.message}`); + res.status(500).json({ error: e.message }); + } + }); + + router.get("/logs", (req, res) => { + const syncManager = plugin.getSyncManager(); + const { vaultId, limit } = req.query; + + if (!vaultId) { + return res.status(400).json({ error: "vaultId is required" }); + } + + const logs = syncManager.getLogs(vaultId, limit ? parseInt(limit) : 100); + res.json({ logs }); + }); + + router.get("/vaults", (req, res) => { + const syncManager = plugin.getSyncManager(); + res.json({ vaults: syncManager.getAllStates() }); + }); + router.get("/remote-vaults", async (req, res) => { const ctx = plugin.getCtx(); diff --git a/server/plugins/headless-sync/sync-manager.js b/server/plugins/headless-sync/sync-manager.js new file mode 100644 index 0000000..34390f7 --- /dev/null +++ b/server/plugins/headless-sync/sync-manager.js @@ -0,0 +1,325 @@ +const { spawn } = require("child_process"); +const fs = require("fs"); +const path = require("path"); +const os = require("os"); + +const MAX_LOG_ENTRIES = 200; + +class SyncManager { + constructor(ctx) { + this.ctx = ctx; + this.states = new Map(); + this.stateFile = path.join(ctx.dataDir, "sync-states.json"); + } + + loadStates(vaults) { + try { + const saved = JSON.parse(fs.readFileSync(this.stateFile, "utf-8")); + + for (const entry of saved) { + const vaultPath = vaults[entry.vaultId]; + + if (!vaultPath) { + this.ctx.log(`Skipping state for missing vault: ${entry.vaultId}`); + continue; + } + + this.states.set(entry.vaultId, { + vaultId: entry.vaultId, + vaultPath, + remoteVault: entry.remoteVault, + status: "stopped", + pid: null, + lastActivity: new Date().toISOString(), + error: null, + config: entry.config || { + mode: "bidirectional", + deviceName: "ignis-headless", + }, + autoStart: entry.autoStart || false, + logs: [], + _process: null, + }); + } + + this.ctx.log(`Loaded ${saved.length} sync configurations`); + } catch { + this.ctx.log("No previous sync states found"); + } + } + + saveStates() { + const data = []; + + for (const [vaultId, state] of this.states) { + data.push({ + vaultId: state.vaultId, + vaultPath: state.vaultPath, + remoteVault: state.remoteVault, + config: state.config, + autoStart: state.autoStart, + }); + } + + fs.writeFileSync(this.stateFile, JSON.stringify(data, null, 2), "utf-8"); + } + + async setupSync(vaultId, vaultPath, remoteVault, options = {}) { + const obCli = require("./ob-cli"); + + const args = ["sync-setup", "--vault", remoteVault, "--path", "."]; + + if (options.vaultPassword) { + args.push("--password", options.vaultPassword); + } + + if (options.deviceName) { + args.push("--device-name", options.deviceName); + } + + await obCli.runCommand(args, { cwd: vaultPath }); + + const state = { + vaultId, + vaultPath, + remoteVault, + status: "stopped", + pid: null, + lastActivity: new Date().toISOString(), + error: null, + config: { + mode: options.mode || "bidirectional", + deviceName: options.deviceName || "ignis-headless", + }, + autoStart: false, + logs: [], + _process: null, + }; + + this.states.set(vaultId, state); + this.saveStates(); + this.ctx.log(`Sync setup complete for ${vaultId} -> ${remoteVault}`); + + return this.getState(vaultId); + } + + startSync(vaultId) { + const state = this.states.get(vaultId); + + if (!state) { + throw new Error(`No sync configuration for vault: ${vaultId}`); + } + + if (state.status === "running") { + this.ctx.log(`Sync already running for ${vaultId}`); + return this.getState(vaultId); + } + + const args = ["sync", "--continuous"]; + + if (state.config.mode === "pull-only") { + args.push("--pull-only"); + } else if (state.config.mode === "mirror-remote") { + args.push("--mirror-remote"); + } + + const proc = spawn("ob", args, { + cwd: state.vaultPath, + env: { ...process.env, HOME: os.homedir() }, + }); + + state.status = "running"; + state.pid = proc.pid; + state.error = null; + state.autoStart = true; + state._process = proc; + + this.addLog(state, `Sync started (pid: ${proc.pid})`); + + proc.stdout.on("data", (data) => { + const lines = data.toString().split("\n"); + + for (const line of lines) { + if (line.trim()) { + this.addLog(state, line.trim()); + state.lastActivity = new Date().toISOString(); + } + } + }); + + proc.stderr.on("data", (data) => { + const lines = data.toString().split("\n"); + + for (const line of lines) { + if (line.trim()) { + this.addLog(state, `[stderr] ${line.trim()}`); + } + } + }); + + proc.on("close", (code) => { + state.status = code === 0 ? "stopped" : "error"; + state.pid = null; + state._process = null; + + if (code !== 0) { + state.error = `Process exited with code ${code}`; + this.addLog(state, `Sync exited with code ${code}`); + } else { + this.addLog(state, "Sync stopped"); + } + + this.ctx.log(`Sync stopped for ${vaultId} (code: ${code})`); + this.broadcastStatus(vaultId); + this.saveStates(); + }); + + proc.on("error", (err) => { + state.status = "error"; + state.error = err.message; + state.pid = null; + state._process = null; + + this.addLog(state, `Error: ${err.message}`); + this.ctx.log(`Sync error for ${vaultId}: ${err.message}`); + this.broadcastStatus(vaultId); + this.saveStates(); + }); + + this.broadcastStatus(vaultId); + this.ctx.log(`Started sync for ${vaultId} (pid: ${proc.pid})`); + this.saveStates(); + + return this.getState(vaultId); + } + + stopSync(vaultId) { + const state = this.states.get(vaultId); + + if (!state || !state._process) { + throw new Error(`No active sync for vault: ${vaultId}`); + } + + state._process.kill("SIGTERM"); + state.status = "stopped"; + state.pid = null; + state.autoStart = false; + state._process = null; + + this.addLog(state, "Sync stopped by user"); + this.ctx.log(`Stopped sync for ${vaultId}`); + this.broadcastStatus(vaultId); + this.saveStates(); + + return this.getState(vaultId); + } + + getState(vaultId) { + const state = this.states.get(vaultId); + + if (!state) { + return null; + } + + return { + vaultId: state.vaultId, + remoteVault: state.remoteVault, + status: state.status, + pid: state.pid, + lastActivity: state.lastActivity, + error: state.error, + config: state.config, + autoStart: state.autoStart, + }; + } + + getAllStates() { + const result = []; + + for (const [vaultId] of this.states) { + result.push(this.getState(vaultId)); + } + + return result; + } + + getLogs(vaultId, limit = 100) { + const state = this.states.get(vaultId); + + if (!state) { + return []; + } + + return state.logs.slice(-limit); + } + + addLog(state, line) { + state.logs.push({ + timestamp: new Date().toISOString(), + line, + }); + + if (state.logs.length > MAX_LOG_ENTRIES) { + state.logs = state.logs.slice(-MAX_LOG_ENTRIES); + } + } + + broadcastStatus(vaultId) { + const state = this.getState(vaultId); + + if (!state) { + return; + } + + const message = JSON.stringify({ + channel: "plugin:headless-sync", + type: "sync-status", + payload: state, + }); + + if (this.ctx.wss && this.ctx.wss.clients) { + for (const client of this.ctx.wss.clients) { + if (client.readyState === 1) { + client.send(message); + } + } + } + } + + autoStartAll() { + let started = 0; + + for (const [vaultId, state] of this.states) { + if (state.autoStart && state.status === "stopped") { + try { + this.startSync(vaultId); + started++; + } catch (e) { + this.ctx.log(`Auto-start failed for ${vaultId}: ${e.message}`); + } + } + } + + if (started > 0) { + this.ctx.log(`Auto-started sync for ${started} vault(s)`); + } + } + + async shutdown() { + this.ctx.log("Shutting down sync manager..."); + + for (const [vaultId, state] of this.states) { + if (state._process) { + this.ctx.log(`Stopping sync for ${vaultId}...`); + + try { + state._process.kill("SIGTERM"); + } catch (e) { + this.ctx.log(`Error stopping sync for ${vaultId}: ${e.message}`); + } + } + } + } +} + +module.exports = { SyncManager };