Initial commit
This commit is contained in:
@@ -0,0 +1,438 @@
|
||||
import "server-only";
|
||||
import path from "node:path";
|
||||
import fs from "node:fs";
|
||||
import fsp from "node:fs/promises";
|
||||
import readline from "node:readline";
|
||||
import { Readable } from "node:stream";
|
||||
import { getAppSetting } from "@/lib/db/appSettings";
|
||||
import { rawDb } from "@/lib/db/client";
|
||||
import {
|
||||
insertJob,
|
||||
getJob,
|
||||
setStatus,
|
||||
updateProgress,
|
||||
nextQueuedJob,
|
||||
recoverOrphanedJobs,
|
||||
activeJobForCode,
|
||||
listAgedTerminalJobs,
|
||||
listAllJobDirs,
|
||||
deleteAllJobs,
|
||||
} from "./db";
|
||||
import {
|
||||
buildJobArgs,
|
||||
parseStageLine,
|
||||
spawnJob,
|
||||
validateOutcome,
|
||||
moveFile,
|
||||
isDirWritable,
|
||||
jobBaseDir,
|
||||
jobDirFor,
|
||||
newJobId,
|
||||
} from "./spawn";
|
||||
import { findVideosForCode, rescanVideoIndex } from "@/lib/video";
|
||||
import { getStoredVideoMetadata } from "@/lib/video/metadata";
|
||||
import type { JobRow } from "./types";
|
||||
|
||||
declare global {
|
||||
// eslint-disable-next-line no-var
|
||||
var __whisperjavWorkerStarted: boolean | undefined;
|
||||
// eslint-disable-next-line no-var
|
||||
var __whisperjavRunningKill: ((reason: "cancel") => void) | undefined;
|
||||
// eslint-disable-next-line no-var
|
||||
var __whisperjavRunningId: string | null | undefined;
|
||||
}
|
||||
|
||||
if (!global.__whisperjavWorkerStarted) {
|
||||
global.__whisperjavWorkerStarted = true;
|
||||
global.__whisperjavRunningKill = undefined;
|
||||
global.__whisperjavRunningId = null;
|
||||
// Restart sweep: any rows in `running` from a prior process are dead.
|
||||
try {
|
||||
const recovered = recoverOrphanedJobs();
|
||||
if (recovered > 0) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.log(`[whisperjav] recovered ${recovered} orphaned job(s) on bootstrap`);
|
||||
}
|
||||
} catch (e) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.error("[whisperjav] orphan recovery failed:", e);
|
||||
}
|
||||
}
|
||||
|
||||
export interface EnqueueResult {
|
||||
jobId: string;
|
||||
}
|
||||
|
||||
export interface EnqueueAlreadyExists {
|
||||
alreadyExists: true;
|
||||
abs: string;
|
||||
}
|
||||
|
||||
const BANNER_CHARS = /^[╔╗╚╝═║│┌┐└┘─\s]*$/;
|
||||
const NOISE = [
|
||||
/RequestsDependencyWarning/i,
|
||||
/urllib3 \(\d+\.\d+/,
|
||||
/chardet|charset_normalizer/,
|
||||
/You are about to download and run code from an untrusted repository/,
|
||||
/^Downloading: /,
|
||||
/UserWarning:/,
|
||||
/^\s*warnings\.warn/,
|
||||
/^_check_repo_is_trusted/,
|
||||
];
|
||||
|
||||
function isNoiseLine(s: string): boolean {
|
||||
if (!s.trim()) return true;
|
||||
if (BANNER_CHARS.test(s)) return true;
|
||||
for (const re of NOISE) if (re.test(s)) return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
function generatedSubtitlesDir(code: string): string {
|
||||
return path.join(process.cwd(), "data", "generated-subtitles", code);
|
||||
}
|
||||
|
||||
/** Compute the destination dir for a given video, given settings.
|
||||
* Returns the dir absolute path. Creates it if needed. */
|
||||
async function resolveDestDir(videoAbs: string, code: string): Promise<string> {
|
||||
const s = getAppSetting("whisperjav");
|
||||
if (s.outputLocation === "beside-video") {
|
||||
const dir = path.dirname(videoAbs);
|
||||
if (await isDirWritable(dir)) return dir;
|
||||
}
|
||||
const fallback = generatedSubtitlesDir(code);
|
||||
await fsp.mkdir(fallback, { recursive: true });
|
||||
return fallback;
|
||||
}
|
||||
|
||||
/** Filename pattern WhisperJAV emits — derived from settings, used for
|
||||
* pre-flight idempotency checks. The CLI may add a suffix beyond this
|
||||
* pattern but the language tag is stable. */
|
||||
function expectedSubtitleStemPrefix(videoStem: string, langTag: string): string {
|
||||
return `${videoStem}.${langTag}`;
|
||||
}
|
||||
|
||||
function langTagForSettings(): "ja" | "ko" | "zh" | "en" {
|
||||
const s = getAppSetting("whisperjav");
|
||||
if (s.outputMode === "direct-to-english") return "en";
|
||||
switch (s.sourceLanguage) {
|
||||
case "japanese": return "ja";
|
||||
case "korean": return "ko";
|
||||
case "chinese": return "zh";
|
||||
case "english": return "en";
|
||||
}
|
||||
}
|
||||
|
||||
async function existingGeneratedFor(videoAbs: string, code: string): Promise<string | null> {
|
||||
const langTag = langTagForSettings();
|
||||
const stem = path.basename(videoAbs, path.extname(videoAbs));
|
||||
const prefix = expectedSubtitleStemPrefix(stem, langTag);
|
||||
const candidates = [
|
||||
path.dirname(videoAbs),
|
||||
generatedSubtitlesDir(code),
|
||||
];
|
||||
for (const dir of candidates) {
|
||||
let entries: import("node:fs").Dirent[];
|
||||
try {
|
||||
entries = await fsp.readdir(dir, { withFileTypes: true });
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
for (const e of entries) {
|
||||
if (!e.isFile()) continue;
|
||||
const lower = e.name.toLowerCase();
|
||||
if (!lower.endsWith(".srt")) continue;
|
||||
// Match `<stem>.<lang>` followed by ANY further token before .srt.
|
||||
// Catches both `<stem>.<lang>.srt` and `<stem>.<lang>.whisperjav.srt`.
|
||||
if (lower.startsWith(prefix.toLowerCase() + ".") || lower === `${prefix.toLowerCase()}.srt`) {
|
||||
return path.join(dir, e.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
export async function enqueueJob(opts: { code: string; partIdx: number; overwrite?: boolean }): Promise<EnqueueResult | EnqueueAlreadyExists> {
|
||||
const settings = getAppSetting("whisperjav");
|
||||
if (!settings.cliPath) {
|
||||
throw new Error("WhisperJAV CLI path not configured");
|
||||
}
|
||||
let files = findVideosForCode(opts.code);
|
||||
if (files.length === 0) {
|
||||
// Index might be empty in dev — kick a rescan once.
|
||||
await rescanVideoIndex();
|
||||
files = findVideosForCode(opts.code);
|
||||
}
|
||||
const variant = files[opts.partIdx];
|
||||
if (!variant) throw new Error(`No video found for code=${opts.code} part=${opts.partIdx}`);
|
||||
|
||||
const existing = await existingGeneratedFor(variant.abs, opts.code);
|
||||
if (existing) {
|
||||
if (!opts.overwrite) {
|
||||
return { alreadyExists: true, abs: existing };
|
||||
}
|
||||
// User confirmed overwrite — remove the prior generated file so the
|
||||
// post-run move at the queue worker doesn't trip its collision guard.
|
||||
try {
|
||||
await fsp.unlink(existing);
|
||||
} catch (e) {
|
||||
throw new Error(`Could not remove existing subtitle: ${(e as Error).message}`);
|
||||
}
|
||||
}
|
||||
|
||||
const id = newJobId();
|
||||
const jobDir = jobDirFor(id);
|
||||
await fsp.mkdir(jobDir, { recursive: true });
|
||||
const statsPath = path.join(jobDir, "stats.json");
|
||||
const logPath = path.join(jobDir, "stderr.log");
|
||||
const args = buildJobArgs({
|
||||
videoAbs: variant.abs,
|
||||
outputDir: jobDir,
|
||||
statsPath,
|
||||
settings,
|
||||
});
|
||||
|
||||
// Capture duration + mode at enqueue so the running-job UI can show
|
||||
// an ETA. Duration may be missing if the file hasn't been probed yet
|
||||
// — that's fine, ETA is best-effort.
|
||||
const stored = getStoredVideoMetadata(variant.abs);
|
||||
const videoDurationSec = stored?.durationSec && stored.durationSec > 0 ? stored.durationSec : null;
|
||||
|
||||
insertJob({
|
||||
id,
|
||||
code: opts.code,
|
||||
videoAbs: variant.abs,
|
||||
jobDir,
|
||||
status: "queued",
|
||||
enqueuedAt: Date.now(),
|
||||
cliArgs: JSON.stringify(args),
|
||||
logPath,
|
||||
statsPath,
|
||||
videoDurationSec,
|
||||
mode: settings.quality,
|
||||
});
|
||||
|
||||
scheduleTick();
|
||||
return { jobId: id };
|
||||
}
|
||||
|
||||
export function cancelJob(id: string): boolean {
|
||||
const job = getJob(id);
|
||||
if (!job) return false;
|
||||
if (job.status === "queued") {
|
||||
// Atomic flip: if the worker just picked this job up between our
|
||||
// read and write, the WHERE clause matches zero rows and we fall
|
||||
// through to the running branch.
|
||||
const info = rawDb.prepare(
|
||||
`UPDATE whisperjav_jobs SET status = 'cancelled', ended_at = ? WHERE id = ? AND status = 'queued'`,
|
||||
).run(Date.now(), id);
|
||||
if (info.changes > 0) return true;
|
||||
}
|
||||
// Re-read after the failed conditional update — status may now be 'running'.
|
||||
const fresh = getJob(id) ?? job;
|
||||
if (fresh.status === "running" && global.__whisperjavRunningId === id && global.__whisperjavRunningKill) {
|
||||
global.__whisperjavRunningKill("cancel");
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
let tickPending = false;
|
||||
function scheduleTick(): void {
|
||||
if (tickPending) return;
|
||||
tickPending = true;
|
||||
// Defer to next tick so callers' DB writes are visible.
|
||||
setImmediate(() => {
|
||||
tickPending = false;
|
||||
void runOne();
|
||||
});
|
||||
}
|
||||
|
||||
let workerBusy = false;
|
||||
async function runOne(): Promise<void> {
|
||||
if (workerBusy) return;
|
||||
workerBusy = true;
|
||||
try {
|
||||
while (true) {
|
||||
const next = nextQueuedJob();
|
||||
if (!next) break;
|
||||
await processJob(next);
|
||||
}
|
||||
} finally {
|
||||
workerBusy = false;
|
||||
}
|
||||
}
|
||||
|
||||
async function processJob(job: JobRow): Promise<void> {
|
||||
const settings = getAppSetting("whisperjav");
|
||||
if (!settings.cliPath) {
|
||||
setStatus(job.id, "failed", { startedAt: Date.now(), endedAt: Date.now(), error: "WhisperJAV CLI path cleared while job was queued" });
|
||||
return;
|
||||
}
|
||||
|
||||
setStatus(job.id, "running", { startedAt: Date.now() });
|
||||
|
||||
let args: string[];
|
||||
try {
|
||||
args = JSON.parse(job.cliArgs) as string[];
|
||||
} catch {
|
||||
setStatus(job.id, "failed", { endedAt: Date.now(), error: "stored cli_args malformed" });
|
||||
return;
|
||||
}
|
||||
|
||||
const logStream = fs.createWriteStream(job.logPath, { flags: "a" });
|
||||
const spawned = spawnJob(settings.cliPath, args);
|
||||
global.__whisperjavRunningId = job.id;
|
||||
let cancelled = false;
|
||||
global.__whisperjavRunningKill = (reason) => {
|
||||
if (reason === "cancel") cancelled = true;
|
||||
spawned.kill();
|
||||
};
|
||||
|
||||
|
||||
// Stage parser — write-through on every match. WhisperJAV emits
|
||||
// at most a handful of "Step N/M:" lines per job; the prior
|
||||
// debounce raced the rl close drain and dropped the final stage.
|
||||
const rl = readline.createInterface({ input: spawned.proc.stderr ?? Readable.from([]) });
|
||||
rl.on("line", (raw: string) => {
|
||||
// Strip CSI escape sequences (optional ESC byte + "[...m").
|
||||
const line = raw.replace(/?\[[0-9;]*m/g, "");
|
||||
logStream.write(line + "\n");
|
||||
if (isNoiseLine(line)) return;
|
||||
const stage = parseStageLine(line);
|
||||
if (stage) {
|
||||
updateProgress(job.id, stage.stage, stage.index, stage.total);
|
||||
}
|
||||
});
|
||||
|
||||
// We don't currently expect anything on stdout; tee anyway.
|
||||
spawned.proc.stdout?.on("data", (b) => { logStream.write(b); });
|
||||
|
||||
const exitCode: number | null = await new Promise((resolve) => {
|
||||
spawned.proc.on("close", (code) => resolve(code));
|
||||
spawned.proc.on("error", () => resolve(null));
|
||||
});
|
||||
|
||||
rl.close();
|
||||
await new Promise<void>((res) => logStream.end(() => res()));
|
||||
|
||||
global.__whisperjavRunningKill = undefined;
|
||||
global.__whisperjavRunningId = null;
|
||||
|
||||
if (cancelled) {
|
||||
setStatus(job.id, "cancelled", { endedAt: Date.now(), exitCode });
|
||||
// Keep job dir for diagnosis. Could prune later via retention sweep.
|
||||
return;
|
||||
}
|
||||
|
||||
const result = await validateOutcome({
|
||||
exitCode,
|
||||
statsPath: job.statsPath ?? path.join(job.jobDir, "stats.json"),
|
||||
jobDir: job.jobDir,
|
||||
});
|
||||
|
||||
if (!result.success) {
|
||||
setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: result.reason ?? "validation failed" });
|
||||
return;
|
||||
}
|
||||
|
||||
// Move the .srt to its final destination, fail loudly on collision.
|
||||
if (!result.finalSrtPath) {
|
||||
setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: "no final_srt path resolved" });
|
||||
return;
|
||||
}
|
||||
let destDir: string;
|
||||
try {
|
||||
destDir = await resolveDestDir(job.videoAbs, job.code);
|
||||
} catch (e) {
|
||||
setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: `dest dir resolve failed: ${(e as Error).message}` });
|
||||
return;
|
||||
}
|
||||
const destFile = path.join(destDir, path.basename(result.finalSrtPath));
|
||||
if (fs.existsSync(destFile)) {
|
||||
setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: `Output already exists at ${destFile}` });
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await moveFile(result.finalSrtPath, destFile);
|
||||
} catch (e) {
|
||||
setStatus(job.id, "failed", { endedAt: Date.now(), exitCode, error: `move failed: ${(e as Error).message}` });
|
||||
return;
|
||||
}
|
||||
|
||||
// Cleanup temp dir on success/warning.
|
||||
try {
|
||||
await fsp.rm(job.jobDir, { recursive: true, force: true });
|
||||
} catch { /* best effort */ }
|
||||
|
||||
setStatus(job.id, result.warning ? "warning" : "completed", {
|
||||
endedAt: Date.now(),
|
||||
exitCode,
|
||||
targetSubtitlePath: destFile,
|
||||
cueCount: result.cueCount,
|
||||
error: result.warning ? result.reason : null,
|
||||
});
|
||||
|
||||
// Opportunistic retention sweep — keeps the disk tidy without a
|
||||
// separate scheduler. No-op when retention is 0.
|
||||
void runRetentionSweep();
|
||||
}
|
||||
|
||||
/** Mark every queued (not-yet-running) job cancelled. Used by the
|
||||
* "Stop Batch" button to drain the queue without touching the
|
||||
* currently-running job. Returns the count cancelled. */
|
||||
export function cancelAllQueued(): number {
|
||||
const result = rawDb.prepare(`
|
||||
UPDATE whisperjav_jobs SET status = 'cancelled', ended_at = ?
|
||||
WHERE status = 'queued'
|
||||
`).run(Date.now());
|
||||
return result.changes ?? 0;
|
||||
}
|
||||
|
||||
/** Module-load side effect: make sure leftover queued jobs from a
|
||||
* prior process get picked up. Safe to call repeatedly. */
|
||||
export function bootstrapQueue(): void {
|
||||
scheduleTick();
|
||||
void runRetentionSweep();
|
||||
}
|
||||
|
||||
/** Delete failed/cancelled job dirs older than `retentionDays`. Always
|
||||
* safe to call — no-ops when retention is 0 or no aged rows exist. */
|
||||
export async function runRetentionSweep(): Promise<{ removed: number }> {
|
||||
const settings = getAppSetting("whisperjav");
|
||||
const days = Number(settings.retentionDays);
|
||||
if (!Number.isFinite(days) || days <= 0) return { removed: 0 };
|
||||
const cutoff = Date.now() - days * 24 * 60 * 60 * 1000;
|
||||
const aged = listAgedTerminalJobs(cutoff);
|
||||
let removed = 0;
|
||||
for (const row of aged) {
|
||||
try {
|
||||
await fsp.rm(row.jobDir, { recursive: true, force: true });
|
||||
removed++;
|
||||
} catch (e) {
|
||||
console.error(`[whisperjav] failed to prune ${row.jobDir}:`, e);
|
||||
}
|
||||
}
|
||||
if (removed > 0) {
|
||||
console.log(`[whisperjav] retention sweep removed ${removed} job dir(s)`);
|
||||
}
|
||||
return { removed };
|
||||
}
|
||||
|
||||
/** Wipe every non-running job row + every temp dir on disk. Used by
|
||||
* the "Clear all job history" Settings action. Returns counts. */
|
||||
export async function clearAllJobHistory(): Promise<{ rows: number; dirs: number }> {
|
||||
const dirs = listAllJobDirs();
|
||||
let dirsRemoved = 0;
|
||||
for (const dir of dirs) {
|
||||
try {
|
||||
await fsp.rm(dir, { recursive: true, force: true });
|
||||
dirsRemoved++;
|
||||
} catch { /* best effort */ }
|
||||
}
|
||||
const rows = deleteAllJobs();
|
||||
return { rows, dirs: dirsRemoved };
|
||||
}
|
||||
|
||||
bootstrapQueue();
|
||||
|
||||
export { activeJobForCode, getJob, jobBaseDir };
|
||||
Reference in New Issue
Block a user