Step 10f: extract rclone subprocess wrappers into rcjav/rclone_io.py

This commit is contained in:
admin
2026-05-22 21:53:36 +02:00
parent 41f7c80f1b
commit 90054e4d0b
3 changed files with 336 additions and 269 deletions
+22 -269
View File
@@ -73,7 +73,25 @@ def human_size(n: int) -> str:
return f"{nf:.2f} PiB"
RCLONE_BIN = "rclone"
from rcjav.rclone_io import (
RCLONE_BIN,
DURATION_RE,
set_basic as _set_rclone_basic,
set_rclone_bin as _set_rclone_bin,
quick_search_remote,
choose_search_mode,
name_to_include_patterns,
name_match,
query_to_include_patterns,
remote_file_count,
parse_duration,
walk_remote,
)
# Mirror of rcjav.rclone_io.BASIC for in-tree readers that haven't been
# updated yet (output renderers, BasicProgress checks in main()). Set in
# main() via both this name and _set_rclone_basic().
BASIC = False # set by --basic
USE_ANSI = True # disabled by --no-color
@@ -188,13 +206,6 @@ from rcjav.dupes import (
CONFIG_PATH = Path(__file__).resolve().parent / "config.json"
# Written by the native-messaging host when the user clicks Cancel in the
# extension popup. walk_remote checks for it every CANCEL_CHECK_INTERVAL files
# and exits cleanly if found.
CANCEL_FLAG = Path(__file__).resolve().parent / "scan-cancel.flag"
CANCEL_CHECK_INTERVAL = 100 # check / emit progress every N files
def load_config() -> dict:
if not CONFIG_PATH.exists():
return {}
@@ -213,265 +224,6 @@ def save_config(cfg: dict) -> None:
os.replace(tmp, CONFIG_PATH)
# ---------- quick search (no cache) ----------
def quick_search_remote(remote: str, source_label: str,
patterns: list[str],
skipped: list[tuple[str, str]]) -> list[FileEntry]:
"""Run `rclone lsjson --include <pattern>` once per pattern. Bypass cache."""
out: list[FileEntry] = []
seen: set[tuple[str, str]] = set()
for pat in patterns:
cmd = [RCLONE_BIN, "lsjson", remote, "--files-only", "-R", "--include", pat]
proc = subprocess.run(cmd, capture_output=True, text=True,
encoding="utf-8", errors="replace")
if proc.returncode != 0:
console.print(f"[red]rclone lsjson --include failed for {remote}:[/]\n{proc.stderr}")
sys.exit(proc.returncode)
for item in json.loads(proc.stdout or "[]"):
if item.get("IsDir"):
continue
path = item["Path"]
key = (remote, path)
if key in seen:
continue
seen.add(key)
jav_id = extract_id(Path(path).name)
if not jav_id:
skipped.append((remote, path))
continue
out.append(FileEntry(
source=source_label, remote=remote, path=path,
size=int(item.get("Size", 0)),
mod_time=item.get("ModTime", ""), jav_id=jav_id,
))
return out
def choose_search_mode(raw_queries: list[str], force_quick: bool, force_cache: bool) -> tuple[str, str]:
"""Decide quick vs cached. Returns (mode, reason)."""
if force_quick and force_cache:
return ("cached", "both --quick and --cache passed; preferring --cache (safer)")
if force_quick:
return ("quick", "forced via --quick")
if force_cache:
return ("cached", "forced via --cache")
if len(raw_queries) > 1:
return ("cached", f"multi-query ({len(raw_queries)} IDs) — cache batches them for free")
if not raw_queries:
return ("cached", "no queries")
q = raw_queries[0]
if RANGE_RE.search(q):
return ("cached", "range [N-M] — too many rclone calls otherwise")
if "*" in q or "?" in q:
return ("cached", "wildcard — cache match semantics are more reliable")
return ("quick", "single exact ID — live lookup is fastest")
def _escape_rclone_glob(s: str) -> str:
"""Escape rclone filter meta-chars so a literal token isn't interpreted as a
glob. rclone's filter syntax treats `*`, `?`, `[`, `{` specially; brackets
open a char-class that fails silently if the token contains `[` or `]`."""
out = []
for ch in s:
if ch in r"*?[]{}\\":
out.append("\\" + ch)
else:
out.append(ch)
return "".join(out)
def name_to_include_patterns(tokens: list[str]) -> list[str]:
"""Build rclone --include globs for each name token (case-insensitive substring)."""
pats: list[str] = []
for t in tokens:
if "*" in t or "?" in t:
# Caller-supplied wildcard — assume they meant it.
pats.append(t)
else:
# Literal substring search: escape glob meta inside the token so
# `--name "[BD]"` searches for the literal "[BD]" not a char class.
pats.append(f"*{_escape_rclone_glob(t)}*")
return pats
def name_match(stem: str, tokens: list[str]) -> bool:
"""Case-insensitive: True if ANY token matches stem (substring or fnmatch glob)."""
low = stem.lower()
for t in tokens:
tl = t.lower()
if "*" in tl or "?" in tl:
if fnmatch.fnmatchcase(low, tl):
return True
elif tl in low:
return True
return False
def query_to_include_patterns(raw: str) -> list[str]:
"""Turn a search query into one or more rclone --include globs.
Ranges expand to individual IDs; wildcards and exact IDs map to single glob."""
if RANGE_RE.search(raw):
expanded = expand_range(raw) or []
out: list[str] = []
for e in expanded:
out.extend(query_to_include_patterns(e))
return out
if "*" in raw or "?" in raw:
return [f"{raw}*"]
norm = normalize_id(raw)
if not norm:
return [f"{raw}*"]
prefix, _, digits = norm.rpartition("-")
if not digits.isdigit():
return [f"{norm}*"]
n = int(digits)
width = max(3, len(str(n)))
return [f"{prefix}-{n:0{width}d}*"]
# ---------- rclone wrappers ----------
def remote_file_count(remote: str) -> int:
"""Fast total file count via `rclone size --json`."""
cmd = [RCLONE_BIN, "size", "--json", remote]
proc = subprocess.run(cmd, capture_output=True, text=True,
encoding="utf-8", errors="replace")
if proc.returncode != 0:
console.print(f"[red]rclone size failed for {remote}:[/]\n{proc.stderr}")
sys.exit(proc.returncode)
try:
return int(json.loads(proc.stdout).get("count", 0))
except (json.JSONDecodeError, ValueError):
return 0
DURATION_RE = re.compile(r"^\s*(\d+)\s*([smhd])\s*$", re.IGNORECASE)
def parse_duration(s: str) -> str | None:
"""Validate a duration suffix (`30m`, `24h`, `7d`, `90s`). Returns the
normalized form rclone accepts, or None if invalid. We don't compute a
timedelta — we pass the suffix straight to rclone --max-age."""
if not s:
return None
m = DURATION_RE.match(s)
if not m:
return None
return f"{m.group(1)}{m.group(2).lower()}"
def walk_remote(remote: str, source_label: str,
skipped: list[tuple[str, str]],
progress: Progress, task_id,
max_age: str | None = None,
_total_override: int | None = None) -> tuple[list[FileEntry], list[str]]:
"""Stream files from rclone lsf, ticking progress per file.
If max_age is set, pass --max-age to rclone so only recently-modified files
are returned (incremental scan).
_total_override: skip the internal remote_file_count probe (caller already did it)."""
if max_age:
# Can't pre-count for an age-filtered walk — skip the size probe and
# let progress run on a synthetic total.
total = 0
progress.update(task_id, total=1,
description=f"[cyan]{source_label}[/] {remote} (since {max_age})")
else:
if _total_override is not None:
total = _total_override
else:
total = remote_file_count(remote)
if BASIC:
# Caller already emitted SCAN_REMOTE_START (without total) — now we know it.
sys.stderr.write("SCAN_REMOTE_COUNTED " + json.dumps({
"remote": remote, "total": total,
}) + "\n")
sys.stderr.flush()
progress.update(task_id, total=max(total, 1),
description=f"[cyan]{source_label}[/] {remote}")
cmd = [RCLONE_BIN, "lsf", "--files-only", "-R",
"--format", "pst", "--separator", "\t"]
if max_age:
cmd += ["--max-age", max_age]
cmd.append(remote)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, encoding="utf-8", errors="replace")
entries: list[FileEntry] = []
local_skipped: list[str] = []
if proc.stdout is None:
raise RuntimeError("rclone stdout pipe unexpectedly None")
_stderr_chunks: list[str] = []
_stderr_thread = threading.Thread(
target=lambda: _stderr_chunks.append(proc.stderr.read() if proc.stderr else ""),
daemon=True,
)
_stderr_thread.start()
_cancelled = False
try:
for line in proc.stdout:
line = line.rstrip("\n").rstrip("\r")
if not line:
continue
parts = line.split("\t")
if len(parts) < 2:
continue
rel = parts[0]
try:
size = int(parts[1])
except ValueError:
size = 0
mod_time = parts[2] if len(parts) >= 3 else ""
jav_id = extract_id(Path(rel).name)
if not jav_id:
local_skipped.append(rel)
skipped.append((remote, rel))
else:
entries.append(FileEntry(
source=source_label, remote=remote, path=rel,
size=size, mod_time=mod_time, jav_id=jav_id,
))
progress.advance(task_id)
# Every CANCEL_CHECK_INTERVAL files: check cancel flag and emit progress.
n = len(entries) + len(local_skipped)
if BASIC and n > 0 and n % CANCEL_CHECK_INTERVAL == 0:
if CANCEL_FLAG.exists():
try:
CANCEL_FLAG.unlink(missing_ok=True)
except OSError:
pass
proc.terminate()
try:
proc.wait(timeout=3)
except subprocess.TimeoutExpired:
proc.kill()
_cancelled = True
break
sys.stderr.write("SCAN_FILE_PROGRESS " + json.dumps({
"remote": remote, "label": source_label,
"files": len(entries), "skipped": len(local_skipped),
"total": total,
}) + "\n")
sys.stderr.flush()
except KeyboardInterrupt:
proc.terminate()
try:
proc.wait(timeout=3)
except subprocess.TimeoutExpired:
proc.kill()
raise
if _cancelled:
sys.stderr.write("SCAN_CANCELLED\n")
sys.stderr.flush()
sys.exit(0)
proc.wait()
_stderr_thread.join()
if proc.returncode != 0:
err = _stderr_chunks[0] if _stderr_chunks else ""
console.print(f"[red]rclone lsf failed for {remote}:[/]\n{err}")
sys.exit(proc.returncode)
return entries, local_skipped
def make_progress():
if BASIC:
return BasicProgress()
@@ -1188,9 +940,10 @@ def main():
"`--source cq:personal-files/ClearJAV --target cq:personal-files/JAV/TMP`.")
args = ap.parse_args()
global RCLONE_BIN, console, BASIC, DEFAULT_SOURCE, DEFAULT_TARGET, DEFAULT_CATALOG
RCLONE_BIN = args.rclone_bin
global console, BASIC, DEFAULT_SOURCE, DEFAULT_TARGET, DEFAULT_CATALOG
_set_rclone_bin(args.rclone_bin)
BASIC = args.basic or args.format == "json"
_set_rclone_basic(BASIC)
# Apply persisted config overrides BEFORE defaults are consulted.
cfg = load_config()