#!/usr/bin/env python3 """Scan rclone remotes for duplicate JAV files grouped by ID.""" from __future__ import annotations import argparse import csv import fnmatch import json import os import re import subprocess import sys import threading import time import xml.etree.ElementTree as ET from dataclasses import dataclass, asdict from datetime import datetime from pathlib import Path from typing import Iterable from rich.console import Console from rich.panel import Panel from rich.progress import ( BarColumn, MofNCompleteColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn, ) from rich.table import Table from rich.text import Text from rcjav.model import FileEntry from rcjav import ids as _rcjav_ids from rcjav.ids import ( PRIMARY_ID_RE, FALLBACK_ID_RE, COMPOUND_ID_RE, RANGE_RE, BUILTIN_PART_RES, configure_part_patterns, detect_part, detect_part_from_stem, part_key, extract_id, normalize_id, describe_id_match, expand_range, _VARIANT_SUFFIX_RE, _RES_LABEL_RE, _RESOLUTION_TAG_RE, _BRACKET_ID_RE, _NOHYPHEN_ID_RE, _VIDEO_EXTS, _LOWEST_KEEP_PRIORITY_EXTS, ) # PART_RES is rebound by configure_part_patterns(); always read it dynamically # from the rcjav.ids module rather than capturing a stale binding at import time. def _current_part_res(): return _rcjav_ids.PART_RES from rcjav.rclone_io import ( RCLONE_BIN, DURATION_RE, set_basic as _set_rclone_basic, set_rclone_bin as _set_rclone_bin, quick_search_remote, choose_search_mode, name_to_include_patterns, name_match, query_to_include_patterns, remote_file_count, parse_duration, walk_remote, ) from rcjav import output as _output from rcjav.output import ( human_size, ansi, ANSI_RESET, ANSI_GREEN, ANSI_RED, ANSI_YELLOW, ANSI_CYAN, ANSI_DIM, ANSI_BOLD, strip_markup, BasicProgress, make_progress, render_banner, render_search, render_name_matches, render_name_matches_plain, render_dupes, render_banner_plain, render_search_plain, render_dupes_plain, write_txt, write_csv, write_json, describe_skipped_id, dupes_to_obj, set_use_ansi as _set_output_use_ansi, set_basic as _set_output_basic, set_console_no_color as _set_output_no_color, ) # rc-jav.py keeps its own local rich Console for the prints that haven't # moved to rcjav.output yet (collectors, main()). When --no-color is in # play we rebind both this and rcjav.output's console. console = Console() # Mirror of rcjav.rclone_io.BASIC for in-tree readers that haven't been # updated yet (output renderers, BasicProgress checks in main()). Set in # main() via both this name and _set_rclone_basic(). BASIC = False # set by --basic console = Console() # replaced in main() if --no-color # Default remotes used when --search is invoked without explicit --source/--target. DEFAULT_SOURCE = ["cq:personal-files/ClearJAV"] DEFAULT_TARGET = ["cq:personal-files/JAV/TMP"] # Default WinCatalog export folder (or specific files). Folder entries auto-discover *.csv / *.xml. DEFAULT_CATALOG: list[str] = [str(Path(__file__).resolve().parent / "wincatalog")] from rcjav.catalog import ( CATALOG_COL_NAME, CATALOG_COL_PATH, CATALOG_COL_SIZE, CATALOG_COL_DISC, normalize_catalog_path, load_catalog_csv, load_catalog_xml, load_catalogs, _expand_catalog_paths, ) from rcjav.cache import ( CACHE_PATH, CACHE_VERSION, CACHE_STALE_HOURS, load_cache, save_cache, cache_age_hours, fmt_age, ) from rcjav.dupes import ( DEFAULT_KEEP_RANKING, set_keep_ranking, decide_keep_with_reason, decide_keep, find_dupes, describe_dupe_risks, find_variant_alerts, ) from rcjav.library import ( find_library_issues, rename_file_in_remote, rename_files_batch, _bracket_to_canonical, _nohyphen_to_canonical, ) CONFIG_PATH = Path(__file__).resolve().parent / "config.json" def load_config() -> dict: if not CONFIG_PATH.exists(): return {} try: data = json.loads(CONFIG_PATH.read_text(encoding="utf-8")) if not isinstance(data, dict): return {} return data except (json.JSONDecodeError, OSError): return {} def save_config(cfg: dict) -> None: tmp = CONFIG_PATH.with_suffix(CONFIG_PATH.suffix + ".tmp") tmp.write_text(json.dumps(cfg, indent=2), encoding="utf-8") os.replace(tmp, CONFIG_PATH) # ---------- collectors ---------- def collect_with_progress(remotes_by_label: list[tuple[str, str]], skipped: list[tuple[str, str]] ) -> list[FileEntry]: """Dupe-mode collect — every remote freshly walked with progress.""" out: list[FileEntry] = [] if not remotes_by_label: return out with make_progress() as progress: tasks = {(label, r): progress.add_task(f"{label} {r}", total=1) for label, r in remotes_by_label} for (label, r), tid in tasks.items(): entries, _ = walk_remote(r, label, skipped, progress, tid) out.extend(entries) return out def cached_collect(remotes: list[str], source_label: str, skipped: list[tuple[str, str]], cache: dict, use_cache: bool, force_update: bool, cache_meta: dict[str, dict], scan_since: str | None = None) -> list[FileEntry]: """Search-mode collect with cache. Always recursive. scan_since: rclone duration string (`24h`, `7d`). When set during a forced update, only files modified within the window are walked and merged on top of the existing cache entry; files older than the window keep their cached record. If there's no prior cache entry for a remote, falls through to a full scan.""" out: list[FileEntry] = [] to_scan: list[str] = [] to_incremental: list[tuple[str, dict]] = [] # (remote, existing_entry) for r in remotes: if scan_since and force_update and use_cache: existing = cache["remotes"].get(r) if existing: to_incremental.append((r, existing)) continue # No prior cache for this remote -> can't be incremental, fall back. entry = cache["remotes"].get(r) if use_cache and not force_update else None if entry: age = cache_age_hours(entry["scanned_at"]) age_str = fmt_age(age) if age is not None else "?" stale = age is not None and age > CACHE_STALE_HOURS cache_meta[r] = {"cached": True, "age": age_str, "stale": stale, "file_count": len(entry["files"])} for f in entry["files"]: out.append(FileEntry(source=source_label, remote=r, path=f["path"], size=f["size"], mod_time=f.get("mod_time", ""), jav_id=f["jav_id"])) for s in entry.get("skipped", []): skipped.append((r, s)) else: to_scan.append(r) if to_scan: with make_progress() as progress: tids = {r: progress.add_task(f"{source_label} {r}", total=1) for r in to_scan} for r_idx, r in enumerate(to_scan): _total: int | None = None if BASIC: # Emit SCAN_REMOTE_START immediately so the UI shows the remote name. # Then probe the file count; once known, emit SCAN_REMOTE_COUNTED so # the UI can show "N / total" without waiting for the first 100 files. sys.stderr.write("SCAN_REMOTE_START " + json.dumps({ "remote": r, "label": source_label, "index": r_idx + 1, "of": len(to_scan), "total": None, }) + "\n") sys.stderr.flush() _total = remote_file_count(r) sys.stderr.write("SCAN_REMOTE_COUNTED " + json.dumps({ "remote": r, "total": _total, }) + "\n") sys.stderr.flush() fresh, local_skipped = walk_remote(r, source_label, skipped, progress, tids[r], _total_override=_total) out.extend(fresh) cache_meta[r] = {"cached": False, "age": "fresh", "stale": False, "file_count": len(fresh)} if use_cache: cache["remotes"][r] = { "scanned_at": datetime.now().astimezone().isoformat(), "recursive": True, "files": [{"path": e.path, "size": e.size, "mod_time": e.mod_time, "jav_id": e.jav_id} for e in fresh], "skipped": local_skipped, } if BASIC: sys.stderr.write("SCAN_PROGRESS " + json.dumps({ "remote": r, "label": source_label, "files": len(fresh), "files_total": len(out), }) + "\n") sys.stderr.flush() if to_incremental: with make_progress() as progress: tids = {r: progress.add_task(f"{source_label} {r} (since {scan_since})", total=1) for r, _ in to_incremental} for r_idx, (r, existing) in enumerate(to_incremental): if BASIC: sys.stderr.write("SCAN_REMOTE_START " + json.dumps({ "remote": r, "label": source_label, "index": r_idx + 1, "of": len(to_incremental), "total": None, "incremental": True, }) + "\n") sys.stderr.flush() fresh, local_skipped = walk_remote( r, source_label, skipped, progress, tids[r], max_age=scan_since, ) # Merge: replace entries at paths we just walked, keep all others. new_paths = {e.path for e in fresh} old_files = [f for f in existing.get("files", []) if f["path"] not in new_paths] merged_files = old_files + [ {"path": e.path, "size": e.size, "mod_time": e.mod_time, "jav_id": e.jav_id} for e in fresh ] # Merge skipped lists (de-dupe). old_skipped = set(existing.get("skipped", [])) old_skipped.update(local_skipped) # Emit FileEntry for everything (old + new) so the caller sees the # full set, not just deltas. for f in merged_files: out.append(FileEntry(source=source_label, remote=r, path=f["path"], size=f["size"], mod_time=f.get("mod_time", ""), jav_id=f["jav_id"])) for s in old_skipped: skipped.append((r, s)) cache_meta[r] = { "cached": False, "age": f"incremental {scan_since}", "stale": False, "file_count": len(merged_files), "added_or_updated": len(fresh), } if use_cache: cache["remotes"][r] = { "scanned_at": datetime.now().astimezone().isoformat(), "recursive": True, "files": merged_files, "skipped": sorted(old_skipped), } if BASIC: sys.stderr.write("SCAN_PROGRESS " + json.dumps({ "remote": r, "label": source_label, "files": len(fresh), "files_total": len(out), "incremental": True, "file_count": len(merged_files), }) + "\n") sys.stderr.flush() return out # ---------- main ---------- def main(): ap = argparse.ArgumentParser(description="Report duplicate JAV files across rclone remotes (read-only).") ap.add_argument("--source", "-s", action="append", default=[], metavar="REMOTE", help="Source remote path (priority — wins dupes regardless of size). Repeatable.") ap.add_argument("--target", "-t", action="append", default=[], metavar="REMOTE", help="Target remote path (non-priority — largest size wins among targets). Repeatable.") ap.add_argument("--format", choices=["console", "txt", "csv", "json", "all"], default="console") ap.add_argument("--output-dir", default="./reports", help="Where to write txt/csv/json.") ap.add_argument("--no-color", action="store_true") ap.add_argument("--rclone-bin", default="rclone", help="Path to rclone executable (default: 'rclone' on PATH).") ap.add_argument("--search", action="append", default=[], metavar="ID", help="Search mode: look up a JAV ID (e.g. SSIS-001). Repeatable. " "If no --source/--target given, default target is used.") ap.add_argument("--name", action="append", default=[], metavar="STR", help="Substring/glob search against filename. Case-insensitive. " "Repeatable; OR semantics (any token match = hit). " "Supports * and ? wildcards. Use quotes for spaces.") ap.add_argument("--update", "-u", action="store_true", help="Search mode: force re-scan and overwrite cache for requested remotes.") ap.add_argument("--no-cache", action="store_true", help="Search mode: bypass cache entirely (no read, no write).") ap.add_argument("--quick", "-q", action="store_true", help="Force quick mode: skip cache, query rclone directly with --include glob. " "Default is auto: single exact IDs use quick, wildcards/ranges/multi use cached.") ap.add_argument("--cache", action="store_true", help="Force cached mode (opposite of --quick).") ap.add_argument("--save", action="store_true", help="Persist the --source / --target / --catalog values you passed " "as new defaults in config.json next to the script. " "Only keys you explicitly passed are saved.") ap.add_argument("--scan", action="store_true", help="Walk configured remotes, refresh cache, exit. No search/dupe output. " "Default scope: DEFAULT_TARGET. Override with --source/--target. " "Always overwrites cache. Suitable for Task Scheduler / cron.") ap.add_argument("--scan-since", metavar="DURATION", help="Incremental scan: only walk files modified within DURATION " "(e.g. 24h, 7d, 30m, 90s). Merges new/changed entries on top of " "the existing cache; old entries are preserved. Falls back to a " "full scan if there's no prior cache for a remote. Requires --scan.") ap.add_argument("--catalog", action="append", default=[], metavar="PATH", help="Path to a WinCatalog CSV or XML export. Repeatable. " "Listed under 'Catalog' in results (informational, never KEEP/DELETE?).") ap.add_argument("--part-pattern", action="append", default=[], metavar="REGEX", help="Extra multipart filename regex. Repeatable; first capture group must be the part number. " "Patterns run against the filename stem after built-in part detectors.") ap.add_argument("--library-issues", action="store_true", help="Report non-canonical filenames (bracket-wrapped IDs, no-hyphen IDs). " "Reads from cache. Outputs JSON when --format json, plain otherwise.") ap.add_argument("--rename-file", action="store_true", help="Rename one file in a remote and patch cache. " "Requires --remote, --old-path, --new-path. Outputs JSON.") ap.add_argument("--rename-files-batch", action="store_true", help="Rename multiple files in one call, writing cache once. " "Reads JSON array of {remote, old_path, new_path} from stdin. Outputs JSON.") ap.add_argument("--remote", metavar="REMOTE", help="Remote path root for --rename-file (e.g. cq:JAV).") ap.add_argument("--old-path", metavar="PATH", help="Relative path of the file to rename (within --remote).") ap.add_argument("--new-path", metavar="PATH", help="New relative path after rename (within --remote).") ap.add_argument("--reextract", action="store_true", help="Walk cache.json and recompute jav_id on every file entry " "using the current ID extraction rules. No rclone calls — " "fast path for picking up rule changes without re-scanning. " "Outputs JSON when --format json, plain otherwise.") ap.add_argument("--basic", action="store_true", help="Plain text output, no rich tables/panels/progress bars. " "Useful for piping or low-bandwidth terminals.") ap.add_argument("--clearjav", action="store_true", help="Shortcut: use DEFAULT_SOURCE as --source and DEFAULT_TARGET as --target, " "Equivalent to " "`--source cq:personal-files/ClearJAV --target cq:personal-files/JAV/TMP`.") args = ap.parse_args() global console, BASIC, DEFAULT_SOURCE, DEFAULT_TARGET, DEFAULT_CATALOG _set_rclone_bin(args.rclone_bin) BASIC = args.basic or args.format == "json" _set_output_basic(BASIC) # Apply persisted config overrides BEFORE defaults are consulted. cfg = load_config() if "default_source" in cfg: DEFAULT_SOURCE = list(cfg["default_source"]) if "default_target" in cfg: DEFAULT_TARGET = list(cfg["default_target"]) if "default_catalog" in cfg: DEFAULT_CATALOG = list(cfg["default_catalog"]) set_keep_ranking(cfg.get("keep_ranking") or {}) part_patterns = list(cfg.get("part_patterns") or []) + list(args.part_pattern) pattern_errors = configure_part_patterns(part_patterns) if pattern_errors: for err in pattern_errors: console.print(f"[red]invalid part pattern:[/] {err}") sys.exit(2) # --save: persist explicitly-passed values, exit. if args.save: if not (args.source or args.target or args.catalog or args.part_pattern): console.print("[red]--save needs at least one --source/--target/--catalog/--part-pattern value to persist.[/]") sys.exit(2) new_cfg = dict(cfg) if args.source: new_cfg["default_source"] = list(args.source) if args.target: new_cfg["default_target"] = list(args.target) if args.catalog: new_cfg["default_catalog"] = list(args.catalog) if args.part_pattern: new_cfg["part_patterns"] = list(args.part_pattern) save_config(new_cfg) console.print(f"[green]Saved to {CONFIG_PATH}:[/]") for k in ("default_source", "default_target", "default_catalog", "part_patterns"): if k in new_cfg: console.print(f" {k} = {new_cfg[k]}") sys.exit(0) _set_output_use_ansi(not args.no_color) if args.no_color or BASIC: console = Console(no_color=True, color_system=None, highlight=False) _set_output_no_color() # Search mode: defaults kick in if no remotes specified. if args.clearjav: if not args.source: args.source = list(DEFAULT_SOURCE) if not args.target: args.target = list(DEFAULT_TARGET) if args.search and not args.source and not args.target: args.target = list(DEFAULT_TARGET) # --scan: default to DEFAULT_TARGET only, always overwrite cache. if args.scan: if not args.source and not args.target: args.target = list(DEFAULT_TARGET) args.update = True # Use default catalog(s) if user passed none. if not args.catalog and DEFAULT_CATALOG: args.catalog = list(DEFAULT_CATALOG) # --library-issues: read-only cache scan for non-canonical filenames. # --reextract: rebuild jav_id values from current rules without re-scanning. if args.reextract: from rcjav.ids import current_rules_signature from rcjav.cache import stamp_current_rules sig = current_rules_signature() cache = load_cache(sig) changed = 0 unchanged = 0 dropped = 0 per_remote = [] for remote, entry in (cache.get("remotes") or {}).items(): r_changed = 0 r_unchanged = 0 r_dropped = 0 files = entry.get("files") or [] for f in files: old_id = f.get("jav_id") or "" new_id = extract_id(Path(f.get("path", "")).name) if new_id is None: if old_id: f["jav_id"] = "" r_dropped += 1 continue if new_id != old_id: f["jav_id"] = new_id r_changed += 1 else: r_unchanged += 1 changed += r_changed unchanged += r_unchanged dropped += r_dropped per_remote.append({ "remote": remote, "changed": r_changed, "unchanged": r_unchanged, "dropped": r_dropped, "files": len(files), }) stamp_current_rules(cache, sig) save_cache(cache) summary = { "ok": True, "changed": changed, "unchanged": unchanged, "dropped": dropped, "total": changed + unchanged + dropped, "id_rules_signature": sig, "remotes": per_remote, } if args.format == "json" or BASIC: print(json.dumps(summary)) else: console.print(Panel( f"[bold]Re-extracted IDs against current rules[/]\n" f" changed: [yellow]{changed:,}[/]\n" f" unchanged: [dim]{unchanged:,}[/]\n" f" dropped: [red]{dropped:,}[/]\n" f" total: {summary['total']:,}", title="Re-extract", border_style="green")) if per_remote: from rich.table import Table as _Tbl t = _Tbl(title="Per-remote", show_lines=False) t.add_column("Remote", style="cyan") t.add_column("Changed", justify="right", style="yellow") t.add_column("Unchanged", justify="right", style="dim") t.add_column("Dropped", justify="right", style="red") t.add_column("Files", justify="right") for r in per_remote: t.add_row(r["remote"], f"{r['changed']:,}", f"{r['unchanged']:,}", f"{r['dropped']:,}", f"{r['files']:,}") console.print(t) sys.exit(0) if args.library_issues: cache = load_cache() issues = find_library_issues(cache) if args.format == "json" or BASIC: print(json.dumps({"ok": True, **issues})) else: bracket = issues["bracket_names"] nohyphen = issues["nohyphen_names"] total = len(bracket) + len(nohyphen) if not total: console.print(Panel("[bold green]No library issues found.[/]", title="Library Issues")) else: from rich.table import Table t = Table(title=f"Library Issues ({total} file(s))", show_lines=True) t.add_column("Issue", style="yellow", width=14) t.add_column("Current Name") t.add_column("Canonical Name", style="green") t.add_column("Remote", style="dim") for e in bracket: t.add_row("bracket ID", Path(e["path"]).name, e["canonical_name"], e["remote"]) for e in nohyphen: t.add_row("no hyphen", Path(e["path"]).name, e["canonical_name"], e["remote"]) console.print(t) sys.exit(0) # --rename-files-batch: rename multiple files, single cache write. if args.rename_files_batch: try: renames = json.loads(sys.stdin.read()) except json.JSONDecodeError as e: print(json.dumps({"ok": False, "error": f"Invalid JSON on stdin: {e}"})) sys.exit(1) if not isinstance(renames, list): print(json.dumps({"ok": False, "error": "stdin must be a JSON array"})) sys.exit(1) cache = load_cache() results = rename_files_batch(renames, cache, rclone_bin=RCLONE_BIN) ok = any(r["ok"] for r in results) print(json.dumps({"ok": ok, "results": results})) sys.exit(0 if ok else 1) # --rename-file: rename one file in a remote and patch cache. if args.rename_file: if not args.remote or not args.old_path or not args.new_path: ap.error("--rename-file requires --remote, --old-path, and --new-path.") cache = load_cache() result = rename_file_in_remote( args.remote, args.old_path, args.new_path, cache, rclone_bin=RCLONE_BIN ) print(json.dumps(result)) sys.exit(0 if result["ok"] else 1) if not args.source and not args.target and not args.catalog: ap.error("Provide at least one --source, --target, or --catalog.") # Scan-only mode: walk remotes, write cache, summary, exit. if args.scan: scan_since = None if args.scan_since: scan_since = parse_duration(args.scan_since) if not scan_since: console.print(f"[red]invalid --scan-since value: {args.scan_since!r} " f"(expected e.g. 24h, 7d, 30m, 90s)[/]") sys.exit(2) from rcjav.ids import current_rules_signature from rcjav.cache import stamp_current_rules _scan_sig = current_rules_signature() cache = load_cache(_scan_sig) cache_meta: dict[str, dict] = {} skipped: list[tuple[str, str]] = [] t0 = time.perf_counter() if BASIC: # `--scan` resolves its default target above. Report only the # remotes that this scan will actually walk; falling back here to # DEFAULT_SOURCE would resurrect retired source roots in job UI. _all_remotes = list(args.source) + list(args.target) sys.stderr.write("SCAN_START " + json.dumps({ "remotes": _all_remotes, "total": len(_all_remotes), }) + "\n") sys.stderr.flush() entries = (cached_collect(args.source, "Source", skipped, cache, use_cache=not args.no_cache, force_update=True, cache_meta=cache_meta, scan_since=scan_since) + cached_collect(args.target, "Target", skipped, cache, use_cache=not args.no_cache, force_update=True, cache_meta=cache_meta, scan_since=scan_since)) if not args.no_cache: # Stamp current rules only on a FULL scan. An incremental # (--scan-since) only re-walked some files; older files in the # cache may still have jav_ids from the previous rule set, so the # cache remains "stale by rules" until a full scan or --reextract. if not scan_since: stamp_current_rules(cache, _scan_sig) save_cache(cache) elapsed = time.perf_counter() - t0 if BASIC: sys.stderr.write(f"Scan complete: {len(entries)} files in {elapsed:.2f}s\n") sys.stderr.write(f"Cache: {CACHE_PATH}\n" if not args.no_cache else "Cache: (skipped, --no-cache)\n") else: console.print(f"[bold green]Scan complete:[/] {len(entries)} files in {elapsed:.2f}s") if not args.no_cache: console.print(f"[dim]Cache: {CACHE_PATH}[/]") else: console.print("[dim]Cache: (skipped, --no-cache)[/]") sys.exit(0) skipped: list[tuple[str, str]] = [] t0 = time.perf_counter() if args.search or args.name: search_timings: dict[str, int] = {} # If --name was passed without explicit remotes, fall back to default target # (catalog default already injected earlier; don't let it suppress remote defaulting). if args.name and not args.search and not args.source and not args.target: args.target = list(DEFAULT_TARGET) # Substring name search can't be server-side filtered on most backends — cache wins. # Only the ID search shape benefits from quick (server-side prefix glob). if args.name and not args.quick: mode, reason = "cached", "name substring search — cache is faster than rclone --include" else: combined = list(args.search) + list(args.name) mode, reason = choose_search_mode(combined, args.quick, args.cache) if BASIC: sys.stderr.write(f"Mode: {mode} ({reason})\n") else: mode_color = "green" if mode == "quick" else "cyan" console.print(f"[{mode_color}]Mode: {mode}[/] [dim]({reason})[/]") phase_t0 = time.perf_counter() cache = load_cache() search_timings["cache_load_ms"] = round((time.perf_counter() - phase_t0) * 1000) use_cache = not args.no_cache and mode == "cached" cache_meta: dict[str, dict] = {} phase_t0 = time.perf_counter() if mode == "quick": all_patterns: list[str] = [] for raw in args.search: all_patterns.extend(query_to_include_patterns(raw)) all_patterns.extend(name_to_include_patterns(args.name)) entries = [] for r in args.source: cache_meta[r] = {"cached": False, "age": "quick", "stale": False, "file_count": 0} got = quick_search_remote(r, "Source", all_patterns, skipped) entries.extend(got) cache_meta[r]["file_count"] = len(got) for r in args.target: cache_meta[r] = {"cached": False, "age": "quick", "stale": False, "file_count": 0} got = quick_search_remote(r, "Target", all_patterns, skipped) entries.extend(got) cache_meta[r]["file_count"] = len(got) else: entries = (cached_collect(args.source, "Source", skipped, cache, use_cache, args.update, cache_meta) + cached_collect(args.target, "Target", skipped, cache, use_cache, args.update, cache_meta)) search_timings["entry_collect_ms"] = round((time.perf_counter() - phase_t0) * 1000) # Load each catalog separately so cache_meta gets the per-catalog count # (was global total — every catalog reported the sum across all). catalog_entries: list[FileEntry] = [] phase_t0 = time.perf_counter() for cp_str in args.catalog: for cp in _expand_catalog_paths([cp_str], default_paths=DEFAULT_CATALOG): ext = cp.suffix.lower() if ext == ".csv": one = load_catalog_csv(cp, skipped) elif ext == ".xml": one = load_catalog_xml(cp, skipped) else: console.print(f"[yellow]WARN: unknown catalog format '{ext}' for {cp}; skipping.[/]") continue catalog_entries.extend(one) cache_meta[f"catalog:{cp.name}"] = { "cached": False, "age": "loaded", "stale": False, "file_count": len(one), } entries.extend(catalog_entries) search_timings["catalog_load_ms"] = round((time.perf_counter() - phase_t0) * 1000) if use_cache and args.update: save_cache(cache) else: if args.cache and not args.no_cache: cache = load_cache() cache_meta: dict[str, dict] = {} entries = (cached_collect(args.source, "Source", skipped, cache, use_cache=True, force_update=False, cache_meta=cache_meta) + cached_collect(args.target, "Target", skipped, cache, use_cache=True, force_update=False, cache_meta=cache_meta)) else: remotes_by_label = ([("Source", r) for r in args.source] + [("Target", r) for r in args.target]) entries = collect_with_progress(remotes_by_label, skipped) entries.extend(load_catalogs(args.catalog, skipped, default_paths=DEFAULT_CATALOG)) elapsed = time.perf_counter() - t0 if BASIC: sys.stderr.write(f"Scanned/loaded {len(entries)} file(s) in {elapsed:.2f}s\n") else: console.print(f"[dim]Scanned/loaded {len(entries)} file(s) in {elapsed:.2f}s[/]") if args.search or args.name: # query_expansions: original_raw -> list of normalized IDs / wildcard patterns to look up query_expansions: dict[str, list[str]] = {} queries: list[str] = [] for raw in args.search: if RANGE_RE.search(raw): expanded = expand_range(raw) or [] normed: list[str] = [] for r in expanded: n = normalize_id(r) if n: normed.append(n) if not normed: console.print(f"[yellow]WARN: range '{raw}' produced no valid IDs.[/]") continue queries.append(raw) query_expansions[raw] = normed continue if "*" in raw or "?" in raw: q = raw.upper() queries.append(q) query_expansions[q] = [q] continue norm = normalize_id(raw) if not norm: console.print(f"[yellow]WARN: cannot parse '{raw}' as a JAV ID, skipping.[/]") continue # Use the raw (upper-cased) form for display so leading zeros are preserved # (e.g. user types PRTD-027 — keep it, don't show PRTD-27). Lookup still uses # the normalized form internally. display = raw.upper() queries.append(display) query_expansions[display] = [norm] phase_t0 = time.perf_counter() index: dict[str, list[FileEntry]] = {} for e in entries: index.setdefault(e.jav_id, []).append(e) search_timings["index_ms"] = round((time.perf_counter() - phase_t0) * 1000) phase_t0 = time.perf_counter() matches: dict[str, list[FileEntry]] = {} match_traces: dict[str, dict[int, dict[str, str]]] = {} for q in queries: expansions = query_expansions.get(q, [q]) hits: list[FileEntry] = [] seen: set[int] = set() traces: dict[int, dict[str, str]] = {} def add_hit(entry: FileEntry, matched_query: str) -> None: key = id(entry) if key in seen: return seen.add(key) hits.append(entry) traces[key] = describe_id_match(q, matched_query, entry.jav_id, len(expansions)) for sub in expansions: if "*" in sub or "?" in sub: pat = sub if "#PART" in sub.upper() else sub + "*" for k, v in index.items(): if fnmatch.fnmatchcase(k, pat): for e in v: add_hit(e, sub) elif "#part" in sub: for e in index.get(sub, []): add_hit(e, sub) else: for e in index.get(sub, []): add_hit(e, sub) for k, v in index.items(): if k.startswith(sub + "#part"): for e in v: add_hit(e, sub) matches[q] = hits match_traces[q] = traces search_timings["match_ms"] = round((time.perf_counter() - phase_t0) * 1000) if args.format == "json": # Structured output for tools that consume search results (e.g. the rclonex # Brave extension). Includes everything needed to drive a UI: per-query hits # with source/remote/path/size/mod_time, plus name-match block + skipped. name_hits_json: list[FileEntry] = [] if args.name: for e in entries: if name_match(Path(e.path).stem, args.name): name_hits_json.append(e) out_obj = { "queries": [ { "query": q, "hits": [ {"source": e.source, "remote": e.remote, "path": e.path, "full_path": e.full_path, "size": e.size, "size_human": human_size(e.size), "mod_time": e.mod_time, "jav_id": e.jav_id, **match_traces.get(q, {}).get(id(e), {})} for e in sorted(matches.get(q, []), key=lambda x: (x.jav_id, x.path.lower())) ], } for q in queries ], "name_matches": [ {"source": e.source, "remote": e.remote, "path": e.path, "full_path": e.full_path, "size": e.size, "size_human": human_size(e.size), "mod_time": e.mod_time, "jav_id": e.jav_id, "match_kind": "name", "match_reason": "Filename search", "match_confidence": "broad", "matched_query": ", ".join(args.name), "matched_id": e.jav_id} for e in sorted(name_hits_json, key=lambda x: (x.jav_id, x.path.lower())) ], "name_tokens": list(args.name), "cache_meta": cache_meta, "skipped_count": len(skipped), "elapsed_sec": round(time.perf_counter() - t0, 3), "timings": search_timings, } print(json.dumps(out_obj)) id_ok = (not queries) or all(matches.values()) name_ok = (not args.name) or bool(name_hits_json) sys.exit(0 if (id_ok and name_ok) else 1) if queries: if BASIC: print(render_search_plain(matches, queries, cache_meta)) else: render_search(matches, queries, cache_meta) # --name results as a separate block name_hits: list[FileEntry] = [] if args.name: for e in entries: if name_match(Path(e.path).stem, args.name): name_hits.append(e) if BASIC: print(render_name_matches_plain(name_hits, args.name, cache_meta)) else: render_name_matches(name_hits, args.name, cache_meta) # Exit code: 0 if every search query had hits AND name-search (if used) returned hits. id_ok = (not queries) or all(matches.values()) name_ok = (not args.name) or bool(name_hits) sys.exit(0 if (id_ok and name_ok) else 1) dupes = find_dupes(entries) variant_alerts = find_variant_alerts(entries) if args.format == "json" and BASIC: print(json.dumps(dupes_to_obj(dupes, skipped, variant_alerts))) sys.exit(0) if BASIC: print(render_dupes_plain(dupes, skipped, variant_alerts)) else: render_dupes(dupes, skipped, variant_alerts) if args.format != "console": out_dir = Path(args.output_dir) out_dir.mkdir(parents=True, exist_ok=True) stamp = datetime.now().strftime("%Y%m%d-%H%M%S") targets = {"txt", "csv", "json"} if args.format == "all" else {args.format} if "txt" in targets: write_txt(out_dir / f"dupes-{stamp}.txt", dupes, skipped) if "csv" in targets: write_csv(out_dir / f"dupes-{stamp}.csv", dupes) if "json" in targets: write_json(out_dir / f"dupes-{stamp}.json", dupes, skipped, variant_alerts) console.print(f"[dim]Reports written to {out_dir}[/]") if __name__ == "__main__": try: main() except KeyboardInterrupt: console.print("\n[yellow]Aborted by user (Ctrl+C). Cache not written for in-flight scans.[/]") sys.exit(130)