import "server-only"; import { rawDb } from "@/lib/db/client"; import type { JobRow, JobStatus } from "./types"; interface JobDbRow { id: string; code: string; video_abs: string; job_dir: string; target_subtitle_path: string | null; status: JobStatus; enqueued_at: number; started_at: number | null; ended_at: number | null; exit_code: number | null; error: string | null; stage: string | null; stage_index: number | null; stage_total: number | null; cue_count: number | null; cli_args: string; log_path: string; stats_path: string | null; video_duration_sec: number | null; mode: string | null; } function rowFromDb(r: JobDbRow): JobRow { return { id: r.id, code: r.code, videoAbs: r.video_abs, jobDir: r.job_dir, targetSubtitlePath: r.target_subtitle_path, status: r.status, enqueuedAt: r.enqueued_at, startedAt: r.started_at, endedAt: r.ended_at, exitCode: r.exit_code, error: r.error, stage: r.stage, stageIndex: r.stage_index, stageTotal: r.stage_total, cueCount: r.cue_count, cliArgs: r.cli_args, logPath: r.log_path, statsPath: r.stats_path, videoDurationSec: r.video_duration_sec, mode: r.mode, }; } export function insertJob(row: Omit): void { rawDb.prepare(` INSERT INTO whisperjav_jobs ( id, code, video_abs, job_dir, target_subtitle_path, status, enqueued_at, started_at, ended_at, exit_code, error, stage, stage_index, stage_total, cue_count, cli_args, log_path, stats_path, video_duration_sec, mode ) VALUES (?, ?, ?, ?, NULL, ?, ?, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, ?, ?, ?, ?, ?) `).run( row.id, row.code, row.videoAbs, row.jobDir, row.status, row.enqueuedAt, row.cliArgs, row.logPath, row.statsPath, row.videoDurationSec, row.mode, ); } /** Returns avg(elapsed_sec / video_duration_sec) over recent * successful jobs for the given mode. Used to estimate remaining time * for an in-flight job. Falls back to a per-mode seed when no history * is available. */ export function estimateRealtimeMultiplier(mode: string): number { const rows = rawDb .prepare( `SELECT started_at, ended_at, video_duration_sec FROM whisperjav_jobs WHERE status IN ('completed', 'warning') AND mode = ? AND started_at IS NOT NULL AND ended_at IS NOT NULL AND video_duration_sec IS NOT NULL AND video_duration_sec > 0 ORDER BY ended_at DESC LIMIT 10`, ) .all(mode) as Array<{ started_at: number; ended_at: number; video_duration_sec: number }>; if (rows.length === 0) { if (mode === "fast") return 0.8; if (mode === "qwen") return 6.0; return 2.0; // balanced default } let sum = 0; let n = 0; for (const r of rows) { const elapsed = (r.ended_at - r.started_at) / 1000; if (elapsed <= 0) continue; sum += elapsed / r.video_duration_sec; n++; } return n > 0 ? sum / n : 2.0; } export function getJob(id: string): JobRow | null { const r = rawDb.prepare(`SELECT * FROM whisperjav_jobs WHERE id = ?`).get(id) as JobDbRow | undefined; return r ? rowFromDb(r) : null; } export function listJobsForCode(code: string, limit = 5): JobRow[] { const rows = rawDb.prepare(` SELECT * FROM whisperjav_jobs WHERE code = ? ORDER BY enqueued_at DESC LIMIT ? `).all(code, limit) as JobDbRow[]; return rows.map(rowFromDb); } /** Earliest queued job, regardless of code. */ export function nextQueuedJob(): JobRow | null { const r = rawDb.prepare(` SELECT * FROM whisperjav_jobs WHERE status = 'queued' ORDER BY enqueued_at ASC LIMIT 1 `).get() as JobDbRow | undefined; return r ? rowFromDb(r) : null; } /** Most recent non-terminal (queued/running) job for a code, if any. */ export function activeJobForCode(code: string): JobRow | null { const r = rawDb.prepare(` SELECT * FROM whisperjav_jobs WHERE code = ? AND status IN ('queued','running') ORDER BY enqueued_at DESC LIMIT 1 `).get(code) as JobDbRow | undefined; return r ? rowFromDb(r) : null; } export function setStatus(id: string, status: JobStatus, fields: Partial<{ startedAt: number | null; endedAt: number | null; exitCode: number | null; error: string | null; targetSubtitlePath: string | null; cueCount: number | null; }> = {}): void { const sets: string[] = ["status = ?"]; const args: (string | number | null)[] = [status]; const map: Record = { startedAt: "started_at", endedAt: "ended_at", exitCode: "exit_code", error: "error", targetSubtitlePath: "target_subtitle_path", cueCount: "cue_count", }; for (const [k, col] of Object.entries(map)) { if (k in fields) { sets.push(`${col} = ?`); args.push((fields as Record)[k] ?? null); } } args.push(id); rawDb.prepare(`UPDATE whisperjav_jobs SET ${sets.join(", ")} WHERE id = ?`).run(...args); } export function updateProgress(id: string, stage: string | null, idx: number | null, total: number | null): void { rawDb.prepare(` UPDATE whisperjav_jobs SET stage = ?, stage_index = ?, stage_total = ? WHERE id = ? `).run(stage, idx, total, id); } /** Rows older than `cutoffMs` whose status is one of the terminal * retention candidates (failed/cancelled). Used by the retention * sweep to find job dirs to delete. */ export function listAgedTerminalJobs(cutoffMs: number): Array<{ id: string; jobDir: string }> { const rows = rawDb.prepare(` SELECT id, job_dir FROM whisperjav_jobs WHERE status IN ('failed', 'cancelled') AND COALESCE(ended_at, enqueued_at) < ? `).all(cutoffMs) as Array<{ id: string; job_dir: string }>; return rows.map((r) => ({ id: r.id, jobDir: r.job_dir })); } /** Used by the "Clear all job history" Settings action. */ export function listAllJobDirs(): string[] { const rows = rawDb.prepare(`SELECT job_dir FROM whisperjav_jobs`).all() as Array<{ job_dir: string }>; return rows.map((r) => r.job_dir); } export function deleteAllJobs(): number { const result = rawDb.prepare(`DELETE FROM whisperjav_jobs WHERE status NOT IN ('queued', 'running')`).run(); return result.changes ?? 0; } /** Mark any running rows as failed (their child processes are dead). * Queued rows remain queued — they're still waiting their turn. */ export function recoverOrphanedJobs(): number { const result = rawDb.prepare(` UPDATE whisperjav_jobs SET status = 'failed', error = 'process did not survive restart', ended_at = ? WHERE status = 'running' `).run(Date.now()); return result.changes ?? 0; }