Files

496 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""All terminal rendering, plain-text formatting, and file outputs.
Owns the singleton `console` (rich.Console) plus the ANSI constants
used in --basic mode. `BASIC` is mirrored from rcjav.rclone_io so
both modules answer the same question (the setter here proxies).
"""
from __future__ import annotations
import csv
import json
import re
import sys
from dataclasses import asdict
from pathlib import Path
from rich.console import Console
from rich.panel import Panel
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from rich.table import Table
from rich.text import Text
from rcjav import rclone_io as _rclone_io
from rcjav.dupes import (
decide_keep,
decide_keep_with_reason,
describe_dupe_risks,
)
from rcjav.ids import extract_id
from rcjav.model import FileEntry
# ---------- ANSI / plain-mode toggles ----------
USE_ANSI = True # disabled by --no-color
ANSI_RESET = "\033[0m"
ANSI_GREEN = "\033[32m"
ANSI_RED = "\033[31m"
ANSI_YELLOW = "\033[33m"
ANSI_CYAN = "\033[36m"
ANSI_DIM = "\033[2m"
ANSI_BOLD = "\033[1m"
def set_use_ansi(value: bool) -> None:
global USE_ANSI
USE_ANSI = bool(value)
def ansi(s: str, code: str) -> str:
return f"{code}{s}{ANSI_RESET}" if USE_ANSI else s
# Singleton rich console. Replaced in set_console_no_color() when --no-color
# is passed (rich respects no_color=True everywhere).
console = Console()
def set_console_no_color() -> None:
global console
console = Console(no_color=True)
_RICH_TAG_RE = re.compile(r"\[/?[^\]]*\]")
def strip_markup(s: str) -> str:
return _RICH_TAG_RE.sub("", s)
# ---------- --basic mode flag (mirrored with rcjav.rclone_io) ----------
# Read dynamically as _rclone_io.BASIC so a single set_basic() call updates
# both this module's renderers and walk_remote's progress emission.
def set_basic(value: bool) -> None:
"""Toggle --basic mode for both renderers and rclone progress."""
_rclone_io.set_basic(value)
def _basic() -> bool:
return _rclone_io.BASIC
# ---------- size formatting ----------
def human_size(n: int) -> str:
nf = float(max(0, n))
for unit in ("B", "KiB", "MiB", "GiB", "TiB"):
if nf < 1024:
return f"{int(nf)} B" if unit == "B" else f"{nf:.2f} {unit}"
nf /= 1024
return f"{nf:.2f} PiB"
# ---------- progress UI ----------
class BasicProgress:
"""Minimal stand-in for rich.Progress used when --basic is set."""
def __init__(self):
self._tasks: dict[int, dict] = {}
self._next = 0
self._last_print: dict[int, int] = {}
def __enter__(self):
return self
def __exit__(self, *exc):
for tid, t in self._tasks.items():
sys.stderr.write(f"{ansi('[done]', ANSI_GREEN)} {t['desc']} {t['done']}/{t['total']}\n")
return False
def add_task(self, description: str, total: int = 1) -> int:
tid = self._next
self._next += 1
desc = strip_markup(description)
self._tasks[tid] = {"desc": desc, "total": total, "done": 0}
self._last_print[tid] = 0
sys.stderr.write(f"{ansi('[start]', ANSI_CYAN)} {desc}\n")
return tid
def update(self, tid, total=None, description=None, **_):
t = self._tasks[tid]
if total is not None:
t["total"] = total
if description is not None:
t["desc"] = strip_markup(description)
def advance(self, tid, n: int = 1):
t = self._tasks[tid]
t["done"] += n
# In-place refresh every 5 files (or every file if total small).
step = 5 if t["total"] > 50 else 1
if t["done"] - self._last_print[tid] >= step or t["done"] == t["total"]:
counter = ansi(f"{t['done']}/{t['total']}", ANSI_CYAN)
line = f" {counter} {ansi(t['desc'], ANSI_DIM)}"
if sys.stderr.isatty():
sys.stderr.write(f"\r\033[K{line}")
if t["done"] == t["total"]:
sys.stderr.write("\n")
sys.stderr.flush()
elif t["done"] == t["total"]:
sys.stderr.write(line + "\n")
self._last_print[tid] = t["done"]
def make_progress():
if _basic():
return BasicProgress()
return Progress(
SpinnerColumn(),
TextColumn("{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TimeElapsedColumn(),
TextColumn("eta"),
TimeRemainingColumn(),
console=console,
transient=False,
)
# ---------- rich renderers ----------
def render_banner(cache_meta: dict[str, dict], mode: str) -> Panel:
lines: list[Text] = []
lines.append(Text.from_markup(f"[bold]mode:[/] {mode}"))
if cache_meta:
for r, m in cache_meta.items():
if m["cached"]:
tag = f"CACHED {m['age']}" + (" STALE" if m["stale"] else "")
style = "yellow" if m["stale"] else "dim"
else:
tag = "FRESH SCAN"
style = "green"
lines.append(Text.from_markup(
f" [white]{r}[/] [{style}]{tag}[/] [dim]({m['file_count']} files)[/]"
))
body = Text("\n").join(lines)
return Panel(body, title="rc-jav", title_align="left", border_style="blue")
def render_search(matches: dict[str, list[FileEntry]], queries: list[str],
cache_meta: dict[str, dict]) -> None:
console.print(render_banner(cache_meta, mode="search"))
for q in queries:
hits = matches.get(q, [])
if not hits:
console.print(f"[bold red][{q}] NOT FOUND[/]")
console.print()
continue
title = f"[bold green][{q}] {len(hits)} hit(s)[/]"
tbl = Table(title=title, title_justify="left", show_lines=False,
border_style="green", expand=True)
tbl.add_column("Source", style="yellow", no_wrap=True)
tbl.add_column("Cache", no_wrap=True)
tbl.add_column("File", style="bold", overflow="fold")
tbl.add_column("Size", justify="right", no_wrap=True)
tbl.add_column("Path", style="dim", overflow="fold")
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
meta = cache_meta.get(e.remote, {})
if meta.get("cached"):
cache_tag = "[yellow][CACHED-STALE][/]" if meta.get("stale") else "[dim][CACHED][/]"
else:
cache_tag = "[green][FRESH][/]"
tbl.add_row(
e.source, cache_tag, Path(e.path).name,
f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]",
e.full_path,
)
console.print(tbl)
console.print()
def render_name_matches(hits: list[FileEntry], tokens: list[str],
cache_meta: dict[str, dict]) -> None:
title = f"[bold green]Name match {tokens}{len(hits)} hit(s)[/]"
if not hits:
console.print(f"[bold red]Name match {tokens} — NOT FOUND[/]")
return
tbl = Table(title=title, title_justify="left", show_lines=False,
border_style="green", expand=True)
tbl.add_column("Source", style="yellow", no_wrap=True)
tbl.add_column("Cache", no_wrap=True)
tbl.add_column("ID", style="bold cyan", no_wrap=True)
tbl.add_column("File", style="bold", overflow="fold")
tbl.add_column("Size", justify="right", no_wrap=True)
tbl.add_column("Path", style="dim", overflow="fold")
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
meta = cache_meta.get(e.remote, {})
if meta.get("cached"):
cache_tag = "[yellow][CACHED-STALE][/]" if meta.get("stale") else "[dim][CACHED][/]"
else:
cache_tag = "[green][FRESH][/]"
tbl.add_row(
e.source, cache_tag, e.jav_id, Path(e.path).name,
f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]",
e.full_path,
)
console.print(tbl)
console.print()
def render_name_matches_plain(hits: list[FileEntry], tokens: list[str],
cache_meta: dict[str, dict]) -> str:
lines: list[str] = []
if not hits:
lines.append(ansi(f"Name match {tokens} — NOT FOUND", ANSI_RED))
return "\n".join(lines)
lines.append(ansi(f"Name match {tokens}{len(hits)} hit(s)", ANSI_GREEN + ANSI_BOLD))
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
meta = cache_meta.get(e.remote, {})
if meta.get("cached"):
tag = ansi("[CACHED-STALE]", ANSI_YELLOW) if meta.get("stale") else ansi("[CACHED]", ANSI_DIM)
else:
tag = ansi("[FRESH]", ANSI_GREEN)
src = ansi(e.source, ANSI_YELLOW)
lines.append(f" {src} {tag} {ansi(e.jav_id, ANSI_CYAN)}")
lines.append(ansi(f" file: {Path(e.path).name}", ANSI_BOLD))
lines.append(f" size: {human_size(e.size)} ({e.size:,} bytes)")
lines.append(ansi(f" path: {e.full_path}", ANSI_DIM))
return "\n".join(lines)
def render_dupes(dupes: dict[str, list[FileEntry]],
skipped: list[tuple[str, str]],
variant_alerts: dict[str, list[FileEntry]] | None = None) -> None:
if not dupes:
console.print(Panel("[bold green]No duplicates found.[/]",
border_style="green"))
else:
console.print(f"[bold]Found {len(dupes)} duplicate ID group(s):[/]")
console.print()
total_reclaim = 0
for jav_id in sorted(dupes):
entries = dupes[jav_id]
keep = decide_keep(entries)
tbl = Table(title=f"[bold][{jav_id}][/]", title_justify="left",
show_lines=False, border_style="magenta", expand=True)
tbl.add_column("Action", no_wrap=True)
tbl.add_column("Source", style="yellow", no_wrap=True)
tbl.add_column("Size", justify="right", no_wrap=True)
tbl.add_column("Path", overflow="fold")
for e in sorted(entries, key=lambda x: (x.source != "Source", x.source == "Catalog", -x.size)):
if e.source == "Catalog":
action = "[cyan]CATALOG[/]"
elif e is keep:
action = "[green]KEEP[/]"
else:
action = "[red]DELETE?[/]"
total_reclaim += e.size
tbl.add_row(action, e.source,
f"{human_size(e.size)}\n[dim]({e.size:,} B)[/]",
e.full_path)
console.print(tbl)
console.print()
console.print(Panel(
f"[bold]Potential space reclaim if all DELETE? removed: "
f"[red]{human_size(total_reclaim)}[/][/]",
border_style="red"))
if skipped:
console.print()
tbl = Table(title=f"[dim]Skipped {len(skipped)} file(s) with no parseable ID[/]",
title_justify="left", show_lines=False, border_style="dim", expand=True)
tbl.add_column("Remote", style="dim", no_wrap=True)
tbl.add_column("Path", style="dim", overflow="fold")
for remote, path in skipped[:50]:
tbl.add_row(remote, path)
if len(skipped) > 50:
tbl.add_row("[dim]…[/]", f"[dim]+{len(skipped) - 50} more[/]")
console.print(tbl)
if variant_alerts:
console.print()
console.print(Panel(
f"[bold yellow]⚠ {len(variant_alerts)} variant alert(s) — manual review recommended[/]",
border_style="yellow"))
for bare_id, entries in sorted(variant_alerts.items()):
tbl = Table(title=f"[bold yellow][{bare_id}] — bare + variant coexist[/]",
title_justify="left", show_lines=False, border_style="yellow", expand=True)
tbl.add_column("ID", style="yellow", no_wrap=True)
tbl.add_column("Size", justify="right", no_wrap=True)
tbl.add_column("Path", overflow="fold")
for e in sorted(entries, key=lambda x: x.full_path):
eid = extract_id(Path(e.path).name) or e.jav_id
tbl.add_row(eid, human_size(e.size), e.full_path)
console.print(tbl)
console.print()
# ---------- plain renderers (--basic) ----------
def render_banner_plain(cache_meta: dict[str, dict], mode: str) -> str:
lines = [ansi(f"=== rc-jav ({mode}) ===", ANSI_BOLD)]
for r, m in cache_meta.items():
if m["cached"]:
tag = f"CACHED {m['age']}" + (" STALE" if m["stale"] else "")
tag_c = ansi(tag, ANSI_YELLOW if m["stale"] else ANSI_DIM)
else:
tag_c = ansi("FRESH SCAN", ANSI_GREEN)
count_str = ansi(f"({m['file_count']} files)", ANSI_DIM)
lines.append(f" {r} {tag_c} {count_str}")
return "\n".join(lines)
def render_search_plain(matches: dict[str, list[FileEntry]], queries: list[str],
cache_meta: dict[str, dict]) -> str:
lines: list[str] = []
if cache_meta:
lines.append(render_banner_plain(cache_meta, "search"))
lines.append("")
for q in queries:
hits = matches.get(q, [])
if not hits:
lines.append(ansi(f"[{q}] NOT FOUND", ANSI_RED))
lines.append("")
continue
lines.append(ansi(f"[{q}] {len(hits)} hit(s)", ANSI_GREEN + ANSI_BOLD))
for e in sorted(hits, key=lambda x: (x.jav_id, x.path.lower())):
meta = cache_meta.get(e.remote, {})
if meta.get("cached"):
tag = ansi("[CACHED-STALE]", ANSI_YELLOW) if meta.get("stale") else ansi("[CACHED]", ANSI_DIM)
else:
tag = ansi("[FRESH]", ANSI_GREEN)
src = ansi(e.source, ANSI_YELLOW)
lines.append(f" {src} {tag}")
lines.append(ansi(f" file: {Path(e.path).name}", ANSI_BOLD))
lines.append(f" size: {human_size(e.size)} ({e.size:,} bytes)")
lines.append(ansi(f" path: {e.full_path}", ANSI_DIM))
lines.append("")
return "\n".join(lines)
def render_dupes_plain(dupes, skipped, variant_alerts=None) -> str:
lines: list[str] = []
if not dupes:
lines.append(ansi("No duplicates found.", ANSI_GREEN))
else:
lines.append(ansi(f"Found {len(dupes)} duplicate ID group(s):", ANSI_BOLD))
lines.append("")
total_reclaim = 0
for jav_id in sorted(dupes):
entries = dupes[jav_id]
keep = decide_keep(entries)
lines.append(ansi(f"[{jav_id}]", ANSI_BOLD))
for e in sorted(entries, key=lambda x: (x.source != "Source", x.source == "Catalog", -x.size)):
if e.source == "Catalog":
mark = ansi("CATALOG ", ANSI_CYAN)
elif e is keep:
mark = ansi("KEEP ", ANSI_GREEN)
else:
mark = ansi("DELETE? ", ANSI_RED)
total_reclaim += e.size
src = ansi(f"{e.source:>8}", ANSI_YELLOW)
size_str = f"{human_size(e.size)} ({e.size:,} B)"
lines.append(f" {mark} {src} {size_str:>26} {e.full_path}")
lines.append("")
lines.append(ansi(f"Potential space reclaim if all DELETE? removed: {human_size(total_reclaim)}", ANSI_BOLD))
if skipped:
lines.append("")
lines.append(ansi(f"Skipped {len(skipped)} file(s) with no parseable ID:", ANSI_DIM))
for remote, path in skipped[:50]:
lines.append(ansi(f" {remote} {path}", ANSI_DIM))
if len(skipped) > 50:
lines.append(ansi(f" ... +{len(skipped) - 50} more", ANSI_DIM))
if variant_alerts:
lines.append("")
lines.append(ansi(f"{len(variant_alerts)} variant alert(s) — manual review required:", ANSI_YELLOW + ANSI_BOLD))
for bare_id, entries in sorted(variant_alerts.items()):
lines.append(ansi(f" [{bare_id}] bare + variant coexist", ANSI_YELLOW))
for e in sorted(entries, key=lambda x: x.full_path):
eid = extract_id(Path(e.path).name) or e.jav_id
lines.append(f" {ansi(eid, ANSI_YELLOW)} {human_size(e.size):>10} {e.full_path}")
return "\n".join(lines)
# ---------- file outputs ----------
def write_txt(path: Path, dupes, skipped):
path.write_text(render_dupes_plain(dupes, skipped), encoding="utf-8")
def write_csv(path: Path, dupes):
with path.open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
w.writerow(["jav_id", "action", "source", "remote", "path", "full_path",
"size_bytes", "size_human", "mod_time"])
for jav_id in sorted(dupes):
entries = dupes[jav_id]
keep = decide_keep(entries)
for e in entries:
if e.source == "Catalog":
action = "CATALOG"
elif e is keep:
action = "KEEP"
else:
action = "DELETE?"
w.writerow([jav_id, action, e.source,
e.remote, e.path, e.full_path, e.size, human_size(e.size), e.mod_time])
def describe_skipped_id(remote: str, path: str) -> dict[str, str]:
"""Explain a common reason a path did not yield an ID."""
name = Path((path or "").replace("\\", "/")).name
reason = "No supported JAV ID at filename start"
hint = "Rename with a leading ID such as ABC-123 or add an ID normalizer/site-specific source."
if re.match(r"^\[[A-Za-z0-9-]+-\d+\]", name):
reason = "ID is wrapped in leading brackets"
hint = "Remove the leading brackets so the filename starts with the ID."
elif re.match(r"^[A-Za-z][A-Za-z0-9]+[-―]\d+", name):
reason = "ID uses a non-ASCII dash"
hint = "Replace the separator with a normal hyphen."
elif re.match(r"^[A-Za-z][A-Za-z0-9]+\d+", name):
reason = "ID prefix and number have no hyphen"
hint = "Insert the ID hyphen, for example ABC-123."
return {"remote": remote, "path": path, "name": name, "reason": reason, "hint": hint}
def dupes_to_obj(dupes, skipped, variant_alerts=None) -> dict:
out = {"groups": {}, "skipped": [describe_skipped_id(r, p) for r, p in skipped],
"variant_alerts": []}
for jav_id in sorted(dupes):
entries = dupes[jav_id]
keep, keep_reason = decide_keep_with_reason(entries)
out["groups"][jav_id] = {
"keep": asdict(keep) | {"full_path": keep.full_path, "size_human": human_size(keep.size)},
"keep_reason": keep_reason,
"risks": describe_dupe_risks(jav_id, entries),
"delete_candidates": [asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size)}
for e in entries
if e is not keep and e.source != "Catalog"],
"catalog": [asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size)}
for e in entries if e.source == "Catalog"],
}
for bare_id, entries in sorted((variant_alerts or {}).items()):
out["variant_alerts"].append({
"bare_id": bare_id,
"files": [
asdict(e) | {"full_path": e.full_path, "size_human": human_size(e.size),
"detected_id": extract_id(Path(e.path).name) or e.jav_id}
for e in sorted(entries, key=lambda x: x.full_path)
],
})
return out
def write_json(path: Path, dupes, skipped, variant_alerts=None):
path.write_text(json.dumps(dupes_to_obj(dupes, skipped, variant_alerts), indent=2), encoding="utf-8")