mirror of
https://github.com/Nystik-gh/ignis.git
synced 2026-06-17 04:35:53 +00:00
refactor headless sync
This commit is contained in:
@@ -1,12 +1,26 @@
|
||||
const fs = require("fs");
|
||||
const path = require("path");
|
||||
const { spawn } = require("child_process");
|
||||
const { spawnOb, runCommand } = require("./ob-cli");
|
||||
|
||||
const MAX_LOG_ENTRIES = 200;
|
||||
|
||||
function killProcess(proc) {
|
||||
if (!proc) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (process.platform === "win32") {
|
||||
spawn("taskkill", ["/pid", String(proc.pid), "/t", "/f"]);
|
||||
} else {
|
||||
proc.kill("SIGTERM");
|
||||
}
|
||||
}
|
||||
|
||||
class SyncManager {
|
||||
constructor(ctx) {
|
||||
constructor(ctx, broadcaster) {
|
||||
this.ctx = ctx;
|
||||
this.broadcaster = broadcaster;
|
||||
this.states = new Map();
|
||||
this.stateFile = path.join(ctx.dataDir, "sync-states.json");
|
||||
}
|
||||
@@ -66,8 +80,6 @@ class SyncManager {
|
||||
}
|
||||
|
||||
async setupSync(vaultId, vaultPath, remoteVault, options = {}) {
|
||||
const obCli = require("./ob-cli");
|
||||
|
||||
const args = ["sync-setup", "--vault", remoteVault, "--path", "."];
|
||||
|
||||
if (options.vaultPassword) {
|
||||
@@ -78,7 +90,7 @@ class SyncManager {
|
||||
args.push("--device-name", options.deviceName);
|
||||
}
|
||||
|
||||
await obCli.runCommand(args, { cwd: vaultPath });
|
||||
await runCommand(args, { cwd: vaultPath });
|
||||
|
||||
const state = {
|
||||
vaultId,
|
||||
@@ -112,6 +124,12 @@ class SyncManager {
|
||||
throw new Error(`No sync configuration for vault: ${vaultId}`);
|
||||
}
|
||||
|
||||
if (this.isCoreSyncEnabled(state.vaultPath)) {
|
||||
const msg = `Cannot start sync for ${vaultId}: Obsidian Sync core plugin is enabled`;
|
||||
this.ctx.log(msg);
|
||||
throw new Error(msg);
|
||||
}
|
||||
|
||||
if (state.status === "running") {
|
||||
this.ctx.log(`Sync already running for ${vaultId}`);
|
||||
return this.getState(vaultId);
|
||||
@@ -142,7 +160,7 @@ class SyncManager {
|
||||
if (line.trim()) {
|
||||
this.addLog(state, line.trim());
|
||||
state.lastActivity = new Date().toISOString();
|
||||
this.broadcastLog(vaultId, line.trim());
|
||||
this.broadcaster.broadcastLog(vaultId, line.trim());
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -158,6 +176,13 @@ class SyncManager {
|
||||
});
|
||||
|
||||
proc.on("close", (code) => {
|
||||
// If the user explicitly stopped sync, don't overwrite the clean
|
||||
// "stopped" state with an error from the non-zero exit code.
|
||||
if (state._userStopped) {
|
||||
state._userStopped = false;
|
||||
return;
|
||||
}
|
||||
|
||||
state.status = code === 0 ? "stopped" : "error";
|
||||
state.pid = null;
|
||||
state._process = null;
|
||||
@@ -170,7 +195,7 @@ class SyncManager {
|
||||
}
|
||||
|
||||
this.ctx.log(`Sync stopped for ${vaultId} (code: ${code})`);
|
||||
this.broadcastStatus(vaultId);
|
||||
this.broadcaster.broadcastStatus(this.getState(vaultId));
|
||||
this.saveStates();
|
||||
});
|
||||
|
||||
@@ -182,11 +207,11 @@ class SyncManager {
|
||||
|
||||
this.addLog(state, `Error: ${err.message}`);
|
||||
this.ctx.log(`Sync error for ${vaultId}: ${err.message}`);
|
||||
this.broadcastStatus(vaultId);
|
||||
this.broadcaster.broadcastStatus(this.getState(vaultId));
|
||||
this.saveStates();
|
||||
});
|
||||
|
||||
this.broadcastStatus(vaultId);
|
||||
this.broadcaster.broadcastStatus(this.getState(vaultId));
|
||||
this.ctx.log(`Started sync for ${vaultId} (pid: ${proc.pid})`);
|
||||
this.saveStates();
|
||||
|
||||
@@ -200,7 +225,8 @@ class SyncManager {
|
||||
throw new Error(`No active sync for vault: ${vaultId}`);
|
||||
}
|
||||
|
||||
state._process.kill("SIGTERM");
|
||||
state._userStopped = true;
|
||||
killProcess(state._process);
|
||||
state.status = "stopped";
|
||||
state.pid = null;
|
||||
state.autoStart = false;
|
||||
@@ -208,7 +234,7 @@ class SyncManager {
|
||||
|
||||
this.addLog(state, "Sync stopped by user");
|
||||
this.ctx.log(`Stopped sync for ${vaultId}`);
|
||||
this.broadcastStatus(vaultId);
|
||||
this.broadcaster.broadcastStatus(this.getState(vaultId));
|
||||
this.saveStates();
|
||||
|
||||
return this.getState(vaultId);
|
||||
@@ -222,7 +248,8 @@ class SyncManager {
|
||||
}
|
||||
|
||||
if (state._process) {
|
||||
state._process.kill("SIGTERM");
|
||||
state._userStopped = true;
|
||||
killProcess(state._process);
|
||||
}
|
||||
|
||||
// Tell ob to disconnect from the remote vault and clear its stored config
|
||||
@@ -289,51 +316,31 @@ class SyncManager {
|
||||
}
|
||||
}
|
||||
|
||||
broadcastLog(vaultId, line) {
|
||||
if (!this.ctx.wss || !this.ctx.wss.clients) {
|
||||
return;
|
||||
}
|
||||
|
||||
const message = JSON.stringify({
|
||||
channel: "plugin:headless-sync",
|
||||
type: "sync-log",
|
||||
payload: { vaultId, line },
|
||||
});
|
||||
|
||||
for (const client of this.ctx.wss.clients) {
|
||||
if (client.readyState === 1) {
|
||||
client.send(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
broadcastStatus(vaultId) {
|
||||
const state = this.getState(vaultId);
|
||||
|
||||
if (!state) {
|
||||
return;
|
||||
}
|
||||
|
||||
const message = JSON.stringify({
|
||||
channel: "plugin:headless-sync",
|
||||
type: "sync-status",
|
||||
payload: state,
|
||||
});
|
||||
|
||||
if (this.ctx.wss && this.ctx.wss.clients) {
|
||||
for (const client of this.ctx.wss.clients) {
|
||||
if (client.readyState === 1) {
|
||||
client.send(message);
|
||||
}
|
||||
}
|
||||
isCoreSyncEnabled(vaultPath) {
|
||||
try {
|
||||
const configPath = path.join(vaultPath, ".obsidian", "core-plugins.json");
|
||||
const data = fs.readFileSync(configPath, "utf-8");
|
||||
const config = JSON.parse(data);
|
||||
return config.sync === true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
autoStartAll() {
|
||||
let started = 0;
|
||||
let skipped = 0;
|
||||
|
||||
for (const [vaultId, state] of this.states) {
|
||||
if (state.autoStart && state.status === "stopped") {
|
||||
if (this.isCoreSyncEnabled(state.vaultPath)) {
|
||||
this.ctx.log(
|
||||
`Skipping auto-start for ${vaultId}: Obsidian Sync core plugin is enabled`,
|
||||
);
|
||||
skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
this.startSync(vaultId);
|
||||
started++;
|
||||
@@ -346,22 +353,50 @@ class SyncManager {
|
||||
if (started > 0) {
|
||||
this.ctx.log(`Auto-started sync for ${started} vault(s)`);
|
||||
}
|
||||
|
||||
if (skipped > 0) {
|
||||
this.ctx.log(
|
||||
`Skipped ${skipped} vault(s) due to Obsidian Sync being enabled`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async shutdown() {
|
||||
this.ctx.log("Shutting down sync manager...");
|
||||
|
||||
const waitPromises = [];
|
||||
|
||||
for (const [vaultId, state] of this.states) {
|
||||
if (state._process) {
|
||||
this.ctx.log(`Stopping sync for ${vaultId}...`);
|
||||
state._userStopped = true;
|
||||
|
||||
const proc = state._process;
|
||||
|
||||
waitPromises.push(
|
||||
new Promise((resolve) => {
|
||||
const timeout = setTimeout(resolve, 5000);
|
||||
|
||||
proc.on("close", () => {
|
||||
clearTimeout(timeout);
|
||||
resolve();
|
||||
});
|
||||
}),
|
||||
);
|
||||
|
||||
try {
|
||||
state._process.kill("SIGTERM");
|
||||
killProcess(proc);
|
||||
} catch (e) {
|
||||
this.ctx.log(`Error stopping sync for ${vaultId}: ${e.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (waitPromises.length > 0) {
|
||||
await Promise.all(waitPromises);
|
||||
}
|
||||
|
||||
this.saveStates();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user