import "server-only"; import path from "node:path"; import fs from "node:fs"; import fsp from "node:fs/promises"; import readline from "node:readline"; import { Readable } from "node:stream"; import { getAppSetting } from "@/lib/db/appSettings"; import { rawDb } from "@/lib/db/client"; import { insertJob, getJob, setStatus, updateProgress, nextQueuedJob, recoverOrphanedJobs, activeJobForCode, listAgedTerminalJobs, listAllJobDirs, deleteAllJobs, } from "./db"; import { buildJobArgs, parseStageLine, spawnJob, validateOutcome, moveFile, isDirWritable, jobBaseDir, jobDirFor, newJobId, } from "./spawn"; import { findVideosForCode, rescanVideoIndex } from "@/lib/video"; import { getStoredVideoMetadata } from "@/lib/video/metadata"; import type { JobRow } from "./types"; declare global { // eslint-disable-next-line no-var var __whisperjavWorkerStarted: boolean | undefined; // eslint-disable-next-line no-var var __whisperjavRunningKill: ((reason: "cancel") => void) | undefined; // eslint-disable-next-line no-var var __whisperjavRunningId: string | null | undefined; } if (!global.__whisperjavWorkerStarted) { global.__whisperjavWorkerStarted = true; global.__whisperjavRunningKill = undefined; global.__whisperjavRunningId = null; // Restart sweep: any rows in `running` from a prior process are dead. try { const recovered = recoverOrphanedJobs(); if (recovered > 0) { // eslint-disable-next-line no-console console.log(`[whisperjav] recovered ${recovered} orphaned job(s) on bootstrap`); } } catch (e) { // eslint-disable-next-line no-console console.error("[whisperjav] orphan recovery failed:", e); } } export interface EnqueueResult { jobId: string; } export interface EnqueueAlreadyExists { alreadyExists: true; abs: string; } const BANNER_CHARS = /^[╔╗╚╝═║│┌┐└┘─\s]*$/; const NOISE = [ /RequestsDependencyWarning/i, /urllib3 \(\d+\.\d+/, /chardet|charset_normalizer/, /You are about to download and run code from an untrusted repository/, /^Downloading: /, /UserWarning:/, /^\s*warnings\.warn/, /^_check_repo_is_trusted/, ]; function isNoiseLine(s: string): boolean { if (!s.trim()) return true; if (BANNER_CHARS.test(s)) return true; for (const re of NOISE) if (re.test(s)) return true; return false; } function generatedSubtitlesDir(code: string): string { return path.join(process.cwd(), "data", "generated-subtitles", code); } /** Compute the destination dir for a given video, given settings. * Returns the dir absolute path. Creates it if needed. */ async function resolveDestDir(videoAbs: string, code: string): Promise { const s = getAppSetting("whisperjav"); if (s.outputLocation === "beside-video") { const dir = path.dirname(videoAbs); if (await isDirWritable(dir)) return dir; } const fallback = generatedSubtitlesDir(code); await fsp.mkdir(fallback, { recursive: true }); return fallback; } /** Filename pattern WhisperJAV emits — derived from settings, used for * pre-flight idempotency checks. The CLI may add a suffix beyond this * pattern but the language tag is stable. */ function expectedSubtitleStemPrefix(videoStem: string, langTag: string): string { return `${videoStem}.${langTag}`; } function langTagForSettings(): "ja" | "ko" | "zh" | "en" { const s = getAppSetting("whisperjav"); if (s.outputMode === "direct-to-english") return "en"; switch (s.sourceLanguage) { case "japanese": return "ja"; case "korean": return "ko"; case "chinese": return "zh"; case "english": return "en"; } } async function existingGeneratedFor(videoAbs: string, code: string): Promise { const langTag = langTagForSettings(); const stem = path.basename(videoAbs, path.extname(videoAbs)); const prefix = expectedSubtitleStemPrefix(stem, langTag); const candidates = [ path.dirname(videoAbs), generatedSubtitlesDir(code), ]; for (const dir of candidates) { let entries: import("node:fs").Dirent[]; try { entries = await fsp.readdir(dir, { withFileTypes: true }); } catch { continue; } for (const e of entries) { if (!e.isFile()) continue; const lower = e.name.toLowerCase(); if (!lower.endsWith(".srt")) continue; // Match `.` followed by ANY further token before .srt. // Catches both `..srt` and `..whisperjav.srt`. if (lower.startsWith(prefix.toLowerCase() + ".") || lower === `${prefix.toLowerCase()}.srt`) { return path.join(dir, e.name); } } } return null; } export async function enqueueJob(opts: { code: string; partIdx: number; overwrite?: boolean }): Promise { const settings = getAppSetting("whisperjav"); if (!settings.cliPath) { throw new Error("WhisperJAV CLI path not configured"); } let files = findVideosForCode(opts.code); if (files.length === 0) { // Index might be empty in dev — kick a rescan once. await rescanVideoIndex(); files = findVideosForCode(opts.code); } const variant = files[opts.partIdx]; if (!variant) throw new Error(`No video found for code=${opts.code} part=${opts.partIdx}`); const existing = await existingGeneratedFor(variant.abs, opts.code); if (existing) { if (!opts.overwrite) { return { alreadyExists: true, abs: existing }; } // User confirmed overwrite — remove the prior generated file so the // post-run move at the queue worker doesn't trip its collision guard. try { await fsp.unlink(existing); } catch (e) { throw new Error(`Could not remove existing subtitle: ${(e as Error).message}`); } } const id = newJobId(); const jobDir = jobDirFor(id); await fsp.mkdir(jobDir, { recursive: true }); const statsPath = path.join(jobDir, "stats.json"); const logPath = path.join(jobDir, "stderr.log"); const args = buildJobArgs({ videoAbs: variant.abs, outputDir: jobDir, statsPath, settings, }); // Capture duration + mode at enqueue so the running-job UI can show // an ETA. Duration may be missing if the file hasn't been probed yet // — that's fine, ETA is best-effort. const stored = getStoredVideoMetadata(variant.abs); const videoDurationSec = stored?.durationSec && stored.durationSec > 0 ? stored.durationSec : null; insertJob({ id, code: opts.code, videoAbs: variant.abs, jobDir, status: "queued", enqueuedAt: Date.now(), cliArgs: JSON.stringify(args), logPath, statsPath, videoDurationSec, mode: settings.quality, }); scheduleTick(); return { jobId: id }; } export function cancelJob(id: string): boolean { const job = getJob(id); if (!job) return false; if (job.status === "queued") { // Atomic flip: if the worker just picked this job up between our // read and write, the WHERE clause matches zero rows and we fall // through to the running branch. const info = rawDb.prepare( `UPDATE whisperjav_jobs SET status = 'cancelled', ended_at = ? WHERE id = ? AND status = 'queued'`, ).run(Date.now(), id); if (info.changes > 0) return true; } // Re-read after the failed conditional update — status may now be 'running'. const fresh = getJob(id) ?? job; if (fresh.status === "running" && global.__whisperjavRunningId === id && global.__whisperjavRunningKill) { global.__whisperjavRunningKill("cancel"); return true; } return false; } let tickPending = false; function scheduleTick(): void { if (tickPending) return; tickPending = true; // Defer to next tick so callers' DB writes are visible. setImmediate(() => { tickPending = false; void runOne(); }); } let workerBusy = false; async function runOne(): Promise { if (workerBusy) return; workerBusy = true; try { while (true) { const next = nextQueuedJob(); if (!next) break; await processJob(next); } } finally { workerBusy = false; } } async function processJob(job: JobRow): Promise { const settings = getAppSetting("whisperjav"); if (!settings.cliPath) { setStatus(job.id, "failed", { startedAt: Date.now(), endedAt: Date.now(), error: "WhisperJAV CLI path cleared while job was queued" }); return; } setStatus(job.id, "running", { startedAt: Date.now() }); let args: string[]; try { args = JSON.parse(job.cliArgs) as string[]; } catch { setStatus(job.id, "failed", { endedAt: Date.now(), error: "stored cli_args malformed" }); return; } const logStream = fs.createWriteStream(job.logPath, { flags: "a" }); const spawned = spawnJob(settings.cliPath, args); global.__whisperjavRunningId = job.id; let cancelled = false; global.__whisperjavRunningKill = (reason) => { if (reason === "cancel") cancelled = true; spawned.kill(); }; // Stage parser — write-through on every match. WhisperJAV emits // at most a handful of "Step N/M:" lines per job; the prior // debounce raced the rl close drain and dropped the final stage. const rl = readline.createInterface({ input: spawned.proc.stderr ?? Readable.from([]) }); rl.on("line", (raw: string) => { // Strip CSI escape sequences (optional ESC byte + "[...m"). const line = raw.replace(/?\[[0-9;]*m/g, ""); logStream.write(line + "\n"); if (isNoiseLine(line)) return; const stage = parseStageLine(line); if (stage) { updateProgress(job.id, stage.stage, stage.index, stage.total); } }); // We don't currently expect anything on stdout; tee anyway. spawned.proc.stdout?.on("data", (b) => { logStream.write(b); }); const exitCode: number | null = await new Promise((resolve) => { spawned.proc.on("close", (code) => resolve(code)); spawned.proc.on("error", () => resolve(null)); }); rl.close(); await new Promise((res) => logStream.end(() => res())); global.__whisperjavRunningKill = undefined; global.__whisperjavRunningId = null; if (cancelled) { setStatus(job.id, "cancelled", { endedAt: Date.now(), exitCode }); // Keep job dir for diagnosis. Could prune later via retention sweep. return; } const result = await validateOutcome({ exitCode, statsPath: job.statsPath ?? path.join(job.jobDir, "stats.json"), jobDir: job.jobDir, }); if (!result.success) { setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: result.reason ?? "validation failed" }); return; } // Move the .srt to its final destination, fail loudly on collision. if (!result.finalSrtPath) { setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: "no final_srt path resolved" }); return; } let destDir: string; try { destDir = await resolveDestDir(job.videoAbs, job.code); } catch (e) { setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: `dest dir resolve failed: ${(e as Error).message}` }); return; } const destFile = path.join(destDir, path.basename(result.finalSrtPath)); if (fs.existsSync(destFile)) { setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: `Output already exists at ${destFile}` }); return; } try { await moveFile(result.finalSrtPath, destFile); } catch (e) { setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: `move failed: ${(e as Error).message}` }); return; } // Cleanup temp dir on success/warning. try { await fsp.rm(job.jobDir, { recursive: true, force: true }); } catch { /* best effort */ } setStatus(job.id, result.warning ? "warning" : "completed", { endedAt: Date.now(), exitCode, targetSubtitlePath: destFile, cueCount: result.cueCount, error: result.warning ? result.reason : null, }); // Opportunistic retention sweep — keeps the disk tidy without a // separate scheduler. No-op when retention is 0. void runRetentionSweep(); } /** Mark every queued (not-yet-running) job cancelled. Used by the * "Stop Batch" button to drain the queue without touching the * currently-running job. Returns the count cancelled. */ export function cancelAllQueued(): number { const result = rawDb.prepare(` UPDATE whisperjav_jobs SET status = 'cancelled', ended_at = ? WHERE status = 'queued' `).run(Date.now()); return result.changes ?? 0; } /** Module-load side effect: make sure leftover queued jobs from a * prior process get picked up. Safe to call repeatedly. */ export function bootstrapQueue(): void { scheduleTick(); void runRetentionSweep(); } /** Delete failed/cancelled job dirs older than `retentionDays`. Always * safe to call — no-ops when retention is 0 or no aged rows exist. */ export async function runRetentionSweep(): Promise<{ removed: number }> { const settings = getAppSetting("whisperjav"); const days = Number(settings.retentionDays); if (!Number.isFinite(days) || days <= 0) return { removed: 0 }; const cutoff = Date.now() - days * 24 * 60 * 60 * 1000; const aged = listAgedTerminalJobs(cutoff); let removed = 0; for (const row of aged) { try { await fsp.rm(row.jobDir, { recursive: true, force: true }); removed++; } catch (e) { console.error(`[whisperjav] failed to prune ${row.jobDir}:`, e); } } if (removed > 0) { console.log(`[whisperjav] retention sweep removed ${removed} job dir(s)`); } return { removed }; } /** Wipe every non-running job row + every temp dir on disk. Used by * the "Clear all job history" Settings action. Returns counts. */ export async function clearAllJobHistory(): Promise<{ rows: number; dirs: number }> { const dirs = listAllJobDirs(); let dirsRemoved = 0; for (const dir of dirs) { try { await fsp.rm(dir, { recursive: true, force: true }); dirsRemoved++; } catch { /* best effort */ } } const rows = deleteAllJobs(); return { rows, dirs: dirsRemoved }; } bootstrapQueue(); export { activeJobForCode, getJob, jobBaseDir };