f7fc15b17c
Includes: - cli.py path fix (parents[1]) for config/catalog resolution - Library cleanup feature design docs (TODO.md, mockup) - Audit + bug-queue markdowns from May 2026 reliability pass - .gitignore expanded for transient artifacts
317 lines
11 KiB
Python
317 lines
11 KiB
Python
"""rclone subprocess wrappers — listing, sizing, quick search.
|
|
|
|
Owns RCLONE_BIN and the cancel-flag protocol used by the
|
|
native-messaging host to interrupt scans. Errors go to stderr (no
|
|
rich markup); rc-jav.py owns terminal styling.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import fnmatch
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from rcjav.ids import RANGE_RE, expand_range, extract_id, normalize_id
|
|
from rcjav.model import FileEntry
|
|
|
|
|
|
RCLONE_BIN = "rclone"
|
|
|
|
# Written by the native-messaging host when the user clicks Cancel in the
|
|
# extension popup. walk_remote checks for it every CANCEL_CHECK_INTERVAL files
|
|
# and exits cleanly if found.
|
|
CANCEL_FLAG = Path(__file__).resolve().parents[1] / "scan-cancel.flag"
|
|
CANCEL_CHECK_INTERVAL = 25
|
|
PROGRESS_EMIT_MIN_FILES = 25
|
|
PROGRESS_EMIT_MIN_GAP_S = 0.25
|
|
PROGRESS_EMIT_MAX_GAP_S = 1.0
|
|
|
|
# Toggled from rc-jav.py main() when --basic is passed. Affects whether
|
|
# walk_remote emits machine-parseable progress lines on stderr.
|
|
BASIC = False
|
|
|
|
|
|
def set_basic(value: bool) -> None:
|
|
"""Toggle plain/machine-readable progress output for rclone walks."""
|
|
global BASIC
|
|
BASIC = bool(value)
|
|
|
|
|
|
def set_rclone_bin(path: str) -> None:
|
|
"""Override the rclone binary (default 'rclone' on PATH)."""
|
|
global RCLONE_BIN
|
|
RCLONE_BIN = path or "rclone"
|
|
|
|
|
|
def _err(msg: str) -> None:
|
|
sys.stderr.write(msg + "\n")
|
|
|
|
|
|
def quick_search_remote(remote: str, source_label: str,
|
|
patterns: list[str],
|
|
skipped: list[tuple[str, str]]) -> list[FileEntry]:
|
|
"""Run `rclone lsjson --include <pattern>` once per pattern. Bypass cache."""
|
|
out: list[FileEntry] = []
|
|
seen: set[tuple[str, str]] = set()
|
|
for pat in patterns:
|
|
cmd = [RCLONE_BIN, "lsjson", remote, "--files-only", "-R", "--include", pat]
|
|
proc = subprocess.run(cmd, capture_output=True, text=True,
|
|
encoding="utf-8", errors="replace")
|
|
if proc.returncode != 0:
|
|
_err(f"rclone lsjson --include failed for {remote}:\n{proc.stderr}")
|
|
sys.exit(proc.returncode)
|
|
for item in json.loads(proc.stdout or "[]"):
|
|
if item.get("IsDir"):
|
|
continue
|
|
path = item["Path"]
|
|
key = (remote, path)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
jav_id = extract_id(Path(path).name)
|
|
if not jav_id:
|
|
skipped.append((remote, path))
|
|
continue
|
|
out.append(FileEntry(
|
|
source=source_label, remote=remote, path=path,
|
|
size=int(item.get("Size", 0)),
|
|
mod_time=item.get("ModTime", ""), jav_id=jav_id,
|
|
))
|
|
return out
|
|
|
|
|
|
def choose_search_mode(raw_queries: list[str], force_quick: bool, force_cache: bool) -> tuple[str, str]:
|
|
"""Decide quick vs cached. Returns (mode, reason)."""
|
|
if force_quick and force_cache:
|
|
return ("cached", "both --quick and --cache passed; preferring --cache (safer)")
|
|
if force_quick:
|
|
return ("quick", "forced via --quick")
|
|
if force_cache:
|
|
return ("cached", "forced via --cache")
|
|
if len(raw_queries) > 1:
|
|
return ("cached", f"multi-query ({len(raw_queries)} IDs) — cache batches them for free")
|
|
if not raw_queries:
|
|
return ("cached", "no queries")
|
|
q = raw_queries[0]
|
|
if RANGE_RE.search(q):
|
|
return ("cached", "range [N-M] — too many rclone calls otherwise")
|
|
if "*" in q or "?" in q:
|
|
return ("cached", "wildcard — cache match semantics are more reliable")
|
|
return ("quick", "single exact ID — live lookup is fastest")
|
|
|
|
|
|
def _escape_rclone_glob(s: str) -> str:
|
|
"""Escape rclone filter meta-chars so a literal token isn't interpreted as a
|
|
glob. rclone's filter syntax treats `*`, `?`, `[`, `{` specially; brackets
|
|
open a char-class that fails silently if the token contains `[` or `]`."""
|
|
out = []
|
|
for ch in s:
|
|
if ch in r"*?[]{}\\":
|
|
out.append("\\" + ch)
|
|
else:
|
|
out.append(ch)
|
|
return "".join(out)
|
|
|
|
|
|
def name_to_include_patterns(tokens: list[str]) -> list[str]:
|
|
"""Build rclone --include globs for each name token (case-insensitive substring)."""
|
|
pats: list[str] = []
|
|
for t in tokens:
|
|
if "*" in t or "?" in t:
|
|
pats.append(t)
|
|
else:
|
|
pats.append(f"*{_escape_rclone_glob(t)}*")
|
|
return pats
|
|
|
|
|
|
def name_match(stem: str, tokens: list[str]) -> bool:
|
|
"""Case-insensitive: True if ANY token matches stem (substring or fnmatch glob)."""
|
|
low = stem.lower()
|
|
for t in tokens:
|
|
tl = t.lower()
|
|
if "*" in tl or "?" in tl:
|
|
if fnmatch.fnmatchcase(low, tl):
|
|
return True
|
|
elif tl in low:
|
|
return True
|
|
return False
|
|
|
|
|
|
def query_to_include_patterns(raw: str) -> list[str]:
|
|
"""Turn a search query into one or more rclone --include globs.
|
|
Ranges expand to individual IDs; wildcards and exact IDs map to single glob."""
|
|
if RANGE_RE.search(raw):
|
|
expanded = expand_range(raw) or []
|
|
out: list[str] = []
|
|
for e in expanded:
|
|
out.extend(query_to_include_patterns(e))
|
|
return out
|
|
if "*" in raw or "?" in raw:
|
|
return [f"{raw}*"]
|
|
norm = normalize_id(raw)
|
|
if not norm:
|
|
return [f"{raw}*"]
|
|
prefix, _, digits = norm.rpartition("-")
|
|
if not digits.isdigit():
|
|
return [f"{norm}*"]
|
|
n = int(digits)
|
|
width = max(3, len(str(n)))
|
|
return [f"{prefix}-{n:0{width}d}*"]
|
|
|
|
|
|
def remote_file_count(remote: str) -> int:
|
|
"""Fast total file count via `rclone size --json`."""
|
|
cmd = [RCLONE_BIN, "size", "--json", remote]
|
|
proc = subprocess.run(cmd, capture_output=True, text=True,
|
|
encoding="utf-8", errors="replace")
|
|
if proc.returncode != 0:
|
|
_err(f"rclone size failed for {remote}:\n{proc.stderr}")
|
|
sys.exit(proc.returncode)
|
|
try:
|
|
return int(json.loads(proc.stdout).get("count", 0))
|
|
except (json.JSONDecodeError, ValueError):
|
|
return 0
|
|
|
|
|
|
DURATION_RE = re.compile(r"^\s*(\d+)\s*([smhd])\s*$", re.IGNORECASE)
|
|
|
|
|
|
def parse_duration(s: str) -> str | None:
|
|
"""Validate a duration suffix (`30m`, `24h`, `7d`, `90s`). Returns the
|
|
normalized form rclone accepts, or None if invalid. We don't compute a
|
|
timedelta — we pass the suffix straight to rclone --max-age."""
|
|
if not s:
|
|
return None
|
|
m = DURATION_RE.match(s)
|
|
if not m:
|
|
return None
|
|
return f"{m.group(1)}{m.group(2).lower()}"
|
|
|
|
|
|
def walk_remote(remote: str, source_label: str,
|
|
skipped: list[tuple[str, str]],
|
|
progress, task_id,
|
|
max_age: str | None = None,
|
|
_total_override: int | None = None) -> tuple[list[FileEntry], list[str]]:
|
|
"""Stream files from rclone lsf, ticking progress per file.
|
|
If max_age is set, pass --max-age to rclone so only recently-modified files
|
|
are returned (incremental scan).
|
|
_total_override: skip the internal remote_file_count probe (caller already did it).
|
|
|
|
`progress` is a rich.Progress (or BasicProgress) instance owned by the caller.
|
|
"""
|
|
if max_age:
|
|
total = 0
|
|
progress.update(task_id, total=1,
|
|
description=f"[cyan]{source_label}[/] {remote} (since {max_age})")
|
|
else:
|
|
if _total_override is not None:
|
|
total = _total_override
|
|
else:
|
|
total = remote_file_count(remote)
|
|
if BASIC:
|
|
sys.stderr.write("SCAN_REMOTE_COUNTED " + json.dumps({
|
|
"remote": remote, "total": total,
|
|
}) + "\n")
|
|
sys.stderr.flush()
|
|
progress.update(task_id, total=max(total, 1),
|
|
description=f"[cyan]{source_label}[/] {remote}")
|
|
cmd = [RCLONE_BIN, "lsf", "--files-only", "-R",
|
|
"--format", "pst", "--separator", "\t"]
|
|
if max_age:
|
|
cmd += ["--max-age", max_age]
|
|
cmd.append(remote)
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
text=True, encoding="utf-8", errors="replace")
|
|
entries: list[FileEntry] = []
|
|
local_skipped: list[str] = []
|
|
if proc.stdout is None:
|
|
raise RuntimeError("rclone stdout pipe unexpectedly None")
|
|
_stderr_chunks: list[str] = []
|
|
_stderr_thread = threading.Thread(
|
|
target=lambda: _stderr_chunks.append(proc.stderr.read() if proc.stderr else ""),
|
|
daemon=True,
|
|
)
|
|
_stderr_thread.start()
|
|
_cancelled = False
|
|
last_emit_n = 0
|
|
last_emit_ts = time.monotonic()
|
|
try:
|
|
for line in proc.stdout:
|
|
line = line.rstrip("\n").rstrip("\r")
|
|
if not line:
|
|
continue
|
|
parts = line.split("\t")
|
|
if len(parts) < 2:
|
|
continue
|
|
rel = parts[0]
|
|
try:
|
|
size = int(parts[1])
|
|
except ValueError:
|
|
size = 0
|
|
mod_time = parts[2] if len(parts) >= 3 else ""
|
|
jav_id = extract_id(Path(rel).name)
|
|
if not jav_id:
|
|
local_skipped.append(rel)
|
|
skipped.append((remote, rel))
|
|
else:
|
|
entries.append(FileEntry(
|
|
source=source_label, remote=remote, path=rel,
|
|
size=size, mod_time=mod_time, jav_id=jav_id,
|
|
))
|
|
progress.advance(task_id)
|
|
n = len(entries) + len(local_skipped)
|
|
if BASIC and n > 0 and n % CANCEL_CHECK_INTERVAL == 0:
|
|
if CANCEL_FLAG.exists():
|
|
try:
|
|
CANCEL_FLAG.unlink(missing_ok=True)
|
|
except OSError:
|
|
pass
|
|
proc.terminate()
|
|
try:
|
|
proc.wait(timeout=3)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
_cancelled = True
|
|
break
|
|
if BASIC and n > 0:
|
|
now = time.monotonic()
|
|
files_since_emit = n - last_emit_n
|
|
elapsed_since_emit = now - last_emit_ts
|
|
should_emit_progress = (
|
|
files_since_emit >= PROGRESS_EMIT_MIN_FILES
|
|
and elapsed_since_emit >= PROGRESS_EMIT_MIN_GAP_S
|
|
) or elapsed_since_emit >= PROGRESS_EMIT_MAX_GAP_S
|
|
if not should_emit_progress:
|
|
continue
|
|
sys.stderr.write("SCAN_FILE_PROGRESS " + json.dumps({
|
|
"remote": remote, "label": source_label,
|
|
"files": len(entries), "skipped": len(local_skipped),
|
|
"total": total,
|
|
}) + "\n")
|
|
sys.stderr.flush()
|
|
last_emit_n = n
|
|
last_emit_ts = now
|
|
except KeyboardInterrupt:
|
|
proc.terminate()
|
|
try:
|
|
proc.wait(timeout=3)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
raise
|
|
if _cancelled:
|
|
sys.stderr.write("SCAN_CANCELLED\n")
|
|
sys.stderr.flush()
|
|
sys.exit(0)
|
|
proc.wait()
|
|
_stderr_thread.join()
|
|
if proc.returncode != 0:
|
|
err = _stderr_chunks[0] if _stderr_chunks else ""
|
|
_err(f"rclone lsf failed for {remote}:\n{err}")
|
|
sys.exit(proc.returncode)
|
|
return entries, local_skipped
|