Step 10h: extract renderers + file outputs into rcjav/output.py

This commit is contained in:
admin
2026-05-22 22:00:22 +02:00
parent 550482a7a2
commit fb5700cdab
3 changed files with 566 additions and 424 deletions
+39 -424
View File
@@ -64,15 +64,6 @@ def _current_part_res():
return _rcjav_ids.PART_RES return _rcjav_ids.PART_RES
def human_size(n: int) -> str:
nf = float(max(0, n))
for unit in ("B", "KiB", "MiB", "GiB", "TiB"):
if nf < 1024:
return f"{int(nf)} B" if unit == "B" else f"{nf:.2f} {unit}"
nf /= 1024
return f"{nf:.2f} PiB"
from rcjav.rclone_io import ( from rcjav.rclone_io import (
RCLONE_BIN, RCLONE_BIN,
DURATION_RE, DURATION_RE,
@@ -87,85 +78,51 @@ from rcjav.rclone_io import (
parse_duration, parse_duration,
walk_remote, walk_remote,
) )
from rcjav import output as _output
from rcjav.output import (
human_size,
ansi,
ANSI_RESET,
ANSI_GREEN,
ANSI_RED,
ANSI_YELLOW,
ANSI_CYAN,
ANSI_DIM,
ANSI_BOLD,
strip_markup,
BasicProgress,
make_progress,
render_banner,
render_search,
render_name_matches,
render_name_matches_plain,
render_dupes,
render_banner_plain,
render_search_plain,
render_dupes_plain,
write_txt,
write_csv,
write_json,
describe_skipped_id,
dupes_to_obj,
set_use_ansi as _set_output_use_ansi,
set_basic as _set_output_basic,
set_console_no_color as _set_output_no_color,
)
# rc-jav.py keeps its own local rich Console for the prints that haven't
# moved to rcjav.output yet (collectors, main()). When --no-color is in
# play we rebind both this and rcjav.output's console.
console = Console()
# Mirror of rcjav.rclone_io.BASIC for in-tree readers that haven't been # Mirror of rcjav.rclone_io.BASIC for in-tree readers that haven't been
# updated yet (output renderers, BasicProgress checks in main()). Set in # updated yet (output renderers, BasicProgress checks in main()). Set in
# main() via both this name and _set_rclone_basic(). # main() via both this name and _set_rclone_basic().
BASIC = False # set by --basic BASIC = False # set by --basic
USE_ANSI = True # disabled by --no-color
# Pre-rich ANSI codes (used in --basic mode for color).
ANSI_RESET = "\033[0m"
ANSI_GREEN = "\033[32m"
ANSI_RED = "\033[31m"
ANSI_YELLOW = "\033[33m"
ANSI_CYAN = "\033[36m"
ANSI_DIM = "\033[2m"
ANSI_BOLD = "\033[1m"
def ansi(s: str, code: str) -> str:
return f"{code}{s}{ANSI_RESET}" if USE_ANSI else s
console = Console() # replaced in main() if --no-color console = Console() # replaced in main() if --no-color
_RICH_TAG_RE = re.compile(r"\[/?[^\]]*\]")
def strip_markup(s: str) -> str:
return _RICH_TAG_RE.sub("", s)
class BasicProgress:
"""Minimal stand-in for rich.Progress used when --basic is set."""
def __init__(self):
self._tasks: dict[int, dict] = {}
self._next = 0
self._last_print: dict[int, int] = {}
def __enter__(self):
return self
def __exit__(self, *exc):
for tid, t in self._tasks.items():
sys.stderr.write(f"{ansi('[done]', ANSI_GREEN)} {t['desc']} {t['done']}/{t['total']}\n")
return False
def add_task(self, description: str, total: int = 1) -> int:
tid = self._next
self._next += 1
desc = strip_markup(description)
self._tasks[tid] = {"desc": desc, "total": total, "done": 0}
self._last_print[tid] = 0
sys.stderr.write(f"{ansi('[start]', ANSI_CYAN)} {desc}\n")
return tid
def update(self, tid, total=None, description=None, **_):
t = self._tasks[tid]
if total is not None:
t["total"] = total
if description is not None:
t["desc"] = strip_markup(description)
def advance(self, tid, n: int = 1):
t = self._tasks[tid]
t["done"] += n
# In-place refresh every 5 files (or every file if total small).
step = 5 if t["total"] > 50 else 1
if t["done"] - self._last_print[tid] >= step or t["done"] == t["total"]:
counter = ansi(f"{t['done']}/{t['total']}", ANSI_CYAN)
line = f" {counter} {ansi(t['desc'], ANSI_DIM)}"
if sys.stderr.isatty():
sys.stderr.write(f"\r\033[K{line}")
if t["done"] == t["total"]:
sys.stderr.write("\n")
sys.stderr.flush()
elif t["done"] == t["total"]:
# Non-TTY: only print final line, skip intermediate noise.
sys.stderr.write(line + "\n")
self._last_print[tid] = t["done"]
# Default remotes used when --search is invoked without explicit --source/--target. # Default remotes used when --search is invoked without explicit --source/--target.
DEFAULT_SOURCE = ["cq:personal-files/ClearJAV"] DEFAULT_SOURCE = ["cq:personal-files/ClearJAV"]
DEFAULT_TARGET = ["cq:personal-files/JAV/TMP"] DEFAULT_TARGET = ["cq:personal-files/JAV/TMP"]
@@ -231,22 +188,6 @@ def save_config(cfg: dict) -> None:
os.replace(tmp, CONFIG_PATH) os.replace(tmp, CONFIG_PATH)
def make_progress():
if BASIC:
return BasicProgress()
return Progress(
SpinnerColumn(),
TextColumn("{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
TextColumn("eta"),
TimeRemainingColumn(),
console=console,
transient=False,
)
# ---------- collectors ---------- # ---------- collectors ----------
def collect_with_progress(remotes_by_label: list[tuple[str, str]], def collect_with_progress(remotes_by_label: list[tuple[str, str]],
@@ -399,332 +340,6 @@ def cached_collect(remotes: list[str], source_label: str,
return out return out
# ---------- renderers ----------
def render_banner(cache_meta: dict[str, dict], mode: str) -> Panel:
lines: list[Text] = []
lines.append(Text.from_markup(f"[bold]mode:[/] {mode}"))
if cache_meta:
for r, m in cache_meta.items():
if m["cached"]:
tag = f"CACHED {m['age']}" + (" STALE" if m["stale"] else "")
style = "yellow" if m["stale"] else "dim"
else:
tag = "FRESH SCAN"
style = "green"
lines.append(Text.from_markup(
f" [white]{r}[/] [{style}]{tag}[/] [dim]({m['file_count']} files)[/]"
))
body = Text("\n").join(lines)
return Panel(body, title="rc-jav", title_align="left", border_style="blue")
def render_search(matches: dict[str, list[FileEntry]], queries: list[str],
cache_meta: dict[str, dict]) -> None:
console.print(render_banner(cache_meta, mode="search"))
for q in queries:
hits = matches.get(q, [])
if not hits:
console.print(f"[bold red][{q}] NOT FOUND[/]")
console.print()
continue
title = f"[bold green][{q}] {len(hits)} hit(s)[/]"
tbl = Table(title=title, title_justify="left", show_lines=False,
border_style="green", expand=True)
tbl.add_column("Source", style="yellow", no_wrap=True)
tbl.add_column("Cache", no_wrap=True)
tbl.add_column("File", style="bold", overflow="fold")
tbl.add_column("Size", justify="right", no_wrap=True)
tbl.add_column("Path", style="dim", overflow="fold")
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
meta = cache_meta.get(e.remote, {})
if meta.get("cached"):
cache_tag = "[yellow][CACHED-STALE][/]" if meta.get("stale") else "[dim][CACHED][/]"
else:
cache_tag = "[green][FRESH][/]"
tbl.add_row(
e.source, cache_tag, Path(e.path).name,
f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]",
e.full_path,
)
console.print(tbl)
console.print()
def render_name_matches(hits: list[FileEntry], tokens: list[str],
cache_meta: dict[str, dict]) -> None:
title = f"[bold green]Name match {tokens}{len(hits)} hit(s)[/]"
if not hits:
console.print(f"[bold red]Name match {tokens} — NOT FOUND[/]")
return
tbl = Table(title=title, title_justify="left", show_lines=False,
border_style="green", expand=True)
tbl.add_column("Source", style="yellow", no_wrap=True)
tbl.add_column("Cache", no_wrap=True)
tbl.add_column("ID", style="bold cyan", no_wrap=True)
tbl.add_column("File", style="bold", overflow="fold")
tbl.add_column("Size", justify="right", no_wrap=True)
tbl.add_column("Path", style="dim", overflow="fold")
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
meta = cache_meta.get(e.remote, {})
if meta.get("cached"):
cache_tag = "[yellow][CACHED-STALE][/]" if meta.get("stale") else "[dim][CACHED][/]"
else:
cache_tag = "[green][FRESH][/]"
tbl.add_row(
e.source, cache_tag, e.jav_id, Path(e.path).name,
f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]",
e.full_path,
)
console.print(tbl)
console.print()
def render_name_matches_plain(hits: list[FileEntry], tokens: list[str],
cache_meta: dict[str, dict]) -> str:
lines: list[str] = []
if not hits:
lines.append(ansi(f"Name match {tokens} — NOT FOUND", ANSI_RED))
return "\n".join(lines)
lines.append(ansi(f"Name match {tokens}{len(hits)} hit(s)", ANSI_GREEN + ANSI_BOLD))
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
meta = cache_meta.get(e.remote, {})
if meta.get("cached"):
tag = ansi("[CACHED-STALE]", ANSI_YELLOW) if meta.get("stale") else ansi("[CACHED]", ANSI_DIM)
else:
tag = ansi("[FRESH]", ANSI_GREEN)
src = ansi(e.source, ANSI_YELLOW)
lines.append(f" {src} {tag} {ansi(e.jav_id, ANSI_CYAN)}")
lines.append(ansi(f" file: {Path(e.path).name}", ANSI_BOLD))
lines.append(f" size: {human_size(e.size)} ({e.size:,} bytes)")
lines.append(ansi(f" path: {e.full_path}", ANSI_DIM))
return "\n".join(lines)
def render_dupes(dupes: dict[str, list[FileEntry]],
skipped: list[tuple[str, str]],
variant_alerts: dict[str, list[FileEntry]] | None = None) -> None:
if not dupes:
console.print(Panel("[bold green]No duplicates found.[/]",
border_style="green"))
else:
console.print(f"[bold]Found {len(dupes)} duplicate ID group(s):[/]")
console.print()
total_reclaim = 0
for jav_id in sorted(dupes):
entries = dupes[jav_id]
keep = decide_keep(entries)
tbl = Table(title=f"[bold][{jav_id}][/]", title_justify="left",
show_lines=False, border_style="magenta", expand=True)
tbl.add_column("Action", no_wrap=True)
tbl.add_column("Source", style="yellow", no_wrap=True)
tbl.add_column("Size", justify="right", no_wrap=True)
tbl.add_column("Path", overflow="fold")
for e in sorted(entries, key=lambda x: (x.source != "Source", x.source == "Catalog", -x.size)):
if e.source == "Catalog":
action = "[cyan]CATALOG[/]"
elif e is keep:
action = "[green]KEEP[/]"
else:
action = "[red]DELETE?[/]"
total_reclaim += e.size
tbl.add_row(action, e.source,
f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]",
e.full_path)
console.print(tbl)
console.print()
console.print(Panel(
f"[bold]Potential space reclaim if all DELETE? removed: "
f"[red]{human_size(total_reclaim)}[/][/]",
border_style="red"))
if skipped:
console.print()
tbl = Table(title=f"[dim]Skipped {len(skipped)} file(s) with no parseable ID[/]",
title_justify="left", show_lines=False, border_style="dim", expand=True)
tbl.add_column("Remote", style="dim", no_wrap=True)
tbl.add_column("Path", style="dim", overflow="fold")
for remote, path in skipped[:50]:
tbl.add_row(remote, path)
if len(skipped) > 50:
tbl.add_row("[dim]…[/]", f"[dim]+{len(skipped) - 50} more[/]")
console.print(tbl)
if variant_alerts:
console.print()
console.print(Panel(
f"[bold yellow]⚠ {len(variant_alerts)} variant alert(s) — manual review recommended[/]",
border_style="yellow"))
for bare_id, entries in sorted(variant_alerts.items()):
tbl = Table(title=f"[bold yellow][{bare_id}] — bare + variant coexist[/]",
title_justify="left", show_lines=False, border_style="yellow", expand=True)
tbl.add_column("ID", style="yellow", no_wrap=True)
tbl.add_column("Size", justify="right", no_wrap=True)
tbl.add_column("Path", overflow="fold")
for e in sorted(entries, key=lambda x: x.full_path):
eid = extract_id(Path(e.path).name) or e.jav_id
tbl.add_row(eid, human_size(e.size), e.full_path)
console.print(tbl)
console.print()
# ---------- plain renderers (--basic) ----------
def render_banner_plain(cache_meta: dict[str, dict], mode: str) -> str:
lines = [ansi(f"=== rc-jav ({mode}) ===", ANSI_BOLD)]
for r, m in cache_meta.items():
if m["cached"]:
tag = f"CACHED {m['age']}" + (" STALE" if m["stale"] else "")
tag_c = ansi(tag, ANSI_YELLOW if m["stale"] else ANSI_DIM)
else:
tag_c = ansi("FRESH SCAN", ANSI_GREEN)
count_str = ansi(f"({m['file_count']} files)", ANSI_DIM)
lines.append(f" {r} {tag_c} {count_str}")
return "\n".join(lines)
def render_search_plain(matches: dict[str, list[FileEntry]], queries: list[str],
cache_meta: dict[str, dict]) -> str:
lines: list[str] = []
if cache_meta:
lines.append(render_banner_plain(cache_meta, "search"))
lines.append("")
for q in queries:
hits = matches.get(q, [])
if not hits:
lines.append(ansi(f"[{q}] NOT FOUND", ANSI_RED))
lines.append("")
continue
lines.append(ansi(f"[{q}] {len(hits)} hit(s)", ANSI_GREEN + ANSI_BOLD))
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
meta = cache_meta.get(e.remote, {})
if meta.get("cached"):
tag = ansi("[CACHED-STALE]", ANSI_YELLOW) if meta.get("stale") else ansi("[CACHED]", ANSI_DIM)
else:
tag = ansi("[FRESH]", ANSI_GREEN)
src = ansi(e.source, ANSI_YELLOW)
lines.append(f" {src} {tag}")
lines.append(ansi(f" file: {Path(e.path).name}", ANSI_BOLD))
lines.append(f" size: {human_size(e.size)} ({e.size:,} bytes)")
lines.append(ansi(f" path: {e.full_path}", ANSI_DIM))
lines.append("")
return "\n".join(lines)
# ---------- file outputs ----------
def render_dupes_plain(dupes, skipped, variant_alerts=None) -> str:
lines: list[str] = []
if not dupes:
lines.append(ansi("No duplicates found.", ANSI_GREEN))
else:
lines.append(ansi(f"Found {len(dupes)} duplicate ID group(s):", ANSI_BOLD))
lines.append("")
total_reclaim = 0
for jav_id in sorted(dupes):
entries = dupes[jav_id]
keep = decide_keep(entries)
lines.append(ansi(f"[{jav_id}]", ANSI_BOLD))
for e in sorted(entries, key=lambda x: (x.source != "Source", x.source == "Catalog", -x.size)):
if e.source == "Catalog":
mark = ansi("CATALOG ", ANSI_CYAN)
elif e is keep:
mark = ansi("KEEP ", ANSI_GREEN)
else:
mark = ansi("DELETE? ", ANSI_RED)
total_reclaim += e.size
src = ansi(f"{e.source:>8}", ANSI_YELLOW)
size_str = f"{human_size(e.size)} ({e.size:,} B)"
lines.append(f" {mark} {src} {size_str:>26} {e.full_path}")
lines.append("")
lines.append(ansi(f"Potential space reclaim if all DELETE? removed: {human_size(total_reclaim)}", ANSI_BOLD))
if skipped:
lines.append("")
lines.append(ansi(f"Skipped {len(skipped)} file(s) with no parseable ID:", ANSI_DIM))
for remote, path in skipped[:50]:
lines.append(ansi(f" {remote} {path}", ANSI_DIM))
if len(skipped) > 50:
lines.append(ansi(f" ... +{len(skipped) - 50} more", ANSI_DIM))
if variant_alerts:
lines.append("")
lines.append(ansi(f"{len(variant_alerts)} variant alert(s) — manual review required:", ANSI_YELLOW + ANSI_BOLD))
for bare_id, entries in sorted(variant_alerts.items()):
lines.append(ansi(f" [{bare_id}] bare + variant coexist", ANSI_YELLOW))
for e in sorted(entries, key=lambda x: x.full_path):
eid = extract_id(Path(e.path).name) or e.jav_id
lines.append(f" {ansi(eid, ANSI_YELLOW)} {human_size(e.size):>10} {e.full_path}")
return "\n".join(lines)
def write_txt(path: Path, dupes, skipped):
path.write_text(render_dupes_plain(dupes, skipped), encoding="utf-8")
def write_csv(path: Path, dupes):
with path.open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(["jav_id", "action", "source", "remote", "path", "full_path",
"size_bytes", "size_human", "mod_time"])
for jav_id in sorted(dupes):
entries = dupes[jav_id]
keep = decide_keep(entries)
for e in entries:
if e.source == "Catalog":
action = "CATALOG"
elif e is keep:
action = "KEEP"
else:
action = "DELETE?"
w.writerow([jav_id, action, e.source,
e.remote, e.path, e.full_path, e.size, human_size(e.size), e.mod_time])
def describe_skipped_id(remote: str, path: str) -> dict[str, str]:
"""Explain a common reason a path did not yield an ID."""
name = Path((path or "").replace("\\", "/")).name
reason = "No supported JAV ID at filename start"
hint = "Rename with a leading ID such as ABC-123 or add an ID normalizer/site-specific source."
if re.match(r"^\[[A-Za-z0-9-]+-\d+\]", name):
reason = "ID is wrapped in leading brackets"
hint = "Remove the leading brackets so the filename starts with the ID."
elif re.match(r"^[A-Za-z][A-Za-z0-9]+[\u2010-\u2015]\d+", name):
reason = "ID uses a non-ASCII dash"
hint = "Replace the separator with a normal hyphen."
elif re.match(r"^[A-Za-z][A-Za-z0-9]+\d+", name):
reason = "ID prefix and number have no hyphen"
hint = "Insert the ID hyphen, for example ABC-123."
return {"remote": remote, "path": path, "name": name, "reason": reason, "hint": hint}
def dupes_to_obj(dupes, skipped, variant_alerts=None) -> dict:
out = {"groups": {}, "skipped": [describe_skipped_id(r, p) for r, p in skipped],
"variant_alerts": []}
for jav_id in sorted(dupes):
entries = dupes[jav_id]
keep, keep_reason = decide_keep_with_reason(entries)
out["groups"][jav_id] = {
"keep": asdict(keep) | {"full_path": keep.full_path, "size_human": human_size(keep.size)},
"keep_reason": keep_reason,
"risks": describe_dupe_risks(jav_id, entries),
"delete_candidates": [asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size)}
for e in entries
if e is not keep and e.source != "Catalog"],
"catalog": [asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size)}
for e in entries if e.source == "Catalog"],
}
for bare_id, entries in sorted((variant_alerts or {}).items()):
out["variant_alerts"].append({
"bare_id": bare_id,
"files": [
asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size),
"detected_id": extract_id(Path(e.path).name) or e.jav_id}
for e in sorted(entries, key=lambda x: x.full_path)
],
})
return out
def write_json(path: Path, dupes, skipped, variant_alerts=None):
path.write_text(json.dumps(dupes_to_obj(dupes, skipped, variant_alerts), indent=2), encoding="utf-8")
# ---------- main ---------- # ---------- main ----------
def main(): def main():
@@ -801,7 +416,7 @@ def main():
global console, BASIC, DEFAULT_SOURCE, DEFAULT_TARGET, DEFAULT_CATALOG global console, BASIC, DEFAULT_SOURCE, DEFAULT_TARGET, DEFAULT_CATALOG
_set_rclone_bin(args.rclone_bin) _set_rclone_bin(args.rclone_bin)
BASIC = args.basic or args.format == "json" BASIC = args.basic or args.format == "json"
_set_rclone_basic(BASIC) _set_output_basic(BASIC)
# Apply persisted config overrides BEFORE defaults are consulted. # Apply persisted config overrides BEFORE defaults are consulted.
cfg = load_config() cfg = load_config()
@@ -839,10 +454,10 @@ def main():
if k in new_cfg: if k in new_cfg:
console.print(f" {k} = {new_cfg[k]}") console.print(f" {k} = {new_cfg[k]}")
sys.exit(0) sys.exit(0)
global USE_ANSI _set_output_use_ansi(not args.no_color)
USE_ANSI = not args.no_color
if args.no_color or BASIC: if args.no_color or BASIC:
console = Console(no_color=True, color_system=None, highlight=False) console = Console(no_color=True, color_system=None, highlight=False)
_set_output_no_color()
# Search mode: defaults kick in if no remotes specified. # Search mode: defaults kick in if no remotes specified.
if args.clearjav: if args.clearjav:
+32
View File
@@ -6,6 +6,38 @@ find at the top level. Adding a new submodule does not change the
public surface — only this file does. public surface — only this file does.
""" """
from rcjav.model import FileEntry # noqa: F401 from rcjav.model import FileEntry # noqa: F401
from rcjav.output import ( # noqa: F401
USE_ANSI,
ANSI_RESET,
ANSI_GREEN,
ANSI_RED,
ANSI_YELLOW,
ANSI_CYAN,
ANSI_DIM,
ANSI_BOLD,
set_use_ansi,
set_basic,
ansi,
console,
set_console_no_color,
strip_markup,
human_size,
BasicProgress,
make_progress,
render_banner,
render_search,
render_name_matches,
render_name_matches_plain,
render_dupes,
render_banner_plain,
render_search_plain,
render_dupes_plain,
write_txt,
write_csv,
describe_skipped_id,
dupes_to_obj,
write_json,
)
from rcjav.library import ( # noqa: F401 from rcjav.library import ( # noqa: F401
find_library_issues, find_library_issues,
rename_file_in_remote, rename_file_in_remote,
+495
View File
@@ -0,0 +1,495 @@
"""All terminal rendering, plain-text formatting, and file outputs.
Owns the singleton `console` (rich.Console) plus the ANSI constants
used in --basic mode. `BASIC` is mirrored from rcjav.rclone_io so
both modules answer the same question (the setter here proxies).
"""
from __future__ import annotations
import csv
import json
import re
import sys
from dataclasses import asdict
from pathlib import Path
from rich.console import Console
from rich.panel import Panel
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from rich.table import Table
from rich.text import Text
from rcjav import rclone_io as _rclone_io
from rcjav.dupes import (
decide_keep,
decide_keep_with_reason,
describe_dupe_risks,
)
from rcjav.ids import extract_id
from rcjav.model import FileEntry
# ---------- ANSI / plain-mode toggles ----------
USE_ANSI = True # disabled by --no-color
ANSI_RESET = "\033[0m"
ANSI_GREEN = "\033[32m"
ANSI_RED = "\033[31m"
ANSI_YELLOW = "\033[33m"
ANSI_CYAN = "\033[36m"
ANSI_DIM = "\033[2m"
ANSI_BOLD = "\033[1m"
def set_use_ansi(value: bool) -> None:
global USE_ANSI
USE_ANSI = bool(value)
def ansi(s: str, code: str) -> str:
return f"{code}{s}{ANSI_RESET}" if USE_ANSI else s
# Singleton rich console. Replaced in set_console_no_color() when --no-color
# is passed (rich respects no_color=True everywhere).
console = Console()
def set_console_no_color() -> None:
global console
console = Console(no_color=True)
_RICH_TAG_RE = re.compile(r"\[/?[^\]]*\]")
def strip_markup(s: str) -> str:
return _RICH_TAG_RE.sub("", s)
# ---------- --basic mode flag (mirrored with rcjav.rclone_io) ----------
# Read dynamically as _rclone_io.BASIC so a single set_basic() call updates
# both this module's renderers and walk_remote's progress emission.
def set_basic(value: bool) -> None:
"""Toggle --basic mode for both renderers and rclone progress."""
_rclone_io.set_basic(value)
def _basic() -> bool:
return _rclone_io.BASIC
# ---------- size formatting ----------
def human_size(n: int) -> str:
nf = float(max(0, n))
for unit in ("B", "KiB", "MiB", "GiB", "TiB"):
if nf < 1024:
return f"{int(nf)} B" if unit == "B" else f"{nf:.2f} {unit}"
nf /= 1024
return f"{nf:.2f} PiB"
# ---------- progress UI ----------
class BasicProgress:
"""Minimal stand-in for rich.Progress used when --basic is set."""
def __init__(self):
self._tasks: dict[int, dict] = {}
self._next = 0
self._last_print: dict[int, int] = {}
def __enter__(self):
return self
def __exit__(self, *exc):
for tid, t in self._tasks.items():
sys.stderr.write(f"{ansi('[done]', ANSI_GREEN)} {t['desc']} {t['done']}/{t['total']}\n")
return False
def add_task(self, description: str, total: int = 1) -> int:
tid = self._next
self._next += 1
desc = strip_markup(description)
self._tasks[tid] = {"desc": desc, "total": total, "done": 0}
self._last_print[tid] = 0
sys.stderr.write(f"{ansi('[start]', ANSI_CYAN)} {desc}\n")
return tid
def update(self, tid, total=None, description=None, **_):
t = self._tasks[tid]
if total is not None:
t["total"] = total
if description is not None:
t["desc"] = strip_markup(description)
def advance(self, tid, n: int = 1):
t = self._tasks[tid]
t["done"] += n
# In-place refresh every 5 files (or every file if total small).
step = 5 if t["total"] > 50 else 1
if t["done"] - self._last_print[tid] >= step or t["done"] == t["total"]:
counter = ansi(f"{t['done']}/{t['total']}", ANSI_CYAN)
line = f" {counter} {ansi(t['desc'], ANSI_DIM)}"
if sys.stderr.isatty():
sys.stderr.write(f"\r\033[K{line}")
if t["done"] == t["total"]:
sys.stderr.write("\n")
sys.stderr.flush()
elif t["done"] == t["total"]:
sys.stderr.write(line + "\n")
self._last_print[tid] = t["done"]
def make_progress():
if _basic():
return BasicProgress()
return Progress(
SpinnerColumn(),
TextColumn("{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
TextColumn("eta"),
TimeRemainingColumn(),
console=console,
transient=False,
)
# ---------- rich renderers ----------
def render_banner(cache_meta: dict[str, dict], mode: str) -> Panel:
lines: list[Text] = []
lines.append(Text.from_markup(f"[bold]mode:[/] {mode}"))
if cache_meta:
for r, m in cache_meta.items():
if m["cached"]:
tag = f"CACHED {m['age']}" + (" STALE" if m["stale"] else "")
style = "yellow" if m["stale"] else "dim"
else:
tag = "FRESH SCAN"
style = "green"
lines.append(Text.from_markup(
f" [white]{r}[/] [{style}]{tag}[/] [dim]({m['file_count']} files)[/]"
))
body = Text("\n").join(lines)
return Panel(body, title="rc-jav", title_align="left", border_style="blue")
def render_search(matches: dict[str, list[FileEntry]], queries: list[str],
cache_meta: dict[str, dict]) -> None:
console.print(render_banner(cache_meta, mode="search"))
for q in queries:
hits = matches.get(q, [])
if not hits:
console.print(f"[bold red][{q}] NOT FOUND[/]")
console.print()
continue
title = f"[bold green][{q}] {len(hits)} hit(s)[/]"
tbl = Table(title=title, title_justify="left", show_lines=False,
border_style="green", expand=True)
tbl.add_column("Source", style="yellow", no_wrap=True)
tbl.add_column("Cache", no_wrap=True)
tbl.add_column("File", style="bold", overflow="fold")
tbl.add_column("Size", justify="right", no_wrap=True)
tbl.add_column("Path", style="dim", overflow="fold")
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
meta = cache_meta.get(e.remote, {})
if meta.get("cached"):
cache_tag = "[yellow][CACHED-STALE][/]" if meta.get("stale") else "[dim][CACHED][/]"
else:
cache_tag = "[green][FRESH][/]"
tbl.add_row(
e.source, cache_tag, Path(e.path).name,
f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]",
e.full_path,
)
console.print(tbl)
console.print()
def render_name_matches(hits: list[FileEntry], tokens: list[str],
cache_meta: dict[str, dict]) -> None:
title = f"[bold green]Name match {tokens}{len(hits)} hit(s)[/]"
if not hits:
console.print(f"[bold red]Name match {tokens} — NOT FOUND[/]")
return
tbl = Table(title=title, title_justify="left", show_lines=False,
border_style="green", expand=True)
tbl.add_column("Source", style="yellow", no_wrap=True)
tbl.add_column("Cache", no_wrap=True)
tbl.add_column("ID", style="bold cyan", no_wrap=True)
tbl.add_column("File", style="bold", overflow="fold")
tbl.add_column("Size", justify="right", no_wrap=True)
tbl.add_column("Path", style="dim", overflow="fold")
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
meta = cache_meta.get(e.remote, {})
if meta.get("cached"):
cache_tag = "[yellow][CACHED-STALE][/]" if meta.get("stale") else "[dim][CACHED][/]"
else:
cache_tag = "[green][FRESH][/]"
tbl.add_row(
e.source, cache_tag, e.jav_id, Path(e.path).name,
f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]",
e.full_path,
)
console.print(tbl)
console.print()
def render_name_matches_plain(hits: list[FileEntry], tokens: list[str],
cache_meta: dict[str, dict]) -> str:
lines: list[str] = []
if not hits:
lines.append(ansi(f"Name match {tokens} — NOT FOUND", ANSI_RED))
return "\n".join(lines)
lines.append(ansi(f"Name match {tokens}{len(hits)} hit(s)", ANSI_GREEN + ANSI_BOLD))
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
meta = cache_meta.get(e.remote, {})
if meta.get("cached"):
tag = ansi("[CACHED-STALE]", ANSI_YELLOW) if meta.get("stale") else ansi("[CACHED]", ANSI_DIM)
else:
tag = ansi("[FRESH]", ANSI_GREEN)
src = ansi(e.source, ANSI_YELLOW)
lines.append(f" {src} {tag} {ansi(e.jav_id, ANSI_CYAN)}")
lines.append(ansi(f" file: {Path(e.path).name}", ANSI_BOLD))
lines.append(f" size: {human_size(e.size)} ({e.size:,} bytes)")
lines.append(ansi(f" path: {e.full_path}", ANSI_DIM))
return "\n".join(lines)
def render_dupes(dupes: dict[str, list[FileEntry]],
skipped: list[tuple[str, str]],
variant_alerts: dict[str, list[FileEntry]] | None = None) -> None:
if not dupes:
console.print(Panel("[bold green]No duplicates found.[/]",
border_style="green"))
else:
console.print(f"[bold]Found {len(dupes)} duplicate ID group(s):[/]")
console.print()
total_reclaim = 0
for jav_id in sorted(dupes):
entries = dupes[jav_id]
keep = decide_keep(entries)
tbl = Table(title=f"[bold][{jav_id}][/]", title_justify="left",
show_lines=False, border_style="magenta", expand=True)
tbl.add_column("Action", no_wrap=True)
tbl.add_column("Source", style="yellow", no_wrap=True)
tbl.add_column("Size", justify="right", no_wrap=True)
tbl.add_column("Path", overflow="fold")
for e in sorted(entries, key=lambda x: (x.source != "Source", x.source == "Catalog", -x.size)):
if e.source == "Catalog":
action = "[cyan]CATALOG[/]"
elif e is keep:
action = "[green]KEEP[/]"
else:
action = "[red]DELETE?[/]"
total_reclaim += e.size
tbl.add_row(action, e.source,
f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]",
e.full_path)
console.print(tbl)
console.print()
console.print(Panel(
f"[bold]Potential space reclaim if all DELETE? removed: "
f"[red]{human_size(total_reclaim)}[/][/]",
border_style="red"))
if skipped:
console.print()
tbl = Table(title=f"[dim]Skipped {len(skipped)} file(s) with no parseable ID[/]",
title_justify="left", show_lines=False, border_style="dim", expand=True)
tbl.add_column("Remote", style="dim", no_wrap=True)
tbl.add_column("Path", style="dim", overflow="fold")
for remote, path in skipped[:50]:
tbl.add_row(remote, path)
if len(skipped) > 50:
tbl.add_row("[dim]…[/]", f"[dim]+{len(skipped) - 50} more[/]")
console.print(tbl)
if variant_alerts:
console.print()
console.print(Panel(
f"[bold yellow]⚠ {len(variant_alerts)} variant alert(s) — manual review recommended[/]",
border_style="yellow"))
for bare_id, entries in sorted(variant_alerts.items()):
tbl = Table(title=f"[bold yellow][{bare_id}] — bare + variant coexist[/]",
title_justify="left", show_lines=False, border_style="yellow", expand=True)
tbl.add_column("ID", style="yellow", no_wrap=True)
tbl.add_column("Size", justify="right", no_wrap=True)
tbl.add_column("Path", overflow="fold")
for e in sorted(entries, key=lambda x: x.full_path):
eid = extract_id(Path(e.path).name) or e.jav_id
tbl.add_row(eid, human_size(e.size), e.full_path)
console.print(tbl)
console.print()
# ---------- plain renderers (--basic) ----------
def render_banner_plain(cache_meta: dict[str, dict], mode: str) -> str:
lines = [ansi(f"=== rc-jav ({mode}) ===", ANSI_BOLD)]
for r, m in cache_meta.items():
if m["cached"]:
tag = f"CACHED {m['age']}" + (" STALE" if m["stale"] else "")
tag_c = ansi(tag, ANSI_YELLOW if m["stale"] else ANSI_DIM)
else:
tag_c = ansi("FRESH SCAN", ANSI_GREEN)
count_str = ansi(f"({m['file_count']} files)", ANSI_DIM)
lines.append(f" {r} {tag_c} {count_str}")
return "\n".join(lines)
def render_search_plain(matches: dict[str, list[FileEntry]], queries: list[str],
cache_meta: dict[str, dict]) -> str:
lines: list[str] = []
if cache_meta:
lines.append(render_banner_plain(cache_meta, "search"))
lines.append("")
for q in queries:
hits = matches.get(q, [])
if not hits:
lines.append(ansi(f"[{q}] NOT FOUND", ANSI_RED))
lines.append("")
continue
lines.append(ansi(f"[{q}] {len(hits)} hit(s)", ANSI_GREEN + ANSI_BOLD))
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
meta = cache_meta.get(e.remote, {})
if meta.get("cached"):
tag = ansi("[CACHED-STALE]", ANSI_YELLOW) if meta.get("stale") else ansi("[CACHED]", ANSI_DIM)
else:
tag = ansi("[FRESH]", ANSI_GREEN)
src = ansi(e.source, ANSI_YELLOW)
lines.append(f" {src} {tag}")
lines.append(ansi(f" file: {Path(e.path).name}", ANSI_BOLD))
lines.append(f" size: {human_size(e.size)} ({e.size:,} bytes)")
lines.append(ansi(f" path: {e.full_path}", ANSI_DIM))
lines.append("")
return "\n".join(lines)
def render_dupes_plain(dupes, skipped, variant_alerts=None) -> str:
lines: list[str] = []
if not dupes:
lines.append(ansi("No duplicates found.", ANSI_GREEN))
else:
lines.append(ansi(f"Found {len(dupes)} duplicate ID group(s):", ANSI_BOLD))
lines.append("")
total_reclaim = 0
for jav_id in sorted(dupes):
entries = dupes[jav_id]
keep = decide_keep(entries)
lines.append(ansi(f"[{jav_id}]", ANSI_BOLD))
for e in sorted(entries, key=lambda x: (x.source != "Source", x.source == "Catalog", -x.size)):
if e.source == "Catalog":
mark = ansi("CATALOG ", ANSI_CYAN)
elif e is keep:
mark = ansi("KEEP ", ANSI_GREEN)
else:
mark = ansi("DELETE? ", ANSI_RED)
total_reclaim += e.size
src = ansi(f"{e.source:>8}", ANSI_YELLOW)
size_str = f"{human_size(e.size)} ({e.size:,} B)"
lines.append(f" {mark} {src} {size_str:>26} {e.full_path}")
lines.append("")
lines.append(ansi(f"Potential space reclaim if all DELETE? removed: {human_size(total_reclaim)}", ANSI_BOLD))
if skipped:
lines.append("")
lines.append(ansi(f"Skipped {len(skipped)} file(s) with no parseable ID:", ANSI_DIM))
for remote, path in skipped[:50]:
lines.append(ansi(f" {remote} {path}", ANSI_DIM))
if len(skipped) > 50:
lines.append(ansi(f" ... +{len(skipped) - 50} more", ANSI_DIM))
if variant_alerts:
lines.append("")
lines.append(ansi(f"{len(variant_alerts)} variant alert(s) — manual review required:", ANSI_YELLOW + ANSI_BOLD))
for bare_id, entries in sorted(variant_alerts.items()):
lines.append(ansi(f" [{bare_id}] bare + variant coexist", ANSI_YELLOW))
for e in sorted(entries, key=lambda x: x.full_path):
eid = extract_id(Path(e.path).name) or e.jav_id
lines.append(f" {ansi(eid, ANSI_YELLOW)} {human_size(e.size):>10} {e.full_path}")
return "\n".join(lines)
# ---------- file outputs ----------
def write_txt(path: Path, dupes, skipped):
path.write_text(render_dupes_plain(dupes, skipped), encoding="utf-8")
def write_csv(path: Path, dupes):
with path.open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(["jav_id", "action", "source", "remote", "path", "full_path",
"size_bytes", "size_human", "mod_time"])
for jav_id in sorted(dupes):
entries = dupes[jav_id]
keep = decide_keep(entries)
for e in entries:
if e.source == "Catalog":
action = "CATALOG"
elif e is keep:
action = "KEEP"
else:
action = "DELETE?"
w.writerow([jav_id, action, e.source,
e.remote, e.path, e.full_path, e.size, human_size(e.size), e.mod_time])
def describe_skipped_id(remote: str, path: str) -> dict[str, str]:
"""Explain a common reason a path did not yield an ID."""
name = Path((path or "").replace("\\", "/")).name
reason = "No supported JAV ID at filename start"
hint = "Rename with a leading ID such as ABC-123 or add an ID normalizer/site-specific source."
if re.match(r"^\[[A-Za-z0-9-]+-\d+\]", name):
reason = "ID is wrapped in leading brackets"
hint = "Remove the leading brackets so the filename starts with the ID."
elif re.match(r"^[A-Za-z][A-Za-z0-9]+[-―]\d+", name):
reason = "ID uses a non-ASCII dash"
hint = "Replace the separator with a normal hyphen."
elif re.match(r"^[A-Za-z][A-Za-z0-9]+\d+", name):
reason = "ID prefix and number have no hyphen"
hint = "Insert the ID hyphen, for example ABC-123."
return {"remote": remote, "path": path, "name": name, "reason": reason, "hint": hint}
def dupes_to_obj(dupes, skipped, variant_alerts=None) -> dict:
out = {"groups": {}, "skipped": [describe_skipped_id(r, p) for r, p in skipped],
"variant_alerts": []}
for jav_id in sorted(dupes):
entries = dupes[jav_id]
keep, keep_reason = decide_keep_with_reason(entries)
out["groups"][jav_id] = {
"keep": asdict(keep) | {"full_path": keep.full_path, "size_human": human_size(keep.size)},
"keep_reason": keep_reason,
"risks": describe_dupe_risks(jav_id, entries),
"delete_candidates": [asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size)}
for e in entries
if e is not keep and e.source != "Catalog"],
"catalog": [asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size)}
for e in entries if e.source == "Catalog"],
}
for bare_id, entries in sorted((variant_alerts or {}).items()):
out["variant_alerts"].append({
"bare_id": bare_id,
"files": [
asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size),
"detected_id": extract_id(Path(e.path).name) or e.jav_id}
for e in sorted(entries, key=lambda x: x.full_path)
],
})
return out
def write_json(path: Path, dupes, skipped, variant_alerts=None):
path.write_text(json.dumps(dupes_to_obj(dupes, skipped, variant_alerts), indent=2), encoding="utf-8")