#!/usr/bin/env python3 """Native messaging host for rclonex Brave extension. Reads length-prefixed JSON messages on stdin, dispatches to rc-jav.py, writes length-prefixed JSON responses on stdout. Long-lived: Brave keeps this process alive for the lifetime of the extension's connectNative() port. """ from __future__ import annotations import json import os import re import struct import subprocess import sys import threading import time import traceback from datetime import datetime from pathlib import Path # Native messaging requires raw binary stdio. Without this, Windows translates # CRLF on stdout (corrupting the 4-byte length prefix) and may buffer. if os.name == "nt": import msvcrt msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY) msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) # ----- config ----- RC_JAV = Path(r"D:\DEV\Project\rclone-jav\rc-jav.py") PYTHON = "python" # or absolute path to python.exe HOST_DIR = Path(__file__).resolve().parent LOG_DIR = HOST_DIR / "logs" STATE_DIR = HOST_DIR / "state" LOG_DIR.mkdir(exist_ok=True) STATE_DIR.mkdir(exist_ok=True) LOG_FILE = LOG_DIR / "rcjav-host.log" EVENTS_LOG = LOG_DIR / "rcjav-host-events.log" DELETE_LOG = LOG_DIR / "deletes.log" EVENTS_MAX_BYTES = 5 * 1024 * 1024 # 5 MiB — rotate by truncating oldest half SCAN_STATE_FILE = STATE_DIR / "scan-state.json" VERSION = "0.1.0" # Scan background thread state _scan_lock = threading.Lock() _scan_thread: threading.Thread | None = None _scan_script_path: Path | None = None # set by handle_scan; used by handle_scan_cancel _scan_proc: subprocess.Popen | None = None _scan_cancel_requested = False HIT_RE = re.compile(r"\[(?P[^\]]+)\] (?P\d+) hit\(s\)") MISS_RE = re.compile(r"\[(?P[^\]]+)\] NOT FOUND") PRIMARY_ID_RE = re.compile(r"^([A-Za-z]+)-(\d+)") FALLBACK_ID_RE = re.compile(r"^([A-Za-z0-9]+)-(\d+)") COMPOUND_ID_RE = re.compile(r"^([A-Za-z0-9]+(?:-[A-Za-z0-9]+)+)-(\d+)") HOST_RANGE_RE = re.compile(r"\[(\d+)-(\d+)\]") _cache_lock = threading.Lock() _cache_mem: dict[str, dict] = {} def log(msg: str) -> None: try: with LOG_FILE.open("a", encoding="utf-8") as f: f.write(msg + "\n") except OSError: pass def log_event(action: str, **kwargs) -> None: """Append a structured JSON-lines event to rcjav-host-events.log. Rotates by dropping the oldest half when the file exceeds EVENTS_MAX_BYTES.""" entry = {"ts": datetime.now().isoformat(), "action": action, **kwargs} line = json.dumps(entry, ensure_ascii=False) + "\n" try: size = EVENTS_LOG.stat().st_size if EVENTS_LOG.exists() else 0 if size >= EVENTS_MAX_BYTES: try: raw = EVENTS_LOG.read_bytes() mid = len(raw) // 2 # Seek forward to the next newline boundary so we don't split a JSON line. nl = raw.find(b"\n", mid) raw = raw[nl + 1:] if nl >= 0 else raw[mid:] EVENTS_LOG.write_bytes(raw) except OSError: pass with EVENTS_LOG.open("a", encoding="utf-8") as f: f.write(line) except OSError: pass # ----- stdio framing ----- def read_message(): raw_len = sys.stdin.buffer.read(4) if len(raw_len) < 4: return None (msg_len,) = struct.unpack("extension messages at 1 MiB. Stay below. MAX_RESPONSE_BYTES = 900_000 def _shrink_response(obj: dict) -> dict: """If the JSON payload would exceed Chrome's native-messaging cap, drop the heaviest optional fields and add a `truncated` flag so the extension can report it. Last resort: cap `structured` to the first N entries.""" data = json.dumps(obj).encode("utf-8") if len(data) <= MAX_RESPONSE_BYTES: return obj shrunk = dict(obj) truncated_reason = [] for field in ("raw_json", "stdout", "stderr"): if field in shrunk and shrunk.get(field): shrunk[field] = None truncated_reason.append(f"dropped {field}") if len(json.dumps(shrunk).encode("utf-8")) <= MAX_RESPONSE_BYTES: shrunk["truncated"] = True shrunk["truncated_reason"] = "; ".join(truncated_reason) return shrunk structured = shrunk.get("structured") or [] # Set the truncated metadata before sizing so the budget includes it. shrunk["truncated"] = True shrunk["truncated_reason"] = "; ".join(truncated_reason + ["structured TBD"]) if structured: lo, hi = 0, len(structured) while lo < hi: mid = (lo + hi + 1) // 2 shrunk["structured"] = structured[:mid] if len(json.dumps(shrunk).encode("utf-8")) <= MAX_RESPONSE_BYTES: lo = mid else: hi = mid - 1 shrunk["structured"] = structured[:lo] truncated_reason.append(f"structured {len(structured)} -> {lo}") shrunk["truncated_reason"] = "; ".join(truncated_reason) return shrunk def write_message(obj: dict) -> None: obj = _shrink_response(obj) data = json.dumps(obj).encode("utf-8") sys.stdout.buffer.write(struct.pack(" dict | None: """Find the last top-level JSON object in `text`. Tolerates leading status lines printed by rc-jav, pretty-printed multi-line JSON, and trailing noise.""" if not text: return None last = None i = 0 n = len(text) while i < n: j = text.find("{", i) if j < 0: break try: obj, end = _JSON_DECODER.raw_decode(text, j) except json.JSONDecodeError: i = j + 1 continue if isinstance(obj, dict): last = obj i = end return last # ----- rc-jav invocation ----- def resolve_rcjav(rcjav_path: str | None) -> Path: """Accept either a folder (auto-append rc-jav.py) or a full .py path.""" if not rcjav_path: return RC_JAV p = Path(rcjav_path) if p.is_dir() or p.suffix.lower() != ".py": p = p / "rc-jav.py" return p def run_rcjav(args: list[str], timeout: int = 120, extra_flags: list[str] | None = None, rcjav_path: str | None = None, stdin_data: str | None = None) -> tuple[int, str, str]: flags = extra_flags if extra_flags is not None else ["--basic", "--no-color"] script = resolve_rcjav(rcjav_path) # Reject up-front instead of resolving a relative path against the host's # CWD (which is whatever Brave hands it — usually its install dir) and # surfacing a confusing "file not found" from subprocess. if not script.is_absolute() or not script.exists(): return 2, "", ( f"rc-jav.py not found at {script!s} — set the 'rc-jav path' in the " f"extension options to an absolute path that points to rc-jav.py " f"or its parent folder." ) cmd = [PYTHON, str(script), *args, *flags] # Hide console window on Windows creationflags = 0 if os.name == "nt": creationflags = 0x08000000 # CREATE_NO_WINDOW try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, encoding="utf-8", errors="replace", creationflags=creationflags, input=stdin_data, ) return proc.returncode, proc.stdout, proc.stderr except subprocess.TimeoutExpired: return 124, "", "timeout after %ds" % timeout except Exception as e: return 1, "", f"spawn error: {e}" # Memoize rules-info per script path so handle_cache_status doesn't pay # the Python startup cost on every poll. Invalidate on rcjav_path change. _RULES_INFO_CACHE: dict[str, dict] = {} def fetch_rules_info(rcjav_path: str | None) -> dict: """Get cache contract constants + current rules signature from rc-jav.py. Returns {ok, cache_schema, id_rules, id_rules_signature} on success, or {ok: False, error: ...} when the lookup fails. Memoized per script path; the result is cached for the lifetime of this host process. """ script = resolve_rcjav(rcjav_path) key = str(script) cached = _RULES_INFO_CACHE.get(key) if cached is not None: return cached rc, stdout, stderr = run_rcjav( ["--print-rules-info"], extra_flags=[], # NB: omit --basic / --no-color, --print-rules-info already JSON rcjav_path=rcjav_path, timeout=30, ) if rc != 0: info = {"ok": False, "error": (stderr or stdout or "unknown").strip()} else: try: info = json.loads(stdout.strip()) except json.JSONDecodeError as e: info = {"ok": False, "error": f"invalid rules-info JSON: {e}"} _RULES_INFO_CACHE[key] = info return info def part_pattern_args(payload: dict) -> list[str]: args: list[str] = [] for pattern in payload.get("part_patterns") or []: if isinstance(pattern, str) and pattern.strip(): args += ["--part-pattern", pattern.strip()] return args def parse_hits(stdout: str) -> tuple[int, list[str]]: """Sum 'N hit(s)' lines, collect IDs.""" total = 0 ids: list[str] = [] for m in HIT_RE.finditer(stdout): total += int(m.group("count")) ids.append(m.group("id")) return total, ids def host_human_size(n: int) -> str: nf = float(n or 0) for unit in ("B", "KiB", "MiB", "GiB", "TiB"): if nf < 1024: return f"{int(nf)} B" if unit == "B" else f"{nf:.2f} {unit}" nf /= 1024 return f"{nf:.2f} PiB" def host_normalize_id(raw: str) -> str | None: stem = Path(str(raw) + ".x").stem m = PRIMARY_ID_RE.match(stem) or COMPOUND_ID_RE.match(stem) or FALLBACK_ID_RE.match(stem) if not m: return None num = int(m.group(2)) width = max(3, len(m.group(2))) prefix = m.group(1).upper() if prefix == "FC2": prefix = "FC2-PPV" return f"{prefix}-{num:0{width}d}" def _load_rcjav_config(script: Path) -> dict: cfg_path = script.parent / "config.json" if not cfg_path.exists(): return {} try: return json.loads(cfg_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return {} def _cache_age_label(scanned_at: str, stale_hours: float = 24) -> tuple[str, bool]: try: dt = datetime.fromisoformat(str(scanned_at).replace("Z", "+00:00")) now = datetime.now(dt.tzinfo) if dt.tzinfo else datetime.now() hours = (now - dt).total_seconds() / 3600.0 except Exception: return "?", False if hours < 1: age = f"{int(hours * 60)}m" elif hours < 24: age = f"{hours:.1f}h" else: age = f"{hours / 24:.1f}d" return age, hours > stale_hours def _load_host_cache(script: Path) -> tuple[dict | None, dict]: cache_path = script.parent / "cache.json" timings: dict[str, int | str | bool | None] = {"host_cache_path": str(cache_path)} if not cache_path.exists(): timings["host_cache_error"] = "cache.json not found" return None, timings try: stat = cache_path.stat() except OSError as e: timings["host_cache_error"] = str(e) return None, timings key = str(cache_path.resolve()) stamp = (stat.st_mtime_ns, stat.st_size) with _cache_lock: cached = _cache_mem.get(key) if cached and cached.get("stamp") == stamp: timings["cache_load_ms"] = 0 timings["host_cache_reused"] = True return cached["data"], timings t0 = time.perf_counter() try: data = json.loads(cache_path.read_text(encoding="utf-8")) except Exception as e: timings["cache_load_ms"] = round((time.perf_counter() - t0) * 1000) timings["host_cache_error"] = str(e) return None, timings timings["cache_load_ms"] = round((time.perf_counter() - t0) * 1000) timings["host_cache_reused"] = False with _cache_lock: _cache_mem[key] = {"stamp": stamp, "data": data} return data, timings def _entry_to_hit(source: str, remote: str, item: dict, match_trace: dict | None = None) -> dict: path = item.get("path", "") sep = "" if remote.endswith("/") or not path else "/" size = int(item.get("size") or 0) hit = { "source": source, "remote": remote, "path": path, "full_path": f"{remote}{sep}{path}", "size": size, "size_human": host_human_size(size), "mod_time": item.get("mod_time", ""), "jav_id": item.get("jav_id", ""), } if match_trace: hit.update(match_trace) return hit def _no_match_state(search_mode: str, cache_meta: dict) -> dict: meta = list((cache_meta or {}).values()) missing = [m for m in meta if m.get("age") == "missing"] stale = [m for m in meta if m.get("stale")] if search_mode == "quick": return { "no_match_kind": "live_miss", "no_match_title": "Live search found no library hit", "no_match_detail": "The ID was extracted and searched against rclone directly.", } if missing: return { "no_match_kind": "cache_missing", "no_match_title": "Cache search has missing roots", "no_match_detail": "At least one requested root is not present in cache.json. Rebuild or refresh that cache root.", } if stale: return { "no_match_kind": "cache_stale_miss", "no_match_title": "Stale cache found no library hit", "no_match_detail": "The ID was searched in cache, but one or more cache roots are stale.", } return { "no_match_kind": "cache_miss", "no_match_title": "Cache search found no library hit", "no_match_detail": "The ID was extracted and searched in the current cache.", } def handle_cached_search_fast(payload: dict) -> dict | None: """Answer simple cached ID searches in-process, avoiding per-query Python startup. Returns None when the regular rc-jav CLI path should handle the request.""" query = payload.get("id") or payload.get("query") if not query or payload.get("name") or payload.get("quick"): return None raw = str(query) if "*" in raw or "?" in raw or HOST_RANGE_RE.search(raw): return None norm = host_normalize_id(raw) if not norm: return None t0 = time.perf_counter() script = resolve_rcjav(payload.get("rcjav_path")) cache, timings = _load_host_cache(script) if not isinstance(cache, dict) or not isinstance(cache.get("remotes"), dict): return None cfg = _load_rcjav_config(script) source_roots = [r for r in (payload.get("source_override") or []) if isinstance(r, str)] target_roots = [r for r in (payload.get("target_override") or []) if isinstance(r, str)] if not source_roots and not target_roots: target_roots = cfg.get("default_target") or [] if not source_roots and not target_roots: return None wanted: list[tuple[str, str]] = [("Source", r) for r in source_roots] + [("Target", r) for r in target_roots] cache_meta: dict[str, dict] = {} hits: list[dict] = [] index_start = time.perf_counter() for source, remote in wanted: entry = cache["remotes"].get(remote) if not isinstance(entry, dict): cache_meta[remote] = {"cached": False, "age": "missing", "stale": True, "file_count": 0} continue files = entry.get("files", []) or [] age, stale = _cache_age_label(entry.get("scanned_at", ""), _stale_hours(payload)) cache_meta[remote] = {"cached": True, "age": age, "stale": stale, "file_count": len(files)} for item in files: jid = item.get("jav_id", "") if jid == norm or str(jid).startswith(norm + "#part"): is_part = str(jid).startswith(norm + "#part") normalized = str(raw).upper() != norm.upper() hits.append(_entry_to_hit(source, remote, item, { "match_kind": "part" if is_part else "normalized" if normalized else "exact", "match_reason": "Base ID + part" if is_part else "Normalized ID" if normalized else "Exact ID", "match_confidence": "related" if is_part else "normalized" if normalized else "high", "matched_query": norm, "matched_id": jid, })) timings["cache_match_ms"] = round((time.perf_counter() - index_start) * 1000) hits.sort(key=lambda h: (h.get("jav_id", ""), h.get("path", "").lower())) host_ms = round((time.perf_counter() - t0) * 1000) timings.update({ "host_cached_ms": host_ms, "host_rcjav_ms": 0, "cli_elapsed_ms": None, }) result = { "ok": True, "rc": 0 if hits else 1, "hits": len(hits), "found": bool(hits), "search_mode": "cached", "structured": hits, "cache_meta": cache_meta, "scanned_remotes": list(cache_meta.keys()), "timings": timings, "stderr": "", "id": query, "name": None, "message": f"{len(hits)} hit(s)" if hits else "NOT FOUND", "fast_path": "host-cache", } if not hits: result.update(_no_match_state("cached", cache_meta)) return result # ----- dispatch ----- def handle_search(payload: dict) -> dict: query = payload.get("id") or payload.get("query") name = payload.get("name") quick = bool(payload.get("quick")) fast = handle_cached_search_fast(payload) if fast is not None: return fast args: list[str] = [] if query: args += ["--search", str(query)] if name: args += ["--name", str(name)] if quick: args.append("--quick") args += part_pattern_args(payload) if not args: return {"ok": False, "error": "no id or name"} # Optional profile overrides: --source / --target lists from the active profile. for r in payload.get("source_override") or []: if r and isinstance(r, str): args += ["--source", r] for r in payload.get("target_override") or []: if r and isinstance(r, str): args += ["--target", r] # Always ask for JSON output — needed for per-hit Delete UI in the extension. # Also run --basic to suppress rich on stderr. t0 = time.perf_counter() rc, out, err = run_rcjav(args, extra_flags=["--basic", "--no-color", "--format", "json"], rcjav_path=payload.get("rcjav_path")) rcjav_ms = round((time.perf_counter() - t0) * 1000) parsed = extract_json_blob(out) hits = 0 structured: list[dict] = [] cache_meta = {} if parsed: for q in parsed.get("queries", []): for h in q.get("hits", []): structured.append(h) structured.extend(parsed.get("name_matches", [])) cache_meta = parsed.get("cache_meta", {}) or {} hits = len(structured) else: # Fallback: text count (also covers --basic non-JSON output) hits_count_only, _ = parse_hits(out) hits = hits_count_only # If rc-jav failed and produced no parseable result, surface as an error # instead of an empty "NOT FOUND" — otherwise host crashes look like misses. if rc != 0 and parsed is None and hits == 0: return { "ok": False, "rc": rc, "error": (err or out or "rc-jav exited non-zero").strip()[:1000], "id": query, "name": name, } result = { "ok": True, "rc": rc, "hits": hits, "found": hits > 0, "search_mode": "quick" if quick else "cached", "structured": structured, "cache_meta": cache_meta, "scanned_remotes": list(cache_meta.keys()), "timings": { "host_rcjav_ms": rcjav_ms, "cli_elapsed_ms": round(float(parsed.get("elapsed_sec", 0)) * 1000) if parsed else None, **(parsed.get("timings", {}) if parsed else {}), }, # Drop full stdout from the success payload — it duplicates `structured` # (the parsed JSON blob) and can blow Chrome's 1 MiB message cap. "stderr": (err or "")[:2000], "id": query, "name": name, "message": f"{hits} hit(s)" if hits > 0 else "NOT FOUND", } if hits == 0: result.update(_no_match_state("quick" if quick else "cached", cache_meta)) return result def handle_bulk_search(payload: dict) -> dict: queries = [ str(q).strip() for q in (payload.get("queries") or []) if isinstance(q, str) and str(q).strip() ][:250] if not queries: return {"ok": False, "error": "bulk search needs one or more IDs"} quick = bool(payload.get("quick")) args: list[str] = [] for query in queries: args += ["--search", query] if quick: args.append("--quick") args += part_pattern_args(payload) for r in payload.get("source_override") or []: if r and isinstance(r, str): args += ["--source", r] for r in payload.get("target_override") or []: if r and isinstance(r, str): args += ["--target", r] t0 = time.perf_counter() rc, out, err = run_rcjav( args, extra_flags=["--basic", "--no-color", "--format", "json"], rcjav_path=payload.get("rcjav_path"), ) rcjav_ms = round((time.perf_counter() - t0) * 1000) parsed = extract_json_blob(out) if parsed is None: return {"ok": False, "rc": rc, "error": (err or out or "bulk search failed").strip()[:1000]} cache_meta = parsed.get("cache_meta", {}) or {} grouped = [] total_hits = 0 search_mode = "quick" if quick else "cached" for item in parsed.get("queries", []): hits = item.get("hits", []) or [] total_hits += len(hits) row = {"query": item.get("query", ""), "hits": len(hits), "structured": hits} if not hits: row.update(_no_match_state(search_mode, cache_meta)) grouped.append(row) return { "ok": True, "rc": rc, "search_mode": search_mode, "queries": grouped, "query_count": len(grouped), "hits": total_hits, "cache_meta": cache_meta, "scanned_remotes": list(cache_meta.keys()), "timings": { "host_rcjav_ms": rcjav_ms, "cli_elapsed_ms": round(float(parsed.get("elapsed_sec", 0)) * 1000), **(parsed.get("timings", {}) or {}), }, "stderr": (err or "")[:2000], } def handle_dupe_review(payload: dict) -> dict: script = resolve_rcjav(payload.get("rcjav_path")) cfg = _load_rcjav_config(script) source_roots = [r for r in (payload.get("source_override") or []) if isinstance(r, str)] target_roots = [r for r in (payload.get("target_override") or []) if isinstance(r, str)] if not source_roots and not target_roots: target_roots = cfg.get("default_target") or [] args = ["--cache"] args += part_pattern_args(payload) for root in source_roots: args += ["--source", root] for root in target_roots: args += ["--target", root] if not source_roots and not target_roots: return {"ok": False, "error": "no source or target roots configured for duplicate review"} t0 = time.perf_counter() rc, out, err = run_rcjav( args, extra_flags=["--basic", "--no-color", "--format", "json"], rcjav_path=payload.get("rcjav_path"), ) elapsed = round((time.perf_counter() - t0) * 1000) parsed = extract_json_blob(out) if parsed is None: return {"ok": False, "rc": rc, "error": (err or out or "duplicate review failed").strip()[:1000]} groups = parsed.get("groups", {}) or {} reclaim = sum( int(item.get("size") or 0) for group in groups.values() for item in group.get("delete_candidates", []) or [] ) return { "ok": True, "groups": groups, "group_count": len(groups), "variant_alerts": parsed.get("variant_alerts", []) or [], "roots": {"source": source_roots, "target": target_roots}, "potential_reclaim": reclaim, "potential_reclaim_human": host_human_size(reclaim), "timings": {"host_rcjav_ms": elapsed}, "stderr": (err or "")[:2000], } def handle_library_issues(payload: dict) -> dict: """Return non-canonical filenames from cache (bracket-wrapped IDs, no-hyphen IDs).""" script = resolve_rcjav(payload.get("rcjav_path")) t0 = time.perf_counter() rc, out, err = run_rcjav( ["--library-issues"], extra_flags=["--basic", "--no-color", "--format", "json"], rcjav_path=payload.get("rcjav_path"), ) elapsed = round((time.perf_counter() - t0) * 1000) parsed = extract_json_blob(out) if parsed is None: return {"ok": False, "rc": rc, "error": (err or out or "library-issues failed").strip()[:1000]} return { "ok": True, "bracket_names": parsed.get("bracket_names", []), "nohyphen_names": parsed.get("nohyphen_names", []), "timings": {"host_rcjav_ms": elapsed}, } def _patch_cache_remove_paths(rcjav_path: str | None, full_paths: list[str]) -> None: """Remove file entries from cache.json after successful deletion.""" try: script = resolve_rcjav(rcjav_path) cache_path = script.parent / "cache.json" if not cache_path.exists(): return cache = json.loads(cache_path.read_text(encoding="utf-8")) # Build set of (remote, rel_path) to remove to_remove: set[tuple[str, str]] = set() for full_path in full_paths: for remote in cache.get("remotes", {}): sep = "" if remote.endswith("/") else "/" prefix = remote + sep if full_path.startswith(prefix): to_remove.add((remote, full_path[len(prefix):])) break if not to_remove: return modified = False for remote, remote_data in cache.get("remotes", {}).items(): before = len(remote_data.get("files", [])) remote_data["files"] = [ f for f in remote_data.get("files", []) if (remote, f["path"]) not in to_remove ] if len(remote_data["files"]) != before: modified = True if modified: tmp = cache_path.with_suffix(cache_path.suffix + ".tmp") tmp.write_text(json.dumps(cache, indent=2), encoding="utf-8") os.replace(tmp, cache_path) except Exception as e: log(f"_patch_cache_remove_paths error: {e}") def handle_delete_batch(payload: dict) -> dict: """Delete multiple files respecting existing delete settings, patch cache once.""" paths = payload.get("paths") or [] if not isinstance(paths, list) or not paths: return {"ok": False, "error": "paths must be a non-empty array"} base = { "mode": payload.get("mode", "trash"), "trash_dir": payload.get("trash_dir", ""), "allowed_remotes": payload.get("allowed_remotes", []), "rcjav_path": payload.get("rcjav_path", ""), } results = [] deleted_paths = [] for path in paths: r = handle_delete({**base, "path": path}) results.append({"path": path, **r}) if r.get("ok"): deleted_paths.append(path) if deleted_paths: _patch_cache_remove_paths(payload.get("rcjav_path"), deleted_paths) ok_count = sum(1 for r in results if r.get("ok")) return { "ok": ok_count > 0, "results": results, "deleted_count": ok_count, "failed_count": len(results) - ok_count, } def handle_rename_file(payload: dict) -> dict: """Rename one file in an rclone remote and patch cache. Collision-safe.""" remote = (payload.get("remote") or "").strip() old_path = (payload.get("old_path") or "").strip() new_path = (payload.get("new_path") or "").strip() if not remote or not old_path or not new_path: return {"ok": False, "error": "remote, old_path, and new_path are required"} if old_path == new_path: return {"ok": False, "error": "old_path and new_path are identical"} t0 = time.perf_counter() rc, out, err = run_rcjav( ["--rename-file", "--remote", remote, "--old-path", old_path, "--new-path", new_path], extra_flags=["--basic", "--no-color"], rcjav_path=payload.get("rcjav_path"), ) elapsed = round((time.perf_counter() - t0) * 1000) parsed = extract_json_blob(out) if parsed is None: return {"ok": False, "rc": rc, "error": (err or out or "rename failed").strip()[:1000]} return {**parsed, "timings": {"host_rcjav_ms": elapsed}} def handle_rename_files_batch(payload: dict) -> dict: """Rename multiple files in one call — single cache write at the end.""" renames = payload.get("renames") or [] if not isinstance(renames, list) or not renames: return {"ok": False, "error": "renames must be a non-empty array"} t0 = time.perf_counter() rc, out, err = run_rcjav( ["--rename-files-batch"], extra_flags=["--basic", "--no-color"], rcjav_path=payload.get("rcjav_path"), stdin_data=json.dumps(renames), ) elapsed = round((time.perf_counter() - t0) * 1000) parsed = extract_json_blob(out) if parsed is None: return {"ok": False, "rc": rc, "error": (err or out or "batch rename failed").strip()[:1000]} return {**parsed, "timings": {"host_rcjav_ms": elapsed}} def _remote_root(p: str) -> str: """Return the `remote:` prefix of an rclone path. For a local path, return ''.""" if ":" not in p: return "" return p.split(":", 1)[0] + ":" def _normalize_rclone_prefix(p: str) -> str: """Normalize an allowed rclone path prefix without broadening it to the remote root.""" p = (p or "").strip().replace("\\", "/").rstrip("/") return p if ":" in p else "" def _path_in_allowed_prefixes(path: str, prefixes: list[str]) -> bool: path_norm = path.replace("\\", "/") for prefix in prefixes: if path_norm == prefix or path_norm.startswith(prefix + "/"): return True return False def handle_delete(payload: dict) -> dict: """Delete or move-to-trash a single file via rclone.""" path = payload.get("path", "").strip() mode = payload.get("mode", "trash") # 'trash' or 'permanent' trash_dir = payload.get("trash_dir", "").strip() allowed_prefixes = [ p for p in (_normalize_rclone_prefix(r) for r in payload.get("allowed_remotes", [])) if p ] # Derive additional allowlist from rc-jav's config.json (default_target + # default_source) so the host is self-protective even if the extension forgets # to pass allowed_remotes. Plus the trash_dir itself. script = resolve_rcjav(payload.get("rcjav_path")) cfg_path = script.parent / "config.json" if cfg_path.exists(): try: cfg = json.loads(cfg_path.read_text(encoding="utf-8")) for key in ("default_target", "default_source"): for r in cfg.get(key, []) or []: prefix = _normalize_rclone_prefix(r) if prefix and prefix not in allowed_prefixes: allowed_prefixes.append(prefix) except (OSError, json.JSONDecodeError): pass if trash_dir: prefix = _normalize_rclone_prefix(trash_dir) if prefix and prefix not in allowed_prefixes: allowed_prefixes.append(prefix) if not path: return {"ok": False, "error": "no path"} # Reject directory-shaped paths — `rclone moveto` on a trailing-slash path # would move a whole tree and produce a destination with empty leaf. if path.endswith("/") or path.endswith("\\"): return {"ok": False, "error": "path looks like a directory (trailing slash)"} # Sanity: must look like an rclone remote path (contains ':') or a local file if ":" not in path and not Path(path).exists(): return {"ok": False, "error": "path doesn't look like an rclone remote or local file"} # Allowlist: refuse to delete anything outside the configured library paths. # Defaults derive from rc-jav config.json + trash_dir above. # If still empty, no config was found — refuse rather than fail-open. if not allowed_prefixes: return {"ok": False, "error": "no allowed prefixes configured; refusing to delete"} root = _remote_root(path) if not root or not _path_in_allowed_prefixes(path, allowed_prefixes): return { "ok": False, "error": f"path {path!r} not in allowed prefixes={allowed_prefixes}", } audit = DELETE_LOG log_entry = {"ts": __import__("datetime").datetime.now().isoformat(), "path": path, "mode": mode, "trash_dir": trash_dir} if mode == "permanent": cmd = [RCLONE_BIN, "deletefile", path] elif mode == "trash": if not trash_dir: return {"ok": False, "error": "trash_dir required when mode=trash"} # rclone moveto src dst — preserves filename if dst is dir-like, but for a single # file we should give it a full destination path. from datetime import date date_prefix = date.today().isoformat() leaf = path.split("/")[-1] dst = f"{trash_dir.rstrip('/')}/{date_prefix}/{leaf}" cmd = [RCLONE_BIN, "moveto", path, dst] log_entry["dst"] = dst else: return {"ok": False, "error": f"unknown mode {mode!r}"} creationflags = 0x08000000 if os.name == "nt" else 0 try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=300, encoding="utf-8", errors="replace", creationflags=creationflags) log_entry["rc"] = proc.returncode log_entry["stderr"] = proc.stderr.strip()[:500] except Exception as e: log_entry["error"] = str(e) try: with audit.open("a", encoding="utf-8") as f: f.write(json.dumps(log_entry) + "\n") except OSError: pass return {"ok": False, "error": str(e)} try: with audit.open("a", encoding="utf-8") as f: f.write(json.dumps(log_entry) + "\n") except OSError: pass return { "ok": proc.returncode == 0, "rc": proc.returncode, "stdout": proc.stdout, "stderr": proc.stderr, "path": path, "mode": mode, "dst": log_entry.get("dst"), } def handle_recent_deletes(payload: dict) -> dict: """Return the last N successful trash-mode deletes, newest first, so the extension can offer Undo. Permanent deletes are excluded — they can't be undone.""" limit = max(1, min(int(payload.get("limit") or 20), 200)) audit = DELETE_LOG if not audit.exists(): return {"ok": True, "entries": []} # Read tail. deletes.log is line-delimited JSON; small enough to read in full # in practice, but cap defensively at 1 MiB so a runaway log doesn't OOM us. try: raw = audit.read_text(encoding="utf-8", errors="replace") if len(raw) > 1_000_000: raw = raw[-1_000_000:] except OSError as e: return {"ok": False, "error": f"can't read deletes.log: {e}"} out: list[dict] = [] for line in raw.splitlines(): line = line.strip() if not line: continue try: obj = json.loads(line) except json.JSONDecodeError: continue # Skip undo records and permanent deletes — neither is undoable. if obj.get("kind") == "undo": continue if obj.get("mode") != "trash": continue if obj.get("rc") != 0: continue if not obj.get("dst"): continue out.append({ "ts": obj.get("ts"), "path": obj.get("path"), "dst": obj.get("dst"), "trash_dir": obj.get("trash_dir"), }) # Build the set of dst paths already undone so we hide them. undone_dst = set() for line in raw.splitlines(): try: obj = json.loads(line) except (json.JSONDecodeError, ValueError): continue if obj.get("kind") == "undo" and obj.get("rc") == 0 and obj.get("dst"): undone_dst.add(obj["dst"]) out = [e for e in out if e["dst"] not in undone_dst] out.reverse() # newest first return {"ok": True, "entries": out[:limit]} def handle_undo_delete(payload: dict) -> dict: """Move a file from trash back to its original path via `rclone moveto`. The extension passes the {dst, path} pair captured in the delete audit log; we re-validate both ends against the configured allowlist before acting.""" dst = (payload.get("dst") or "").strip() orig = (payload.get("path") or "").strip() if not dst or not orig: return {"ok": False, "error": "undo needs both dst (trash location) and path (original)"} # Reuse the same allowlist derivation handle_delete uses. allowed_prefixes = [ p for p in (_normalize_rclone_prefix(r) for r in payload.get("allowed_remotes", [])) if p ] script = resolve_rcjav(payload.get("rcjav_path")) cfg_path = script.parent / "config.json" if cfg_path.exists(): try: cfg = json.loads(cfg_path.read_text(encoding="utf-8")) for key in ("default_target", "default_source"): for r in cfg.get(key, []) or []: prefix = _normalize_rclone_prefix(r) if prefix and prefix not in allowed_prefixes: allowed_prefixes.append(prefix) except (OSError, json.JSONDecodeError): pass trash_dir = (payload.get("trash_dir") or "").strip() if trash_dir: prefix = _normalize_rclone_prefix(trash_dir) if prefix and prefix not in allowed_prefixes: allowed_prefixes.append(prefix) if not allowed_prefixes: return {"ok": False, "error": "no allowed prefixes configured; refusing to undo"} for end, label in ((dst, "dst"), (orig, "path")): if end.endswith("/") or end.endswith("\\"): return {"ok": False, "error": f"undo {label} looks like a directory"} if ":" not in end: return {"ok": False, "error": f"undo {label} not an rclone path"} if not _path_in_allowed_prefixes(end, allowed_prefixes): return {"ok": False, "error": f"undo {label} {end!r} not in allowed prefixes"} creationflags = 0x08000000 if os.name == "nt" else 0 cmd = [RCLONE_BIN, "moveto", dst, orig] audit = DELETE_LOG log_entry = { "ts": datetime.now().isoformat(), "kind": "undo", "path": orig, "dst": dst, "trash_dir": trash_dir, } try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=300, encoding="utf-8", errors="replace", creationflags=creationflags) log_entry["rc"] = proc.returncode log_entry["stderr"] = (proc.stderr or "").strip()[:500] except Exception as e: log_entry["error"] = str(e) try: with audit.open("a", encoding="utf-8") as f: f.write(json.dumps(log_entry) + "\n") except OSError: pass return {"ok": False, "error": str(e)} try: with audit.open("a", encoding="utf-8") as f: f.write(json.dumps(log_entry) + "\n") except OSError: pass return { "ok": proc.returncode == 0, "rc": proc.returncode, "stderr": proc.stderr, "path": orig, "dst": dst, } # rclone binary path — assumes on PATH. Override via env if needed. RCLONE_BIN = "rclone" def handle_ping(payload: dict) -> dict: override = payload.get("rcjav_path", "") effective = resolve_rcjav(override) return { "ok": True, "version": VERSION, "rc_jav": str(effective), "rc_jav_exists": effective.exists(), "rc_jav_default": str(RC_JAV), "rc_jav_overridden": bool(override), "python": PYTHON, } def handle_diagnostics(payload: dict) -> dict: """Run a battery of environment checks. Returns ordered list of {name, status, detail}.""" import shutil checks: list[dict] = [] def add(name, status, detail=""): checks.append({"name": name, "status": status, "detail": str(detail)}) # Python add("Python interpreter", "ok", sys.version.split()[0] + " — " + sys.executable) # rc-jav script override = payload.get("rcjav_path", "") script = resolve_rcjav(override) add("Host version", "ok", VERSION) add("rc-jav path source", "ok", "extension override: " + override if override else f"host default: {RC_JAV}") if script.exists(): add("rc-jav.py", "ok", str(script)) else: add("rc-jav.py", "fail", f"not found at {script}") ext_settings = payload.get("extension_settings") or {} if isinstance(ext_settings, dict) and ext_settings: mode = "quick/live" if ext_settings.get("quick_mode") else "cache" profile = ext_settings.get("active_profile") or "config.json defaults" profile_count = ext_settings.get("profile_count", 0) triggers = [] if ext_settings.get("auto_every_page"): triggers.append("every page") if ext_settings.get("auto_known_sites"): triggers.append("known sites") add("Extension settings", "ok", f"mode={mode}; profile={profile}; profiles={profile_count}; auto={', '.join(triggers) or 'off'}") # rc-jav can run --help if script.exists(): try: cf = 0x08000000 if os.name == "nt" else 0 r = subprocess.run([PYTHON, str(script), "--help"], capture_output=True, text=True, timeout=10, encoding="utf-8", errors="replace", creationflags=cf) if r.returncode == 0: add("rc-jav --help", "ok", "exits 0") else: add("rc-jav --help", "fail", f"exit {r.returncode}: {r.stderr.strip()[:200]}") except Exception as e: add("rc-jav --help", "fail", str(e)) else: add("rc-jav --help", "warn", "skipped (script missing)") # rich library present? try: if script.exists(): cf = 0x08000000 if os.name == "nt" else 0 r = subprocess.run([PYTHON, "-c", "import rich; print(rich.__file__)"], capture_output=True, text=True, timeout=10, encoding="utf-8", errors="replace", creationflags=cf) if r.returncode == 0: add("rich (Python pkg)", "ok", r.stdout.strip()) else: add("rich (Python pkg)", "fail", "import failed — run: pip install rich") except Exception as e: add("rich (Python pkg)", "fail", str(e)) # rclone on PATH rclone = shutil.which("rclone") or shutil.which("rclone.exe") if rclone: try: cf = 0x08000000 if os.name == "nt" else 0 r = subprocess.run([rclone, "version"], capture_output=True, text=True, timeout=10, encoding="utf-8", errors="replace", creationflags=cf) first = r.stdout.splitlines()[0] if r.stdout else "?" add("rclone binary", "ok", f"{rclone} — {first}") except Exception as e: add("rclone binary", "warn", f"found at {rclone} but version check failed: {e}") else: add("rclone binary", "fail", "not on PATH") # Brave config — rclone listremotes if rclone: try: cf = 0x08000000 if os.name == "nt" else 0 r = subprocess.run([rclone, "listremotes"], capture_output=True, text=True, timeout=10, encoding="utf-8", errors="replace", creationflags=cf) if r.returncode == 0: remotes = [s.strip() for s in r.stdout.splitlines() if s.strip()] add("rclone remotes", "ok" if remotes else "warn", ", ".join(remotes) if remotes else "no remotes configured") else: add("rclone remotes", "fail", r.stderr.strip()[:200]) except Exception as e: add("rclone remotes", "fail", str(e)) # rc-jav config.json if script.exists(): cfg = script.parent / "config.json" if cfg.exists(): try: data = json.loads(cfg.read_text(encoding="utf-8")) tgt = data.get("default_target", []) src = data.get("default_source", []) add("rc-jav config.json", "ok", f"source={src or '(empty)'} target={tgt or '(empty)'}") except Exception as e: add("rc-jav config.json", "fail", str(e)) else: add("rc-jav config.json", "warn", f"not present at {cfg}") # cache.json status cache = script.parent / "cache.json" if cache.exists(): try: data = json.loads(cache.read_text(encoding="utf-8")) rmap = data.get("remotes", {}) summary = [f"{r}={len(v.get('files', []))} files" for r, v in rmap.items()] add("rc-jav cache.json", "ok", "; ".join(summary) if summary else "empty") except Exception as e: add("rc-jav cache.json", "warn", str(e)) else: add("rc-jav cache.json", "warn", "no cache yet (run --scan to build)") try: cache_status = handle_cache_status(payload) warnings = cache_status.get("warnings") or [] stale = [r["remote"] for r in cache_status.get("remotes", []) if r.get("stale")] if warnings: add("Cache health", "warn", "; ".join((w.get("message") or w.get("code") or "?") for w in warnings[:3])) elif stale: add("Cache health", "warn", "stale remotes: " + ", ".join(stale)) elif cache_status.get("cache_exists"): add("Cache health", "ok", "no mismatch warnings") except Exception as e: add("Cache health", "warn", str(e)) # WinCatalog folder wc = script.parent / "wincatalog" if wc.exists() and wc.is_dir(): files = [f.name for f in wc.iterdir() if f.suffix.lower() in (".csv", ".xml")] add("WinCatalog folder", "ok" if files else "info", f"{wc} — {len(files)} CSV/XML file(s)" if files else f"optional: no CSV/XML files in {wc}") else: add("WinCatalog folder", "info", f"optional: not present at {wc}") return {"ok": True, "checks": checks} REGISTRY_HOST_KEYS = [ (r"HKLM\Software\BraveSoftware\Brave-Browser\NativeMessagingHosts\com.rcjav.host", "HKEY_LOCAL_MACHINE", r"Software\BraveSoftware\Brave-Browser\NativeMessagingHosts\com.rcjav.host"), (r"HKLM\Software\Google\Chrome\NativeMessagingHosts\com.rcjav.host", "HKEY_LOCAL_MACHINE", r"Software\Google\Chrome\NativeMessagingHosts\com.rcjav.host"), (r"HKLM\Software\WOW6432Node\Google\Chrome\NativeMessagingHosts\com.rcjav.host", "HKEY_LOCAL_MACHINE", r"Software\WOW6432Node\Google\Chrome\NativeMessagingHosts\com.rcjav.host"), (r"HKLM\Software\Chromium\NativeMessagingHosts\com.rcjav.host", "HKEY_LOCAL_MACHINE", r"Software\Chromium\NativeMessagingHosts\com.rcjav.host"), (r"HKCU\Software\BraveSoftware\Brave-Browser\NativeMessagingHosts\com.rcjav.host", "HKEY_CURRENT_USER", r"Software\BraveSoftware\Brave-Browser\NativeMessagingHosts\com.rcjav.host"), (r"HKCU\Software\Google\Chrome\NativeMessagingHosts\com.rcjav.host", "HKEY_CURRENT_USER", r"Software\Google\Chrome\NativeMessagingHosts\com.rcjav.host"), (r"HKCU\Software\Chromium\NativeMessagingHosts\com.rcjav.host", "HKEY_CURRENT_USER", r"Software\Chromium\NativeMessagingHosts\com.rcjav.host"), ] def _registry_host_entries() -> list[dict]: if os.name != "nt": return [] try: import winreg except Exception: return [{"key": "registry", "status": "fail", "detail": "winreg unavailable"}] hives = { "HKEY_LOCAL_MACHINE": winreg.HKEY_LOCAL_MACHINE, "HKEY_CURRENT_USER": winreg.HKEY_CURRENT_USER, } entries = [] for label, hive_name, subkey in REGISTRY_HOST_KEYS: try: with winreg.OpenKey(hives[hive_name], subkey) as key: value, _ = winreg.QueryValueEx(key, None) entries.append({"key": label, "status": "ok", "manifest": str(value)}) except FileNotFoundError: entries.append({"key": label, "status": "missing", "manifest": ""}) except PermissionError as e: entries.append({"key": label, "status": "warn", "manifest": "", "detail": str(e)}) except OSError as e: entries.append({"key": label, "status": "warn", "manifest": "", "detail": str(e)}) return entries def handle_host_status(payload: dict) -> dict: """Check native-messaging registration without writing registry or files.""" extension_id = (payload.get("extension_id") or "").strip() expected_origin = f"chrome-extension://{extension_id}/" if extension_id else "" host_dir = Path(__file__).resolve().parent manifest_path = host_dir / "com.rcjav.host.json" install_script = host_dir / "install-host.ps1" batch_path = host_dir / "rcjav-host.bat" checks: list[dict] = [] def add(name, status, detail=""): checks.append({"name": name, "status": status, "detail": str(detail)}) if manifest_path.exists(): add("Manifest file", "ok", str(manifest_path)) else: add("Manifest file", "fail", f"not found at {manifest_path}") manifest = {} if manifest_path.exists(): try: manifest = json.loads(manifest_path.read_text(encoding="utf-8-sig")) add("Manifest JSON", "ok", "valid JSON") except Exception as e: add("Manifest JSON", "fail", str(e)) manifest_host_path = manifest.get("path", "") if isinstance(manifest, dict) else "" if manifest_host_path: hp = Path(manifest_host_path) status = "ok" if hp.exists() else "fail" add("Manifest host path", status, manifest_host_path) else: add("Manifest host path", "fail", "missing path") origins = manifest.get("allowed_origins", []) if isinstance(manifest, dict) else [] if expected_origin: add( "Extension origin", "ok" if expected_origin in origins else "fail", expected_origin if expected_origin in origins else f"expected {expected_origin}; manifest has {origins}", ) else: add("Extension origin", "warn", "extension id not supplied") registry_entries = _registry_host_entries() matching_registry = [ e for e in registry_entries if e.get("manifest") and Path(e["manifest"]).resolve() == manifest_path.resolve() ] registered = [e for e in registry_entries if e.get("status") == "ok"] if matching_registry: add("Registry registration", "ok", f"{len(matching_registry)} matching registry entr{'y' if len(matching_registry) == 1 else 'ies'}\n" + "\n".join(e["key"] for e in matching_registry)) elif registered: add("Registry registration", "fail", "registered path differs: " + "; ".join(f"{e['key']} -> {e['manifest']}" for e in registered)) else: add("Registry registration", "fail" if os.name == "nt" else "warn", "no matching native host registry entries found") if batch_path.exists(): add("Host launcher", "ok", str(batch_path)) else: add("Host launcher", "fail", str(batch_path)) has_problem = any(c.get("status") == "fail" for c in checks) add("Repair command", "warn" if has_problem else "info", f"pwsh -ExecutionPolicy Bypass -File \"{install_script}\" -ExtensionId {extension_id or ''}") return { "ok": True, "checks": checks, "registry": registry_entries, "manifest_path": str(manifest_path), "install_script": str(install_script), } def handle_host_repair(payload: dict) -> dict: """Repair the current host manifest and user-level registry entries. This is callable only when Brave can already launch this host. Blocked forbidden/not-found states still need the external install script because no native host process exists for the extension to call. """ extension_id = (payload.get("extension_id") or "").strip() if not extension_id: return {"ok": False, "error": "extension id is required"} host_dir = Path(__file__).resolve().parent batch_path = host_dir / "rcjav-host.bat" manifest_path = host_dir / "com.rcjav.host.json" if not batch_path.exists(): return {"ok": False, "error": f"host launcher not found at {batch_path}"} manifest = { "name": "com.rcjav.host", "description": "rclonex native messaging host (rc-jav bridge)", "path": str(batch_path), "type": "stdio", "allowed_origins": [f"chrome-extension://{extension_id}/"], } try: manifest_path.write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8") except OSError as e: return {"ok": False, "error": f"failed to write manifest: {e}"} registrations = [] if os.name == "nt": try: import winreg hkcu_keys = [subkey for _, hive_name, subkey in REGISTRY_HOST_KEYS if hive_name == "HKEY_CURRENT_USER"] for subkey in hkcu_keys: try: with winreg.CreateKey(winreg.HKEY_CURRENT_USER, subkey) as key: winreg.SetValueEx(key, None, 0, winreg.REG_SZ, str(manifest_path)) registrations.append({"key": "HKCU\\" + subkey, "status": "ok"}) except OSError as e: registrations.append({"key": "HKCU\\" + subkey, "status": "fail", "detail": str(e)}) except Exception as e: registrations.append({"key": "HKCU registry", "status": "fail", "detail": str(e)}) verify = handle_host_status({**payload, "extension_id": extension_id}) failed = [r for r in registrations if r.get("status") != "ok"] return { "ok": not failed, "manifest_path": str(manifest_path), "registrations": registrations, "verification": verify, "restart_required": True, "message": "manifest and user-level registrations repaired; restart Brave to relaunch the host with the repaired origin", } def _cache_age_hours(scanned_at: str) -> float | None: try: dt = datetime.fromisoformat(scanned_at.replace("Z", "+00:00")) except (ValueError, AttributeError): return None now = datetime.now(dt.tzinfo) if dt.tzinfo else datetime.now() return (now - dt).total_seconds() / 3600.0 def _stale_hours(payload: dict) -> float: try: hours = float(payload.get("stale_hours", 24)) except (TypeError, ValueError): return 24 return min(max(hours, 1), 24 * 365) def _read_scan_state() -> dict: if not SCAN_STATE_FILE.exists(): return {} try: state = json.loads(SCAN_STATE_FILE.read_text(encoding="utf-8")) return state if isinstance(state, dict) else {} except (OSError, json.JSONDecodeError): return {} def _apply_scan_status(remotes: list[dict], state: dict) -> None: current = state.get("current_remote") if not current: return status = "" if state.get("scanning") and not state.get("done"): status = "scanning" elif state.get("done") and state.get("scan_ok") is False: status = "failed" if not status: return for remote in remotes: if remote.get("remote") == current: remote["status"] = status remote["stale"] = status == "failed" or remote.get("stale", False) return def _describe_skipped_id(path: str, remote: str = "") -> dict: name = Path(str(path or "").replace("\\", "/")).name ext = Path(name).suffix.lower().lstrip(".") reason = "no supported ID at filename start" if re.match(r"^\[[A-Za-z0-9-]+-\d+\]", name): reason = "leading brackets hide the ID" elif re.match(r"^[A-Za-z][A-Za-z0-9]+[\u2010-\u2015]\d+", name): reason = "non-ASCII dash in ID" elif re.match(r"^[A-Za-z][A-Za-z0-9]+\d+", name): reason = "missing hyphen between prefix and number" sep = "" if not remote or remote.endswith("/") or not path else "/" full_path = f"{remote}{sep}{path}" if remote else path return {"path": path, "name": name, "ext": ext, "reason": reason, "full_path": full_path} def _cache_freshness_fields(data: dict | None, rules_info: dict) -> dict: """Build the cache-contract fields surfaced to the extension. `data` is the parsed cache.json (or None when the file is missing). `rules_info` is the dict returned by fetch_rules_info; when its `ok` flag is False we report match-flags as None and let the UI decide whether to show a "rules lookup failed" state. """ out: dict = { # Legacy field — preserved for any consumer that still reads it. "version": (data or {}).get("version") if data else None, # New two-tier contract: "cache_schema": (data or {}).get("cache_schema") if data else None, "id_rules": (data or {}).get("id_rules") if data else None, "id_rules_signature": (data or {}).get("id_rules_signature") if data else None, "expected_cache_schema": None, "expected_id_rules": None, "expected_id_rules_signature": None, "cache_schema_match": None, "id_rules_match": None, "id_rules_signature_match": None, "cache_state": None, # 'fresh' | 'stale_by_rules' | 'schema_mismatch' | 'missing' "rules_info_error": None, } if not rules_info.get("ok"): out["rules_info_error"] = rules_info.get("error") or "rules lookup failed" return out out["expected_cache_schema"] = rules_info.get("cache_schema") out["expected_id_rules"] = rules_info.get("id_rules") out["expected_id_rules_signature"] = rules_info.get("id_rules_signature") if data is None: out["cache_state"] = "missing" return out # Legacy version:3 cache (pre-migration on disk): treat as stale_by_rules. if "cache_schema" not in data and data.get("version") == 3: out["cache_state"] = "stale_by_rules" out["cache_schema_match"] = True # we'll migrate at next load_cache out["id_rules_match"] = False out["id_rules_signature_match"] = False return out schema_match = data.get("cache_schema") == out["expected_cache_schema"] rules_match = data.get("id_rules") == out["expected_id_rules"] sig_match = data.get("id_rules_signature") == out["expected_id_rules_signature"] out["cache_schema_match"] = schema_match out["id_rules_match"] = rules_match out["id_rules_signature_match"] = sig_match if not schema_match: out["cache_state"] = "schema_mismatch" elif rules_match and sig_match: out["cache_state"] = "fresh" else: out["cache_state"] = "stale_by_rules" return out def handle_reextract_ids(payload: dict) -> dict: """Trigger a fast re-extract of jav_ids against the current rule set. No rclone calls — walks the on-disk cache.json. Stamps current rules + signature into the cache on success so the next cache_status call reports `cache_state: "fresh"`. """ rc, stdout, stderr = run_rcjav( ["--reextract", "--format", "json"], extra_flags=[], # --reextract already JSON; --basic would prepend noise rcjav_path=payload.get("rcjav_path"), timeout=300, ) if rc != 0: return {"ok": False, "error": (stderr or stdout or "unknown").strip()} try: result = json.loads(stdout.strip()) except json.JSONDecodeError as e: return {"ok": False, "error": f"invalid reextract JSON: {e}"} return result def handle_cache_status(payload: dict) -> dict: script = resolve_rcjav(payload.get("rcjav_path", "")) cache_path = script.parent / "cache.json" config_path = script.parent / "config.json" configured = {"default_source": [], "default_target": []} if config_path.exists(): try: cfg = json.loads(config_path.read_text(encoding="utf-8")) configured["default_source"] = cfg.get("default_source", []) or [] configured["default_target"] = cfg.get("default_target", []) or [] except Exception: pass stale_hours = _stale_hours(payload) scan_state = _read_scan_state() configured_roots = set((configured.get("default_source") or []) + (configured.get("default_target") or [])) rules_info = fetch_rules_info(payload.get("rcjav_path", "")) if not cache_path.exists(): remotes = [{ "remote": remote, "status": "never_scanned", "stale": True, "file_count": 0, "skipped_count": 0, "issues": [], } for remote in sorted(configured_roots)] _apply_scan_status(remotes, scan_state) return { "ok": True, "cache_exists": False, "cache_path": str(cache_path), "configured": configured, "stale_hours": stale_hours, "scan_state": scan_state, "remotes": remotes, **_cache_freshness_fields(None, rules_info), } try: data = json.loads(cache_path.read_text(encoding="utf-8")) except Exception as e: return {"ok": False, "error": str(e), "cache_path": str(cache_path)} remotes = [] warnings = [] remote_map = data.get("remotes", {}) or {} configured_remote_prefixes = {r.split(":", 1)[0] + ":" for r in configured_roots if isinstance(r, str) and ":" in r} cached_remote_prefixes = {r.split(":", 1)[0] + ":" for r in remote_map if isinstance(r, str) and ":" in r} for remote, entry in remote_map.items(): files = entry.get("files", []) or [] skipped = entry.get("skipped", []) or [] scanned_at = entry.get("scanned_at", "") age = _cache_age_hours(scanned_at) bare_fc2 = 0 short_number = 0 missing_jav_id = 0 for f in files: jid = str(f.get("jav_id") or "") if not jid: missing_jav_id += 1 continue base = jid.split("#", 1)[0] if re.match(r"^FC2-\d{4,}$", base, re.I): bare_fc2 += 1 if re.match(r"^[A-Z][A-Z0-9]{1,}-\d{1,2}$", base, re.I): short_number += 1 issues = [] if bare_fc2: issues.append({"code": "bare_fc2", "count": bare_fc2, "message": "bare FC2 IDs should now be FC2-PPV"}) if short_number: issues.append({"code": "short_number", "count": short_number, "message": "IDs with fewer than 3 digits suggest an old cache"}) if missing_jav_id: issues.append({"code": "missing_jav_id", "count": missing_jav_id, "message": "cache rows missing jav_id"}) if issues: warnings.append({ "remote": remote, "code": "stale_id_shape", "message": f"{remote} contains IDs from older normalization rules; rebuild cache recommended", "issues": issues, }) is_stale = bool(age is not None and age > stale_hours) remotes.append({ "remote": remote, "status": "stale" if is_stale else "fresh", "scanned_at": scanned_at, "age_hours": round(age, 3) if age is not None else None, "stale": is_stale, "file_count": len(files), "skipped_count": len(skipped), "skipped_items": [_describe_skipped_id(path, remote) for path in skipped], "skipped_samples": [_describe_skipped_id(path, remote) for path in skipped[:5]], "issues": issues, }) cached_roots = set(remote_map.keys()) for remote in sorted(configured_roots - cached_roots): remotes.append({ "remote": remote, "status": "never_scanned", "scanned_at": "", "age_hours": None, "stale": True, "file_count": 0, "skipped_count": 0, "issues": [], }) warnings.append({ "remote": remote, "code": "remote_not_scanned", "message": f"{remote} is configured but has not been scanned into cache.json", }) if configured_remote_prefixes and cached_remote_prefixes and not cached_remote_prefixes.intersection(configured_remote_prefixes): warnings.append({ "code": "remote_prefix_mismatch", "message": "cache remotes do not share a remote prefix with config.json defaults", "configured": sorted(configured_roots), "cached": sorted(remote_map.keys()), }) for remote in remote_map: if configured_roots and remote not in configured_roots: same_parent = any(remote.startswith(root.rstrip("/") + "/") or root.startswith(remote.rstrip("/") + "/") for root in configured_roots) if not same_parent and remote not in configured_roots: warnings.append({ "remote": remote, "code": "remote_not_configured", "message": f"{remote} is in cache.json but is not one of config.json's default source/target roots", }) _apply_scan_status(remotes, scan_state) remotes.sort(key=lambda r: r["remote"].lower()) return { "ok": True, "cache_exists": True, "cache_path": str(cache_path), "stale_hours": stale_hours, "configured": configured, "scan_state": scan_state, "remotes": remotes, "warnings": warnings, **_cache_freshness_fields(data, rules_info), } def _scan_worker(cmd: list[str], creationflags: int, scan_since: str = "", scan_roots: list[str] | None = None) -> None: """Background thread: runs rc-jav --scan via Popen, reads stderr line-by-line to parse SCAN_START / SCAN_PROGRESS / 'Scan complete:' lines, and writes live progress to SCAN_STATE_FILE so handle_scan_progress can return it.""" global _scan_thread, _scan_proc, _scan_cancel_requested state: dict = { "scanning": True, "done": False, "started_at": datetime.now().isoformat(), "scan_since": scan_since, "scope": list(scan_roots or []), "remotes": [], "total_remotes": 0, "remote_jobs": [], "current_remote": None, "current_label": None, "files_this_remote": 0, "files_total": 0, "file_count": None, "elapsed_s": None, } def _write() -> None: try: SCAN_STATE_FILE.write_text(json.dumps(state), encoding="utf-8") except OSError: pass def _job(remote: str, label: str = "") -> dict: for item in state["remote_jobs"]: if item.get("remote") == remote: if label: item["label"] = label return item item = { "remote": remote, "label": label, "status": "queued", "files": 0, "skipped": 0, "total": None, } state["remote_jobs"].append(item) return item _write() try: proc = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, # scan mode outputs nothing useful to stdout stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace", creationflags=creationflags, ) with _scan_lock: _scan_proc = proc for raw in proc.stderr: line = raw.rstrip("\n") if line.startswith("SCAN_START "): try: d = json.loads(line[11:]) state["remotes"] = d.get("remotes", []) state["total_remotes"] = d.get("total", 0) for remote in state["remotes"]: _job(remote) _write() except (json.JSONDecodeError, KeyError): pass elif line.startswith("SCAN_REMOTE_START "): # Emitted immediately before the file-count probe starts. try: d = json.loads(line[18:]) state["current_remote"] = d.get("remote") state["current_label"] = d.get("label") state["current_index"] = d.get("index", 0) state["files_this_remote"] = 0 state["files_remote_total"] = None # unknown until SCAN_REMOTE_COUNTED job = _job(d.get("remote"), d.get("label")) job.update({ "status": "running", "index": d.get("index", 0), "of": d.get("of", state.get("total_remotes", 0)), "files": 0, "skipped": 0, "total": None, "incremental": bool(d.get("incremental")), "started_at": datetime.now().isoformat(), }) _write() except (json.JSONDecodeError, KeyError): pass elif line.startswith("SCAN_REMOTE_COUNTED "): # Emitted after remote_file_count() returns — total is now known. try: d = json.loads(line[20:]) state["files_remote_total"] = d.get("total") _job(d.get("remote"))["total"] = d.get("total") _write() except (json.JSONDecodeError, KeyError): pass elif line.startswith("SCAN_FILE_PROGRESS "): # Emitted every CANCEL_CHECK_INTERVAL files from inside walk_remote. try: d = json.loads(line[19:]) state["files_this_remote"] = d.get("files", 0) if d.get("total") is not None: state["files_remote_total"] = d["total"] job = _job(d.get("remote"), d.get("label")) job["files"] = d.get("files", 0) job["skipped"] = d.get("skipped", 0) if d.get("total") is not None: job["total"] = d["total"] _write() except (json.JSONDecodeError, KeyError): pass elif line.startswith("SCAN_PROGRESS "): # Emitted after each remote completes — cumulative total. try: d = json.loads(line[14:]) state["current_remote"] = d.get("remote") state["current_label"] = d.get("label") state["files_this_remote"] = d.get("files", 0) state["files_total"] = d.get("files_total", 0) job = _job(d.get("remote"), d.get("label")) job.update({ "status": "completed", "files": d.get("files", 0), "file_count": d.get("file_count"), "incremental": bool(d.get("incremental", job.get("incremental"))), "finished_at": datetime.now().isoformat(), }) _write() except (json.JSONDecodeError, KeyError): pass elif line == "SCAN_CANCELLED": state["cancelled"] = True _write() elif line.startswith("Scan complete:"): m = re.search(r"(\d+)\s*files\s*in\s*([\d.]+)s", line) if m: state["file_count"] = int(m.group(1)) state["elapsed_s"] = float(m.group(2)) _write() proc.wait() state["done"] = True state["scanning"] = False state["rc"] = proc.returncode state["finished_at"] = datetime.now().isoformat() if state.get("cancelled") or _scan_cancel_requested: state["cancelled"] = True state["scan_ok"] = True # clean cancel — not an error else: state["scan_ok"] = proc.returncode == 0 if proc.returncode != 0: state["error"] = f"rc-jav exited {proc.returncode}" for job in state["remote_jobs"]: if job.get("status") == "running": job["status"] = "cancelled" if state.get("cancelled") else "failed" if not state.get("scan_ok") else "completed" job["finished_at"] = state["finished_at"] elif job.get("status") == "queued" and (state.get("cancelled") or not state.get("scan_ok")): job["status"] = "cancelled" if state.get("cancelled") else "failed" job["finished_at"] = state["finished_at"] _write() log_event("scan_complete", ok=state["scan_ok"], rc=proc.returncode, cancelled=state.get("cancelled", False), file_count=state.get("file_count"), elapsed_s=state.get("elapsed_s")) except Exception as e: state.update({ "done": True, "scanning": False, "scan_ok": False, "error": str(e), "finished_at": datetime.now().isoformat(), }) for job in state.get("remote_jobs", []): if job.get("status") in ("queued", "running"): job["status"] = "failed" job["finished_at"] = state["finished_at"] _write() log(f"_scan_worker error: {e}") finally: with _scan_lock: _scan_thread = None _scan_proc = None _scan_cancel_requested = False def handle_scan(payload: dict) -> dict: """Start rc-jav.py --scan in a background thread and return immediately. The caller should poll scan_progress until done=true. scan_since: optional rclone duration string (e.g. '24h').""" global _scan_thread, _scan_script_path, _scan_cancel_requested with _scan_lock: if _scan_thread is not None and _scan_thread.is_alive(): return {"ok": True, "scanning": True, "already_running": True} _scan_cancel_requested = False script = resolve_rcjav(payload.get("rcjav_path", "")) scan_since = (payload.get("scan_since") or "").strip() cmd = [sys.executable, str(script), "--scan", "--basic"] cmd += part_pattern_args(payload) scan_roots = [ str(root).strip() for root in (payload.get("scan_roots") or []) if isinstance(root, str) and str(root).strip() ] for root in scan_roots: cmd += ["--target", root] if scan_since: cmd += ["--scan-since", scan_since] creationflags = 0x08000000 if os.name == "nt" else 0 _scan_script_path = script t = threading.Thread(target=_scan_worker, args=(cmd, creationflags, scan_since, scan_roots), daemon=True, name="scan-worker") with _scan_lock: _scan_thread = t t.start() return {"ok": True, "scanning": True, "started": True, "scan_roots": scan_roots} def _deferred_kill(proc: subprocess.Popen, delay_s: float = 5.0) -> None: """Wait up to delay_s for proc to exit via the cancel flag, then terminate. Runs in a daemon thread so it never blocks the message loop.""" time.sleep(delay_s) try: if proc.poll() is None: proc.terminate() except Exception: pass def handle_scan_cancel(payload: dict) -> dict: """Write a cancel flag file that rc-jav.py's walk_remote checks every CANCEL_CHECK_INTERVAL files. rc-jav exits cleanly when it sees the flag. A deferred kill fires after 5 s if rc-jav has not exited on its own — this avoids terminating it mid-write (which would corrupt cache.json or prevent SCAN_CANCELLED from reaching the scan-state parser).""" global _scan_cancel_requested with _scan_lock: running = _scan_thread is not None and _scan_thread.is_alive() proc = _scan_proc if not running: return {"ok": True, "cancelled": False, "message": "no scan running"} _scan_cancel_requested = True # Resolve the rc-jav directory so we write the flag next to rc-jav.py # (where rc-jav.py looks for it via Path(__file__).parent / "scan-cancel.flag"). script = _scan_script_path or resolve_rcjav(payload.get("rcjav_path", "")) cancel_flag = script.parent / "scan-cancel.flag" try: cancel_flag.write_text("cancel", encoding="utf-8") if proc is not None and proc.poll() is None: t = threading.Thread( target=_deferred_kill, args=(proc, 5.0), daemon=True, name="cancel-kill", ) t.start() return {"ok": True, "cancelled": True} except Exception as e: return {"ok": False, "error": str(e)} def handle_scan_progress(payload: dict) -> dict: """Return current scan progress from SCAN_STATE_FILE.""" with _scan_lock: running = _scan_thread is not None and _scan_thread.is_alive() if not SCAN_STATE_FILE.exists(): return {"ok": True, "scanning": running, "done": not running, "no_state": True} try: state = json.loads(SCAN_STATE_FILE.read_text(encoding="utf-8")) # Thread may have just finished — ensure state is consistent. if not running and state.get("scanning"): state["scanning"] = False state["done"] = True state.pop("ok", None) return {"ok": True, **state} except Exception as e: return {"ok": False, "error": str(e)} def handle_scan_clear(payload: dict) -> dict: """Clear persisted scan job UI history without touching cache.json.""" with _scan_lock: running = _scan_thread is not None and _scan_thread.is_alive() if running: return {"ok": False, "error": "scan is still running; cancel it before clearing job history"} try: if SCAN_STATE_FILE.exists(): SCAN_STATE_FILE.unlink() return {"ok": True, "cleared": True} except Exception as e: return {"ok": False, "error": str(e)} def handle_delete_skipped(payload: dict) -> dict: """Delete one or more skipped (non-JAV) files by full_path and patch cache.""" paths = payload.get("paths") or [] if not isinstance(paths, list) or not paths: return {"ok": False, "error": "paths must be a non-empty array"} base = { "mode": payload.get("mode", "trash"), "trash_dir": payload.get("trash_dir", ""), "allowed_remotes": payload.get("allowed_remotes", []), "rcjav_path": payload.get("rcjav_path", ""), } results = [] deleted_paths = [] for path in paths: r = handle_delete({**base, "path": path}) results.append({"path": path, **r}) if r.get("ok"): deleted_paths.append(path) # Patch skipped list in cache.json too if deleted_paths: try: script = resolve_rcjav(payload.get("rcjav_path")) cache_path = script.parent / "cache.json" if cache_path.exists(): cache = json.loads(cache_path.read_text(encoding="utf-8")) modified = False for remote_key, remote_data in cache.get("remotes", {}).items(): sep = "" if remote_key.endswith("/") else "/" prefix = remote_key + sep before = remote_data.get("skipped", []) or [] after = [ p for p in before if f"{prefix}{p}" not in deleted_paths ] if len(after) != len(before): remote_data["skipped"] = after modified = True if modified: tmp = cache_path.with_suffix(cache_path.suffix + ".tmp") tmp.write_text(json.dumps(cache, indent=2), encoding="utf-8") os.replace(tmp, cache_path) except Exception as e: log(f"handle_delete_skipped cache patch error: {e}") _patch_cache_remove_paths(payload.get("rcjav_path"), deleted_paths) ok_count = sum(1 for r in results if r.get("ok")) return { "ok": ok_count > 0, "results": results, "deleted_count": ok_count, "failed_count": len(results) - ok_count, } def handle_get_keep_ranking(payload: dict) -> dict: """Return the current keep_ranking config from rc-jav's config.json.""" script = resolve_rcjav(payload.get("rcjav_path", "")) cfg_path = script.parent / "config.json" cfg = {} if cfg_path.exists(): try: cfg = json.loads(cfg_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): pass ranking = cfg.get("keep_ranking") or {} defaults = { "priority_folders": ["ClearJAV"], "size_tolerance_mib": 0, "format_preference": ["mkv", "mp4", "wmv", "avi"], "tiebreak_res_tag": True, "tiebreak_longer_name": True, } merged = {**defaults, **ranking} return {"ok": True, "keep_ranking": merged, "is_default": not bool(cfg.get("keep_ranking"))} def handle_save_keep_ranking(payload: dict) -> dict: """Write keep_ranking config to rc-jav's config.json.""" script = resolve_rcjav(payload.get("rcjav_path", "")) cfg_path = script.parent / "config.json" ranking = payload.get("keep_ranking") if not isinstance(ranking, dict): return {"ok": False, "error": "keep_ranking must be an object"} # Validate fields tolerance = ranking.get("size_tolerance_mib") if tolerance is not None: try: tolerance = float(tolerance) if tolerance < 0: return {"ok": False, "error": "size_tolerance_mib must be >= 0"} except (TypeError, ValueError): return {"ok": False, "error": "size_tolerance_mib must be a number"} fmt = ranking.get("format_preference") if fmt is not None and not isinstance(fmt, list): return {"ok": False, "error": "format_preference must be an array"} priority_folders = ranking.get("priority_folders") if priority_folders is not None and not isinstance(priority_folders, list): return {"ok": False, "error": "priority_folders must be an array"} cfg = {} if cfg_path.exists(): try: cfg = json.loads(cfg_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): pass cfg["keep_ranking"] = ranking try: tmp = cfg_path.with_suffix(cfg_path.suffix + ".tmp") tmp.write_text(json.dumps(cfg, indent=2), encoding="utf-8") os.replace(tmp, cfg_path) except OSError as e: return {"ok": False, "error": f"failed to write config: {e}"} return {"ok": True, "keep_ranking": ranking} def handle_list_remotes(payload: dict) -> dict: """Return all rclone remotes (from `rclone listremotes`) plus the default_source / default_target paths from rc-jav's config.json so the options page can offer autocomplete when building profiles.""" creationflags = 0x08000000 if os.name == "nt" else 0 try: proc = subprocess.run( [RCLONE_BIN, "listremotes"], capture_output=True, text=True, timeout=15, encoding="utf-8", errors="replace", creationflags=creationflags, ) remotes = [r.strip() for r in (proc.stdout or "").splitlines() if r.strip()] except Exception as e: remotes = [] log(f"list_remotes: rclone listremotes failed: {e}") # Also read config.json so the user sees their configured defaults. script = resolve_rcjav(payload.get("rcjav_path", "")) cfg_path = script.parent / "config.json" default_source: list[str] = [] default_target: list[str] = [] if cfg_path.exists(): try: cfg = json.loads(cfg_path.read_text(encoding="utf-8")) default_source = cfg.get("default_source") or [] default_target = cfg.get("default_target") or [] except Exception: pass return { "ok": True, "remotes": remotes, "default_source": default_source, "default_target": default_target, } DISPATCH = { "search": handle_search, "bulk_search": handle_bulk_search, "dupe_review": handle_dupe_review, "delete_batch": handle_delete_batch, "library_issues": handle_library_issues, "rename_file": handle_rename_file, "rename_files_batch": handle_rename_files_batch, "ping": handle_ping, "delete": handle_delete, "diagnostics": handle_diagnostics, "host_status": handle_host_status, "host_repair": handle_host_repair, "cache_status": handle_cache_status, "reextract_ids": handle_reextract_ids, "recent_deletes": handle_recent_deletes, "undo_delete": handle_undo_delete, "scan": handle_scan, "scan_progress": handle_scan_progress, "scan_cancel": handle_scan_cancel, "scan_clear": handle_scan_clear, "list_remotes": handle_list_remotes, "delete_skipped": handle_delete_skipped, "get_keep_ranking": handle_get_keep_ranking, "save_keep_ranking": handle_save_keep_ranking, } def main(): log(f"--- host start pid={os.getpid()} ---") while True: try: msg = read_message() except Exception as e: log(f"read error: {e}\n{traceback.format_exc()}") break if msg is None: log("stdin closed, exiting") break req_id = msg.get("req_id") action = msg.get("action", "search") handler = DISPATCH.get(action) if not handler: write_message({"req_id": req_id, "ok": False, "error": f"unknown action {action}"}) continue t0 = time.monotonic() try: resp = handler(msg) except Exception as e: log(f"handler error: {e}\n{traceback.format_exc()}") resp = {"ok": False, "error": str(e)} elapsed_ms = round((time.monotonic() - t0) * 1000) # Structured event log — include search-specific fields for analytics. event_extra: dict = {"ok": resp.get("ok"), "elapsed_ms": elapsed_ms} if action == "search": event_extra["id"] = msg.get("id") event_extra["hits"] = resp.get("hits") event_extra["search_mode"] = resp.get("search_mode") elif action in ("delete", "undo_delete", "scan"): if not resp.get("ok"): event_extra["error"] = (resp.get("error") or "")[:200] # Don't log every scan_progress poll — it'd flood the events log. if action not in ("scan_progress",): log_event(action, **event_extra) resp["req_id"] = req_id try: write_message(resp) except Exception as e: log(f"write error: {e}") break if __name__ == "__main__": main()