fix outputs
This commit is contained in:
@@ -39,6 +39,21 @@ const audioBitrate = process.env.STREAM_AUDIO_BITRATE ?? "160k";
|
|||||||
const streamKey = process.env.TWITCH_STREAM_KEY;
|
const streamKey = process.env.TWITCH_STREAM_KEY;
|
||||||
const serverPort = process.env.STREAM_APP_PORT ?? "5109";
|
const serverPort = process.env.STREAM_APP_PORT ?? "5109";
|
||||||
const broadcastUrl = process.env.BROADCAST_URL ?? `http://127.0.0.1:${serverPort}/broadcast`;
|
const broadcastUrl = process.env.BROADCAST_URL ?? `http://127.0.0.1:${serverPort}/broadcast`;
|
||||||
|
const redactionTokens = [
|
||||||
|
broadcastUrl,
|
||||||
|
streamKey,
|
||||||
|
streamKey ? encodeURIComponent(streamKey) : undefined,
|
||||||
|
streamKey ? `rtmp://live.twitch.tv/app/${streamKey}` : undefined,
|
||||||
|
].filter((token): token is string => Boolean(token));
|
||||||
|
const redactionWindow = Math.max(1, ...redactionTokens.map((token) => token.length));
|
||||||
|
|
||||||
|
function redactSensitive(value: string): string {
|
||||||
|
let output = value;
|
||||||
|
for (const token of redactionTokens) {
|
||||||
|
output = output.split(token).join("[REDACTED]");
|
||||||
|
}
|
||||||
|
return output;
|
||||||
|
}
|
||||||
|
|
||||||
if (mode === "live" && !streamKey) {
|
if (mode === "live" && !streamKey) {
|
||||||
console.error("TWITCH_STREAM_KEY is not set.");
|
console.error("TWITCH_STREAM_KEY is not set.");
|
||||||
@@ -57,7 +72,7 @@ async function assertBroadcastReachable(url: string) {
|
|||||||
} catch (error) {
|
} catch (error) {
|
||||||
const detail = error instanceof Error ? error.message : String(error);
|
const detail = error instanceof Error ? error.message : String(error);
|
||||||
throw new Error(
|
throw new Error(
|
||||||
`Cannot reach broadcast page at ${url} (${detail}). Start the app server first (bun run start or bun run start:web).`,
|
`Cannot reach broadcast page (${redactSensitive(detail)}). Start the app server first (bun run start or bun run start:web).`,
|
||||||
);
|
);
|
||||||
} finally {
|
} finally {
|
||||||
clearTimeout(timeout);
|
clearTimeout(timeout);
|
||||||
@@ -140,6 +155,36 @@ async function pipeReadableToSink(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function pipeReadableToRedactedStderr(readable: ReadableStream<Uint8Array>) {
|
||||||
|
const reader = readable.getReader();
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
let carry = "";
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
const chunk = decoder.decode(value, { stream: true });
|
||||||
|
const combined = carry + chunk;
|
||||||
|
if (combined.length <= redactionWindow) {
|
||||||
|
carry = combined;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const flushUntil = combined.length - (redactionWindow - 1);
|
||||||
|
const safeOutput = combined.slice(0, flushUntil);
|
||||||
|
carry = combined.slice(flushUntil);
|
||||||
|
if (safeOutput.length > 0) {
|
||||||
|
process.stderr.write(redactSensitive(safeOutput));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const trailing = carry + decoder.decode();
|
||||||
|
if (trailing.length > 0) {
|
||||||
|
process.stderr.write(redactSensitive(trailing));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
reader.releaseLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function main() {
|
async function main() {
|
||||||
await assertBroadcastReachable(broadcastUrl);
|
await assertBroadcastReachable(broadcastUrl);
|
||||||
|
|
||||||
@@ -147,8 +192,11 @@ async function main() {
|
|||||||
const ffmpeg = Bun.spawn(["ffmpeg", ...ffmpegArgs], {
|
const ffmpeg = Bun.spawn(["ffmpeg", ...ffmpegArgs], {
|
||||||
stdin: "pipe",
|
stdin: "pipe",
|
||||||
stdout: mode === "dryrun" ? "pipe" : "inherit",
|
stdout: mode === "dryrun" ? "pipe" : "inherit",
|
||||||
stderr: "inherit",
|
stderr: "pipe",
|
||||||
});
|
});
|
||||||
|
if (ffmpeg.stderr) {
|
||||||
|
void pipeReadableToRedactedStderr(ffmpeg.stderr);
|
||||||
|
}
|
||||||
let ffmpegWritable = true;
|
let ffmpegWritable = true;
|
||||||
|
|
||||||
let ffplay: Bun.Subprocess | null = null;
|
let ffplay: Bun.Subprocess | null = null;
|
||||||
@@ -251,7 +299,7 @@ async function main() {
|
|||||||
await page.setViewport({ width: 1920, height: 1080, deviceScaleFactor: 1 });
|
await page.setViewport({ width: 1920, height: 1080, deviceScaleFactor: 1 });
|
||||||
page.on("console", (msg) => {
|
page.on("console", (msg) => {
|
||||||
if (process.env.STREAM_DEBUG === "1") {
|
if (process.env.STREAM_DEBUG === "1") {
|
||||||
console.log(`[broadcast] ${msg.type()}: ${msg.text()}`);
|
console.log(`[broadcast] ${msg.type()}: ${redactSensitive(msg.text())}`);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -270,7 +318,7 @@ async function main() {
|
|||||||
}, 10_000);
|
}, 10_000);
|
||||||
|
|
||||||
await firstChunk.finally(() => clearTimeout(firstChunkTimer));
|
await firstChunk.finally(() => clearTimeout(firstChunkTimer));
|
||||||
console.log(`Streaming from ${broadcastUrl} in ${mode} mode`);
|
console.log(`Streaming broadcast in ${mode} mode`);
|
||||||
|
|
||||||
let shuttingDown = false;
|
let shuttingDown = false;
|
||||||
shutdown = async () => {
|
shutdown = async () => {
|
||||||
@@ -332,6 +380,6 @@ async function main() {
|
|||||||
|
|
||||||
main().catch((error) => {
|
main().catch((error) => {
|
||||||
const detail = error instanceof Error ? error.message : String(error);
|
const detail = error instanceof Error ? error.message : String(error);
|
||||||
console.error(detail);
|
console.error(redactSensitive(detail));
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user