From fb5700cdab71641d405b531de255f3389b85f958 Mon Sep 17 00:00:00 2001 From: admin Date: Fri, 22 May 2026 22:00:22 +0200 Subject: [PATCH] Step 10h: extract renderers + file outputs into rcjav/output.py --- rc-jav.py | 463 ++++--------------------------------------- rcjav/__init__.py | 32 +++ rcjav/output.py | 495 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 566 insertions(+), 424 deletions(-) create mode 100644 rcjav/output.py diff --git a/rc-jav.py b/rc-jav.py index 4b7c893..d9a30ed 100644 --- a/rc-jav.py +++ b/rc-jav.py @@ -64,15 +64,6 @@ def _current_part_res(): return _rcjav_ids.PART_RES -def human_size(n: int) -> str: - nf = float(max(0, n)) - for unit in ("B", "KiB", "MiB", "GiB", "TiB"): - if nf < 1024: - return f"{int(nf)} B" if unit == "B" else f"{nf:.2f} {unit}" - nf /= 1024 - return f"{nf:.2f} PiB" - - from rcjav.rclone_io import ( RCLONE_BIN, DURATION_RE, @@ -87,85 +78,51 @@ from rcjav.rclone_io import ( parse_duration, walk_remote, ) +from rcjav import output as _output +from rcjav.output import ( + human_size, + ansi, + ANSI_RESET, + ANSI_GREEN, + ANSI_RED, + ANSI_YELLOW, + ANSI_CYAN, + ANSI_DIM, + ANSI_BOLD, + strip_markup, + BasicProgress, + make_progress, + render_banner, + render_search, + render_name_matches, + render_name_matches_plain, + render_dupes, + render_banner_plain, + render_search_plain, + render_dupes_plain, + write_txt, + write_csv, + write_json, + describe_skipped_id, + dupes_to_obj, + set_use_ansi as _set_output_use_ansi, + set_basic as _set_output_basic, + set_console_no_color as _set_output_no_color, +) + +# rc-jav.py keeps its own local rich Console for the prints that haven't +# moved to rcjav.output yet (collectors, main()). When --no-color is in +# play we rebind both this and rcjav.output's console. +console = Console() # Mirror of rcjav.rclone_io.BASIC for in-tree readers that haven't been # updated yet (output renderers, BasicProgress checks in main()). Set in # main() via both this name and _set_rclone_basic(). BASIC = False # set by --basic -USE_ANSI = True # disabled by --no-color - -# Pre-rich ANSI codes (used in --basic mode for color). -ANSI_RESET = "\033[0m" -ANSI_GREEN = "\033[32m" -ANSI_RED = "\033[31m" -ANSI_YELLOW = "\033[33m" -ANSI_CYAN = "\033[36m" -ANSI_DIM = "\033[2m" -ANSI_BOLD = "\033[1m" - - -def ansi(s: str, code: str) -> str: - return f"{code}{s}{ANSI_RESET}" if USE_ANSI else s console = Console() # replaced in main() if --no-color -_RICH_TAG_RE = re.compile(r"\[/?[^\]]*\]") - - -def strip_markup(s: str) -> str: - return _RICH_TAG_RE.sub("", s) - - -class BasicProgress: - """Minimal stand-in for rich.Progress used when --basic is set.""" - def __init__(self): - self._tasks: dict[int, dict] = {} - self._next = 0 - self._last_print: dict[int, int] = {} - - def __enter__(self): - return self - - def __exit__(self, *exc): - for tid, t in self._tasks.items(): - sys.stderr.write(f"{ansi('[done]', ANSI_GREEN)} {t['desc']} {t['done']}/{t['total']}\n") - return False - - def add_task(self, description: str, total: int = 1) -> int: - tid = self._next - self._next += 1 - desc = strip_markup(description) - self._tasks[tid] = {"desc": desc, "total": total, "done": 0} - self._last_print[tid] = 0 - sys.stderr.write(f"{ansi('[start]', ANSI_CYAN)} {desc}\n") - return tid - - def update(self, tid, total=None, description=None, **_): - t = self._tasks[tid] - if total is not None: - t["total"] = total - if description is not None: - t["desc"] = strip_markup(description) - - def advance(self, tid, n: int = 1): - t = self._tasks[tid] - t["done"] += n - # In-place refresh every 5 files (or every file if total small). - step = 5 if t["total"] > 50 else 1 - if t["done"] - self._last_print[tid] >= step or t["done"] == t["total"]: - counter = ansi(f"{t['done']}/{t['total']}", ANSI_CYAN) - line = f" {counter} {ansi(t['desc'], ANSI_DIM)}" - if sys.stderr.isatty(): - sys.stderr.write(f"\r\033[K{line}") - if t["done"] == t["total"]: - sys.stderr.write("\n") - sys.stderr.flush() - elif t["done"] == t["total"]: - # Non-TTY: only print final line, skip intermediate noise. - sys.stderr.write(line + "\n") - self._last_print[tid] = t["done"] - # Default remotes used when --search is invoked without explicit --source/--target. DEFAULT_SOURCE = ["cq:personal-files/ClearJAV"] DEFAULT_TARGET = ["cq:personal-files/JAV/TMP"] @@ -231,22 +188,6 @@ def save_config(cfg: dict) -> None: os.replace(tmp, CONFIG_PATH) -def make_progress(): - if BASIC: - return BasicProgress() - return Progress( - SpinnerColumn(), - TextColumn("{task.description}"), - BarColumn(), - MofNCompleteColumn(), - TimeElapsedColumn(), - TextColumn("eta"), - TimeRemainingColumn(), - console=console, - transient=False, - ) - - # ---------- collectors ---------- def collect_with_progress(remotes_by_label: list[tuple[str, str]], @@ -399,332 +340,6 @@ def cached_collect(remotes: list[str], source_label: str, return out -# ---------- renderers ---------- - -def render_banner(cache_meta: dict[str, dict], mode: str) -> Panel: - lines: list[Text] = [] - lines.append(Text.from_markup(f"[bold]mode:[/] {mode}")) - if cache_meta: - for r, m in cache_meta.items(): - if m["cached"]: - tag = f"CACHED {m['age']}" + (" STALE" if m["stale"] else "") - style = "yellow" if m["stale"] else "dim" - else: - tag = "FRESH SCAN" - style = "green" - lines.append(Text.from_markup( - f" [white]{r}[/] [{style}]{tag}[/] [dim]({m['file_count']} files)[/]" - )) - body = Text("\n").join(lines) - return Panel(body, title="rc-jav", title_align="left", border_style="blue") - - -def render_search(matches: dict[str, list[FileEntry]], queries: list[str], - cache_meta: dict[str, dict]) -> None: - console.print(render_banner(cache_meta, mode="search")) - for q in queries: - hits = matches.get(q, []) - if not hits: - console.print(f"[bold red][{q}] NOT FOUND[/]") - console.print() - continue - title = f"[bold green][{q}] {len(hits)} hit(s)[/]" - tbl = Table(title=title, title_justify="left", show_lines=False, - border_style="green", expand=True) - tbl.add_column("Source", style="yellow", no_wrap=True) - tbl.add_column("Cache", no_wrap=True) - tbl.add_column("File", style="bold", overflow="fold") - tbl.add_column("Size", justify="right", no_wrap=True) - tbl.add_column("Path", style="dim", overflow="fold") - for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())): - meta = cache_meta.get(e.remote, {}) - if meta.get("cached"): - cache_tag = "[yellow][CACHED-STALE][/]" if meta.get("stale") else "[dim][CACHED][/]" - else: - cache_tag = "[green][FRESH][/]" - tbl.add_row( - e.source, cache_tag, Path(e.path).name, - f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]", - e.full_path, - ) - console.print(tbl) - console.print() - - -def render_name_matches(hits: list[FileEntry], tokens: list[str], - cache_meta: dict[str, dict]) -> None: - title = f"[bold green]Name match {tokens} — {len(hits)} hit(s)[/]" - if not hits: - console.print(f"[bold red]Name match {tokens} — NOT FOUND[/]") - return - tbl = Table(title=title, title_justify="left", show_lines=False, - border_style="green", expand=True) - tbl.add_column("Source", style="yellow", no_wrap=True) - tbl.add_column("Cache", no_wrap=True) - tbl.add_column("ID", style="bold cyan", no_wrap=True) - tbl.add_column("File", style="bold", overflow="fold") - tbl.add_column("Size", justify="right", no_wrap=True) - tbl.add_column("Path", style="dim", overflow="fold") - for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())): - meta = cache_meta.get(e.remote, {}) - if meta.get("cached"): - cache_tag = "[yellow][CACHED-STALE][/]" if meta.get("stale") else "[dim][CACHED][/]" - else: - cache_tag = "[green][FRESH][/]" - tbl.add_row( - e.source, cache_tag, e.jav_id, Path(e.path).name, - f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]", - e.full_path, - ) - console.print(tbl) - console.print() - - -def render_name_matches_plain(hits: list[FileEntry], tokens: list[str], - cache_meta: dict[str, dict]) -> str: - lines: list[str] = [] - if not hits: - lines.append(ansi(f"Name match {tokens} — NOT FOUND", ANSI_RED)) - return "\n".join(lines) - lines.append(ansi(f"Name match {tokens} — {len(hits)} hit(s)", ANSI_GREEN + ANSI_BOLD)) - for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())): - meta = cache_meta.get(e.remote, {}) - if meta.get("cached"): - tag = ansi("[CACHED-STALE]", ANSI_YELLOW) if meta.get("stale") else ansi("[CACHED]", ANSI_DIM) - else: - tag = ansi("[FRESH]", ANSI_GREEN) - src = ansi(e.source, ANSI_YELLOW) - lines.append(f" {src} {tag} {ansi(e.jav_id, ANSI_CYAN)}") - lines.append(ansi(f" file: {Path(e.path).name}", ANSI_BOLD)) - lines.append(f" size: {human_size(e.size)} ({e.size:,} bytes)") - lines.append(ansi(f" path: {e.full_path}", ANSI_DIM)) - return "\n".join(lines) - - -def render_dupes(dupes: dict[str, list[FileEntry]], - skipped: list[tuple[str, str]], - variant_alerts: dict[str, list[FileEntry]] | None = None) -> None: - if not dupes: - console.print(Panel("[bold green]No duplicates found.[/]", - border_style="green")) - else: - console.print(f"[bold]Found {len(dupes)} duplicate ID group(s):[/]") - console.print() - total_reclaim = 0 - for jav_id in sorted(dupes): - entries = dupes[jav_id] - keep = decide_keep(entries) - tbl = Table(title=f"[bold][{jav_id}][/]", title_justify="left", - show_lines=False, border_style="magenta", expand=True) - tbl.add_column("Action", no_wrap=True) - tbl.add_column("Source", style="yellow", no_wrap=True) - tbl.add_column("Size", justify="right", no_wrap=True) - tbl.add_column("Path", overflow="fold") - for e in sorted(entries, key=lambda x: (x.source != "Source", x.source == "Catalog", -x.size)): - if e.source == "Catalog": - action = "[cyan]CATALOG[/]" - elif e is keep: - action = "[green]KEEP[/]" - else: - action = "[red]DELETE?[/]" - total_reclaim += e.size - tbl.add_row(action, e.source, - f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]", - e.full_path) - console.print(tbl) - console.print() - console.print(Panel( - f"[bold]Potential space reclaim if all DELETE? removed: " - f"[red]{human_size(total_reclaim)}[/][/]", - border_style="red")) - if skipped: - console.print() - tbl = Table(title=f"[dim]Skipped {len(skipped)} file(s) with no parseable ID[/]", - title_justify="left", show_lines=False, border_style="dim", expand=True) - tbl.add_column("Remote", style="dim", no_wrap=True) - tbl.add_column("Path", style="dim", overflow="fold") - for remote, path in skipped[:50]: - tbl.add_row(remote, path) - if len(skipped) > 50: - tbl.add_row("[dim]…[/]", f"[dim]+{len(skipped) - 50} more[/]") - console.print(tbl) - if variant_alerts: - console.print() - console.print(Panel( - f"[bold yellow]⚠ {len(variant_alerts)} variant alert(s) — manual review recommended[/]", - border_style="yellow")) - for bare_id, entries in sorted(variant_alerts.items()): - tbl = Table(title=f"[bold yellow][{bare_id}] — bare + variant coexist[/]", - title_justify="left", show_lines=False, border_style="yellow", expand=True) - tbl.add_column("ID", style="yellow", no_wrap=True) - tbl.add_column("Size", justify="right", no_wrap=True) - tbl.add_column("Path", overflow="fold") - for e in sorted(entries, key=lambda x: x.full_path): - eid = extract_id(Path(e.path).name) or e.jav_id - tbl.add_row(eid, human_size(e.size), e.full_path) - console.print(tbl) - console.print() - -# ---------- plain renderers (--basic) ---------- - -def render_banner_plain(cache_meta: dict[str, dict], mode: str) -> str: - lines = [ansi(f"=== rc-jav ({mode}) ===", ANSI_BOLD)] - for r, m in cache_meta.items(): - if m["cached"]: - tag = f"CACHED {m['age']}" + (" STALE" if m["stale"] else "") - tag_c = ansi(tag, ANSI_YELLOW if m["stale"] else ANSI_DIM) - else: - tag_c = ansi("FRESH SCAN", ANSI_GREEN) - count_str = ansi(f"({m['file_count']} files)", ANSI_DIM) - lines.append(f" {r} {tag_c} {count_str}") - return "\n".join(lines) - - -def render_search_plain(matches: dict[str, list[FileEntry]], queries: list[str], - cache_meta: dict[str, dict]) -> str: - lines: list[str] = [] - if cache_meta: - lines.append(render_banner_plain(cache_meta, "search")) - lines.append("") - for q in queries: - hits = matches.get(q, []) - if not hits: - lines.append(ansi(f"[{q}] NOT FOUND", ANSI_RED)) - lines.append("") - continue - lines.append(ansi(f"[{q}] {len(hits)} hit(s)", ANSI_GREEN + ANSI_BOLD)) - for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())): - meta = cache_meta.get(e.remote, {}) - if meta.get("cached"): - tag = ansi("[CACHED-STALE]", ANSI_YELLOW) if meta.get("stale") else ansi("[CACHED]", ANSI_DIM) - else: - tag = ansi("[FRESH]", ANSI_GREEN) - src = ansi(e.source, ANSI_YELLOW) - lines.append(f" {src} {tag}") - lines.append(ansi(f" file: {Path(e.path).name}", ANSI_BOLD)) - lines.append(f" size: {human_size(e.size)} ({e.size:,} bytes)") - lines.append(ansi(f" path: {e.full_path}", ANSI_DIM)) - lines.append("") - return "\n".join(lines) - - -# ---------- file outputs ---------- - -def render_dupes_plain(dupes, skipped, variant_alerts=None) -> str: - lines: list[str] = [] - if not dupes: - lines.append(ansi("No duplicates found.", ANSI_GREEN)) - else: - lines.append(ansi(f"Found {len(dupes)} duplicate ID group(s):", ANSI_BOLD)) - lines.append("") - total_reclaim = 0 - for jav_id in sorted(dupes): - entries = dupes[jav_id] - keep = decide_keep(entries) - lines.append(ansi(f"[{jav_id}]", ANSI_BOLD)) - for e in sorted(entries, key=lambda x: (x.source != "Source", x.source == "Catalog", -x.size)): - if e.source == "Catalog": - mark = ansi("CATALOG ", ANSI_CYAN) - elif e is keep: - mark = ansi("KEEP ", ANSI_GREEN) - else: - mark = ansi("DELETE? ", ANSI_RED) - total_reclaim += e.size - src = ansi(f"{e.source:>8}", ANSI_YELLOW) - size_str = f"{human_size(e.size)} ({e.size:,} B)" - lines.append(f" {mark} {src} {size_str:>26} {e.full_path}") - lines.append("") - lines.append(ansi(f"Potential space reclaim if all DELETE? removed: {human_size(total_reclaim)}", ANSI_BOLD)) - if skipped: - lines.append("") - lines.append(ansi(f"Skipped {len(skipped)} file(s) with no parseable ID:", ANSI_DIM)) - for remote, path in skipped[:50]: - lines.append(ansi(f" {remote} {path}", ANSI_DIM)) - if len(skipped) > 50: - lines.append(ansi(f" ... +{len(skipped) - 50} more", ANSI_DIM)) - if variant_alerts: - lines.append("") - lines.append(ansi(f"⚠ {len(variant_alerts)} variant alert(s) — manual review required:", ANSI_YELLOW + ANSI_BOLD)) - for bare_id, entries in sorted(variant_alerts.items()): - lines.append(ansi(f" [{bare_id}] bare + variant coexist", ANSI_YELLOW)) - for e in sorted(entries, key=lambda x: x.full_path): - eid = extract_id(Path(e.path).name) or e.jav_id - lines.append(f" {ansi(eid, ANSI_YELLOW)} {human_size(e.size):>10} {e.full_path}") - return "\n".join(lines) - - -def write_txt(path: Path, dupes, skipped): - path.write_text(render_dupes_plain(dupes, skipped), encoding="utf-8") - - -def write_csv(path: Path, dupes): - with path.open("w", newline="", encoding="utf-8") as f: - w = csv.writer(f) - w.writerow(["jav_id", "action", "source", "remote", "path", "full_path", - "size_bytes", "size_human", "mod_time"]) - for jav_id in sorted(dupes): - entries = dupes[jav_id] - keep = decide_keep(entries) - for e in entries: - if e.source == "Catalog": - action = "CATALOG" - elif e is keep: - action = "KEEP" - else: - action = "DELETE?" - w.writerow([jav_id, action, e.source, - e.remote, e.path, e.full_path, e.size, human_size(e.size), e.mod_time]) - - -def describe_skipped_id(remote: str, path: str) -> dict[str, str]: - """Explain a common reason a path did not yield an ID.""" - name = Path((path or "").replace("\\", "/")).name - reason = "No supported JAV ID at filename start" - hint = "Rename with a leading ID such as ABC-123 or add an ID normalizer/site-specific source." - if re.match(r"^\[[A-Za-z0-9-]+-\d+\]", name): - reason = "ID is wrapped in leading brackets" - hint = "Remove the leading brackets so the filename starts with the ID." - elif re.match(r"^[A-Za-z][A-Za-z0-9]+[\u2010-\u2015]\d+", name): - reason = "ID uses a non-ASCII dash" - hint = "Replace the separator with a normal hyphen." - elif re.match(r"^[A-Za-z][A-Za-z0-9]+\d+", name): - reason = "ID prefix and number have no hyphen" - hint = "Insert the ID hyphen, for example ABC-123." - return {"remote": remote, "path": path, "name": name, "reason": reason, "hint": hint} - - -def dupes_to_obj(dupes, skipped, variant_alerts=None) -> dict: - out = {"groups": {}, "skipped": [describe_skipped_id(r, p) for r, p in skipped], - "variant_alerts": []} - for jav_id in sorted(dupes): - entries = dupes[jav_id] - keep, keep_reason = decide_keep_with_reason(entries) - out["groups"][jav_id] = { - "keep": asdict(keep) | {"full_path": keep.full_path, "size_human": human_size(keep.size)}, - "keep_reason": keep_reason, - "risks": describe_dupe_risks(jav_id, entries), - "delete_candidates": [asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size)} - for e in entries - if e is not keep and e.source != "Catalog"], - "catalog": [asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size)} - for e in entries if e.source == "Catalog"], - } - for bare_id, entries in sorted((variant_alerts or {}).items()): - out["variant_alerts"].append({ - "bare_id": bare_id, - "files": [ - asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size), - "detected_id": extract_id(Path(e.path).name) or e.jav_id} - for e in sorted(entries, key=lambda x: x.full_path) - ], - }) - return out - - -def write_json(path: Path, dupes, skipped, variant_alerts=None): - path.write_text(json.dumps(dupes_to_obj(dupes, skipped, variant_alerts), indent=2), encoding="utf-8") - - # ---------- main ---------- def main(): @@ -801,7 +416,7 @@ def main(): global console, BASIC, DEFAULT_SOURCE, DEFAULT_TARGET, DEFAULT_CATALOG _set_rclone_bin(args.rclone_bin) BASIC = args.basic or args.format == "json" - _set_rclone_basic(BASIC) + _set_output_basic(BASIC) # Apply persisted config overrides BEFORE defaults are consulted. cfg = load_config() @@ -839,10 +454,10 @@ def main(): if k in new_cfg: console.print(f" {k} = {new_cfg[k]}") sys.exit(0) - global USE_ANSI - USE_ANSI = not args.no_color + _set_output_use_ansi(not args.no_color) if args.no_color or BASIC: console = Console(no_color=True, color_system=None, highlight=False) + _set_output_no_color() # Search mode: defaults kick in if no remotes specified. if args.clearjav: diff --git a/rcjav/__init__.py b/rcjav/__init__.py index 8a58c10..bd7d98c 100644 --- a/rcjav/__init__.py +++ b/rcjav/__init__.py @@ -6,6 +6,38 @@ find at the top level. Adding a new submodule does not change the public surface — only this file does. """ from rcjav.model import FileEntry # noqa: F401 +from rcjav.output import ( # noqa: F401 + USE_ANSI, + ANSI_RESET, + ANSI_GREEN, + ANSI_RED, + ANSI_YELLOW, + ANSI_CYAN, + ANSI_DIM, + ANSI_BOLD, + set_use_ansi, + set_basic, + ansi, + console, + set_console_no_color, + strip_markup, + human_size, + BasicProgress, + make_progress, + render_banner, + render_search, + render_name_matches, + render_name_matches_plain, + render_dupes, + render_banner_plain, + render_search_plain, + render_dupes_plain, + write_txt, + write_csv, + describe_skipped_id, + dupes_to_obj, + write_json, +) from rcjav.library import ( # noqa: F401 find_library_issues, rename_file_in_remote, diff --git a/rcjav/output.py b/rcjav/output.py new file mode 100644 index 0000000..d9fe5ad --- /dev/null +++ b/rcjav/output.py @@ -0,0 +1,495 @@ +"""All terminal rendering, plain-text formatting, and file outputs. + +Owns the singleton `console` (rich.Console) plus the ANSI constants +used in --basic mode. `BASIC` is mirrored from rcjav.rclone_io so +both modules answer the same question (the setter here proxies). +""" +from __future__ import annotations + +import csv +import json +import re +import sys +from dataclasses import asdict +from pathlib import Path + +from rich.console import Console +from rich.panel import Panel +from rich.progress import ( + BarColumn, + MofNCompleteColumn, + Progress, + SpinnerColumn, + TextColumn, + TimeElapsedColumn, + TimeRemainingColumn, +) +from rich.table import Table +from rich.text import Text + +from rcjav import rclone_io as _rclone_io +from rcjav.dupes import ( + decide_keep, + decide_keep_with_reason, + describe_dupe_risks, +) +from rcjav.ids import extract_id +from rcjav.model import FileEntry + + +# ---------- ANSI / plain-mode toggles ---------- + +USE_ANSI = True # disabled by --no-color + +ANSI_RESET = "\033[0m" +ANSI_GREEN = "\033[32m" +ANSI_RED = "\033[31m" +ANSI_YELLOW = "\033[33m" +ANSI_CYAN = "\033[36m" +ANSI_DIM = "\033[2m" +ANSI_BOLD = "\033[1m" + + +def set_use_ansi(value: bool) -> None: + global USE_ANSI + USE_ANSI = bool(value) + + +def ansi(s: str, code: str) -> str: + return f"{code}{s}{ANSI_RESET}" if USE_ANSI else s + + +# Singleton rich console. Replaced in set_console_no_color() when --no-color +# is passed (rich respects no_color=True everywhere). +console = Console() + + +def set_console_no_color() -> None: + global console + console = Console(no_color=True) + + +_RICH_TAG_RE = re.compile(r"\[/?[^\]]*\]") + + +def strip_markup(s: str) -> str: + return _RICH_TAG_RE.sub("", s) + + +# ---------- --basic mode flag (mirrored with rcjav.rclone_io) ---------- + +# Read dynamically as _rclone_io.BASIC so a single set_basic() call updates +# both this module's renderers and walk_remote's progress emission. + +def set_basic(value: bool) -> None: + """Toggle --basic mode for both renderers and rclone progress.""" + _rclone_io.set_basic(value) + + +def _basic() -> bool: + return _rclone_io.BASIC + + +# ---------- size formatting ---------- + +def human_size(n: int) -> str: + nf = float(max(0, n)) + for unit in ("B", "KiB", "MiB", "GiB", "TiB"): + if nf < 1024: + return f"{int(nf)} B" if unit == "B" else f"{nf:.2f} {unit}" + nf /= 1024 + return f"{nf:.2f} PiB" + + +# ---------- progress UI ---------- + +class BasicProgress: + """Minimal stand-in for rich.Progress used when --basic is set.""" + def __init__(self): + self._tasks: dict[int, dict] = {} + self._next = 0 + self._last_print: dict[int, int] = {} + + def __enter__(self): + return self + + def __exit__(self, *exc): + for tid, t in self._tasks.items(): + sys.stderr.write(f"{ansi('[done]', ANSI_GREEN)} {t['desc']} {t['done']}/{t['total']}\n") + return False + + def add_task(self, description: str, total: int = 1) -> int: + tid = self._next + self._next += 1 + desc = strip_markup(description) + self._tasks[tid] = {"desc": desc, "total": total, "done": 0} + self._last_print[tid] = 0 + sys.stderr.write(f"{ansi('[start]', ANSI_CYAN)} {desc}\n") + return tid + + def update(self, tid, total=None, description=None, **_): + t = self._tasks[tid] + if total is not None: + t["total"] = total + if description is not None: + t["desc"] = strip_markup(description) + + def advance(self, tid, n: int = 1): + t = self._tasks[tid] + t["done"] += n + # In-place refresh every 5 files (or every file if total small). + step = 5 if t["total"] > 50 else 1 + if t["done"] - self._last_print[tid] >= step or t["done"] == t["total"]: + counter = ansi(f"{t['done']}/{t['total']}", ANSI_CYAN) + line = f" {counter} {ansi(t['desc'], ANSI_DIM)}" + if sys.stderr.isatty(): + sys.stderr.write(f"\r\033[K{line}") + if t["done"] == t["total"]: + sys.stderr.write("\n") + sys.stderr.flush() + elif t["done"] == t["total"]: + sys.stderr.write(line + "\n") + self._last_print[tid] = t["done"] + + +def make_progress(): + if _basic(): + return BasicProgress() + return Progress( + SpinnerColumn(), + TextColumn("{task.description}"), + BarColumn(), + MofNCompleteColumn(), + TimeElapsedColumn(), + TextColumn("eta"), + TimeRemainingColumn(), + console=console, + transient=False, + ) + + +# ---------- rich renderers ---------- + +def render_banner(cache_meta: dict[str, dict], mode: str) -> Panel: + lines: list[Text] = [] + lines.append(Text.from_markup(f"[bold]mode:[/] {mode}")) + if cache_meta: + for r, m in cache_meta.items(): + if m["cached"]: + tag = f"CACHED {m['age']}" + (" STALE" if m["stale"] else "") + style = "yellow" if m["stale"] else "dim" + else: + tag = "FRESH SCAN" + style = "green" + lines.append(Text.from_markup( + f" [white]{r}[/] [{style}]{tag}[/] [dim]({m['file_count']} files)[/]" + )) + body = Text("\n").join(lines) + return Panel(body, title="rc-jav", title_align="left", border_style="blue") + + +def render_search(matches: dict[str, list[FileEntry]], queries: list[str], + cache_meta: dict[str, dict]) -> None: + console.print(render_banner(cache_meta, mode="search")) + for q in queries: + hits = matches.get(q, []) + if not hits: + console.print(f"[bold red][{q}] NOT FOUND[/]") + console.print() + continue + title = f"[bold green][{q}] {len(hits)} hit(s)[/]" + tbl = Table(title=title, title_justify="left", show_lines=False, + border_style="green", expand=True) + tbl.add_column("Source", style="yellow", no_wrap=True) + tbl.add_column("Cache", no_wrap=True) + tbl.add_column("File", style="bold", overflow="fold") + tbl.add_column("Size", justify="right", no_wrap=True) + tbl.add_column("Path", style="dim", overflow="fold") + for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())): + meta = cache_meta.get(e.remote, {}) + if meta.get("cached"): + cache_tag = "[yellow][CACHED-STALE][/]" if meta.get("stale") else "[dim][CACHED][/]" + else: + cache_tag = "[green][FRESH][/]" + tbl.add_row( + e.source, cache_tag, Path(e.path).name, + f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]", + e.full_path, + ) + console.print(tbl) + console.print() + + +def render_name_matches(hits: list[FileEntry], tokens: list[str], + cache_meta: dict[str, dict]) -> None: + title = f"[bold green]Name match {tokens} — {len(hits)} hit(s)[/]" + if not hits: + console.print(f"[bold red]Name match {tokens} — NOT FOUND[/]") + return + tbl = Table(title=title, title_justify="left", show_lines=False, + border_style="green", expand=True) + tbl.add_column("Source", style="yellow", no_wrap=True) + tbl.add_column("Cache", no_wrap=True) + tbl.add_column("ID", style="bold cyan", no_wrap=True) + tbl.add_column("File", style="bold", overflow="fold") + tbl.add_column("Size", justify="right", no_wrap=True) + tbl.add_column("Path", style="dim", overflow="fold") + for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())): + meta = cache_meta.get(e.remote, {}) + if meta.get("cached"): + cache_tag = "[yellow][CACHED-STALE][/]" if meta.get("stale") else "[dim][CACHED][/]" + else: + cache_tag = "[green][FRESH][/]" + tbl.add_row( + e.source, cache_tag, e.jav_id, Path(e.path).name, + f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]", + e.full_path, + ) + console.print(tbl) + console.print() + + +def render_name_matches_plain(hits: list[FileEntry], tokens: list[str], + cache_meta: dict[str, dict]) -> str: + lines: list[str] = [] + if not hits: + lines.append(ansi(f"Name match {tokens} — NOT FOUND", ANSI_RED)) + return "\n".join(lines) + lines.append(ansi(f"Name match {tokens} — {len(hits)} hit(s)", ANSI_GREEN + ANSI_BOLD)) + for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())): + meta = cache_meta.get(e.remote, {}) + if meta.get("cached"): + tag = ansi("[CACHED-STALE]", ANSI_YELLOW) if meta.get("stale") else ansi("[CACHED]", ANSI_DIM) + else: + tag = ansi("[FRESH]", ANSI_GREEN) + src = ansi(e.source, ANSI_YELLOW) + lines.append(f" {src} {tag} {ansi(e.jav_id, ANSI_CYAN)}") + lines.append(ansi(f" file: {Path(e.path).name}", ANSI_BOLD)) + lines.append(f" size: {human_size(e.size)} ({e.size:,} bytes)") + lines.append(ansi(f" path: {e.full_path}", ANSI_DIM)) + return "\n".join(lines) + + +def render_dupes(dupes: dict[str, list[FileEntry]], + skipped: list[tuple[str, str]], + variant_alerts: dict[str, list[FileEntry]] | None = None) -> None: + if not dupes: + console.print(Panel("[bold green]No duplicates found.[/]", + border_style="green")) + else: + console.print(f"[bold]Found {len(dupes)} duplicate ID group(s):[/]") + console.print() + total_reclaim = 0 + for jav_id in sorted(dupes): + entries = dupes[jav_id] + keep = decide_keep(entries) + tbl = Table(title=f"[bold][{jav_id}][/]", title_justify="left", + show_lines=False, border_style="magenta", expand=True) + tbl.add_column("Action", no_wrap=True) + tbl.add_column("Source", style="yellow", no_wrap=True) + tbl.add_column("Size", justify="right", no_wrap=True) + tbl.add_column("Path", overflow="fold") + for e in sorted(entries, key=lambda x: (x.source != "Source", x.source == "Catalog", -x.size)): + if e.source == "Catalog": + action = "[cyan]CATALOG[/]" + elif e is keep: + action = "[green]KEEP[/]" + else: + action = "[red]DELETE?[/]" + total_reclaim += e.size + tbl.add_row(action, e.source, + f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]", + e.full_path) + console.print(tbl) + console.print() + console.print(Panel( + f"[bold]Potential space reclaim if all DELETE? removed: " + f"[red]{human_size(total_reclaim)}[/][/]", + border_style="red")) + if skipped: + console.print() + tbl = Table(title=f"[dim]Skipped {len(skipped)} file(s) with no parseable ID[/]", + title_justify="left", show_lines=False, border_style="dim", expand=True) + tbl.add_column("Remote", style="dim", no_wrap=True) + tbl.add_column("Path", style="dim", overflow="fold") + for remote, path in skipped[:50]: + tbl.add_row(remote, path) + if len(skipped) > 50: + tbl.add_row("[dim]…[/]", f"[dim]+{len(skipped) - 50} more[/]") + console.print(tbl) + if variant_alerts: + console.print() + console.print(Panel( + f"[bold yellow]⚠ {len(variant_alerts)} variant alert(s) — manual review recommended[/]", + border_style="yellow")) + for bare_id, entries in sorted(variant_alerts.items()): + tbl = Table(title=f"[bold yellow][{bare_id}] — bare + variant coexist[/]", + title_justify="left", show_lines=False, border_style="yellow", expand=True) + tbl.add_column("ID", style="yellow", no_wrap=True) + tbl.add_column("Size", justify="right", no_wrap=True) + tbl.add_column("Path", overflow="fold") + for e in sorted(entries, key=lambda x: x.full_path): + eid = extract_id(Path(e.path).name) or e.jav_id + tbl.add_row(eid, human_size(e.size), e.full_path) + console.print(tbl) + console.print() + + +# ---------- plain renderers (--basic) ---------- + +def render_banner_plain(cache_meta: dict[str, dict], mode: str) -> str: + lines = [ansi(f"=== rc-jav ({mode}) ===", ANSI_BOLD)] + for r, m in cache_meta.items(): + if m["cached"]: + tag = f"CACHED {m['age']}" + (" STALE" if m["stale"] else "") + tag_c = ansi(tag, ANSI_YELLOW if m["stale"] else ANSI_DIM) + else: + tag_c = ansi("FRESH SCAN", ANSI_GREEN) + count_str = ansi(f"({m['file_count']} files)", ANSI_DIM) + lines.append(f" {r} {tag_c} {count_str}") + return "\n".join(lines) + + +def render_search_plain(matches: dict[str, list[FileEntry]], queries: list[str], + cache_meta: dict[str, dict]) -> str: + lines: list[str] = [] + if cache_meta: + lines.append(render_banner_plain(cache_meta, "search")) + lines.append("") + for q in queries: + hits = matches.get(q, []) + if not hits: + lines.append(ansi(f"[{q}] NOT FOUND", ANSI_RED)) + lines.append("") + continue + lines.append(ansi(f"[{q}] {len(hits)} hit(s)", ANSI_GREEN + ANSI_BOLD)) + for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())): + meta = cache_meta.get(e.remote, {}) + if meta.get("cached"): + tag = ansi("[CACHED-STALE]", ANSI_YELLOW) if meta.get("stale") else ansi("[CACHED]", ANSI_DIM) + else: + tag = ansi("[FRESH]", ANSI_GREEN) + src = ansi(e.source, ANSI_YELLOW) + lines.append(f" {src} {tag}") + lines.append(ansi(f" file: {Path(e.path).name}", ANSI_BOLD)) + lines.append(f" size: {human_size(e.size)} ({e.size:,} bytes)") + lines.append(ansi(f" path: {e.full_path}", ANSI_DIM)) + lines.append("") + return "\n".join(lines) + + +def render_dupes_plain(dupes, skipped, variant_alerts=None) -> str: + lines: list[str] = [] + if not dupes: + lines.append(ansi("No duplicates found.", ANSI_GREEN)) + else: + lines.append(ansi(f"Found {len(dupes)} duplicate ID group(s):", ANSI_BOLD)) + lines.append("") + total_reclaim = 0 + for jav_id in sorted(dupes): + entries = dupes[jav_id] + keep = decide_keep(entries) + lines.append(ansi(f"[{jav_id}]", ANSI_BOLD)) + for e in sorted(entries, key=lambda x: (x.source != "Source", x.source == "Catalog", -x.size)): + if e.source == "Catalog": + mark = ansi("CATALOG ", ANSI_CYAN) + elif e is keep: + mark = ansi("KEEP ", ANSI_GREEN) + else: + mark = ansi("DELETE? ", ANSI_RED) + total_reclaim += e.size + src = ansi(f"{e.source:>8}", ANSI_YELLOW) + size_str = f"{human_size(e.size)} ({e.size:,} B)" + lines.append(f" {mark} {src} {size_str:>26} {e.full_path}") + lines.append("") + lines.append(ansi(f"Potential space reclaim if all DELETE? removed: {human_size(total_reclaim)}", ANSI_BOLD)) + if skipped: + lines.append("") + lines.append(ansi(f"Skipped {len(skipped)} file(s) with no parseable ID:", ANSI_DIM)) + for remote, path in skipped[:50]: + lines.append(ansi(f" {remote} {path}", ANSI_DIM)) + if len(skipped) > 50: + lines.append(ansi(f" ... +{len(skipped) - 50} more", ANSI_DIM)) + if variant_alerts: + lines.append("") + lines.append(ansi(f"⚠ {len(variant_alerts)} variant alert(s) — manual review required:", ANSI_YELLOW + ANSI_BOLD)) + for bare_id, entries in sorted(variant_alerts.items()): + lines.append(ansi(f" [{bare_id}] bare + variant coexist", ANSI_YELLOW)) + for e in sorted(entries, key=lambda x: x.full_path): + eid = extract_id(Path(e.path).name) or e.jav_id + lines.append(f" {ansi(eid, ANSI_YELLOW)} {human_size(e.size):>10} {e.full_path}") + return "\n".join(lines) + + +# ---------- file outputs ---------- + +def write_txt(path: Path, dupes, skipped): + path.write_text(render_dupes_plain(dupes, skipped), encoding="utf-8") + + +def write_csv(path: Path, dupes): + with path.open("w", newline="", encoding="utf-8") as f: + w = csv.writer(f) + w.writerow(["jav_id", "action", "source", "remote", "path", "full_path", + "size_bytes", "size_human", "mod_time"]) + for jav_id in sorted(dupes): + entries = dupes[jav_id] + keep = decide_keep(entries) + for e in entries: + if e.source == "Catalog": + action = "CATALOG" + elif e is keep: + action = "KEEP" + else: + action = "DELETE?" + w.writerow([jav_id, action, e.source, + e.remote, e.path, e.full_path, e.size, human_size(e.size), e.mod_time]) + + +def describe_skipped_id(remote: str, path: str) -> dict[str, str]: + """Explain a common reason a path did not yield an ID.""" + name = Path((path or "").replace("\\", "/")).name + reason = "No supported JAV ID at filename start" + hint = "Rename with a leading ID such as ABC-123 or add an ID normalizer/site-specific source." + if re.match(r"^\[[A-Za-z0-9-]+-\d+\]", name): + reason = "ID is wrapped in leading brackets" + hint = "Remove the leading brackets so the filename starts with the ID." + elif re.match(r"^[A-Za-z][A-Za-z0-9]+[‐-―]\d+", name): + reason = "ID uses a non-ASCII dash" + hint = "Replace the separator with a normal hyphen." + elif re.match(r"^[A-Za-z][A-Za-z0-9]+\d+", name): + reason = "ID prefix and number have no hyphen" + hint = "Insert the ID hyphen, for example ABC-123." + return {"remote": remote, "path": path, "name": name, "reason": reason, "hint": hint} + + +def dupes_to_obj(dupes, skipped, variant_alerts=None) -> dict: + out = {"groups": {}, "skipped": [describe_skipped_id(r, p) for r, p in skipped], + "variant_alerts": []} + for jav_id in sorted(dupes): + entries = dupes[jav_id] + keep, keep_reason = decide_keep_with_reason(entries) + out["groups"][jav_id] = { + "keep": asdict(keep) | {"full_path": keep.full_path, "size_human": human_size(keep.size)}, + "keep_reason": keep_reason, + "risks": describe_dupe_risks(jav_id, entries), + "delete_candidates": [asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size)} + for e in entries + if e is not keep and e.source != "Catalog"], + "catalog": [asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size)} + for e in entries if e.source == "Catalog"], + } + for bare_id, entries in sorted((variant_alerts or {}).items()): + out["variant_alerts"].append({ + "bare_id": bare_id, + "files": [ + asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size), + "detected_id": extract_id(Path(e.path).name) or e.jav_id} + for e in sorted(entries, key=lambda x: x.full_path) + ], + }) + return out + + +def write_json(path: Path, dupes, skipped, variant_alerts=None): + path.write_text(json.dumps(dupes_to_obj(dupes, skipped, variant_alerts), indent=2), encoding="utf-8")