diff --git a/rc-jav.py b/rc-jav.py index 8701e29..cf8d242 100644 --- a/rc-jav.py +++ b/rc-jav.py @@ -73,7 +73,25 @@ def human_size(n: int) -> str: return f"{nf:.2f} PiB" -RCLONE_BIN = "rclone" +from rcjav.rclone_io import ( + RCLONE_BIN, + DURATION_RE, + set_basic as _set_rclone_basic, + set_rclone_bin as _set_rclone_bin, + quick_search_remote, + choose_search_mode, + name_to_include_patterns, + name_match, + query_to_include_patterns, + remote_file_count, + parse_duration, + walk_remote, +) + + +# Mirror of rcjav.rclone_io.BASIC for in-tree readers that haven't been +# updated yet (output renderers, BasicProgress checks in main()). Set in +# main() via both this name and _set_rclone_basic(). BASIC = False # set by --basic USE_ANSI = True # disabled by --no-color @@ -188,13 +206,6 @@ from rcjav.dupes import ( CONFIG_PATH = Path(__file__).resolve().parent / "config.json" -# Written by the native-messaging host when the user clicks Cancel in the -# extension popup. walk_remote checks for it every CANCEL_CHECK_INTERVAL files -# and exits cleanly if found. -CANCEL_FLAG = Path(__file__).resolve().parent / "scan-cancel.flag" -CANCEL_CHECK_INTERVAL = 100 # check / emit progress every N files - - def load_config() -> dict: if not CONFIG_PATH.exists(): return {} @@ -213,265 +224,6 @@ def save_config(cfg: dict) -> None: os.replace(tmp, CONFIG_PATH) -# ---------- quick search (no cache) ---------- - -def quick_search_remote(remote: str, source_label: str, - patterns: list[str], - skipped: list[tuple[str, str]]) -> list[FileEntry]: - """Run `rclone lsjson --include ` once per pattern. Bypass cache.""" - out: list[FileEntry] = [] - seen: set[tuple[str, str]] = set() - for pat in patterns: - cmd = [RCLONE_BIN, "lsjson", remote, "--files-only", "-R", "--include", pat] - proc = subprocess.run(cmd, capture_output=True, text=True, - encoding="utf-8", errors="replace") - if proc.returncode != 0: - console.print(f"[red]rclone lsjson --include failed for {remote}:[/]\n{proc.stderr}") - sys.exit(proc.returncode) - for item in json.loads(proc.stdout or "[]"): - if item.get("IsDir"): - continue - path = item["Path"] - key = (remote, path) - if key in seen: - continue - seen.add(key) - jav_id = extract_id(Path(path).name) - if not jav_id: - skipped.append((remote, path)) - continue - out.append(FileEntry( - source=source_label, remote=remote, path=path, - size=int(item.get("Size", 0)), - mod_time=item.get("ModTime", ""), jav_id=jav_id, - )) - return out - - -def choose_search_mode(raw_queries: list[str], force_quick: bool, force_cache: bool) -> tuple[str, str]: - """Decide quick vs cached. Returns (mode, reason).""" - if force_quick and force_cache: - return ("cached", "both --quick and --cache passed; preferring --cache (safer)") - if force_quick: - return ("quick", "forced via --quick") - if force_cache: - return ("cached", "forced via --cache") - if len(raw_queries) > 1: - return ("cached", f"multi-query ({len(raw_queries)} IDs) — cache batches them for free") - if not raw_queries: - return ("cached", "no queries") - q = raw_queries[0] - if RANGE_RE.search(q): - return ("cached", "range [N-M] — too many rclone calls otherwise") - if "*" in q or "?" in q: - return ("cached", "wildcard — cache match semantics are more reliable") - return ("quick", "single exact ID — live lookup is fastest") - - -def _escape_rclone_glob(s: str) -> str: - """Escape rclone filter meta-chars so a literal token isn't interpreted as a - glob. rclone's filter syntax treats `*`, `?`, `[`, `{` specially; brackets - open a char-class that fails silently if the token contains `[` or `]`.""" - out = [] - for ch in s: - if ch in r"*?[]{}\\": - out.append("\\" + ch) - else: - out.append(ch) - return "".join(out) - - -def name_to_include_patterns(tokens: list[str]) -> list[str]: - """Build rclone --include globs for each name token (case-insensitive substring).""" - pats: list[str] = [] - for t in tokens: - if "*" in t or "?" in t: - # Caller-supplied wildcard — assume they meant it. - pats.append(t) - else: - # Literal substring search: escape glob meta inside the token so - # `--name "[BD]"` searches for the literal "[BD]" not a char class. - pats.append(f"*{_escape_rclone_glob(t)}*") - return pats - - -def name_match(stem: str, tokens: list[str]) -> bool: - """Case-insensitive: True if ANY token matches stem (substring or fnmatch glob).""" - low = stem.lower() - for t in tokens: - tl = t.lower() - if "*" in tl or "?" in tl: - if fnmatch.fnmatchcase(low, tl): - return True - elif tl in low: - return True - return False - - -def query_to_include_patterns(raw: str) -> list[str]: - """Turn a search query into one or more rclone --include globs. - Ranges expand to individual IDs; wildcards and exact IDs map to single glob.""" - if RANGE_RE.search(raw): - expanded = expand_range(raw) or [] - out: list[str] = [] - for e in expanded: - out.extend(query_to_include_patterns(e)) - return out - if "*" in raw or "?" in raw: - return [f"{raw}*"] - norm = normalize_id(raw) - if not norm: - return [f"{raw}*"] - prefix, _, digits = norm.rpartition("-") - if not digits.isdigit(): - return [f"{norm}*"] - n = int(digits) - width = max(3, len(str(n))) - return [f"{prefix}-{n:0{width}d}*"] - - -# ---------- rclone wrappers ---------- - -def remote_file_count(remote: str) -> int: - """Fast total file count via `rclone size --json`.""" - cmd = [RCLONE_BIN, "size", "--json", remote] - proc = subprocess.run(cmd, capture_output=True, text=True, - encoding="utf-8", errors="replace") - if proc.returncode != 0: - console.print(f"[red]rclone size failed for {remote}:[/]\n{proc.stderr}") - sys.exit(proc.returncode) - try: - return int(json.loads(proc.stdout).get("count", 0)) - except (json.JSONDecodeError, ValueError): - return 0 - - -DURATION_RE = re.compile(r"^\s*(\d+)\s*([smhd])\s*$", re.IGNORECASE) - - -def parse_duration(s: str) -> str | None: - """Validate a duration suffix (`30m`, `24h`, `7d`, `90s`). Returns the - normalized form rclone accepts, or None if invalid. We don't compute a - timedelta — we pass the suffix straight to rclone --max-age.""" - if not s: - return None - m = DURATION_RE.match(s) - if not m: - return None - return f"{m.group(1)}{m.group(2).lower()}" - - -def walk_remote(remote: str, source_label: str, - skipped: list[tuple[str, str]], - progress: Progress, task_id, - max_age: str | None = None, - _total_override: int | None = None) -> tuple[list[FileEntry], list[str]]: - """Stream files from rclone lsf, ticking progress per file. - If max_age is set, pass --max-age to rclone so only recently-modified files - are returned (incremental scan). - _total_override: skip the internal remote_file_count probe (caller already did it).""" - if max_age: - # Can't pre-count for an age-filtered walk — skip the size probe and - # let progress run on a synthetic total. - total = 0 - progress.update(task_id, total=1, - description=f"[cyan]{source_label}[/] {remote} (since {max_age})") - else: - if _total_override is not None: - total = _total_override - else: - total = remote_file_count(remote) - if BASIC: - # Caller already emitted SCAN_REMOTE_START (without total) — now we know it. - sys.stderr.write("SCAN_REMOTE_COUNTED " + json.dumps({ - "remote": remote, "total": total, - }) + "\n") - sys.stderr.flush() - progress.update(task_id, total=max(total, 1), - description=f"[cyan]{source_label}[/] {remote}") - cmd = [RCLONE_BIN, "lsf", "--files-only", "-R", - "--format", "pst", "--separator", "\t"] - if max_age: - cmd += ["--max-age", max_age] - cmd.append(remote) - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - text=True, encoding="utf-8", errors="replace") - entries: list[FileEntry] = [] - local_skipped: list[str] = [] - if proc.stdout is None: - raise RuntimeError("rclone stdout pipe unexpectedly None") - _stderr_chunks: list[str] = [] - _stderr_thread = threading.Thread( - target=lambda: _stderr_chunks.append(proc.stderr.read() if proc.stderr else ""), - daemon=True, - ) - _stderr_thread.start() - _cancelled = False - try: - for line in proc.stdout: - line = line.rstrip("\n").rstrip("\r") - if not line: - continue - parts = line.split("\t") - if len(parts) < 2: - continue - rel = parts[0] - try: - size = int(parts[1]) - except ValueError: - size = 0 - mod_time = parts[2] if len(parts) >= 3 else "" - jav_id = extract_id(Path(rel).name) - if not jav_id: - local_skipped.append(rel) - skipped.append((remote, rel)) - else: - entries.append(FileEntry( - source=source_label, remote=remote, path=rel, - size=size, mod_time=mod_time, jav_id=jav_id, - )) - progress.advance(task_id) - # Every CANCEL_CHECK_INTERVAL files: check cancel flag and emit progress. - n = len(entries) + len(local_skipped) - if BASIC and n > 0 and n % CANCEL_CHECK_INTERVAL == 0: - if CANCEL_FLAG.exists(): - try: - CANCEL_FLAG.unlink(missing_ok=True) - except OSError: - pass - proc.terminate() - try: - proc.wait(timeout=3) - except subprocess.TimeoutExpired: - proc.kill() - _cancelled = True - break - sys.stderr.write("SCAN_FILE_PROGRESS " + json.dumps({ - "remote": remote, "label": source_label, - "files": len(entries), "skipped": len(local_skipped), - "total": total, - }) + "\n") - sys.stderr.flush() - except KeyboardInterrupt: - proc.terminate() - try: - proc.wait(timeout=3) - except subprocess.TimeoutExpired: - proc.kill() - raise - if _cancelled: - sys.stderr.write("SCAN_CANCELLED\n") - sys.stderr.flush() - sys.exit(0) - proc.wait() - _stderr_thread.join() - if proc.returncode != 0: - err = _stderr_chunks[0] if _stderr_chunks else "" - console.print(f"[red]rclone lsf failed for {remote}:[/]\n{err}") - sys.exit(proc.returncode) - return entries, local_skipped - - def make_progress(): if BASIC: return BasicProgress() @@ -1188,9 +940,10 @@ def main(): "`--source cq:personal-files/ClearJAV --target cq:personal-files/JAV/TMP`.") args = ap.parse_args() - global RCLONE_BIN, console, BASIC, DEFAULT_SOURCE, DEFAULT_TARGET, DEFAULT_CATALOG - RCLONE_BIN = args.rclone_bin + global console, BASIC, DEFAULT_SOURCE, DEFAULT_TARGET, DEFAULT_CATALOG + _set_rclone_bin(args.rclone_bin) BASIC = args.basic or args.format == "json" + _set_rclone_basic(BASIC) # Apply persisted config overrides BEFORE defaults are consulted. cfg = load_config() diff --git a/rcjav/__init__.py b/rcjav/__init__.py index 72c7fa6..344e496 100644 --- a/rcjav/__init__.py +++ b/rcjav/__init__.py @@ -6,6 +6,22 @@ find at the top level. Adding a new submodule does not change the public surface — only this file does. """ from rcjav.model import FileEntry # noqa: F401 +from rcjav.rclone_io import ( # noqa: F401 + RCLONE_BIN, + CANCEL_FLAG, + CANCEL_CHECK_INTERVAL, + DURATION_RE, + set_basic, + set_rclone_bin, + quick_search_remote, + choose_search_mode, + name_to_include_patterns, + name_match, + query_to_include_patterns, + remote_file_count, + parse_duration, + walk_remote, +) from rcjav.catalog import ( # noqa: F401 CATALOG_COL_NAME, CATALOG_COL_PATH, diff --git a/rcjav/rclone_io.py b/rcjav/rclone_io.py new file mode 100644 index 0000000..8bd539f --- /dev/null +++ b/rcjav/rclone_io.py @@ -0,0 +1,298 @@ +"""rclone subprocess wrappers — listing, sizing, quick search. + +Owns RCLONE_BIN and the cancel-flag protocol used by the +native-messaging host to interrupt scans. Errors go to stderr (no +rich markup); rc-jav.py owns terminal styling. +""" +from __future__ import annotations + +import fnmatch +import json +import re +import subprocess +import sys +import threading +from pathlib import Path + +from rcjav.ids import RANGE_RE, expand_range, extract_id, normalize_id +from rcjav.model import FileEntry + + +RCLONE_BIN = "rclone" + +# Written by the native-messaging host when the user clicks Cancel in the +# extension popup. walk_remote checks for it every CANCEL_CHECK_INTERVAL files +# and exits cleanly if found. +CANCEL_FLAG = Path(__file__).resolve().parents[1] / "scan-cancel.flag" +CANCEL_CHECK_INTERVAL = 100 # check / emit progress every N files + +# Toggled from rc-jav.py main() when --basic is passed. Affects whether +# walk_remote emits machine-parseable progress lines on stderr. +BASIC = False + + +def set_basic(value: bool) -> None: + """Toggle plain/machine-readable progress output for rclone walks.""" + global BASIC + BASIC = bool(value) + + +def set_rclone_bin(path: str) -> None: + """Override the rclone binary (default 'rclone' on PATH).""" + global RCLONE_BIN + RCLONE_BIN = path or "rclone" + + +def _err(msg: str) -> None: + sys.stderr.write(msg + "\n") + + +def quick_search_remote(remote: str, source_label: str, + patterns: list[str], + skipped: list[tuple[str, str]]) -> list[FileEntry]: + """Run `rclone lsjson --include ` once per pattern. Bypass cache.""" + out: list[FileEntry] = [] + seen: set[tuple[str, str]] = set() + for pat in patterns: + cmd = [RCLONE_BIN, "lsjson", remote, "--files-only", "-R", "--include", pat] + proc = subprocess.run(cmd, capture_output=True, text=True, + encoding="utf-8", errors="replace") + if proc.returncode != 0: + _err(f"rclone lsjson --include failed for {remote}:\n{proc.stderr}") + sys.exit(proc.returncode) + for item in json.loads(proc.stdout or "[]"): + if item.get("IsDir"): + continue + path = item["Path"] + key = (remote, path) + if key in seen: + continue + seen.add(key) + jav_id = extract_id(Path(path).name) + if not jav_id: + skipped.append((remote, path)) + continue + out.append(FileEntry( + source=source_label, remote=remote, path=path, + size=int(item.get("Size", 0)), + mod_time=item.get("ModTime", ""), jav_id=jav_id, + )) + return out + + +def choose_search_mode(raw_queries: list[str], force_quick: bool, force_cache: bool) -> tuple[str, str]: + """Decide quick vs cached. Returns (mode, reason).""" + if force_quick and force_cache: + return ("cached", "both --quick and --cache passed; preferring --cache (safer)") + if force_quick: + return ("quick", "forced via --quick") + if force_cache: + return ("cached", "forced via --cache") + if len(raw_queries) > 1: + return ("cached", f"multi-query ({len(raw_queries)} IDs) — cache batches them for free") + if not raw_queries: + return ("cached", "no queries") + q = raw_queries[0] + if RANGE_RE.search(q): + return ("cached", "range [N-M] — too many rclone calls otherwise") + if "*" in q or "?" in q: + return ("cached", "wildcard — cache match semantics are more reliable") + return ("quick", "single exact ID — live lookup is fastest") + + +def _escape_rclone_glob(s: str) -> str: + """Escape rclone filter meta-chars so a literal token isn't interpreted as a + glob. rclone's filter syntax treats `*`, `?`, `[`, `{` specially; brackets + open a char-class that fails silently if the token contains `[` or `]`.""" + out = [] + for ch in s: + if ch in r"*?[]{}\\": + out.append("\\" + ch) + else: + out.append(ch) + return "".join(out) + + +def name_to_include_patterns(tokens: list[str]) -> list[str]: + """Build rclone --include globs for each name token (case-insensitive substring).""" + pats: list[str] = [] + for t in tokens: + if "*" in t or "?" in t: + pats.append(t) + else: + pats.append(f"*{_escape_rclone_glob(t)}*") + return pats + + +def name_match(stem: str, tokens: list[str]) -> bool: + """Case-insensitive: True if ANY token matches stem (substring or fnmatch glob).""" + low = stem.lower() + for t in tokens: + tl = t.lower() + if "*" in tl or "?" in tl: + if fnmatch.fnmatchcase(low, tl): + return True + elif tl in low: + return True + return False + + +def query_to_include_patterns(raw: str) -> list[str]: + """Turn a search query into one or more rclone --include globs. + Ranges expand to individual IDs; wildcards and exact IDs map to single glob.""" + if RANGE_RE.search(raw): + expanded = expand_range(raw) or [] + out: list[str] = [] + for e in expanded: + out.extend(query_to_include_patterns(e)) + return out + if "*" in raw or "?" in raw: + return [f"{raw}*"] + norm = normalize_id(raw) + if not norm: + return [f"{raw}*"] + prefix, _, digits = norm.rpartition("-") + if not digits.isdigit(): + return [f"{norm}*"] + n = int(digits) + width = max(3, len(str(n))) + return [f"{prefix}-{n:0{width}d}*"] + + +def remote_file_count(remote: str) -> int: + """Fast total file count via `rclone size --json`.""" + cmd = [RCLONE_BIN, "size", "--json", remote] + proc = subprocess.run(cmd, capture_output=True, text=True, + encoding="utf-8", errors="replace") + if proc.returncode != 0: + _err(f"rclone size failed for {remote}:\n{proc.stderr}") + sys.exit(proc.returncode) + try: + return int(json.loads(proc.stdout).get("count", 0)) + except (json.JSONDecodeError, ValueError): + return 0 + + +DURATION_RE = re.compile(r"^\s*(\d+)\s*([smhd])\s*$", re.IGNORECASE) + + +def parse_duration(s: str) -> str | None: + """Validate a duration suffix (`30m`, `24h`, `7d`, `90s`). Returns the + normalized form rclone accepts, or None if invalid. We don't compute a + timedelta — we pass the suffix straight to rclone --max-age.""" + if not s: + return None + m = DURATION_RE.match(s) + if not m: + return None + return f"{m.group(1)}{m.group(2).lower()}" + + +def walk_remote(remote: str, source_label: str, + skipped: list[tuple[str, str]], + progress, task_id, + max_age: str | None = None, + _total_override: int | None = None) -> tuple[list[FileEntry], list[str]]: + """Stream files from rclone lsf, ticking progress per file. + If max_age is set, pass --max-age to rclone so only recently-modified files + are returned (incremental scan). + _total_override: skip the internal remote_file_count probe (caller already did it). + + `progress` is a rich.Progress (or BasicProgress) instance owned by the caller. + """ + if max_age: + total = 0 + progress.update(task_id, total=1, + description=f"[cyan]{source_label}[/] {remote} (since {max_age})") + else: + if _total_override is not None: + total = _total_override + else: + total = remote_file_count(remote) + if BASIC: + sys.stderr.write("SCAN_REMOTE_COUNTED " + json.dumps({ + "remote": remote, "total": total, + }) + "\n") + sys.stderr.flush() + progress.update(task_id, total=max(total, 1), + description=f"[cyan]{source_label}[/] {remote}") + cmd = [RCLONE_BIN, "lsf", "--files-only", "-R", + "--format", "pst", "--separator", "\t"] + if max_age: + cmd += ["--max-age", max_age] + cmd.append(remote) + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + text=True, encoding="utf-8", errors="replace") + entries: list[FileEntry] = [] + local_skipped: list[str] = [] + if proc.stdout is None: + raise RuntimeError("rclone stdout pipe unexpectedly None") + _stderr_chunks: list[str] = [] + _stderr_thread = threading.Thread( + target=lambda: _stderr_chunks.append(proc.stderr.read() if proc.stderr else ""), + daemon=True, + ) + _stderr_thread.start() + _cancelled = False + try: + for line in proc.stdout: + line = line.rstrip("\n").rstrip("\r") + if not line: + continue + parts = line.split("\t") + if len(parts) < 2: + continue + rel = parts[0] + try: + size = int(parts[1]) + except ValueError: + size = 0 + mod_time = parts[2] if len(parts) >= 3 else "" + jav_id = extract_id(Path(rel).name) + if not jav_id: + local_skipped.append(rel) + skipped.append((remote, rel)) + else: + entries.append(FileEntry( + source=source_label, remote=remote, path=rel, + size=size, mod_time=mod_time, jav_id=jav_id, + )) + progress.advance(task_id) + n = len(entries) + len(local_skipped) + if BASIC and n > 0 and n % CANCEL_CHECK_INTERVAL == 0: + if CANCEL_FLAG.exists(): + try: + CANCEL_FLAG.unlink(missing_ok=True) + except OSError: + pass + proc.terminate() + try: + proc.wait(timeout=3) + except subprocess.TimeoutExpired: + proc.kill() + _cancelled = True + break + sys.stderr.write("SCAN_FILE_PROGRESS " + json.dumps({ + "remote": remote, "label": source_label, + "files": len(entries), "skipped": len(local_skipped), + "total": total, + }) + "\n") + sys.stderr.flush() + except KeyboardInterrupt: + proc.terminate() + try: + proc.wait(timeout=3) + except subprocess.TimeoutExpired: + proc.kill() + raise + if _cancelled: + sys.stderr.write("SCAN_CANCELLED\n") + sys.stderr.flush() + sys.exit(0) + proc.wait() + _stderr_thread.join() + if proc.returncode != 0: + err = _stderr_chunks[0] if _stderr_chunks else "" + _err(f"rclone lsf failed for {remote}:\n{err}") + sys.exit(proc.returncode) + return entries, local_skipped