ba57b7fd21
Carves the first slice out of the monolithic rc-jav.py (now 2017
lines, was 2230). Two new modules:
rcjav/model.py FileEntry dataclass — the one shared shape that
every other submodule will need.
rcjav/ids.py Single source of truth for everything that
influences a FileEntry.jav_id: PRIMARY_ID_RE,
FALLBACK_ID_RE, COMPOUND_ID_RE, BUILTIN_PART_RES,
configure_part_patterns, detect_part,
detect_part_from_stem, part_key, extract_id,
normalize_id, describe_id_match, expand_range,
plus the supporting "private" regexes
(_BRACKET_ID_RE, _RESOLUTION_TAG_RE, etc.) that
other code in rc-jav.py still reads.
rcjav/__init__.py re-exports the public surface so future external
consumers can `from rcjav import extract_id` without caring which
submodule it lives in.
rc-jav.py drops the inline ID block and pulls everything from
rcjav.ids via a single import statement. PART_RES is intentionally
NOT imported — it's mutated by configure_part_patterns at runtime, so
a captured top-level reference would go stale. A small helper
`_current_part_res()` reads it dynamically via `_rcjav_ids.PART_RES`.
fixtures/run.py fix: synthesized importlib module name changed from
"rcjav" (which now collides with the real package directory) to
"rcjav_script". Also prepends ROOT to sys.path so rc-jav.py's
`from rcjav.model import …` resolves when run as
`python fixtures/run.py`.
Verified:
- python rc-jav.py --help → usage banner prints
- python fixtures/run.py → 17/17 cases pass
- python -m unittest tests.test_rules → 5/5 OK
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2020 lines
86 KiB
Python
2020 lines
86 KiB
Python
#!/usr/bin/env python3
|
|
"""Scan rclone remotes for duplicate JAV files grouped by ID."""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import fnmatch
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
import xml.etree.ElementTree as ET
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
from rich.console import Console
|
|
from rich.panel import Panel
|
|
from rich.progress import (
|
|
BarColumn,
|
|
MofNCompleteColumn,
|
|
Progress,
|
|
SpinnerColumn,
|
|
TextColumn,
|
|
TimeElapsedColumn,
|
|
TimeRemainingColumn,
|
|
)
|
|
from rich.table import Table
|
|
from rich.text import Text
|
|
|
|
from rcjav.model import FileEntry
|
|
from rcjav import ids as _rcjav_ids
|
|
from rcjav.ids import (
|
|
PRIMARY_ID_RE,
|
|
FALLBACK_ID_RE,
|
|
COMPOUND_ID_RE,
|
|
RANGE_RE,
|
|
BUILTIN_PART_RES,
|
|
configure_part_patterns,
|
|
detect_part,
|
|
detect_part_from_stem,
|
|
part_key,
|
|
extract_id,
|
|
normalize_id,
|
|
describe_id_match,
|
|
expand_range,
|
|
_VARIANT_SUFFIX_RE,
|
|
_RES_LABEL_RE,
|
|
_RESOLUTION_TAG_RE,
|
|
_BRACKET_ID_RE,
|
|
_NOHYPHEN_ID_RE,
|
|
_VIDEO_EXTS,
|
|
_LOWEST_KEEP_PRIORITY_EXTS,
|
|
)
|
|
|
|
|
|
# PART_RES is rebound by configure_part_patterns(); always read it dynamically
|
|
# from the rcjav.ids module rather than capturing a stale binding at import time.
|
|
def _current_part_res():
|
|
return _rcjav_ids.PART_RES
|
|
|
|
|
|
def human_size(n: int) -> str:
|
|
nf = float(max(0, n))
|
|
for unit in ("B", "KiB", "MiB", "GiB", "TiB"):
|
|
if nf < 1024:
|
|
return f"{int(nf)} B" if unit == "B" else f"{nf:.2f} {unit}"
|
|
nf /= 1024
|
|
return f"{nf:.2f} PiB"
|
|
|
|
|
|
RCLONE_BIN = "rclone"
|
|
BASIC = False # set by --basic
|
|
USE_ANSI = True # disabled by --no-color
|
|
|
|
# Pre-rich ANSI codes (used in --basic mode for color).
|
|
ANSI_RESET = "\033[0m"
|
|
ANSI_GREEN = "\033[32m"
|
|
ANSI_RED = "\033[31m"
|
|
ANSI_YELLOW = "\033[33m"
|
|
ANSI_CYAN = "\033[36m"
|
|
ANSI_DIM = "\033[2m"
|
|
ANSI_BOLD = "\033[1m"
|
|
|
|
|
|
def ansi(s: str, code: str) -> str:
|
|
return f"{code}{s}{ANSI_RESET}" if USE_ANSI else s
|
|
console = Console() # replaced in main() if --no-color
|
|
|
|
|
|
_RICH_TAG_RE = re.compile(r"\[/?[^\]]*\]")
|
|
|
|
|
|
def strip_markup(s: str) -> str:
|
|
return _RICH_TAG_RE.sub("", s)
|
|
|
|
|
|
class BasicProgress:
|
|
"""Minimal stand-in for rich.Progress used when --basic is set."""
|
|
def __init__(self):
|
|
self._tasks: dict[int, dict] = {}
|
|
self._next = 0
|
|
self._last_print: dict[int, int] = {}
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *exc):
|
|
for tid, t in self._tasks.items():
|
|
sys.stderr.write(f"{ansi('[done]', ANSI_GREEN)} {t['desc']} {t['done']}/{t['total']}\n")
|
|
return False
|
|
|
|
def add_task(self, description: str, total: int = 1) -> int:
|
|
tid = self._next
|
|
self._next += 1
|
|
desc = strip_markup(description)
|
|
self._tasks[tid] = {"desc": desc, "total": total, "done": 0}
|
|
self._last_print[tid] = 0
|
|
sys.stderr.write(f"{ansi('[start]', ANSI_CYAN)} {desc}\n")
|
|
return tid
|
|
|
|
def update(self, tid, total=None, description=None, **_):
|
|
t = self._tasks[tid]
|
|
if total is not None:
|
|
t["total"] = total
|
|
if description is not None:
|
|
t["desc"] = strip_markup(description)
|
|
|
|
def advance(self, tid, n: int = 1):
|
|
t = self._tasks[tid]
|
|
t["done"] += n
|
|
# In-place refresh every 5 files (or every file if total small).
|
|
step = 5 if t["total"] > 50 else 1
|
|
if t["done"] - self._last_print[tid] >= step or t["done"] == t["total"]:
|
|
counter = ansi(f"{t['done']}/{t['total']}", ANSI_CYAN)
|
|
line = f" {counter} {ansi(t['desc'], ANSI_DIM)}"
|
|
if sys.stderr.isatty():
|
|
sys.stderr.write(f"\r\033[K{line}")
|
|
if t["done"] == t["total"]:
|
|
sys.stderr.write("\n")
|
|
sys.stderr.flush()
|
|
elif t["done"] == t["total"]:
|
|
# Non-TTY: only print final line, skip intermediate noise.
|
|
sys.stderr.write(line + "\n")
|
|
self._last_print[tid] = t["done"]
|
|
|
|
# Default remotes used when --search is invoked without explicit --source/--target.
|
|
DEFAULT_SOURCE = ["cq:personal-files/ClearJAV"]
|
|
DEFAULT_TARGET = ["cq:personal-files/JAV/TMP"]
|
|
|
|
# Default WinCatalog export folder (or specific files). Folder entries auto-discover *.csv / *.xml.
|
|
DEFAULT_CATALOG: list[str] = [str(Path(__file__).resolve().parent / "wincatalog")]
|
|
|
|
# CSV column synonyms (lowercased) — first matching one wins.
|
|
CATALOG_COL_NAME = ("name", "file name", "filename", "title")
|
|
CATALOG_COL_PATH = ("path", "full path", "location", "folder")
|
|
CATALOG_COL_SIZE = ("size", "file size", "bytes", "size (bytes)")
|
|
CATALOG_COL_DISC = ("disc", "disc name", "disc label", "volume", "source", "catalog", "media")
|
|
|
|
CACHE_PATH = Path(__file__).resolve().parent / "cache.json"
|
|
CACHE_VERSION = 3 # bumped: extract_id handles bracket-wrapped IDs + no-hyphen fallback
|
|
CACHE_STALE_HOURS = 24
|
|
|
|
DEFAULT_KEEP_RANKING: dict = {
|
|
"priority_folders": ["ClearJAV"],
|
|
"size_tolerance_mib": 0,
|
|
"format_preference": ["mkv", "mp4", "wmv", "avi"],
|
|
"tiebreak_res_tag": True,
|
|
"tiebreak_longer_name": True,
|
|
}
|
|
# Module-level ranking config; set from config.json in main() so all call sites pick it up.
|
|
_KEEP_RANKING: dict = {}
|
|
|
|
CONFIG_PATH = Path(__file__).resolve().parent / "config.json"
|
|
|
|
# Written by the native-messaging host when the user clicks Cancel in the
|
|
# extension popup. walk_remote checks for it every CANCEL_CHECK_INTERVAL files
|
|
# and exits cleanly if found.
|
|
CANCEL_FLAG = Path(__file__).resolve().parent / "scan-cancel.flag"
|
|
CANCEL_CHECK_INTERVAL = 100 # check / emit progress every N files
|
|
|
|
|
|
def load_config() -> dict:
|
|
if not CONFIG_PATH.exists():
|
|
return {}
|
|
try:
|
|
data = json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
|
|
if not isinstance(data, dict):
|
|
return {}
|
|
return data
|
|
except (json.JSONDecodeError, OSError):
|
|
return {}
|
|
|
|
|
|
def save_config(cfg: dict) -> None:
|
|
tmp = CONFIG_PATH.with_suffix(CONFIG_PATH.suffix + ".tmp")
|
|
tmp.write_text(json.dumps(cfg, indent=2), encoding="utf-8")
|
|
os.replace(tmp, CONFIG_PATH)
|
|
|
|
|
|
def load_cache() -> dict:
|
|
if not CACHE_PATH.exists():
|
|
return {"version": CACHE_VERSION, "remotes": {}}
|
|
try:
|
|
data = json.loads(CACHE_PATH.read_text(encoding="utf-8"))
|
|
if (
|
|
not isinstance(data, dict)
|
|
or data.get("version") != CACHE_VERSION
|
|
or not isinstance(data.get("remotes"), dict)
|
|
):
|
|
if isinstance(data, dict) and "version" in data and data["version"] != CACHE_VERSION:
|
|
sys.stderr.write(
|
|
f"[warn] cache version mismatch (got {data['version']}, "
|
|
f"expected {CACHE_VERSION}); forcing full rescan.\n"
|
|
)
|
|
return {"version": CACHE_VERSION, "remotes": {}}
|
|
return data
|
|
except (json.JSONDecodeError, OSError):
|
|
return {"version": CACHE_VERSION, "remotes": {}}
|
|
|
|
|
|
def save_cache(cache: dict) -> None:
|
|
# Write to a sibling tmp file then atomically replace, so a killed mid-write
|
|
# (Ctrl-C, power loss, concurrent --scan) can't leave a half-written
|
|
# cache.json — load_cache would otherwise see invalid JSON and fall back to
|
|
# an empty cache, forcing a full re-scan.
|
|
tmp = CACHE_PATH.with_suffix(CACHE_PATH.suffix + ".tmp")
|
|
tmp.write_text(json.dumps(cache, indent=2), encoding="utf-8")
|
|
try:
|
|
os.replace(tmp, CACHE_PATH)
|
|
except PermissionError:
|
|
# Windows: destination may be briefly locked by antivirus or a concurrent reader.
|
|
time.sleep(0.5)
|
|
os.replace(tmp, CACHE_PATH)
|
|
|
|
|
|
def cache_age_hours(scanned_at: str) -> float | None:
|
|
try:
|
|
dt = datetime.fromisoformat(scanned_at.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
return None
|
|
now = datetime.now(dt.tzinfo) if dt.tzinfo else datetime.now()
|
|
return (now - dt).total_seconds() / 3600.0
|
|
|
|
|
|
def fmt_age(hours: float) -> str:
|
|
if hours < 1:
|
|
return f"{int(hours * 60)}m"
|
|
if hours < 24:
|
|
return f"{hours:.1f}h"
|
|
return f"{hours / 24:.1f}d"
|
|
|
|
|
|
# ---------- WinCatalog ingest ----------
|
|
|
|
def _pick_col(headers_lower: list[str], synonyms: tuple[str, ...]) -> str | None:
|
|
for s in synonyms:
|
|
if s in headers_lower:
|
|
return s
|
|
return None
|
|
|
|
|
|
def normalize_catalog_path(path: str) -> str:
|
|
"""Keep catalog paths display-compatible with rclone-style path consumers."""
|
|
p = (path or "").replace("\\", "/")
|
|
if p.startswith("//"):
|
|
return "//" + re.sub(r"/+", "/", p[2:])
|
|
return re.sub(r"/+", "/", p)
|
|
|
|
|
|
def load_catalog_csv(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]:
|
|
"""Load a WinCatalog CSV export. Lenient about column names."""
|
|
entries: list[FileEntry] = []
|
|
with path.open("r", encoding="utf-8-sig", newline="") as f:
|
|
# Sniff delimiter
|
|
sample = f.read(4096)
|
|
f.seek(0)
|
|
try:
|
|
dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|")
|
|
except csv.Error:
|
|
dialect = csv.excel
|
|
reader = csv.DictReader(f, dialect=dialect)
|
|
if not reader.fieldnames:
|
|
return entries
|
|
headers: dict[str, str] = {}
|
|
for h in reader.fieldnames:
|
|
hl = h.lower()
|
|
if hl not in headers:
|
|
headers[hl] = h
|
|
col_name = _pick_col(list(headers), CATALOG_COL_NAME)
|
|
col_path = _pick_col(list(headers), CATALOG_COL_PATH)
|
|
col_size = _pick_col(list(headers), CATALOG_COL_SIZE)
|
|
col_disc = _pick_col(list(headers), CATALOG_COL_DISC)
|
|
if not col_name and not col_path:
|
|
console.print(f"[yellow]WARN: catalog CSV {path} has no Name/Path columns; skipping.[/]")
|
|
return entries
|
|
for row in reader:
|
|
name = (row.get(headers[col_name]) if col_name else "") or ""
|
|
full_path = (row.get(headers[col_path]) if col_path else "") or ""
|
|
if not name and full_path:
|
|
name = Path(full_path).name
|
|
full_path = normalize_catalog_path(full_path)
|
|
if not name:
|
|
continue
|
|
jav_id = extract_id(name)
|
|
if not jav_id:
|
|
skipped.append((f"catalog:{path.name}", full_path or name))
|
|
continue
|
|
try:
|
|
size = int(row.get(headers[col_size], 0)) if col_size else 0
|
|
except (ValueError, TypeError):
|
|
size = 0
|
|
disc = (row.get(headers[col_disc]) if col_disc else "") or ""
|
|
# Encode disc label into "remote" so it surfaces in output.
|
|
remote_label = f"catalog:{disc}" if disc else f"catalog:{path.name}"
|
|
entries.append(FileEntry(
|
|
source="Catalog", remote=remote_label,
|
|
path=full_path or name, size=size, mod_time="",
|
|
jav_id=jav_id,
|
|
))
|
|
return entries
|
|
|
|
|
|
def _strip_xml_ns(tag: str) -> str:
|
|
"""Remove Clark-notation namespace {uri}local → local."""
|
|
return tag.split("}")[-1] if "}" in tag else tag
|
|
|
|
|
|
def load_catalog_xml(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]:
|
|
"""Load a WinCatalog XML export. Walks for any element with file-like attrs."""
|
|
entries: list[FileEntry] = []
|
|
tree = ET.parse(str(path))
|
|
root = tree.getroot()
|
|
|
|
def walk(node, disc_label: str, parent_path: str, _depth: int = 0):
|
|
if _depth > 500:
|
|
return
|
|
tag = _strip_xml_ns(node.tag).lower()
|
|
# Heuristics: disc/catalog/source containers reset disc_label
|
|
if tag in ("disc", "catalog", "source", "volume", "media"):
|
|
disc_label = node.get("name") or node.get("Name") or disc_label
|
|
# File-like nodes
|
|
if tag in ("file", "f"):
|
|
name = node.get("name") or node.get("Name") or node.findtext("Name") or ""
|
|
size_raw = node.get("size") or node.get("Size") or node.findtext("Size") or "0"
|
|
try:
|
|
size = int(size_raw)
|
|
except ValueError:
|
|
size = 0
|
|
full_path = normalize_catalog_path(f"{parent_path}/{name}" if parent_path else name)
|
|
jav_id = extract_id(name)
|
|
if jav_id:
|
|
entries.append(FileEntry(
|
|
source="Catalog",
|
|
remote=f"catalog:{disc_label}" if disc_label else f"catalog:{path.name}",
|
|
path=full_path, size=size, mod_time="", jav_id=jav_id,
|
|
))
|
|
else:
|
|
skipped.append((f"catalog:{disc_label or path.name}", full_path))
|
|
return
|
|
# Folder-like: extend parent_path
|
|
if tag in ("folder", "dir", "directory"):
|
|
folder_name = node.get("name") or node.get("Name") or ""
|
|
parent_path = normalize_catalog_path(f"{parent_path}/{folder_name}" if parent_path else folder_name)
|
|
for child in node:
|
|
walk(child, disc_label, parent_path, _depth + 1)
|
|
|
|
walk(root, "", "")
|
|
return entries
|
|
|
|
|
|
def _expand_catalog_paths(paths: list[str]) -> list[Path]:
|
|
"""Expand any directories to their *.csv / *.xml children. Files passed through."""
|
|
out: list[Path] = []
|
|
for p in paths:
|
|
cp = Path(p)
|
|
if cp.is_dir():
|
|
for child in sorted(cp.iterdir()):
|
|
if child.suffix.lower() in (".csv", ".xml") and child.is_file():
|
|
out.append(child)
|
|
elif cp.exists():
|
|
out.append(cp)
|
|
# silently skip missing default dir; warn for everything else
|
|
elif Path(p).resolve() not in {Path(d).resolve() for d in DEFAULT_CATALOG}:
|
|
console.print(f"[yellow]WARN: catalog path not found: {p}[/]")
|
|
return out
|
|
|
|
|
|
def load_catalogs(paths: list[str], skipped: list[tuple[str, str]]) -> list[FileEntry]:
|
|
out: list[FileEntry] = []
|
|
for cp in _expand_catalog_paths(paths):
|
|
ext = cp.suffix.lower()
|
|
if ext == ".csv":
|
|
out.extend(load_catalog_csv(cp, skipped))
|
|
elif ext == ".xml":
|
|
out.extend(load_catalog_xml(cp, skipped))
|
|
else:
|
|
console.print(f"[yellow]WARN: unknown catalog format '{ext}' for {cp}; skipping.[/]")
|
|
return out
|
|
|
|
|
|
# ---------- quick search (no cache) ----------
|
|
|
|
def quick_search_remote(remote: str, source_label: str,
|
|
patterns: list[str],
|
|
skipped: list[tuple[str, str]]) -> list[FileEntry]:
|
|
"""Run `rclone lsjson --include <pattern>` once per pattern. Bypass cache."""
|
|
out: list[FileEntry] = []
|
|
seen: set[tuple[str, str]] = set()
|
|
for pat in patterns:
|
|
cmd = [RCLONE_BIN, "lsjson", remote, "--files-only", "-R", "--include", pat]
|
|
proc = subprocess.run(cmd, capture_output=True, text=True,
|
|
encoding="utf-8", errors="replace")
|
|
if proc.returncode != 0:
|
|
console.print(f"[red]rclone lsjson --include failed for {remote}:[/]\n{proc.stderr}")
|
|
sys.exit(proc.returncode)
|
|
for item in json.loads(proc.stdout or "[]"):
|
|
if item.get("IsDir"):
|
|
continue
|
|
path = item["Path"]
|
|
key = (remote, path)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
jav_id = extract_id(Path(path).name)
|
|
if not jav_id:
|
|
skipped.append((remote, path))
|
|
continue
|
|
out.append(FileEntry(
|
|
source=source_label, remote=remote, path=path,
|
|
size=int(item.get("Size", 0)),
|
|
mod_time=item.get("ModTime", ""), jav_id=jav_id,
|
|
))
|
|
return out
|
|
|
|
|
|
def choose_search_mode(raw_queries: list[str], force_quick: bool, force_cache: bool) -> tuple[str, str]:
|
|
"""Decide quick vs cached. Returns (mode, reason)."""
|
|
if force_quick and force_cache:
|
|
return ("cached", "both --quick and --cache passed; preferring --cache (safer)")
|
|
if force_quick:
|
|
return ("quick", "forced via --quick")
|
|
if force_cache:
|
|
return ("cached", "forced via --cache")
|
|
if len(raw_queries) > 1:
|
|
return ("cached", f"multi-query ({len(raw_queries)} IDs) — cache batches them for free")
|
|
if not raw_queries:
|
|
return ("cached", "no queries")
|
|
q = raw_queries[0]
|
|
if RANGE_RE.search(q):
|
|
return ("cached", "range [N-M] — too many rclone calls otherwise")
|
|
if "*" in q or "?" in q:
|
|
return ("cached", "wildcard — cache match semantics are more reliable")
|
|
return ("quick", "single exact ID — live lookup is fastest")
|
|
|
|
|
|
def _escape_rclone_glob(s: str) -> str:
|
|
"""Escape rclone filter meta-chars so a literal token isn't interpreted as a
|
|
glob. rclone's filter syntax treats `*`, `?`, `[`, `{` specially; brackets
|
|
open a char-class that fails silently if the token contains `[` or `]`."""
|
|
out = []
|
|
for ch in s:
|
|
if ch in r"*?[]{}\\":
|
|
out.append("\\" + ch)
|
|
else:
|
|
out.append(ch)
|
|
return "".join(out)
|
|
|
|
|
|
def name_to_include_patterns(tokens: list[str]) -> list[str]:
|
|
"""Build rclone --include globs for each name token (case-insensitive substring)."""
|
|
pats: list[str] = []
|
|
for t in tokens:
|
|
if "*" in t or "?" in t:
|
|
# Caller-supplied wildcard — assume they meant it.
|
|
pats.append(t)
|
|
else:
|
|
# Literal substring search: escape glob meta inside the token so
|
|
# `--name "[BD]"` searches for the literal "[BD]" not a char class.
|
|
pats.append(f"*{_escape_rclone_glob(t)}*")
|
|
return pats
|
|
|
|
|
|
def name_match(stem: str, tokens: list[str]) -> bool:
|
|
"""Case-insensitive: True if ANY token matches stem (substring or fnmatch glob)."""
|
|
low = stem.lower()
|
|
for t in tokens:
|
|
tl = t.lower()
|
|
if "*" in tl or "?" in tl:
|
|
if fnmatch.fnmatchcase(low, tl):
|
|
return True
|
|
elif tl in low:
|
|
return True
|
|
return False
|
|
|
|
|
|
def query_to_include_patterns(raw: str) -> list[str]:
|
|
"""Turn a search query into one or more rclone --include globs.
|
|
Ranges expand to individual IDs; wildcards and exact IDs map to single glob."""
|
|
if RANGE_RE.search(raw):
|
|
expanded = expand_range(raw) or []
|
|
out: list[str] = []
|
|
for e in expanded:
|
|
out.extend(query_to_include_patterns(e))
|
|
return out
|
|
if "*" in raw or "?" in raw:
|
|
return [f"{raw}*"]
|
|
norm = normalize_id(raw)
|
|
if not norm:
|
|
return [f"{raw}*"]
|
|
prefix, _, digits = norm.rpartition("-")
|
|
if not digits.isdigit():
|
|
return [f"{norm}*"]
|
|
n = int(digits)
|
|
width = max(3, len(str(n)))
|
|
return [f"{prefix}-{n:0{width}d}*"]
|
|
|
|
|
|
# ---------- rclone wrappers ----------
|
|
|
|
def remote_file_count(remote: str) -> int:
|
|
"""Fast total file count via `rclone size --json`."""
|
|
cmd = [RCLONE_BIN, "size", "--json", remote]
|
|
proc = subprocess.run(cmd, capture_output=True, text=True,
|
|
encoding="utf-8", errors="replace")
|
|
if proc.returncode != 0:
|
|
console.print(f"[red]rclone size failed for {remote}:[/]\n{proc.stderr}")
|
|
sys.exit(proc.returncode)
|
|
try:
|
|
return int(json.loads(proc.stdout).get("count", 0))
|
|
except (json.JSONDecodeError, ValueError):
|
|
return 0
|
|
|
|
|
|
DURATION_RE = re.compile(r"^\s*(\d+)\s*([smhd])\s*$", re.IGNORECASE)
|
|
|
|
|
|
def parse_duration(s: str) -> str | None:
|
|
"""Validate a duration suffix (`30m`, `24h`, `7d`, `90s`). Returns the
|
|
normalized form rclone accepts, or None if invalid. We don't compute a
|
|
timedelta — we pass the suffix straight to rclone --max-age."""
|
|
if not s:
|
|
return None
|
|
m = DURATION_RE.match(s)
|
|
if not m:
|
|
return None
|
|
return f"{m.group(1)}{m.group(2).lower()}"
|
|
|
|
|
|
def walk_remote(remote: str, source_label: str,
|
|
skipped: list[tuple[str, str]],
|
|
progress: Progress, task_id,
|
|
max_age: str | None = None,
|
|
_total_override: int | None = None) -> tuple[list[FileEntry], list[str]]:
|
|
"""Stream files from rclone lsf, ticking progress per file.
|
|
If max_age is set, pass --max-age to rclone so only recently-modified files
|
|
are returned (incremental scan).
|
|
_total_override: skip the internal remote_file_count probe (caller already did it)."""
|
|
if max_age:
|
|
# Can't pre-count for an age-filtered walk — skip the size probe and
|
|
# let progress run on a synthetic total.
|
|
total = 0
|
|
progress.update(task_id, total=1,
|
|
description=f"[cyan]{source_label}[/] {remote} (since {max_age})")
|
|
else:
|
|
if _total_override is not None:
|
|
total = _total_override
|
|
else:
|
|
total = remote_file_count(remote)
|
|
if BASIC:
|
|
# Caller already emitted SCAN_REMOTE_START (without total) — now we know it.
|
|
sys.stderr.write("SCAN_REMOTE_COUNTED " + json.dumps({
|
|
"remote": remote, "total": total,
|
|
}) + "\n")
|
|
sys.stderr.flush()
|
|
progress.update(task_id, total=max(total, 1),
|
|
description=f"[cyan]{source_label}[/] {remote}")
|
|
cmd = [RCLONE_BIN, "lsf", "--files-only", "-R",
|
|
"--format", "pst", "--separator", "\t"]
|
|
if max_age:
|
|
cmd += ["--max-age", max_age]
|
|
cmd.append(remote)
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
text=True, encoding="utf-8", errors="replace")
|
|
entries: list[FileEntry] = []
|
|
local_skipped: list[str] = []
|
|
if proc.stdout is None:
|
|
raise RuntimeError("rclone stdout pipe unexpectedly None")
|
|
_stderr_chunks: list[str] = []
|
|
_stderr_thread = threading.Thread(
|
|
target=lambda: _stderr_chunks.append(proc.stderr.read() if proc.stderr else ""),
|
|
daemon=True,
|
|
)
|
|
_stderr_thread.start()
|
|
_cancelled = False
|
|
try:
|
|
for line in proc.stdout:
|
|
line = line.rstrip("\n").rstrip("\r")
|
|
if not line:
|
|
continue
|
|
parts = line.split("\t")
|
|
if len(parts) < 2:
|
|
continue
|
|
rel = parts[0]
|
|
try:
|
|
size = int(parts[1])
|
|
except ValueError:
|
|
size = 0
|
|
mod_time = parts[2] if len(parts) >= 3 else ""
|
|
jav_id = extract_id(Path(rel).name)
|
|
if not jav_id:
|
|
local_skipped.append(rel)
|
|
skipped.append((remote, rel))
|
|
else:
|
|
entries.append(FileEntry(
|
|
source=source_label, remote=remote, path=rel,
|
|
size=size, mod_time=mod_time, jav_id=jav_id,
|
|
))
|
|
progress.advance(task_id)
|
|
# Every CANCEL_CHECK_INTERVAL files: check cancel flag and emit progress.
|
|
n = len(entries) + len(local_skipped)
|
|
if BASIC and n > 0 and n % CANCEL_CHECK_INTERVAL == 0:
|
|
if CANCEL_FLAG.exists():
|
|
try:
|
|
CANCEL_FLAG.unlink(missing_ok=True)
|
|
except OSError:
|
|
pass
|
|
proc.terminate()
|
|
try:
|
|
proc.wait(timeout=3)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
_cancelled = True
|
|
break
|
|
sys.stderr.write("SCAN_FILE_PROGRESS " + json.dumps({
|
|
"remote": remote, "label": source_label,
|
|
"files": len(entries), "skipped": len(local_skipped),
|
|
"total": total,
|
|
}) + "\n")
|
|
sys.stderr.flush()
|
|
except KeyboardInterrupt:
|
|
proc.terminate()
|
|
try:
|
|
proc.wait(timeout=3)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
raise
|
|
if _cancelled:
|
|
sys.stderr.write("SCAN_CANCELLED\n")
|
|
sys.stderr.flush()
|
|
sys.exit(0)
|
|
proc.wait()
|
|
_stderr_thread.join()
|
|
if proc.returncode != 0:
|
|
err = _stderr_chunks[0] if _stderr_chunks else ""
|
|
console.print(f"[red]rclone lsf failed for {remote}:[/]\n{err}")
|
|
sys.exit(proc.returncode)
|
|
return entries, local_skipped
|
|
|
|
|
|
def make_progress():
|
|
if BASIC:
|
|
return BasicProgress()
|
|
return Progress(
|
|
SpinnerColumn(),
|
|
TextColumn("{task.description}"),
|
|
BarColumn(),
|
|
MofNCompleteColumn(),
|
|
TimeElapsedColumn(),
|
|
TextColumn("eta"),
|
|
TimeRemainingColumn(),
|
|
console=console,
|
|
transient=False,
|
|
)
|
|
|
|
|
|
# ---------- collectors ----------
|
|
|
|
def collect_with_progress(remotes_by_label: list[tuple[str, str]],
|
|
skipped: list[tuple[str, str]]
|
|
) -> list[FileEntry]:
|
|
"""Dupe-mode collect — every remote freshly walked with progress."""
|
|
out: list[FileEntry] = []
|
|
if not remotes_by_label:
|
|
return out
|
|
with make_progress() as progress:
|
|
tasks = {(label, r): progress.add_task(f"{label} {r}", total=1)
|
|
for label, r in remotes_by_label}
|
|
for (label, r), tid in tasks.items():
|
|
entries, _ = walk_remote(r, label, skipped, progress, tid)
|
|
out.extend(entries)
|
|
return out
|
|
|
|
|
|
def cached_collect(remotes: list[str], source_label: str,
|
|
skipped: list[tuple[str, str]],
|
|
cache: dict, use_cache: bool, force_update: bool,
|
|
cache_meta: dict[str, dict],
|
|
scan_since: str | None = None) -> list[FileEntry]:
|
|
"""Search-mode collect with cache. Always recursive.
|
|
scan_since: rclone duration string (`24h`, `7d`). When set during a forced
|
|
update, only files modified within the window are walked and merged on top
|
|
of the existing cache entry; files older than the window keep their cached
|
|
record. If there's no prior cache entry for a remote, falls through to a
|
|
full scan."""
|
|
out: list[FileEntry] = []
|
|
to_scan: list[str] = []
|
|
to_incremental: list[tuple[str, dict]] = [] # (remote, existing_entry)
|
|
for r in remotes:
|
|
if scan_since and force_update and use_cache:
|
|
existing = cache["remotes"].get(r)
|
|
if existing:
|
|
to_incremental.append((r, existing))
|
|
continue
|
|
# No prior cache for this remote -> can't be incremental, fall back.
|
|
entry = cache["remotes"].get(r) if use_cache and not force_update else None
|
|
if entry:
|
|
age = cache_age_hours(entry["scanned_at"])
|
|
age_str = fmt_age(age) if age is not None else "?"
|
|
stale = age is not None and age > CACHE_STALE_HOURS
|
|
cache_meta[r] = {"cached": True, "age": age_str, "stale": stale,
|
|
"file_count": len(entry["files"])}
|
|
for f in entry["files"]:
|
|
out.append(FileEntry(source=source_label, remote=r, path=f["path"],
|
|
size=f["size"], mod_time=f.get("mod_time", ""),
|
|
jav_id=f["jav_id"]))
|
|
for s in entry.get("skipped", []):
|
|
skipped.append((r, s))
|
|
else:
|
|
to_scan.append(r)
|
|
|
|
if to_scan:
|
|
with make_progress() as progress:
|
|
tids = {r: progress.add_task(f"{source_label} {r}", total=1) for r in to_scan}
|
|
for r_idx, r in enumerate(to_scan):
|
|
_total: int | None = None
|
|
if BASIC:
|
|
# Emit SCAN_REMOTE_START immediately so the UI shows the remote name.
|
|
# Then probe the file count; once known, emit SCAN_REMOTE_COUNTED so
|
|
# the UI can show "N / total" without waiting for the first 100 files.
|
|
sys.stderr.write("SCAN_REMOTE_START " + json.dumps({
|
|
"remote": r, "label": source_label,
|
|
"index": r_idx + 1, "of": len(to_scan),
|
|
"total": None,
|
|
}) + "\n")
|
|
sys.stderr.flush()
|
|
_total = remote_file_count(r)
|
|
sys.stderr.write("SCAN_REMOTE_COUNTED " + json.dumps({
|
|
"remote": r, "total": _total,
|
|
}) + "\n")
|
|
sys.stderr.flush()
|
|
fresh, local_skipped = walk_remote(r, source_label, skipped, progress, tids[r],
|
|
_total_override=_total)
|
|
out.extend(fresh)
|
|
cache_meta[r] = {"cached": False, "age": "fresh", "stale": False,
|
|
"file_count": len(fresh)}
|
|
if use_cache:
|
|
cache["remotes"][r] = {
|
|
"scanned_at": datetime.now().astimezone().isoformat(),
|
|
"recursive": True,
|
|
"files": [{"path": e.path, "size": e.size, "mod_time": e.mod_time,
|
|
"jav_id": e.jav_id} for e in fresh],
|
|
"skipped": local_skipped,
|
|
}
|
|
if BASIC:
|
|
sys.stderr.write("SCAN_PROGRESS " + json.dumps({
|
|
"remote": r, "label": source_label,
|
|
"files": len(fresh), "files_total": len(out),
|
|
}) + "\n")
|
|
sys.stderr.flush()
|
|
|
|
if to_incremental:
|
|
with make_progress() as progress:
|
|
tids = {r: progress.add_task(f"{source_label} {r} (since {scan_since})", total=1)
|
|
for r, _ in to_incremental}
|
|
for r_idx, (r, existing) in enumerate(to_incremental):
|
|
if BASIC:
|
|
sys.stderr.write("SCAN_REMOTE_START " + json.dumps({
|
|
"remote": r, "label": source_label,
|
|
"index": r_idx + 1, "of": len(to_incremental),
|
|
"total": None, "incremental": True,
|
|
}) + "\n")
|
|
sys.stderr.flush()
|
|
fresh, local_skipped = walk_remote(
|
|
r, source_label, skipped, progress, tids[r], max_age=scan_since,
|
|
)
|
|
# Merge: replace entries at paths we just walked, keep all others.
|
|
new_paths = {e.path for e in fresh}
|
|
old_files = [f for f in existing.get("files", [])
|
|
if f["path"] not in new_paths]
|
|
merged_files = old_files + [
|
|
{"path": e.path, "size": e.size, "mod_time": e.mod_time,
|
|
"jav_id": e.jav_id} for e in fresh
|
|
]
|
|
# Merge skipped lists (de-dupe).
|
|
old_skipped = set(existing.get("skipped", []))
|
|
old_skipped.update(local_skipped)
|
|
# Emit FileEntry for everything (old + new) so the caller sees the
|
|
# full set, not just deltas.
|
|
for f in merged_files:
|
|
out.append(FileEntry(source=source_label, remote=r, path=f["path"],
|
|
size=f["size"], mod_time=f.get("mod_time", ""),
|
|
jav_id=f["jav_id"]))
|
|
for s in old_skipped:
|
|
skipped.append((r, s))
|
|
cache_meta[r] = {
|
|
"cached": False, "age": f"incremental {scan_since}",
|
|
"stale": False, "file_count": len(merged_files),
|
|
"added_or_updated": len(fresh),
|
|
}
|
|
if use_cache:
|
|
cache["remotes"][r] = {
|
|
"scanned_at": datetime.now().astimezone().isoformat(),
|
|
"recursive": True,
|
|
"files": merged_files,
|
|
"skipped": sorted(old_skipped),
|
|
}
|
|
if BASIC:
|
|
sys.stderr.write("SCAN_PROGRESS " + json.dumps({
|
|
"remote": r, "label": source_label,
|
|
"files": len(fresh), "files_total": len(out),
|
|
"incremental": True,
|
|
"file_count": len(merged_files),
|
|
}) + "\n")
|
|
sys.stderr.flush()
|
|
return out
|
|
|
|
|
|
# ---------- renderers ----------
|
|
|
|
def render_banner(cache_meta: dict[str, dict], mode: str) -> Panel:
|
|
lines: list[Text] = []
|
|
lines.append(Text.from_markup(f"[bold]mode:[/] {mode}"))
|
|
if cache_meta:
|
|
for r, m in cache_meta.items():
|
|
if m["cached"]:
|
|
tag = f"CACHED {m['age']}" + (" STALE" if m["stale"] else "")
|
|
style = "yellow" if m["stale"] else "dim"
|
|
else:
|
|
tag = "FRESH SCAN"
|
|
style = "green"
|
|
lines.append(Text.from_markup(
|
|
f" [white]{r}[/] [{style}]{tag}[/] [dim]({m['file_count']} files)[/]"
|
|
))
|
|
body = Text("\n").join(lines)
|
|
return Panel(body, title="rc-jav", title_align="left", border_style="blue")
|
|
|
|
|
|
def render_search(matches: dict[str, list[FileEntry]], queries: list[str],
|
|
cache_meta: dict[str, dict]) -> None:
|
|
console.print(render_banner(cache_meta, mode="search"))
|
|
for q in queries:
|
|
hits = matches.get(q, [])
|
|
if not hits:
|
|
console.print(f"[bold red][{q}] NOT FOUND[/]")
|
|
console.print()
|
|
continue
|
|
title = f"[bold green][{q}] {len(hits)} hit(s)[/]"
|
|
tbl = Table(title=title, title_justify="left", show_lines=False,
|
|
border_style="green", expand=True)
|
|
tbl.add_column("Source", style="yellow", no_wrap=True)
|
|
tbl.add_column("Cache", no_wrap=True)
|
|
tbl.add_column("File", style="bold", overflow="fold")
|
|
tbl.add_column("Size", justify="right", no_wrap=True)
|
|
tbl.add_column("Path", style="dim", overflow="fold")
|
|
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
|
|
meta = cache_meta.get(e.remote, {})
|
|
if meta.get("cached"):
|
|
cache_tag = "[yellow][CACHED-STALE][/]" if meta.get("stale") else "[dim][CACHED][/]"
|
|
else:
|
|
cache_tag = "[green][FRESH][/]"
|
|
tbl.add_row(
|
|
e.source, cache_tag, Path(e.path).name,
|
|
f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]",
|
|
e.full_path,
|
|
)
|
|
console.print(tbl)
|
|
console.print()
|
|
|
|
|
|
def render_name_matches(hits: list[FileEntry], tokens: list[str],
|
|
cache_meta: dict[str, dict]) -> None:
|
|
title = f"[bold green]Name match {tokens} — {len(hits)} hit(s)[/]"
|
|
if not hits:
|
|
console.print(f"[bold red]Name match {tokens} — NOT FOUND[/]")
|
|
return
|
|
tbl = Table(title=title, title_justify="left", show_lines=False,
|
|
border_style="green", expand=True)
|
|
tbl.add_column("Source", style="yellow", no_wrap=True)
|
|
tbl.add_column("Cache", no_wrap=True)
|
|
tbl.add_column("ID", style="bold cyan", no_wrap=True)
|
|
tbl.add_column("File", style="bold", overflow="fold")
|
|
tbl.add_column("Size", justify="right", no_wrap=True)
|
|
tbl.add_column("Path", style="dim", overflow="fold")
|
|
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
|
|
meta = cache_meta.get(e.remote, {})
|
|
if meta.get("cached"):
|
|
cache_tag = "[yellow][CACHED-STALE][/]" if meta.get("stale") else "[dim][CACHED][/]"
|
|
else:
|
|
cache_tag = "[green][FRESH][/]"
|
|
tbl.add_row(
|
|
e.source, cache_tag, e.jav_id, Path(e.path).name,
|
|
f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]",
|
|
e.full_path,
|
|
)
|
|
console.print(tbl)
|
|
console.print()
|
|
|
|
|
|
def render_name_matches_plain(hits: list[FileEntry], tokens: list[str],
|
|
cache_meta: dict[str, dict]) -> str:
|
|
lines: list[str] = []
|
|
if not hits:
|
|
lines.append(ansi(f"Name match {tokens} — NOT FOUND", ANSI_RED))
|
|
return "\n".join(lines)
|
|
lines.append(ansi(f"Name match {tokens} — {len(hits)} hit(s)", ANSI_GREEN + ANSI_BOLD))
|
|
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
|
|
meta = cache_meta.get(e.remote, {})
|
|
if meta.get("cached"):
|
|
tag = ansi("[CACHED-STALE]", ANSI_YELLOW) if meta.get("stale") else ansi("[CACHED]", ANSI_DIM)
|
|
else:
|
|
tag = ansi("[FRESH]", ANSI_GREEN)
|
|
src = ansi(e.source, ANSI_YELLOW)
|
|
lines.append(f" {src} {tag} {ansi(e.jav_id, ANSI_CYAN)}")
|
|
lines.append(ansi(f" file: {Path(e.path).name}", ANSI_BOLD))
|
|
lines.append(f" size: {human_size(e.size)} ({e.size:,} bytes)")
|
|
lines.append(ansi(f" path: {e.full_path}", ANSI_DIM))
|
|
return "\n".join(lines)
|
|
|
|
|
|
def render_dupes(dupes: dict[str, list[FileEntry]],
|
|
skipped: list[tuple[str, str]],
|
|
variant_alerts: dict[str, list[FileEntry]] | None = None) -> None:
|
|
if not dupes:
|
|
console.print(Panel("[bold green]No duplicates found.[/]",
|
|
border_style="green"))
|
|
else:
|
|
console.print(f"[bold]Found {len(dupes)} duplicate ID group(s):[/]")
|
|
console.print()
|
|
total_reclaim = 0
|
|
for jav_id in sorted(dupes):
|
|
entries = dupes[jav_id]
|
|
keep = decide_keep(entries)
|
|
tbl = Table(title=f"[bold][{jav_id}][/]", title_justify="left",
|
|
show_lines=False, border_style="magenta", expand=True)
|
|
tbl.add_column("Action", no_wrap=True)
|
|
tbl.add_column("Source", style="yellow", no_wrap=True)
|
|
tbl.add_column("Size", justify="right", no_wrap=True)
|
|
tbl.add_column("Path", overflow="fold")
|
|
for e in sorted(entries, key=lambda x: (x.source != "Source", x.source == "Catalog", -x.size)):
|
|
if e.source == "Catalog":
|
|
action = "[cyan]CATALOG[/]"
|
|
elif e is keep:
|
|
action = "[green]KEEP[/]"
|
|
else:
|
|
action = "[red]DELETE?[/]"
|
|
total_reclaim += e.size
|
|
tbl.add_row(action, e.source,
|
|
f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]",
|
|
e.full_path)
|
|
console.print(tbl)
|
|
console.print()
|
|
console.print(Panel(
|
|
f"[bold]Potential space reclaim if all DELETE? removed: "
|
|
f"[red]{human_size(total_reclaim)}[/][/]",
|
|
border_style="red"))
|
|
if skipped:
|
|
console.print()
|
|
tbl = Table(title=f"[dim]Skipped {len(skipped)} file(s) with no parseable ID[/]",
|
|
title_justify="left", show_lines=False, border_style="dim", expand=True)
|
|
tbl.add_column("Remote", style="dim", no_wrap=True)
|
|
tbl.add_column("Path", style="dim", overflow="fold")
|
|
for remote, path in skipped[:50]:
|
|
tbl.add_row(remote, path)
|
|
if len(skipped) > 50:
|
|
tbl.add_row("[dim]…[/]", f"[dim]+{len(skipped) - 50} more[/]")
|
|
console.print(tbl)
|
|
if variant_alerts:
|
|
console.print()
|
|
console.print(Panel(
|
|
f"[bold yellow]⚠ {len(variant_alerts)} variant alert(s) — manual review recommended[/]",
|
|
border_style="yellow"))
|
|
for bare_id, entries in sorted(variant_alerts.items()):
|
|
tbl = Table(title=f"[bold yellow][{bare_id}] — bare + variant coexist[/]",
|
|
title_justify="left", show_lines=False, border_style="yellow", expand=True)
|
|
tbl.add_column("ID", style="yellow", no_wrap=True)
|
|
tbl.add_column("Size", justify="right", no_wrap=True)
|
|
tbl.add_column("Path", overflow="fold")
|
|
for e in sorted(entries, key=lambda x: x.full_path):
|
|
eid = extract_id(Path(e.path).name) or e.jav_id
|
|
tbl.add_row(eid, human_size(e.size), e.full_path)
|
|
console.print(tbl)
|
|
console.print()
|
|
|
|
|
|
def decide_keep_with_reason(entries: list[FileEntry]) -> tuple[FileEntry, dict[str, str]]:
|
|
"""Pick KEEP candidate and explain the first ranking rule that settled it.
|
|
|
|
Catalog entries are excluded — they are offline/informational.
|
|
|
|
Ranking (descending priority, configurable via keep_ranking in config.json):
|
|
1. Video files in ordered priority folders outrank other rclone entries.
|
|
2. Source entries outrank Target entries when no priority-folder video exists.
|
|
3. Non-.ts files outrank .ts files when a duplicate group has both.
|
|
4. Largest file size. If sizes are within size_tolerance_mib, treated as equal
|
|
and format preference is consulted instead.
|
|
5. Format preference: ordered list of extensions (e.g. mkv > mp4 > wmv > avi).
|
|
6. Tie-break: has resolution tag in filename ([1080p], [2160p], [720p], [480p]).
|
|
7. Tie-break: longer filename (more metadata = more descriptive).
|
|
"""
|
|
ranking = _KEEP_RANKING or {}
|
|
tolerance_bytes = int(float(ranking.get("size_tolerance_mib") or 0) * 1024 * 1024)
|
|
priority_folders: list[str] = [
|
|
str(folder).strip() for folder in
|
|
(ranking.get("priority_folders") or DEFAULT_KEEP_RANKING["priority_folders"])
|
|
if str(folder).strip()
|
|
]
|
|
fmt_order: list[str] = list(
|
|
ranking.get("format_preference") or DEFAULT_KEEP_RANKING["format_preference"]
|
|
)
|
|
use_res_tag: bool = ranking.get("tiebreak_res_tag", True)
|
|
use_longer_name: bool = ranking.get("tiebreak_longer_name", True)
|
|
|
|
rclone = [e for e in entries if e.source != "Catalog"]
|
|
|
|
def _priority_folder_rank(e: FileEntry) -> int | None:
|
|
if Path(e.path).suffix.lower() not in _VIDEO_EXTS:
|
|
return None
|
|
# A root can be cq:JAV while the favored folder is a child path, or the
|
|
# supplied root can itself end in that folder. Match across full_path.
|
|
full_path = e.full_path.replace("\\", "/").strip("/").lower()
|
|
segments = [segment for segment in full_path.split("/") if segment]
|
|
for index, raw_folder in enumerate(priority_folders):
|
|
folder = raw_folder.replace("\\", "/").strip("/").lower()
|
|
if not folder:
|
|
continue
|
|
if "/" in folder or ":" in folder:
|
|
framed = f"/{full_path}/"
|
|
if full_path == folder or full_path.startswith(folder + "/") or f"/{folder}/" in framed:
|
|
return index
|
|
elif folder in segments:
|
|
return index
|
|
return None
|
|
|
|
prioritized = [(rank, e) for e in rclone if (rank := _priority_folder_rank(e)) is not None]
|
|
best_priority = min((rank for rank, _ in prioritized), default=None)
|
|
priority_videos = [e for rank, e in prioritized if rank == best_priority]
|
|
pool_priority = [e for e in rclone if e.source == "Source"]
|
|
reason = {"code": "fallback", "summary": "First remaining duplicate candidate"}
|
|
if priority_videos:
|
|
pool = priority_videos
|
|
reason = {
|
|
"code": "vip_folder",
|
|
"summary": f"VIP folder: {priority_folders[best_priority]}",
|
|
}
|
|
elif pool_priority:
|
|
pool = pool_priority
|
|
reason = {"code": "source", "summary": "Source copy outranks target copies"}
|
|
else:
|
|
pool = rclone if rclone else entries
|
|
|
|
# Transport streams often inflate size without being the better keeper.
|
|
preferred_containers = [
|
|
e for e in pool if Path(e.path).suffix.lower() not in _LOWEST_KEEP_PRIORITY_EXTS
|
|
]
|
|
if preferred_containers and len(preferred_containers) != len(pool):
|
|
pool = preferred_containers
|
|
reason = {"code": "container", "summary": "Non-TS video outranks transport stream"}
|
|
|
|
# Step 1: narrow to within size tolerance of the maximum
|
|
max_size = max(e.size for e in pool)
|
|
candidates = [e for e in pool if max_size - e.size <= tolerance_bytes]
|
|
|
|
if len(candidates) == 1:
|
|
if len(pool) > 1 and reason["code"] not in {"vip_folder", "source", "container"}:
|
|
reason = {"code": "size", "summary": "Largest file after ranking rules"}
|
|
return candidates[0], reason
|
|
|
|
# Step 2: format preference (lower index in fmt_order = higher priority)
|
|
def _fmt_rank(e: FileEntry) -> int:
|
|
ext = Path(e.path).suffix.lower().lstrip(".")
|
|
try:
|
|
return fmt_order.index(ext) # lower = better
|
|
except ValueError:
|
|
return len(fmt_order) # unknown = lowest
|
|
|
|
best_fmt = min(_fmt_rank(e) for e in candidates)
|
|
by_fmt = [e for e in candidates if _fmt_rank(e) == best_fmt]
|
|
if len(by_fmt) != len(candidates):
|
|
ext = Path(by_fmt[0].path).suffix.lower().lstrip(".").upper() or "preferred format"
|
|
reason = {"code": "format", "summary": f"Format preference: {ext}"}
|
|
candidates = by_fmt
|
|
|
|
if len(candidates) == 1:
|
|
return candidates[0], reason
|
|
|
|
# Step 3: resolution tag tie-break
|
|
if use_res_tag:
|
|
tagged = [e for e in candidates if _RES_LABEL_RE.search(Path(e.path).name)]
|
|
if tagged:
|
|
if len(tagged) != len(candidates):
|
|
reason = {"code": "resolution_tag", "summary": "Filename has a resolution tag"}
|
|
candidates = tagged
|
|
|
|
if len(candidates) == 1:
|
|
return candidates[0], reason
|
|
|
|
# Step 4: longer filename tie-break
|
|
if use_longer_name:
|
|
keep = max(candidates, key=lambda e: len(Path(e.path).name))
|
|
return keep, {"code": "filename", "summary": "Longer filename tie-break"}
|
|
|
|
return candidates[0], reason
|
|
|
|
|
|
def decide_keep(entries: list[FileEntry]) -> FileEntry:
|
|
"""Pick KEEP candidate for duplicate output."""
|
|
return decide_keep_with_reason(entries)[0]
|
|
|
|
|
|
def find_dupes(entries: Iterable[FileEntry]) -> dict[str, list[FileEntry]]:
|
|
"""Group entries by jav_id. A group is a dupe only if it has >=2 non-Catalog entries."""
|
|
groups: dict[str, list[FileEntry]] = {}
|
|
for e in entries:
|
|
# Re-evaluate duplicate keys from the current filename rules. Cached
|
|
# entries may predate a new part detector such as `.1of2`; treating those
|
|
# stale base IDs as duplicate files would produce risky delete hints.
|
|
key = extract_id(Path(e.path).name) or e.jav_id
|
|
groups.setdefault(key, []).append(e)
|
|
out: dict[str, list[FileEntry]] = {}
|
|
for k, v in groups.items():
|
|
rclone_count = sum(1 for e in v if e.source != "Catalog")
|
|
if rclone_count >= 2:
|
|
out[k] = v
|
|
return out
|
|
|
|
|
|
_SUSPICIOUS_MULTIPART_TAIL_RE = re.compile(
|
|
r"(?:^|[-_.\s])(?:p|pt|part|cd|disc|ep|episode|vol|volume|scene)[-_.\s]*([a-d]|\d{1,2})(?:$|[-_.\s\[])"
|
|
r"|(?:^|[-_.\s])([a-d]|\d{1,2})(?:$|\s*\[)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def describe_dupe_risks(jav_id: str, entries: list[FileEntry]) -> list[dict[str, str]]:
|
|
"""Flag duplicate groups that deserve manual review before deletion."""
|
|
rclone = [e for e in entries if e.source != "Catalog"]
|
|
risks: list[dict[str, str]] = []
|
|
if "#part" not in jav_id and len(rclone) >= 3:
|
|
risks.append({
|
|
"code": "large_same_id_group",
|
|
"summary": f"{len(rclone)} files share this base ID; review for unrecognized parts before deleting.",
|
|
})
|
|
|
|
suspicious: list[str] = []
|
|
for e in rclone:
|
|
stem = Path(e.path).stem
|
|
base_match = PRIMARY_ID_RE.match(stem) or COMPOUND_ID_RE.match(stem) or FALLBACK_ID_RE.match(stem)
|
|
if not base_match:
|
|
continue
|
|
tail = _RESOLUTION_TAG_RE.sub("", stem[base_match.end():]).strip()
|
|
if _SUSPICIOUS_MULTIPART_TAIL_RE.search(tail):
|
|
suspicious.append(Path(e.path).name)
|
|
if suspicious and "#part" not in jav_id:
|
|
samples = ", ".join(suspicious[:3])
|
|
more = " ..." if len(suspicious) > 3 else ""
|
|
risks.append({
|
|
"code": "part_like_suffix",
|
|
"summary": f"Part-like suffixes still share the base ID: {samples}{more}",
|
|
})
|
|
return risks
|
|
|
|
|
|
def find_variant_alerts(
|
|
entries: Iterable[FileEntry],
|
|
) -> dict[str, list[FileEntry]]:
|
|
"""Detect IDs where a bare form and a lowercase-variant form coexist.
|
|
|
|
Example: both ``IBW-902.mp4`` and ``IBW-902z.mp4`` are present.
|
|
They are different products — not dupes — but their coexistence is
|
|
suspicious and warrants manual comparison.
|
|
|
|
Returns {bare_id: [all entries whose re-evaluated ID matches bare or variant]}.
|
|
Only bare IDs that have at least one variant sibling are included.
|
|
"""
|
|
index: dict[str, list[FileEntry]] = {}
|
|
for e in entries:
|
|
key = extract_id(Path(e.path).name) or e.jav_id
|
|
index.setdefault(key, []).append(e)
|
|
|
|
alerts: dict[str, list[FileEntry]] = {}
|
|
for jav_id in index:
|
|
if "#" in jav_id:
|
|
continue # skip multipart IDs
|
|
m = _VARIANT_SUFFIX_RE.match(jav_id)
|
|
if not m:
|
|
continue
|
|
bare = m.group(1)
|
|
if bare in index:
|
|
# Merge bare + variant entries under the bare key.
|
|
if bare not in alerts:
|
|
alerts[bare] = list(index[bare])
|
|
alerts[bare].extend(index[jav_id])
|
|
return alerts
|
|
|
|
|
|
# ---------- library issues (non-canonical filenames) ----------
|
|
|
|
def _bracket_to_canonical(filename: str) -> str:
|
|
"""[REAL-779].mp4 → REAL-779.mp4 | [HODV-21076] Saki [1080p].mkv → HODV-21076 Saki [1080p].mkv"""
|
|
stem = Path(filename).stem
|
|
suffix = Path(filename).suffix
|
|
bm = _BRACKET_ID_RE.match(stem)
|
|
if not bm:
|
|
return filename
|
|
inner = bm.group(1).strip()
|
|
rest = stem[bm.end():].strip()
|
|
new_stem = f"{inner} {rest}".strip() if rest else inner
|
|
return f"{new_stem}{suffix}"
|
|
|
|
|
|
def _nohyphen_to_canonical(filename: str) -> str:
|
|
"""MVSD312 [576p].avi → MVSD-312 [576p].avi"""
|
|
stem = Path(filename).stem
|
|
suffix = Path(filename).suffix
|
|
m = _NOHYPHEN_ID_RE.match(stem)
|
|
if not m:
|
|
return filename
|
|
prefix = m.group(1).upper()
|
|
num_str = m.group(2)
|
|
rest = stem[m.end():]
|
|
return f"{prefix}-{num_str}{rest}{suffix}"
|
|
|
|
|
|
def find_library_issues(cache: dict) -> dict:
|
|
"""Scan cache for files with non-canonical names.
|
|
|
|
Returns:
|
|
{"bracket_names": [...], "nohyphen_names": [...]}
|
|
Each entry: {remote, path, size, mod_time, jav_id, canonical_name, issue}
|
|
"""
|
|
bracket: list[dict] = []
|
|
nohyphen: list[dict] = []
|
|
for remote, remote_data in cache.get("remotes", {}).items():
|
|
for f in remote_data.get("files", []):
|
|
fname = Path(f["path"]).name
|
|
stem = Path(fname).stem
|
|
if stem.startswith("[") and _BRACKET_ID_RE.match(stem):
|
|
bracket.append({
|
|
"remote": remote,
|
|
"path": f["path"],
|
|
"size": f.get("size", 0),
|
|
"size_human": human_size(f.get("size", 0)),
|
|
"mod_time": f.get("mod_time", ""),
|
|
"jav_id": f.get("jav_id", ""),
|
|
"canonical_name": _bracket_to_canonical(fname),
|
|
"issue": "bracket_id",
|
|
})
|
|
elif (not PRIMARY_ID_RE.match(stem)
|
|
and not COMPOUND_ID_RE.match(stem)
|
|
and not FALLBACK_ID_RE.match(stem)
|
|
and _NOHYPHEN_ID_RE.match(stem)):
|
|
nohyphen.append({
|
|
"remote": remote,
|
|
"path": f["path"],
|
|
"size": f.get("size", 0),
|
|
"size_human": human_size(f.get("size", 0)),
|
|
"mod_time": f.get("mod_time", ""),
|
|
"jav_id": f.get("jav_id", ""),
|
|
"canonical_name": _nohyphen_to_canonical(fname),
|
|
"issue": "nohyphen_id",
|
|
})
|
|
return {"bracket_names": bracket, "nohyphen_names": nohyphen}
|
|
|
|
|
|
def rename_file_in_remote(
|
|
remote: str,
|
|
old_rel_path: str,
|
|
new_rel_path: str,
|
|
cache: dict,
|
|
rclone_bin: str = "rclone",
|
|
save: bool = True,
|
|
) -> dict:
|
|
"""Rename one file via rclone moveto and patch cache.json.
|
|
|
|
Returns {"ok": True, "old_path": ..., "new_path": ...}
|
|
or {"ok": False, "error": ..., "conflict": bool}
|
|
|
|
Pass save=False when batching — caller is responsible for calling save_cache() once.
|
|
"""
|
|
sep = "" if remote.endswith("/") else "/"
|
|
old_full = f"{remote}{sep}{old_rel_path}"
|
|
new_full = f"{remote}{sep}{new_rel_path}"
|
|
|
|
# Collision check — does target already exist?
|
|
check = subprocess.run(
|
|
[rclone_bin, "lsf", new_full],
|
|
capture_output=True, text=True,
|
|
)
|
|
if check.returncode == 0 and check.stdout.strip():
|
|
return {"ok": False, "error": f"Target already exists: {new_full}", "conflict": True}
|
|
|
|
# Perform rename
|
|
result = subprocess.run(
|
|
[rclone_bin, "moveto", old_full, new_full],
|
|
capture_output=True, text=True,
|
|
)
|
|
if result.returncode != 0:
|
|
return {"ok": False, "error": (result.stderr or result.stdout).strip(), "conflict": False}
|
|
|
|
# Patch cache — update path + jav_id for the renamed entry
|
|
remote_data = cache.get("remotes", {}).get(remote)
|
|
if remote_data:
|
|
for f in remote_data.get("files", []):
|
|
if f["path"] == old_rel_path:
|
|
f["path"] = new_rel_path
|
|
f["jav_id"] = extract_id(Path(new_rel_path).name) or f["jav_id"]
|
|
break
|
|
remote_data["skipped"] = [s for s in remote_data.get("skipped", []) if s != old_rel_path]
|
|
if save:
|
|
save_cache(cache)
|
|
|
|
return {"ok": True, "old_path": old_full, "new_path": new_full}
|
|
|
|
|
|
def rename_files_batch(
|
|
renames: list[dict],
|
|
cache: dict,
|
|
rclone_bin: str = "rclone",
|
|
) -> list[dict]:
|
|
"""Rename multiple files, writing cache once at the end.
|
|
|
|
Each item in renames: {remote, old_path, new_path}
|
|
Returns list of per-file results with old_path/new_path echoed back.
|
|
"""
|
|
results = []
|
|
cache_dirty = False
|
|
for r in renames:
|
|
res = rename_file_in_remote(
|
|
r["remote"], r["old_path"], r["new_path"],
|
|
cache, rclone_bin=rclone_bin, save=False,
|
|
)
|
|
res["old_path"] = r["old_path"]
|
|
res["new_path"] = r["new_path"]
|
|
results.append(res)
|
|
if res["ok"]:
|
|
cache_dirty = True
|
|
if cache_dirty:
|
|
save_cache(cache)
|
|
return results
|
|
|
|
|
|
# ---------- plain renderers (--basic) ----------
|
|
|
|
def render_banner_plain(cache_meta: dict[str, dict], mode: str) -> str:
|
|
lines = [ansi(f"=== rc-jav ({mode}) ===", ANSI_BOLD)]
|
|
for r, m in cache_meta.items():
|
|
if m["cached"]:
|
|
tag = f"CACHED {m['age']}" + (" STALE" if m["stale"] else "")
|
|
tag_c = ansi(tag, ANSI_YELLOW if m["stale"] else ANSI_DIM)
|
|
else:
|
|
tag_c = ansi("FRESH SCAN", ANSI_GREEN)
|
|
count_str = ansi(f"({m['file_count']} files)", ANSI_DIM)
|
|
lines.append(f" {r} {tag_c} {count_str}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def render_search_plain(matches: dict[str, list[FileEntry]], queries: list[str],
|
|
cache_meta: dict[str, dict]) -> str:
|
|
lines: list[str] = []
|
|
if cache_meta:
|
|
lines.append(render_banner_plain(cache_meta, "search"))
|
|
lines.append("")
|
|
for q in queries:
|
|
hits = matches.get(q, [])
|
|
if not hits:
|
|
lines.append(ansi(f"[{q}] NOT FOUND", ANSI_RED))
|
|
lines.append("")
|
|
continue
|
|
lines.append(ansi(f"[{q}] {len(hits)} hit(s)", ANSI_GREEN + ANSI_BOLD))
|
|
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
|
|
meta = cache_meta.get(e.remote, {})
|
|
if meta.get("cached"):
|
|
tag = ansi("[CACHED-STALE]", ANSI_YELLOW) if meta.get("stale") else ansi("[CACHED]", ANSI_DIM)
|
|
else:
|
|
tag = ansi("[FRESH]", ANSI_GREEN)
|
|
src = ansi(e.source, ANSI_YELLOW)
|
|
lines.append(f" {src} {tag}")
|
|
lines.append(ansi(f" file: {Path(e.path).name}", ANSI_BOLD))
|
|
lines.append(f" size: {human_size(e.size)} ({e.size:,} bytes)")
|
|
lines.append(ansi(f" path: {e.full_path}", ANSI_DIM))
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------- file outputs ----------
|
|
|
|
def render_dupes_plain(dupes, skipped, variant_alerts=None) -> str:
|
|
lines: list[str] = []
|
|
if not dupes:
|
|
lines.append(ansi("No duplicates found.", ANSI_GREEN))
|
|
else:
|
|
lines.append(ansi(f"Found {len(dupes)} duplicate ID group(s):", ANSI_BOLD))
|
|
lines.append("")
|
|
total_reclaim = 0
|
|
for jav_id in sorted(dupes):
|
|
entries = dupes[jav_id]
|
|
keep = decide_keep(entries)
|
|
lines.append(ansi(f"[{jav_id}]", ANSI_BOLD))
|
|
for e in sorted(entries, key=lambda x: (x.source != "Source", x.source == "Catalog", -x.size)):
|
|
if e.source == "Catalog":
|
|
mark = ansi("CATALOG ", ANSI_CYAN)
|
|
elif e is keep:
|
|
mark = ansi("KEEP ", ANSI_GREEN)
|
|
else:
|
|
mark = ansi("DELETE? ", ANSI_RED)
|
|
total_reclaim += e.size
|
|
src = ansi(f"{e.source:>8}", ANSI_YELLOW)
|
|
size_str = f"{human_size(e.size)} ({e.size:,} B)"
|
|
lines.append(f" {mark} {src} {size_str:>26} {e.full_path}")
|
|
lines.append("")
|
|
lines.append(ansi(f"Potential space reclaim if all DELETE? removed: {human_size(total_reclaim)}", ANSI_BOLD))
|
|
if skipped:
|
|
lines.append("")
|
|
lines.append(ansi(f"Skipped {len(skipped)} file(s) with no parseable ID:", ANSI_DIM))
|
|
for remote, path in skipped[:50]:
|
|
lines.append(ansi(f" {remote} {path}", ANSI_DIM))
|
|
if len(skipped) > 50:
|
|
lines.append(ansi(f" ... +{len(skipped) - 50} more", ANSI_DIM))
|
|
if variant_alerts:
|
|
lines.append("")
|
|
lines.append(ansi(f"⚠ {len(variant_alerts)} variant alert(s) — manual review required:", ANSI_YELLOW + ANSI_BOLD))
|
|
for bare_id, entries in sorted(variant_alerts.items()):
|
|
lines.append(ansi(f" [{bare_id}] bare + variant coexist", ANSI_YELLOW))
|
|
for e in sorted(entries, key=lambda x: x.full_path):
|
|
eid = extract_id(Path(e.path).name) or e.jav_id
|
|
lines.append(f" {ansi(eid, ANSI_YELLOW)} {human_size(e.size):>10} {e.full_path}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def write_txt(path: Path, dupes, skipped):
|
|
path.write_text(render_dupes_plain(dupes, skipped), encoding="utf-8")
|
|
|
|
|
|
def write_csv(path: Path, dupes):
|
|
with path.open("w", newline="", encoding="utf-8") as f:
|
|
w = csv.writer(f)
|
|
w.writerow(["jav_id", "action", "source", "remote", "path", "full_path",
|
|
"size_bytes", "size_human", "mod_time"])
|
|
for jav_id in sorted(dupes):
|
|
entries = dupes[jav_id]
|
|
keep = decide_keep(entries)
|
|
for e in entries:
|
|
if e.source == "Catalog":
|
|
action = "CATALOG"
|
|
elif e is keep:
|
|
action = "KEEP"
|
|
else:
|
|
action = "DELETE?"
|
|
w.writerow([jav_id, action, e.source,
|
|
e.remote, e.path, e.full_path, e.size, human_size(e.size), e.mod_time])
|
|
|
|
|
|
def describe_skipped_id(remote: str, path: str) -> dict[str, str]:
|
|
"""Explain a common reason a path did not yield an ID."""
|
|
name = Path((path or "").replace("\\", "/")).name
|
|
reason = "No supported JAV ID at filename start"
|
|
hint = "Rename with a leading ID such as ABC-123 or add an ID normalizer/site-specific source."
|
|
if re.match(r"^\[[A-Za-z0-9-]+-\d+\]", name):
|
|
reason = "ID is wrapped in leading brackets"
|
|
hint = "Remove the leading brackets so the filename starts with the ID."
|
|
elif re.match(r"^[A-Za-z][A-Za-z0-9]+[\u2010-\u2015]\d+", name):
|
|
reason = "ID uses a non-ASCII dash"
|
|
hint = "Replace the separator with a normal hyphen."
|
|
elif re.match(r"^[A-Za-z][A-Za-z0-9]+\d+", name):
|
|
reason = "ID prefix and number have no hyphen"
|
|
hint = "Insert the ID hyphen, for example ABC-123."
|
|
return {"remote": remote, "path": path, "name": name, "reason": reason, "hint": hint}
|
|
|
|
|
|
def dupes_to_obj(dupes, skipped, variant_alerts=None) -> dict:
|
|
out = {"groups": {}, "skipped": [describe_skipped_id(r, p) for r, p in skipped],
|
|
"variant_alerts": []}
|
|
for jav_id in sorted(dupes):
|
|
entries = dupes[jav_id]
|
|
keep, keep_reason = decide_keep_with_reason(entries)
|
|
out["groups"][jav_id] = {
|
|
"keep": asdict(keep) | {"full_path": keep.full_path, "size_human": human_size(keep.size)},
|
|
"keep_reason": keep_reason,
|
|
"risks": describe_dupe_risks(jav_id, entries),
|
|
"delete_candidates": [asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size)}
|
|
for e in entries
|
|
if e is not keep and e.source != "Catalog"],
|
|
"catalog": [asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size)}
|
|
for e in entries if e.source == "Catalog"],
|
|
}
|
|
for bare_id, entries in sorted((variant_alerts or {}).items()):
|
|
out["variant_alerts"].append({
|
|
"bare_id": bare_id,
|
|
"files": [
|
|
asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size),
|
|
"detected_id": extract_id(Path(e.path).name) or e.jav_id}
|
|
for e in sorted(entries, key=lambda x: x.full_path)
|
|
],
|
|
})
|
|
return out
|
|
|
|
|
|
def write_json(path: Path, dupes, skipped, variant_alerts=None):
|
|
path.write_text(json.dumps(dupes_to_obj(dupes, skipped, variant_alerts), indent=2), encoding="utf-8")
|
|
|
|
|
|
# ---------- main ----------
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Report duplicate JAV files across rclone remotes (read-only).")
|
|
ap.add_argument("--source", "-s", action="append", default=[], metavar="REMOTE",
|
|
help="Source remote path (priority — wins dupes regardless of size). Repeatable.")
|
|
ap.add_argument("--target", "-t", action="append", default=[], metavar="REMOTE",
|
|
help="Target remote path (non-priority — largest size wins among targets). Repeatable.")
|
|
ap.add_argument("--format", choices=["console", "txt", "csv", "json", "all"],
|
|
default="console")
|
|
ap.add_argument("--output-dir", default="./reports", help="Where to write txt/csv/json.")
|
|
ap.add_argument("--no-color", action="store_true")
|
|
ap.add_argument("--rclone-bin", default="rclone",
|
|
help="Path to rclone executable (default: 'rclone' on PATH).")
|
|
ap.add_argument("--search", action="append", default=[], metavar="ID",
|
|
help="Search mode: look up a JAV ID (e.g. SSIS-001). Repeatable. "
|
|
"If no --source/--target given, default target is used.")
|
|
ap.add_argument("--name", action="append", default=[], metavar="STR",
|
|
help="Substring/glob search against filename. Case-insensitive. "
|
|
"Repeatable; OR semantics (any token match = hit). "
|
|
"Supports * and ? wildcards. Use quotes for spaces.")
|
|
ap.add_argument("--update", "-u", action="store_true",
|
|
help="Search mode: force re-scan and overwrite cache for requested remotes.")
|
|
ap.add_argument("--no-cache", action="store_true",
|
|
help="Search mode: bypass cache entirely (no read, no write).")
|
|
ap.add_argument("--quick", "-q", action="store_true",
|
|
help="Force quick mode: skip cache, query rclone directly with --include glob. "
|
|
"Default is auto: single exact IDs use quick, wildcards/ranges/multi use cached.")
|
|
ap.add_argument("--cache", action="store_true",
|
|
help="Force cached mode (opposite of --quick).")
|
|
ap.add_argument("--save", action="store_true",
|
|
help="Persist the --source / --target / --catalog values you passed "
|
|
"as new defaults in config.json next to the script. "
|
|
"Only keys you explicitly passed are saved.")
|
|
ap.add_argument("--scan", action="store_true",
|
|
help="Walk configured remotes, refresh cache, exit. No search/dupe output. "
|
|
"Default scope: DEFAULT_TARGET. Override with --source/--target. "
|
|
"Always overwrites cache. Suitable for Task Scheduler / cron.")
|
|
ap.add_argument("--scan-since", metavar="DURATION",
|
|
help="Incremental scan: only walk files modified within DURATION "
|
|
"(e.g. 24h, 7d, 30m, 90s). Merges new/changed entries on top of "
|
|
"the existing cache; old entries are preserved. Falls back to a "
|
|
"full scan if there's no prior cache for a remote. Requires --scan.")
|
|
ap.add_argument("--catalog", action="append", default=[], metavar="PATH",
|
|
help="Path to a WinCatalog CSV or XML export. Repeatable. "
|
|
"Listed under 'Catalog' in results (informational, never KEEP/DELETE?).")
|
|
ap.add_argument("--part-pattern", action="append", default=[], metavar="REGEX",
|
|
help="Extra multipart filename regex. Repeatable; first capture group must be the part number. "
|
|
"Patterns run against the filename stem after built-in part detectors.")
|
|
ap.add_argument("--library-issues", action="store_true",
|
|
help="Report non-canonical filenames (bracket-wrapped IDs, no-hyphen IDs). "
|
|
"Reads from cache. Outputs JSON when --format json, plain otherwise.")
|
|
ap.add_argument("--rename-file", action="store_true",
|
|
help="Rename one file in a remote and patch cache. "
|
|
"Requires --remote, --old-path, --new-path. Outputs JSON.")
|
|
ap.add_argument("--rename-files-batch", action="store_true",
|
|
help="Rename multiple files in one call, writing cache once. "
|
|
"Reads JSON array of {remote, old_path, new_path} from stdin. Outputs JSON.")
|
|
ap.add_argument("--remote", metavar="REMOTE",
|
|
help="Remote path root for --rename-file (e.g. cq:JAV).")
|
|
ap.add_argument("--old-path", metavar="PATH",
|
|
help="Relative path of the file to rename (within --remote).")
|
|
ap.add_argument("--new-path", metavar="PATH",
|
|
help="New relative path after rename (within --remote).")
|
|
ap.add_argument("--basic", action="store_true",
|
|
help="Plain text output, no rich tables/panels/progress bars. "
|
|
"Useful for piping or low-bandwidth terminals.")
|
|
ap.add_argument("--clearjav", action="store_true",
|
|
help="Shortcut: use DEFAULT_SOURCE as --source and DEFAULT_TARGET as --target, "
|
|
"Equivalent to "
|
|
"`--source cq:personal-files/ClearJAV --target cq:personal-files/JAV/TMP`.")
|
|
args = ap.parse_args()
|
|
|
|
global RCLONE_BIN, console, BASIC, DEFAULT_SOURCE, DEFAULT_TARGET, DEFAULT_CATALOG
|
|
RCLONE_BIN = args.rclone_bin
|
|
BASIC = args.basic or args.format == "json"
|
|
|
|
# Apply persisted config overrides BEFORE defaults are consulted.
|
|
cfg = load_config()
|
|
if "default_source" in cfg:
|
|
DEFAULT_SOURCE = list(cfg["default_source"])
|
|
if "default_target" in cfg:
|
|
DEFAULT_TARGET = list(cfg["default_target"])
|
|
if "default_catalog" in cfg:
|
|
DEFAULT_CATALOG = list(cfg["default_catalog"])
|
|
global _KEEP_RANKING
|
|
_KEEP_RANKING = cfg.get("keep_ranking") or {}
|
|
part_patterns = list(cfg.get("part_patterns") or []) + list(args.part_pattern)
|
|
pattern_errors = configure_part_patterns(part_patterns)
|
|
if pattern_errors:
|
|
for err in pattern_errors:
|
|
console.print(f"[red]invalid part pattern:[/] {err}")
|
|
sys.exit(2)
|
|
|
|
# --save: persist explicitly-passed values, exit.
|
|
if args.save:
|
|
if not (args.source or args.target or args.catalog or args.part_pattern):
|
|
console.print("[red]--save needs at least one --source/--target/--catalog/--part-pattern value to persist.[/]")
|
|
sys.exit(2)
|
|
new_cfg = dict(cfg)
|
|
if args.source:
|
|
new_cfg["default_source"] = list(args.source)
|
|
if args.target:
|
|
new_cfg["default_target"] = list(args.target)
|
|
if args.catalog:
|
|
new_cfg["default_catalog"] = list(args.catalog)
|
|
if args.part_pattern:
|
|
new_cfg["part_patterns"] = list(args.part_pattern)
|
|
save_config(new_cfg)
|
|
console.print(f"[green]Saved to {CONFIG_PATH}:[/]")
|
|
for k in ("default_source", "default_target", "default_catalog", "part_patterns"):
|
|
if k in new_cfg:
|
|
console.print(f" {k} = {new_cfg[k]}")
|
|
sys.exit(0)
|
|
global USE_ANSI
|
|
USE_ANSI = not args.no_color
|
|
if args.no_color or BASIC:
|
|
console = Console(no_color=True, color_system=None, highlight=False)
|
|
|
|
# Search mode: defaults kick in if no remotes specified.
|
|
if args.clearjav:
|
|
if not args.source:
|
|
args.source = list(DEFAULT_SOURCE)
|
|
if not args.target:
|
|
args.target = list(DEFAULT_TARGET)
|
|
|
|
if args.search and not args.source and not args.target:
|
|
args.target = list(DEFAULT_TARGET)
|
|
|
|
# --scan: default to DEFAULT_TARGET only, always overwrite cache.
|
|
if args.scan:
|
|
if not args.source and not args.target:
|
|
args.target = list(DEFAULT_TARGET)
|
|
args.update = True
|
|
|
|
# Use default catalog(s) if user passed none.
|
|
if not args.catalog and DEFAULT_CATALOG:
|
|
args.catalog = list(DEFAULT_CATALOG)
|
|
|
|
# --library-issues: read-only cache scan for non-canonical filenames.
|
|
if args.library_issues:
|
|
cache = load_cache()
|
|
issues = find_library_issues(cache)
|
|
if args.format == "json" or BASIC:
|
|
print(json.dumps({"ok": True, **issues}))
|
|
else:
|
|
bracket = issues["bracket_names"]
|
|
nohyphen = issues["nohyphen_names"]
|
|
total = len(bracket) + len(nohyphen)
|
|
if not total:
|
|
console.print(Panel("[bold green]No library issues found.[/]", title="Library Issues"))
|
|
else:
|
|
from rich.table import Table
|
|
t = Table(title=f"Library Issues ({total} file(s))", show_lines=True)
|
|
t.add_column("Issue", style="yellow", width=14)
|
|
t.add_column("Current Name")
|
|
t.add_column("Canonical Name", style="green")
|
|
t.add_column("Remote", style="dim")
|
|
for e in bracket:
|
|
t.add_row("bracket ID", Path(e["path"]).name,
|
|
e["canonical_name"], e["remote"])
|
|
for e in nohyphen:
|
|
t.add_row("no hyphen", Path(e["path"]).name,
|
|
e["canonical_name"], e["remote"])
|
|
console.print(t)
|
|
sys.exit(0)
|
|
|
|
# --rename-files-batch: rename multiple files, single cache write.
|
|
if args.rename_files_batch:
|
|
try:
|
|
renames = json.loads(sys.stdin.read())
|
|
except json.JSONDecodeError as e:
|
|
print(json.dumps({"ok": False, "error": f"Invalid JSON on stdin: {e}"}))
|
|
sys.exit(1)
|
|
if not isinstance(renames, list):
|
|
print(json.dumps({"ok": False, "error": "stdin must be a JSON array"}))
|
|
sys.exit(1)
|
|
cache = load_cache()
|
|
results = rename_files_batch(renames, cache, rclone_bin=RCLONE_BIN)
|
|
ok = any(r["ok"] for r in results)
|
|
print(json.dumps({"ok": ok, "results": results}))
|
|
sys.exit(0 if ok else 1)
|
|
|
|
# --rename-file: rename one file in a remote and patch cache.
|
|
if args.rename_file:
|
|
if not args.remote or not args.old_path or not args.new_path:
|
|
ap.error("--rename-file requires --remote, --old-path, and --new-path.")
|
|
cache = load_cache()
|
|
result = rename_file_in_remote(
|
|
args.remote, args.old_path, args.new_path, cache, rclone_bin=RCLONE_BIN
|
|
)
|
|
print(json.dumps(result))
|
|
sys.exit(0 if result["ok"] else 1)
|
|
|
|
if not args.source and not args.target and not args.catalog:
|
|
ap.error("Provide at least one --source, --target, or --catalog.")
|
|
|
|
# Scan-only mode: walk remotes, write cache, summary, exit.
|
|
if args.scan:
|
|
scan_since = None
|
|
if args.scan_since:
|
|
scan_since = parse_duration(args.scan_since)
|
|
if not scan_since:
|
|
console.print(f"[red]invalid --scan-since value: {args.scan_since!r} "
|
|
f"(expected e.g. 24h, 7d, 30m, 90s)[/]")
|
|
sys.exit(2)
|
|
cache = load_cache()
|
|
cache_meta: dict[str, dict] = {}
|
|
skipped: list[tuple[str, str]] = []
|
|
t0 = time.perf_counter()
|
|
if BASIC:
|
|
# `--scan` resolves its default target above. Report only the
|
|
# remotes that this scan will actually walk; falling back here to
|
|
# DEFAULT_SOURCE would resurrect retired source roots in job UI.
|
|
_all_remotes = list(args.source) + list(args.target)
|
|
sys.stderr.write("SCAN_START " + json.dumps({
|
|
"remotes": _all_remotes, "total": len(_all_remotes),
|
|
}) + "\n")
|
|
sys.stderr.flush()
|
|
entries = (cached_collect(args.source, "Source", skipped, cache,
|
|
use_cache=not args.no_cache, force_update=True,
|
|
cache_meta=cache_meta, scan_since=scan_since)
|
|
+ cached_collect(args.target, "Target", skipped, cache,
|
|
use_cache=not args.no_cache, force_update=True,
|
|
cache_meta=cache_meta, scan_since=scan_since))
|
|
if not args.no_cache:
|
|
save_cache(cache)
|
|
elapsed = time.perf_counter() - t0
|
|
if BASIC:
|
|
sys.stderr.write(f"Scan complete: {len(entries)} files in {elapsed:.2f}s\n")
|
|
sys.stderr.write(f"Cache: {CACHE_PATH}\n" if not args.no_cache
|
|
else "Cache: (skipped, --no-cache)\n")
|
|
else:
|
|
console.print(f"[bold green]Scan complete:[/] {len(entries)} files in {elapsed:.2f}s")
|
|
if not args.no_cache:
|
|
console.print(f"[dim]Cache: {CACHE_PATH}[/]")
|
|
else:
|
|
console.print("[dim]Cache: (skipped, --no-cache)[/]")
|
|
sys.exit(0)
|
|
|
|
skipped: list[tuple[str, str]] = []
|
|
t0 = time.perf_counter()
|
|
|
|
if args.search or args.name:
|
|
search_timings: dict[str, int] = {}
|
|
# If --name was passed without explicit remotes, fall back to default target
|
|
# (catalog default already injected earlier; don't let it suppress remote defaulting).
|
|
if args.name and not args.search and not args.source and not args.target:
|
|
args.target = list(DEFAULT_TARGET)
|
|
# Substring name search can't be server-side filtered on most backends — cache wins.
|
|
# Only the ID search shape benefits from quick (server-side prefix glob).
|
|
if args.name and not args.quick:
|
|
mode, reason = "cached", "name substring search — cache is faster than rclone --include"
|
|
else:
|
|
combined = list(args.search) + list(args.name)
|
|
mode, reason = choose_search_mode(combined, args.quick, args.cache)
|
|
if BASIC:
|
|
sys.stderr.write(f"Mode: {mode} ({reason})\n")
|
|
else:
|
|
mode_color = "green" if mode == "quick" else "cyan"
|
|
console.print(f"[{mode_color}]Mode: {mode}[/] [dim]({reason})[/]")
|
|
|
|
phase_t0 = time.perf_counter()
|
|
cache = load_cache()
|
|
search_timings["cache_load_ms"] = round((time.perf_counter() - phase_t0) * 1000)
|
|
use_cache = not args.no_cache and mode == "cached"
|
|
cache_meta: dict[str, dict] = {}
|
|
phase_t0 = time.perf_counter()
|
|
if mode == "quick":
|
|
all_patterns: list[str] = []
|
|
for raw in args.search:
|
|
all_patterns.extend(query_to_include_patterns(raw))
|
|
all_patterns.extend(name_to_include_patterns(args.name))
|
|
entries = []
|
|
for r in args.source:
|
|
cache_meta[r] = {"cached": False, "age": "quick", "stale": False, "file_count": 0}
|
|
got = quick_search_remote(r, "Source", all_patterns, skipped)
|
|
entries.extend(got)
|
|
cache_meta[r]["file_count"] = len(got)
|
|
for r in args.target:
|
|
cache_meta[r] = {"cached": False, "age": "quick", "stale": False, "file_count": 0}
|
|
got = quick_search_remote(r, "Target", all_patterns, skipped)
|
|
entries.extend(got)
|
|
cache_meta[r]["file_count"] = len(got)
|
|
else:
|
|
entries = (cached_collect(args.source, "Source", skipped, cache,
|
|
use_cache, args.update, cache_meta)
|
|
+ cached_collect(args.target, "Target", skipped, cache,
|
|
use_cache, args.update, cache_meta))
|
|
search_timings["entry_collect_ms"] = round((time.perf_counter() - phase_t0) * 1000)
|
|
# Load each catalog separately so cache_meta gets the per-catalog count
|
|
# (was global total — every catalog reported the sum across all).
|
|
catalog_entries: list[FileEntry] = []
|
|
phase_t0 = time.perf_counter()
|
|
for cp_str in args.catalog:
|
|
for cp in _expand_catalog_paths([cp_str]):
|
|
ext = cp.suffix.lower()
|
|
if ext == ".csv":
|
|
one = load_catalog_csv(cp, skipped)
|
|
elif ext == ".xml":
|
|
one = load_catalog_xml(cp, skipped)
|
|
else:
|
|
console.print(f"[yellow]WARN: unknown catalog format '{ext}' for {cp}; skipping.[/]")
|
|
continue
|
|
catalog_entries.extend(one)
|
|
cache_meta[f"catalog:{cp.name}"] = {
|
|
"cached": False, "age": "loaded", "stale": False,
|
|
"file_count": len(one),
|
|
}
|
|
entries.extend(catalog_entries)
|
|
search_timings["catalog_load_ms"] = round((time.perf_counter() - phase_t0) * 1000)
|
|
if use_cache and args.update:
|
|
save_cache(cache)
|
|
else:
|
|
if args.cache and not args.no_cache:
|
|
cache = load_cache()
|
|
cache_meta: dict[str, dict] = {}
|
|
entries = (cached_collect(args.source, "Source", skipped, cache,
|
|
use_cache=True, force_update=False,
|
|
cache_meta=cache_meta)
|
|
+ cached_collect(args.target, "Target", skipped, cache,
|
|
use_cache=True, force_update=False,
|
|
cache_meta=cache_meta))
|
|
else:
|
|
remotes_by_label = ([("Source", r) for r in args.source]
|
|
+ [("Target", r) for r in args.target])
|
|
entries = collect_with_progress(remotes_by_label, skipped)
|
|
entries.extend(load_catalogs(args.catalog, skipped))
|
|
|
|
elapsed = time.perf_counter() - t0
|
|
if BASIC:
|
|
sys.stderr.write(f"Scanned/loaded {len(entries)} file(s) in {elapsed:.2f}s\n")
|
|
else:
|
|
console.print(f"[dim]Scanned/loaded {len(entries)} file(s) in {elapsed:.2f}s[/]")
|
|
|
|
if args.search or args.name:
|
|
# query_expansions: original_raw -> list of normalized IDs / wildcard patterns to look up
|
|
query_expansions: dict[str, list[str]] = {}
|
|
queries: list[str] = []
|
|
for raw in args.search:
|
|
if RANGE_RE.search(raw):
|
|
expanded = expand_range(raw) or []
|
|
normed: list[str] = []
|
|
for r in expanded:
|
|
n = normalize_id(r)
|
|
if n:
|
|
normed.append(n)
|
|
if not normed:
|
|
console.print(f"[yellow]WARN: range '{raw}' produced no valid IDs.[/]")
|
|
continue
|
|
queries.append(raw)
|
|
query_expansions[raw] = normed
|
|
continue
|
|
if "*" in raw or "?" in raw:
|
|
q = raw.upper()
|
|
queries.append(q)
|
|
query_expansions[q] = [q]
|
|
continue
|
|
norm = normalize_id(raw)
|
|
if not norm:
|
|
console.print(f"[yellow]WARN: cannot parse '{raw}' as a JAV ID, skipping.[/]")
|
|
continue
|
|
# Use the raw (upper-cased) form for display so leading zeros are preserved
|
|
# (e.g. user types PRTD-027 — keep it, don't show PRTD-27). Lookup still uses
|
|
# the normalized form internally.
|
|
display = raw.upper()
|
|
queries.append(display)
|
|
query_expansions[display] = [norm]
|
|
phase_t0 = time.perf_counter()
|
|
index: dict[str, list[FileEntry]] = {}
|
|
for e in entries:
|
|
index.setdefault(e.jav_id, []).append(e)
|
|
search_timings["index_ms"] = round((time.perf_counter() - phase_t0) * 1000)
|
|
phase_t0 = time.perf_counter()
|
|
matches: dict[str, list[FileEntry]] = {}
|
|
match_traces: dict[str, dict[int, dict[str, str]]] = {}
|
|
for q in queries:
|
|
expansions = query_expansions.get(q, [q])
|
|
hits: list[FileEntry] = []
|
|
seen: set[int] = set()
|
|
traces: dict[int, dict[str, str]] = {}
|
|
|
|
def add_hit(entry: FileEntry, matched_query: str) -> None:
|
|
key = id(entry)
|
|
if key in seen:
|
|
return
|
|
seen.add(key)
|
|
hits.append(entry)
|
|
traces[key] = describe_id_match(q, matched_query, entry.jav_id, len(expansions))
|
|
|
|
for sub in expansions:
|
|
if "*" in sub or "?" in sub:
|
|
pat = sub if "#PART" in sub.upper() else sub + "*"
|
|
for k, v in index.items():
|
|
if fnmatch.fnmatchcase(k, pat):
|
|
for e in v:
|
|
add_hit(e, sub)
|
|
elif "#part" in sub:
|
|
for e in index.get(sub, []):
|
|
add_hit(e, sub)
|
|
else:
|
|
for e in index.get(sub, []):
|
|
add_hit(e, sub)
|
|
for k, v in index.items():
|
|
if k.startswith(sub + "#part"):
|
|
for e in v:
|
|
add_hit(e, sub)
|
|
matches[q] = hits
|
|
match_traces[q] = traces
|
|
search_timings["match_ms"] = round((time.perf_counter() - phase_t0) * 1000)
|
|
if args.format == "json":
|
|
# Structured output for tools that consume search results (e.g. the rclonex
|
|
# Brave extension). Includes everything needed to drive a UI: per-query hits
|
|
# with source/remote/path/size/mod_time, plus name-match block + skipped.
|
|
name_hits_json: list[FileEntry] = []
|
|
if args.name:
|
|
for e in entries:
|
|
if name_match(Path(e.path).stem, args.name):
|
|
name_hits_json.append(e)
|
|
out_obj = {
|
|
"queries": [
|
|
{
|
|
"query": q,
|
|
"hits": [
|
|
{"source": e.source, "remote": e.remote, "path": e.path,
|
|
"full_path": e.full_path, "size": e.size,
|
|
"size_human": human_size(e.size),
|
|
"mod_time": e.mod_time, "jav_id": e.jav_id,
|
|
**match_traces.get(q, {}).get(id(e), {})}
|
|
for e in sorted(matches.get(q, []), key=lambda x: (x.jav_id, x.path.lower()))
|
|
],
|
|
}
|
|
for q in queries
|
|
],
|
|
"name_matches": [
|
|
{"source": e.source, "remote": e.remote, "path": e.path,
|
|
"full_path": e.full_path, "size": e.size,
|
|
"size_human": human_size(e.size), "mod_time": e.mod_time,
|
|
"jav_id": e.jav_id, "match_kind": "name",
|
|
"match_reason": "Filename search", "match_confidence": "broad",
|
|
"matched_query": ", ".join(args.name), "matched_id": e.jav_id}
|
|
for e in sorted(name_hits_json, key=lambda x: (x.jav_id, x.path.lower()))
|
|
],
|
|
"name_tokens": list(args.name),
|
|
"cache_meta": cache_meta,
|
|
"skipped_count": len(skipped),
|
|
"elapsed_sec": round(time.perf_counter() - t0, 3),
|
|
"timings": search_timings,
|
|
}
|
|
print(json.dumps(out_obj))
|
|
id_ok = (not queries) or all(matches.values())
|
|
name_ok = (not args.name) or bool(name_hits_json)
|
|
sys.exit(0 if (id_ok and name_ok) else 1)
|
|
if queries:
|
|
if BASIC:
|
|
print(render_search_plain(matches, queries, cache_meta))
|
|
else:
|
|
render_search(matches, queries, cache_meta)
|
|
# --name results as a separate block
|
|
name_hits: list[FileEntry] = []
|
|
if args.name:
|
|
for e in entries:
|
|
if name_match(Path(e.path).stem, args.name):
|
|
name_hits.append(e)
|
|
if BASIC:
|
|
print(render_name_matches_plain(name_hits, args.name, cache_meta))
|
|
else:
|
|
render_name_matches(name_hits, args.name, cache_meta)
|
|
# Exit code: 0 if every search query had hits AND name-search (if used) returned hits.
|
|
id_ok = (not queries) or all(matches.values())
|
|
name_ok = (not args.name) or bool(name_hits)
|
|
sys.exit(0 if (id_ok and name_ok) else 1)
|
|
|
|
dupes = find_dupes(entries)
|
|
variant_alerts = find_variant_alerts(entries)
|
|
if args.format == "json" and BASIC:
|
|
print(json.dumps(dupes_to_obj(dupes, skipped, variant_alerts)))
|
|
sys.exit(0)
|
|
if BASIC:
|
|
print(render_dupes_plain(dupes, skipped, variant_alerts))
|
|
else:
|
|
render_dupes(dupes, skipped, variant_alerts)
|
|
|
|
if args.format != "console":
|
|
out_dir = Path(args.output_dir)
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
targets = {"txt", "csv", "json"} if args.format == "all" else {args.format}
|
|
if "txt" in targets:
|
|
write_txt(out_dir / f"dupes-{stamp}.txt", dupes, skipped)
|
|
if "csv" in targets:
|
|
write_csv(out_dir / f"dupes-{stamp}.csv", dupes)
|
|
if "json" in targets:
|
|
write_json(out_dir / f"dupes-{stamp}.json", dupes, skipped, variant_alerts)
|
|
console.print(f"[dim]Reports written to {out_dir}[/]")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
console.print("\n[yellow]Aborted by user (Ctrl+C). Cache not written for in-flight scans.[/]")
|
|
sys.exit(130)
|