439 lines
14 KiB
TypeScript
439 lines
14 KiB
TypeScript
import "server-only";
|
|
import path from "node:path";
|
|
import fs from "node:fs";
|
|
import fsp from "node:fs/promises";
|
|
import readline from "node:readline";
|
|
import { Readable } from "node:stream";
|
|
import { getAppSetting } from "@/lib/db/appSettings";
|
|
import { rawDb } from "@/lib/db/client";
|
|
import {
|
|
insertJob,
|
|
getJob,
|
|
setStatus,
|
|
updateProgress,
|
|
nextQueuedJob,
|
|
recoverOrphanedJobs,
|
|
activeJobForCode,
|
|
listAgedTerminalJobs,
|
|
listAllJobDirs,
|
|
deleteAllJobs,
|
|
} from "./db";
|
|
import {
|
|
buildJobArgs,
|
|
parseStageLine,
|
|
spawnJob,
|
|
validateOutcome,
|
|
moveFile,
|
|
isDirWritable,
|
|
jobBaseDir,
|
|
jobDirFor,
|
|
newJobId,
|
|
} from "./spawn";
|
|
import { findVideosForCode, rescanVideoIndex } from "@/lib/video";
|
|
import { getStoredVideoMetadata } from "@/lib/video/metadata";
|
|
import type { JobRow } from "./types";
|
|
|
|
declare global {
|
|
// eslint-disable-next-line no-var
|
|
var __whisperjavWorkerStarted: boolean | undefined;
|
|
// eslint-disable-next-line no-var
|
|
var __whisperjavRunningKill: ((reason: "cancel") => void) | undefined;
|
|
// eslint-disable-next-line no-var
|
|
var __whisperjavRunningId: string | null | undefined;
|
|
}
|
|
|
|
if (!global.__whisperjavWorkerStarted) {
|
|
global.__whisperjavWorkerStarted = true;
|
|
global.__whisperjavRunningKill = undefined;
|
|
global.__whisperjavRunningId = null;
|
|
// Restart sweep: any rows in `running` from a prior process are dead.
|
|
try {
|
|
const recovered = recoverOrphanedJobs();
|
|
if (recovered > 0) {
|
|
// eslint-disable-next-line no-console
|
|
console.log(`[whisperjav] recovered ${recovered} orphaned job(s) on bootstrap`);
|
|
}
|
|
} catch (e) {
|
|
// eslint-disable-next-line no-console
|
|
console.error("[whisperjav] orphan recovery failed:", e);
|
|
}
|
|
}
|
|
|
|
export interface EnqueueResult {
|
|
jobId: string;
|
|
}
|
|
|
|
export interface EnqueueAlreadyExists {
|
|
alreadyExists: true;
|
|
abs: string;
|
|
}
|
|
|
|
const BANNER_CHARS = /^[╔╗╚╝═║│┌┐└┘─\s]*$/;
|
|
const NOISE = [
|
|
/RequestsDependencyWarning/i,
|
|
/urllib3 \(\d+\.\d+/,
|
|
/chardet|charset_normalizer/,
|
|
/You are about to download and run code from an untrusted repository/,
|
|
/^Downloading: /,
|
|
/UserWarning:/,
|
|
/^\s*warnings\.warn/,
|
|
/^_check_repo_is_trusted/,
|
|
];
|
|
|
|
function isNoiseLine(s: string): boolean {
|
|
if (!s.trim()) return true;
|
|
if (BANNER_CHARS.test(s)) return true;
|
|
for (const re of NOISE) if (re.test(s)) return true;
|
|
return false;
|
|
}
|
|
|
|
function generatedSubtitlesDir(code: string): string {
|
|
return path.join(process.cwd(), "data", "generated-subtitles", code);
|
|
}
|
|
|
|
/** Compute the destination dir for a given video, given settings.
|
|
* Returns the dir absolute path. Creates it if needed. */
|
|
async function resolveDestDir(videoAbs: string, code: string): Promise<string> {
|
|
const s = getAppSetting("whisperjav");
|
|
if (s.outputLocation === "beside-video") {
|
|
const dir = path.dirname(videoAbs);
|
|
if (await isDirWritable(dir)) return dir;
|
|
}
|
|
const fallback = generatedSubtitlesDir(code);
|
|
await fsp.mkdir(fallback, { recursive: true });
|
|
return fallback;
|
|
}
|
|
|
|
/** Filename pattern WhisperJAV emits — derived from settings, used for
|
|
* pre-flight idempotency checks. The CLI may add a suffix beyond this
|
|
* pattern but the language tag is stable. */
|
|
function expectedSubtitleStemPrefix(videoStem: string, langTag: string): string {
|
|
return `${videoStem}.${langTag}`;
|
|
}
|
|
|
|
function langTagForSettings(): "ja" | "ko" | "zh" | "en" {
|
|
const s = getAppSetting("whisperjav");
|
|
if (s.outputMode === "direct-to-english") return "en";
|
|
switch (s.sourceLanguage) {
|
|
case "japanese": return "ja";
|
|
case "korean": return "ko";
|
|
case "chinese": return "zh";
|
|
case "english": return "en";
|
|
}
|
|
}
|
|
|
|
async function existingGeneratedFor(videoAbs: string, code: string): Promise<string | null> {
|
|
const langTag = langTagForSettings();
|
|
const stem = path.basename(videoAbs, path.extname(videoAbs));
|
|
const prefix = expectedSubtitleStemPrefix(stem, langTag);
|
|
const candidates = [
|
|
path.dirname(videoAbs),
|
|
generatedSubtitlesDir(code),
|
|
];
|
|
for (const dir of candidates) {
|
|
let entries: import("node:fs").Dirent[];
|
|
try {
|
|
entries = await fsp.readdir(dir, { withFileTypes: true });
|
|
} catch {
|
|
continue;
|
|
}
|
|
for (const e of entries) {
|
|
if (!e.isFile()) continue;
|
|
const lower = e.name.toLowerCase();
|
|
if (!lower.endsWith(".srt")) continue;
|
|
// Match `<stem>.<lang>` followed by ANY further token before .srt.
|
|
// Catches both `<stem>.<lang>.srt` and `<stem>.<lang>.whisperjav.srt`.
|
|
if (lower.startsWith(prefix.toLowerCase() + ".") || lower === `${prefix.toLowerCase()}.srt`) {
|
|
return path.join(dir, e.name);
|
|
}
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
export async function enqueueJob(opts: { code: string; partIdx: number; overwrite?: boolean }): Promise<EnqueueResult | EnqueueAlreadyExists> {
|
|
const settings = getAppSetting("whisperjav");
|
|
if (!settings.cliPath) {
|
|
throw new Error("WhisperJAV CLI path not configured");
|
|
}
|
|
let files = findVideosForCode(opts.code);
|
|
if (files.length === 0) {
|
|
// Index might be empty in dev — kick a rescan once.
|
|
await rescanVideoIndex();
|
|
files = findVideosForCode(opts.code);
|
|
}
|
|
const variant = files[opts.partIdx];
|
|
if (!variant) throw new Error(`No video found for code=${opts.code} part=${opts.partIdx}`);
|
|
|
|
const existing = await existingGeneratedFor(variant.abs, opts.code);
|
|
if (existing) {
|
|
if (!opts.overwrite) {
|
|
return { alreadyExists: true, abs: existing };
|
|
}
|
|
// User confirmed overwrite — remove the prior generated file so the
|
|
// post-run move at the queue worker doesn't trip its collision guard.
|
|
try {
|
|
await fsp.unlink(existing);
|
|
} catch (e) {
|
|
throw new Error(`Could not remove existing subtitle: ${(e as Error).message}`);
|
|
}
|
|
}
|
|
|
|
const id = newJobId();
|
|
const jobDir = jobDirFor(id);
|
|
await fsp.mkdir(jobDir, { recursive: true });
|
|
const statsPath = path.join(jobDir, "stats.json");
|
|
const logPath = path.join(jobDir, "stderr.log");
|
|
const args = buildJobArgs({
|
|
videoAbs: variant.abs,
|
|
outputDir: jobDir,
|
|
statsPath,
|
|
settings,
|
|
});
|
|
|
|
// Capture duration + mode at enqueue so the running-job UI can show
|
|
// an ETA. Duration may be missing if the file hasn't been probed yet
|
|
// — that's fine, ETA is best-effort.
|
|
const stored = getStoredVideoMetadata(variant.abs);
|
|
const videoDurationSec = stored?.durationSec && stored.durationSec > 0 ? stored.durationSec : null;
|
|
|
|
insertJob({
|
|
id,
|
|
code: opts.code,
|
|
videoAbs: variant.abs,
|
|
jobDir,
|
|
status: "queued",
|
|
enqueuedAt: Date.now(),
|
|
cliArgs: JSON.stringify(args),
|
|
logPath,
|
|
statsPath,
|
|
videoDurationSec,
|
|
mode: settings.quality,
|
|
});
|
|
|
|
scheduleTick();
|
|
return { jobId: id };
|
|
}
|
|
|
|
export function cancelJob(id: string): boolean {
|
|
const job = getJob(id);
|
|
if (!job) return false;
|
|
if (job.status === "queued") {
|
|
// Atomic flip: if the worker just picked this job up between our
|
|
// read and write, the WHERE clause matches zero rows and we fall
|
|
// through to the running branch.
|
|
const info = rawDb.prepare(
|
|
`UPDATE whisperjav_jobs SET status = 'cancelled', ended_at = ? WHERE id = ? AND status = 'queued'`,
|
|
).run(Date.now(), id);
|
|
if (info.changes > 0) return true;
|
|
}
|
|
// Re-read after the failed conditional update — status may now be 'running'.
|
|
const fresh = getJob(id) ?? job;
|
|
if (fresh.status === "running" && global.__whisperjavRunningId === id && global.__whisperjavRunningKill) {
|
|
global.__whisperjavRunningKill("cancel");
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
let tickPending = false;
|
|
function scheduleTick(): void {
|
|
if (tickPending) return;
|
|
tickPending = true;
|
|
// Defer to next tick so callers' DB writes are visible.
|
|
setImmediate(() => {
|
|
tickPending = false;
|
|
void runOne();
|
|
});
|
|
}
|
|
|
|
let workerBusy = false;
|
|
async function runOne(): Promise<void> {
|
|
if (workerBusy) return;
|
|
workerBusy = true;
|
|
try {
|
|
while (true) {
|
|
const next = nextQueuedJob();
|
|
if (!next) break;
|
|
await processJob(next);
|
|
}
|
|
} finally {
|
|
workerBusy = false;
|
|
}
|
|
}
|
|
|
|
async function processJob(job: JobRow): Promise<void> {
|
|
const settings = getAppSetting("whisperjav");
|
|
if (!settings.cliPath) {
|
|
setStatus(job.id, "failed", { startedAt: Date.now(), endedAt: Date.now(), error: "WhisperJAV CLI path cleared while job was queued" });
|
|
return;
|
|
}
|
|
|
|
setStatus(job.id, "running", { startedAt: Date.now() });
|
|
|
|
let args: string[];
|
|
try {
|
|
args = JSON.parse(job.cliArgs) as string[];
|
|
} catch {
|
|
setStatus(job.id, "failed", { endedAt: Date.now(), error: "stored cli_args malformed" });
|
|
return;
|
|
}
|
|
|
|
const logStream = fs.createWriteStream(job.logPath, { flags: "a" });
|
|
const spawned = spawnJob(settings.cliPath, args);
|
|
global.__whisperjavRunningId = job.id;
|
|
let cancelled = false;
|
|
global.__whisperjavRunningKill = (reason) => {
|
|
if (reason === "cancel") cancelled = true;
|
|
spawned.kill();
|
|
};
|
|
|
|
|
|
// Stage parser — write-through on every match. WhisperJAV emits
|
|
// at most a handful of "Step N/M:" lines per job; the prior
|
|
// debounce raced the rl close drain and dropped the final stage.
|
|
const rl = readline.createInterface({ input: spawned.proc.stderr ?? Readable.from([]) });
|
|
rl.on("line", (raw: string) => {
|
|
// Strip CSI escape sequences (optional ESC byte + "[...m").
|
|
const line = raw.replace(/?\[[0-9;]*m/g, "");
|
|
logStream.write(line + "\n");
|
|
if (isNoiseLine(line)) return;
|
|
const stage = parseStageLine(line);
|
|
if (stage) {
|
|
updateProgress(job.id, stage.stage, stage.index, stage.total);
|
|
}
|
|
});
|
|
|
|
// We don't currently expect anything on stdout; tee anyway.
|
|
spawned.proc.stdout?.on("data", (b) => { logStream.write(b); });
|
|
|
|
const exitCode: number | null = await new Promise((resolve) => {
|
|
spawned.proc.on("close", (code) => resolve(code));
|
|
spawned.proc.on("error", () => resolve(null));
|
|
});
|
|
|
|
rl.close();
|
|
await new Promise<void>((res) => logStream.end(() => res()));
|
|
|
|
global.__whisperjavRunningKill = undefined;
|
|
global.__whisperjavRunningId = null;
|
|
|
|
if (cancelled) {
|
|
setStatus(job.id, "cancelled", { endedAt: Date.now(), exitCode });
|
|
// Keep job dir for diagnosis. Could prune later via retention sweep.
|
|
return;
|
|
}
|
|
|
|
const result = await validateOutcome({
|
|
exitCode,
|
|
statsPath: job.statsPath ?? path.join(job.jobDir, "stats.json"),
|
|
jobDir: job.jobDir,
|
|
});
|
|
|
|
if (!result.success) {
|
|
setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: result.reason ?? "validation failed" });
|
|
return;
|
|
}
|
|
|
|
// Move the .srt to its final destination, fail loudly on collision.
|
|
if (!result.finalSrtPath) {
|
|
setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: "no final_srt path resolved" });
|
|
return;
|
|
}
|
|
let destDir: string;
|
|
try {
|
|
destDir = await resolveDestDir(job.videoAbs, job.code);
|
|
} catch (e) {
|
|
setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: `dest dir resolve failed: ${(e as Error).message}` });
|
|
return;
|
|
}
|
|
const destFile = path.join(destDir, path.basename(result.finalSrtPath));
|
|
if (fs.existsSync(destFile)) {
|
|
setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: `Output already exists at ${destFile}` });
|
|
return;
|
|
}
|
|
try {
|
|
await moveFile(result.finalSrtPath, destFile);
|
|
} catch (e) {
|
|
setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: `move failed: ${(e as Error).message}` });
|
|
return;
|
|
}
|
|
|
|
// Cleanup temp dir on success/warning.
|
|
try {
|
|
await fsp.rm(job.jobDir, { recursive: true, force: true });
|
|
} catch { /* best effort */ }
|
|
|
|
setStatus(job.id, result.warning ? "warning" : "completed", {
|
|
endedAt: Date.now(),
|
|
exitCode,
|
|
targetSubtitlePath: destFile,
|
|
cueCount: result.cueCount,
|
|
error: result.warning ? result.reason : null,
|
|
});
|
|
|
|
// Opportunistic retention sweep — keeps the disk tidy without a
|
|
// separate scheduler. No-op when retention is 0.
|
|
void runRetentionSweep();
|
|
}
|
|
|
|
/** Mark every queued (not-yet-running) job cancelled. Used by the
|
|
* "Stop Batch" button to drain the queue without touching the
|
|
* currently-running job. Returns the count cancelled. */
|
|
export function cancelAllQueued(): number {
|
|
const result = rawDb.prepare(`
|
|
UPDATE whisperjav_jobs SET status = 'cancelled', ended_at = ?
|
|
WHERE status = 'queued'
|
|
`).run(Date.now());
|
|
return result.changes ?? 0;
|
|
}
|
|
|
|
/** Module-load side effect: make sure leftover queued jobs from a
|
|
* prior process get picked up. Safe to call repeatedly. */
|
|
export function bootstrapQueue(): void {
|
|
scheduleTick();
|
|
void runRetentionSweep();
|
|
}
|
|
|
|
/** Delete failed/cancelled job dirs older than `retentionDays`. Always
|
|
* safe to call — no-ops when retention is 0 or no aged rows exist. */
|
|
export async function runRetentionSweep(): Promise<{ removed: number }> {
|
|
const settings = getAppSetting("whisperjav");
|
|
const days = Number(settings.retentionDays);
|
|
if (!Number.isFinite(days) || days <= 0) return { removed: 0 };
|
|
const cutoff = Date.now() - days * 24 * 60 * 60 * 1000;
|
|
const aged = listAgedTerminalJobs(cutoff);
|
|
let removed = 0;
|
|
for (const row of aged) {
|
|
try {
|
|
await fsp.rm(row.jobDir, { recursive: true, force: true });
|
|
removed++;
|
|
} catch (e) {
|
|
console.error(`[whisperjav] failed to prune ${row.jobDir}:`, e);
|
|
}
|
|
}
|
|
if (removed > 0) {
|
|
console.log(`[whisperjav] retention sweep removed ${removed} job dir(s)`);
|
|
}
|
|
return { removed };
|
|
}
|
|
|
|
/** Wipe every non-running job row + every temp dir on disk. Used by
|
|
* the "Clear all job history" Settings action. Returns counts. */
|
|
export async function clearAllJobHistory(): Promise<{ rows: number; dirs: number }> {
|
|
const dirs = listAllJobDirs();
|
|
let dirsRemoved = 0;
|
|
for (const dir of dirs) {
|
|
try {
|
|
await fsp.rm(dir, { recursive: true, force: true });
|
|
dirsRemoved++;
|
|
} catch { /* best effort */ }
|
|
}
|
|
const rows = deleteAllJobs();
|
|
return { rows, dirs: dirsRemoved };
|
|
}
|
|
|
|
bootstrapQueue();
|
|
|
|
export { activeJobForCode, getJob, jobBaseDir };
|