Files
ext-rclone-jav/host/rcjav-host.py
T
admin c17ac9e1e7 Step 10j (host + extension): cache contract three-state UX
Completes the two-tier cache contract from step 9 / docs/CACHE_CONTRACT.md
on the extension side. The Python side shipped in the Python repo at
33c495a.

Host (rcjav-host.py):
  - fetch_rules_info() memoizes per-script-path calls to
    `rc-jav.py --print-rules-info` so handle_cache_status doesn't pay
    the Python startup cost on every poll.
  - _cache_freshness_fields(data, rules_info) computes the new
    cache_schema / id_rules / id_rules_signature trio + their three
    *_match booleans + cache_state ('fresh' / 'stale_by_rules' /
    'schema_mismatch' / 'missing'). Legacy version:3 caches still on
    disk report as stale_by_rules with cache_schema_match=True (we'll
    migrate them at next load_cache).
  - New handle_reextract_ids() action forwards to
    `rc-jav.py --reextract --format json` with a 5-minute timeout.

background.js:
  - New `reextract-ids` message forwards to host with a 300s timeout.

options-cache.js + options-library-issues.js:
  - renderCacheContractBanner() paints a three-state banner above the
    per-remote list: green ✓ fresh / amber ! stale-by-rules (with
    "Re-extract IDs (fast, no rescan)" chip button) / red ✗ schema
    mismatch. Includes a snippet of the cache signature for diagnostics.
  - Delegated click handler in options-library-issues.js catches
    .cache-reextract, sends the message, shows transient
    "Re-extracting…" state, and replaces the button with a per-summary
    line ("Re-extracted N IDs · X changed · Y unchanged · Z dropped").
  - rules_info_error from the host surfaces as its own amber line above
    the banner.

node --check passes on background.js, options-cache.js,
options-library-issues.js individually and on the concatenation of all
four script files. python -m py_compile passes on rcjav-host.py.
Behavioral verification requires reloading the unpacked extension and
running through:
  - Check Cache → banner shows "stale by rules" amber (legacy v3 cache)
  - Click "Re-extract IDs" → fast path runs, summary appears
  - Check Cache again → banner now shows "Cache up to date" green

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-23 11:07:26 +02:00

2156 lines
86 KiB
Python

#!/usr/bin/env python3
"""Native messaging host for rclonex Brave extension.
Reads length-prefixed JSON messages on stdin, dispatches to rc-jav.py, writes
length-prefixed JSON responses on stdout. Long-lived: Brave keeps this process
alive for the lifetime of the extension's connectNative() port.
"""
from __future__ import annotations
import json
import os
import re
import struct
import subprocess
import sys
import threading
import time
import traceback
from datetime import datetime
from pathlib import Path
# Native messaging requires raw binary stdio. Without this, Windows translates
# CRLF on stdout (corrupting the 4-byte length prefix) and may buffer.
if os.name == "nt":
import msvcrt
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
# ----- config -----
RC_JAV = Path(r"D:\DEV\Project\rclone-jav\rc-jav.py")
PYTHON = "python" # or absolute path to python.exe
HOST_DIR = Path(__file__).resolve().parent
LOG_DIR = HOST_DIR / "logs"
STATE_DIR = HOST_DIR / "state"
LOG_DIR.mkdir(exist_ok=True)
STATE_DIR.mkdir(exist_ok=True)
LOG_FILE = LOG_DIR / "rcjav-host.log"
EVENTS_LOG = LOG_DIR / "rcjav-host-events.log"
DELETE_LOG = LOG_DIR / "deletes.log"
EVENTS_MAX_BYTES = 5 * 1024 * 1024 # 5 MiB — rotate by truncating oldest half
SCAN_STATE_FILE = STATE_DIR / "scan-state.json"
VERSION = "0.1.0"
# Scan background thread state
_scan_lock = threading.Lock()
_scan_thread: threading.Thread | None = None
_scan_script_path: Path | None = None # set by handle_scan; used by handle_scan_cancel
_scan_proc: subprocess.Popen | None = None
_scan_cancel_requested = False
HIT_RE = re.compile(r"\[(?P<id>[^\]]+)\] (?P<count>\d+) hit\(s\)")
MISS_RE = re.compile(r"\[(?P<id>[^\]]+)\] NOT FOUND")
PRIMARY_ID_RE = re.compile(r"^([A-Za-z]+)-(\d+)")
FALLBACK_ID_RE = re.compile(r"^([A-Za-z0-9]+)-(\d+)")
COMPOUND_ID_RE = re.compile(r"^([A-Za-z0-9]+(?:-[A-Za-z0-9]+)+)-(\d+)")
HOST_RANGE_RE = re.compile(r"\[(\d+)-(\d+)\]")
_cache_lock = threading.Lock()
_cache_mem: dict[str, dict] = {}
def log(msg: str) -> None:
try:
with LOG_FILE.open("a", encoding="utf-8") as f:
f.write(msg + "\n")
except OSError:
pass
def log_event(action: str, **kwargs) -> None:
"""Append a structured JSON-lines event to rcjav-host-events.log.
Rotates by dropping the oldest half when the file exceeds EVENTS_MAX_BYTES."""
entry = {"ts": datetime.now().isoformat(), "action": action, **kwargs}
line = json.dumps(entry, ensure_ascii=False) + "\n"
try:
size = EVENTS_LOG.stat().st_size if EVENTS_LOG.exists() else 0
if size >= EVENTS_MAX_BYTES:
try:
raw = EVENTS_LOG.read_bytes()
mid = len(raw) // 2
# Seek forward to the next newline boundary so we don't split a JSON line.
nl = raw.find(b"\n", mid)
raw = raw[nl + 1:] if nl >= 0 else raw[mid:]
EVENTS_LOG.write_bytes(raw)
except OSError:
pass
with EVENTS_LOG.open("a", encoding="utf-8") as f:
f.write(line)
except OSError:
pass
# ----- stdio framing -----
def read_message():
raw_len = sys.stdin.buffer.read(4)
if len(raw_len) < 4:
return None
(msg_len,) = struct.unpack("<I", raw_len)
raw = sys.stdin.buffer.read(msg_len)
if len(raw) < msg_len:
return None
return json.loads(raw.decode("utf-8"))
# Chrome native messaging caps host->extension messages at 1 MiB. Stay below.
MAX_RESPONSE_BYTES = 900_000
def _shrink_response(obj: dict) -> dict:
"""If the JSON payload would exceed Chrome's native-messaging cap, drop the
heaviest optional fields and add a `truncated` flag so the extension can
report it. Last resort: cap `structured` to the first N entries."""
data = json.dumps(obj).encode("utf-8")
if len(data) <= MAX_RESPONSE_BYTES:
return obj
shrunk = dict(obj)
truncated_reason = []
for field in ("raw_json", "stdout", "stderr"):
if field in shrunk and shrunk.get(field):
shrunk[field] = None
truncated_reason.append(f"dropped {field}")
if len(json.dumps(shrunk).encode("utf-8")) <= MAX_RESPONSE_BYTES:
shrunk["truncated"] = True
shrunk["truncated_reason"] = "; ".join(truncated_reason)
return shrunk
structured = shrunk.get("structured") or []
# Set the truncated metadata before sizing so the budget includes it.
shrunk["truncated"] = True
shrunk["truncated_reason"] = "; ".join(truncated_reason + ["structured TBD"])
if structured:
lo, hi = 0, len(structured)
while lo < hi:
mid = (lo + hi + 1) // 2
shrunk["structured"] = structured[:mid]
if len(json.dumps(shrunk).encode("utf-8")) <= MAX_RESPONSE_BYTES:
lo = mid
else:
hi = mid - 1
shrunk["structured"] = structured[:lo]
truncated_reason.append(f"structured {len(structured)} -> {lo}")
shrunk["truncated_reason"] = "; ".join(truncated_reason)
return shrunk
def write_message(obj: dict) -> None:
obj = _shrink_response(obj)
data = json.dumps(obj).encode("utf-8")
sys.stdout.buffer.write(struct.pack("<I", len(data)))
sys.stdout.buffer.write(data)
sys.stdout.buffer.flush()
_JSON_DECODER = json.JSONDecoder()
def extract_json_blob(text: str) -> dict | None:
"""Find the last top-level JSON object in `text`. Tolerates leading status
lines printed by rc-jav, pretty-printed multi-line JSON, and trailing noise."""
if not text:
return None
last = None
i = 0
n = len(text)
while i < n:
j = text.find("{", i)
if j < 0:
break
try:
obj, end = _JSON_DECODER.raw_decode(text, j)
except json.JSONDecodeError:
i = j + 1
continue
if isinstance(obj, dict):
last = obj
i = end
return last
# ----- rc-jav invocation -----
def resolve_rcjav(rcjav_path: str | None) -> Path:
"""Accept either a folder (auto-append rc-jav.py) or a full .py path."""
if not rcjav_path:
return RC_JAV
p = Path(rcjav_path)
if p.is_dir() or p.suffix.lower() != ".py":
p = p / "rc-jav.py"
return p
def run_rcjav(args: list[str], timeout: int = 120, extra_flags: list[str] | None = None, rcjav_path: str | None = None, stdin_data: str | None = None) -> tuple[int, str, str]:
flags = extra_flags if extra_flags is not None else ["--basic", "--no-color"]
script = resolve_rcjav(rcjav_path)
# Reject up-front instead of resolving a relative path against the host's
# CWD (which is whatever Brave hands it — usually its install dir) and
# surfacing a confusing "file not found" from subprocess.
if not script.is_absolute() or not script.exists():
return 2, "", (
f"rc-jav.py not found at {script!s} — set the 'rc-jav path' in the "
f"extension options to an absolute path that points to rc-jav.py "
f"or its parent folder."
)
cmd = [PYTHON, str(script), *args, *flags]
# Hide console window on Windows
creationflags = 0
if os.name == "nt":
creationflags = 0x08000000 # CREATE_NO_WINDOW
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout,
encoding="utf-8", errors="replace",
creationflags=creationflags,
input=stdin_data,
)
return proc.returncode, proc.stdout, proc.stderr
except subprocess.TimeoutExpired:
return 124, "", "timeout after %ds" % timeout
except Exception as e:
return 1, "", f"spawn error: {e}"
# Memoize rules-info per script path so handle_cache_status doesn't pay
# the Python startup cost on every poll. Invalidate on rcjav_path change.
_RULES_INFO_CACHE: dict[str, dict] = {}
def fetch_rules_info(rcjav_path: str | None) -> dict:
"""Get cache contract constants + current rules signature from rc-jav.py.
Returns {ok, cache_schema, id_rules, id_rules_signature} on success,
or {ok: False, error: ...} when the lookup fails. Memoized per script
path; the result is cached for the lifetime of this host process.
"""
script = resolve_rcjav(rcjav_path)
key = str(script)
cached = _RULES_INFO_CACHE.get(key)
if cached is not None:
return cached
rc, stdout, stderr = run_rcjav(
["--print-rules-info"],
extra_flags=[], # NB: omit --basic / --no-color, --print-rules-info already JSON
rcjav_path=rcjav_path,
timeout=30,
)
if rc != 0:
info = {"ok": False, "error": (stderr or stdout or "unknown").strip()}
else:
try:
info = json.loads(stdout.strip())
except json.JSONDecodeError as e:
info = {"ok": False, "error": f"invalid rules-info JSON: {e}"}
_RULES_INFO_CACHE[key] = info
return info
def part_pattern_args(payload: dict) -> list[str]:
args: list[str] = []
for pattern in payload.get("part_patterns") or []:
if isinstance(pattern, str) and pattern.strip():
args += ["--part-pattern", pattern.strip()]
return args
def parse_hits(stdout: str) -> tuple[int, list[str]]:
"""Sum 'N hit(s)' lines, collect IDs."""
total = 0
ids: list[str] = []
for m in HIT_RE.finditer(stdout):
total += int(m.group("count"))
ids.append(m.group("id"))
return total, ids
def host_human_size(n: int) -> str:
nf = float(n or 0)
for unit in ("B", "KiB", "MiB", "GiB", "TiB"):
if nf < 1024:
return f"{int(nf)} B" if unit == "B" else f"{nf:.2f} {unit}"
nf /= 1024
return f"{nf:.2f} PiB"
def host_normalize_id(raw: str) -> str | None:
stem = Path(str(raw) + ".x").stem
m = PRIMARY_ID_RE.match(stem) or COMPOUND_ID_RE.match(stem) or FALLBACK_ID_RE.match(stem)
if not m:
return None
num = int(m.group(2))
width = max(3, len(m.group(2)))
prefix = m.group(1).upper()
if prefix == "FC2":
prefix = "FC2-PPV"
return f"{prefix}-{num:0{width}d}"
def _load_rcjav_config(script: Path) -> dict:
cfg_path = script.parent / "config.json"
if not cfg_path.exists():
return {}
try:
return json.loads(cfg_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {}
def _cache_age_label(scanned_at: str, stale_hours: float = 24) -> tuple[str, bool]:
try:
dt = datetime.fromisoformat(str(scanned_at).replace("Z", "+00:00"))
now = datetime.now(dt.tzinfo) if dt.tzinfo else datetime.now()
hours = (now - dt).total_seconds() / 3600.0
except Exception:
return "?", False
if hours < 1:
age = f"{int(hours * 60)}m"
elif hours < 24:
age = f"{hours:.1f}h"
else:
age = f"{hours / 24:.1f}d"
return age, hours > stale_hours
def _load_host_cache(script: Path) -> tuple[dict | None, dict]:
cache_path = script.parent / "cache.json"
timings: dict[str, int | str | bool | None] = {"host_cache_path": str(cache_path)}
if not cache_path.exists():
timings["host_cache_error"] = "cache.json not found"
return None, timings
try:
stat = cache_path.stat()
except OSError as e:
timings["host_cache_error"] = str(e)
return None, timings
key = str(cache_path.resolve())
stamp = (stat.st_mtime_ns, stat.st_size)
with _cache_lock:
cached = _cache_mem.get(key)
if cached and cached.get("stamp") == stamp:
timings["cache_load_ms"] = 0
timings["host_cache_reused"] = True
return cached["data"], timings
t0 = time.perf_counter()
try:
data = json.loads(cache_path.read_text(encoding="utf-8"))
except Exception as e:
timings["cache_load_ms"] = round((time.perf_counter() - t0) * 1000)
timings["host_cache_error"] = str(e)
return None, timings
timings["cache_load_ms"] = round((time.perf_counter() - t0) * 1000)
timings["host_cache_reused"] = False
with _cache_lock:
_cache_mem[key] = {"stamp": stamp, "data": data}
return data, timings
def _entry_to_hit(source: str, remote: str, item: dict, match_trace: dict | None = None) -> dict:
path = item.get("path", "")
sep = "" if remote.endswith("/") or not path else "/"
size = int(item.get("size") or 0)
hit = {
"source": source,
"remote": remote,
"path": path,
"full_path": f"{remote}{sep}{path}",
"size": size,
"size_human": host_human_size(size),
"mod_time": item.get("mod_time", ""),
"jav_id": item.get("jav_id", ""),
}
if match_trace:
hit.update(match_trace)
return hit
def _no_match_state(search_mode: str, cache_meta: dict) -> dict:
meta = list((cache_meta or {}).values())
missing = [m for m in meta if m.get("age") == "missing"]
stale = [m for m in meta if m.get("stale")]
if search_mode == "quick":
return {
"no_match_kind": "live_miss",
"no_match_title": "Live search found no library hit",
"no_match_detail": "The ID was extracted and searched against rclone directly.",
}
if missing:
return {
"no_match_kind": "cache_missing",
"no_match_title": "Cache search has missing roots",
"no_match_detail": "At least one requested root is not present in cache.json. Rebuild or refresh that cache root.",
}
if stale:
return {
"no_match_kind": "cache_stale_miss",
"no_match_title": "Stale cache found no library hit",
"no_match_detail": "The ID was searched in cache, but one or more cache roots are stale.",
}
return {
"no_match_kind": "cache_miss",
"no_match_title": "Cache search found no library hit",
"no_match_detail": "The ID was extracted and searched in the current cache.",
}
def handle_cached_search_fast(payload: dict) -> dict | None:
"""Answer simple cached ID searches in-process, avoiding per-query Python startup.
Returns None when the regular rc-jav CLI path should handle the request."""
query = payload.get("id") or payload.get("query")
if not query or payload.get("name") or payload.get("quick"):
return None
raw = str(query)
if "*" in raw or "?" in raw or HOST_RANGE_RE.search(raw):
return None
norm = host_normalize_id(raw)
if not norm:
return None
t0 = time.perf_counter()
script = resolve_rcjav(payload.get("rcjav_path"))
cache, timings = _load_host_cache(script)
if not isinstance(cache, dict) or not isinstance(cache.get("remotes"), dict):
return None
cfg = _load_rcjav_config(script)
source_roots = [r for r in (payload.get("source_override") or []) if isinstance(r, str)]
target_roots = [r for r in (payload.get("target_override") or []) if isinstance(r, str)]
if not source_roots and not target_roots:
target_roots = cfg.get("default_target") or []
if not source_roots and not target_roots:
return None
wanted: list[tuple[str, str]] = [("Source", r) for r in source_roots] + [("Target", r) for r in target_roots]
cache_meta: dict[str, dict] = {}
hits: list[dict] = []
index_start = time.perf_counter()
for source, remote in wanted:
entry = cache["remotes"].get(remote)
if not isinstance(entry, dict):
cache_meta[remote] = {"cached": False, "age": "missing", "stale": True, "file_count": 0}
continue
files = entry.get("files", []) or []
age, stale = _cache_age_label(entry.get("scanned_at", ""), _stale_hours(payload))
cache_meta[remote] = {"cached": True, "age": age, "stale": stale, "file_count": len(files)}
for item in files:
jid = item.get("jav_id", "")
if jid == norm or str(jid).startswith(norm + "#part"):
is_part = str(jid).startswith(norm + "#part")
normalized = str(raw).upper() != norm.upper()
hits.append(_entry_to_hit(source, remote, item, {
"match_kind": "part" if is_part else "normalized" if normalized else "exact",
"match_reason": "Base ID + part" if is_part else "Normalized ID" if normalized else "Exact ID",
"match_confidence": "related" if is_part else "normalized" if normalized else "high",
"matched_query": norm,
"matched_id": jid,
}))
timings["cache_match_ms"] = round((time.perf_counter() - index_start) * 1000)
hits.sort(key=lambda h: (h.get("jav_id", ""), h.get("path", "").lower()))
host_ms = round((time.perf_counter() - t0) * 1000)
timings.update({
"host_cached_ms": host_ms,
"host_rcjav_ms": 0,
"cli_elapsed_ms": None,
})
result = {
"ok": True,
"rc": 0 if hits else 1,
"hits": len(hits),
"found": bool(hits),
"search_mode": "cached",
"structured": hits,
"cache_meta": cache_meta,
"scanned_remotes": list(cache_meta.keys()),
"timings": timings,
"stderr": "",
"id": query,
"name": None,
"message": f"{len(hits)} hit(s)" if hits else "NOT FOUND",
"fast_path": "host-cache",
}
if not hits:
result.update(_no_match_state("cached", cache_meta))
return result
# ----- dispatch -----
def handle_search(payload: dict) -> dict:
query = payload.get("id") or payload.get("query")
name = payload.get("name")
quick = bool(payload.get("quick"))
fast = handle_cached_search_fast(payload)
if fast is not None:
return fast
args: list[str] = []
if query:
args += ["--search", str(query)]
if name:
args += ["--name", str(name)]
if quick:
args.append("--quick")
args += part_pattern_args(payload)
if not args:
return {"ok": False, "error": "no id or name"}
# Optional profile overrides: --source / --target lists from the active profile.
for r in payload.get("source_override") or []:
if r and isinstance(r, str):
args += ["--source", r]
for r in payload.get("target_override") or []:
if r and isinstance(r, str):
args += ["--target", r]
# Always ask for JSON output — needed for per-hit Delete UI in the extension.
# Also run --basic to suppress rich on stderr.
t0 = time.perf_counter()
rc, out, err = run_rcjav(args, extra_flags=["--basic", "--no-color", "--format", "json"], rcjav_path=payload.get("rcjav_path"))
rcjav_ms = round((time.perf_counter() - t0) * 1000)
parsed = extract_json_blob(out)
hits = 0
structured: list[dict] = []
cache_meta = {}
if parsed:
for q in parsed.get("queries", []):
for h in q.get("hits", []):
structured.append(h)
structured.extend(parsed.get("name_matches", []))
cache_meta = parsed.get("cache_meta", {}) or {}
hits = len(structured)
else:
# Fallback: text count (also covers --basic non-JSON output)
hits_count_only, _ = parse_hits(out)
hits = hits_count_only
# If rc-jav failed and produced no parseable result, surface as an error
# instead of an empty "NOT FOUND" — otherwise host crashes look like misses.
if rc != 0 and parsed is None and hits == 0:
return {
"ok": False,
"rc": rc,
"error": (err or out or "rc-jav exited non-zero").strip()[:1000],
"id": query,
"name": name,
}
result = {
"ok": True,
"rc": rc,
"hits": hits,
"found": hits > 0,
"search_mode": "quick" if quick else "cached",
"structured": structured,
"cache_meta": cache_meta,
"scanned_remotes": list(cache_meta.keys()),
"timings": {
"host_rcjav_ms": rcjav_ms,
"cli_elapsed_ms": round(float(parsed.get("elapsed_sec", 0)) * 1000) if parsed else None,
**(parsed.get("timings", {}) if parsed else {}),
},
# Drop full stdout from the success payload — it duplicates `structured`
# (the parsed JSON blob) and can blow Chrome's 1 MiB message cap.
"stderr": (err or "")[:2000],
"id": query,
"name": name,
"message": f"{hits} hit(s)" if hits > 0 else "NOT FOUND",
}
if hits == 0:
result.update(_no_match_state("quick" if quick else "cached", cache_meta))
return result
def handle_bulk_search(payload: dict) -> dict:
queries = [
str(q).strip() for q in (payload.get("queries") or [])
if isinstance(q, str) and str(q).strip()
][:250]
if not queries:
return {"ok": False, "error": "bulk search needs one or more IDs"}
quick = bool(payload.get("quick"))
args: list[str] = []
for query in queries:
args += ["--search", query]
if quick:
args.append("--quick")
args += part_pattern_args(payload)
for r in payload.get("source_override") or []:
if r and isinstance(r, str):
args += ["--source", r]
for r in payload.get("target_override") or []:
if r and isinstance(r, str):
args += ["--target", r]
t0 = time.perf_counter()
rc, out, err = run_rcjav(
args,
extra_flags=["--basic", "--no-color", "--format", "json"],
rcjav_path=payload.get("rcjav_path"),
)
rcjav_ms = round((time.perf_counter() - t0) * 1000)
parsed = extract_json_blob(out)
if parsed is None:
return {"ok": False, "rc": rc, "error": (err or out or "bulk search failed").strip()[:1000]}
cache_meta = parsed.get("cache_meta", {}) or {}
grouped = []
total_hits = 0
search_mode = "quick" if quick else "cached"
for item in parsed.get("queries", []):
hits = item.get("hits", []) or []
total_hits += len(hits)
row = {"query": item.get("query", ""), "hits": len(hits), "structured": hits}
if not hits:
row.update(_no_match_state(search_mode, cache_meta))
grouped.append(row)
return {
"ok": True,
"rc": rc,
"search_mode": search_mode,
"queries": grouped,
"query_count": len(grouped),
"hits": total_hits,
"cache_meta": cache_meta,
"scanned_remotes": list(cache_meta.keys()),
"timings": {
"host_rcjav_ms": rcjav_ms,
"cli_elapsed_ms": round(float(parsed.get("elapsed_sec", 0)) * 1000),
**(parsed.get("timings", {}) or {}),
},
"stderr": (err or "")[:2000],
}
def handle_dupe_review(payload: dict) -> dict:
script = resolve_rcjav(payload.get("rcjav_path"))
cfg = _load_rcjav_config(script)
source_roots = [r for r in (payload.get("source_override") or []) if isinstance(r, str)]
target_roots = [r for r in (payload.get("target_override") or []) if isinstance(r, str)]
if not source_roots and not target_roots:
target_roots = cfg.get("default_target") or []
args = ["--cache"]
args += part_pattern_args(payload)
for root in source_roots:
args += ["--source", root]
for root in target_roots:
args += ["--target", root]
if not source_roots and not target_roots:
return {"ok": False, "error": "no source or target roots configured for duplicate review"}
t0 = time.perf_counter()
rc, out, err = run_rcjav(
args,
extra_flags=["--basic", "--no-color", "--format", "json"],
rcjav_path=payload.get("rcjav_path"),
)
elapsed = round((time.perf_counter() - t0) * 1000)
parsed = extract_json_blob(out)
if parsed is None:
return {"ok": False, "rc": rc, "error": (err or out or "duplicate review failed").strip()[:1000]}
groups = parsed.get("groups", {}) or {}
reclaim = sum(
int(item.get("size") or 0)
for group in groups.values()
for item in group.get("delete_candidates", []) or []
)
return {
"ok": True,
"groups": groups,
"group_count": len(groups),
"variant_alerts": parsed.get("variant_alerts", []) or [],
"roots": {"source": source_roots, "target": target_roots},
"potential_reclaim": reclaim,
"potential_reclaim_human": host_human_size(reclaim),
"timings": {"host_rcjav_ms": elapsed},
"stderr": (err or "")[:2000],
}
def handle_library_issues(payload: dict) -> dict:
"""Return non-canonical filenames from cache (bracket-wrapped IDs, no-hyphen IDs)."""
script = resolve_rcjav(payload.get("rcjav_path"))
t0 = time.perf_counter()
rc, out, err = run_rcjav(
["--library-issues"],
extra_flags=["--basic", "--no-color", "--format", "json"],
rcjav_path=payload.get("rcjav_path"),
)
elapsed = round((time.perf_counter() - t0) * 1000)
parsed = extract_json_blob(out)
if parsed is None:
return {"ok": False, "rc": rc, "error": (err or out or "library-issues failed").strip()[:1000]}
return {
"ok": True,
"bracket_names": parsed.get("bracket_names", []),
"nohyphen_names": parsed.get("nohyphen_names", []),
"timings": {"host_rcjav_ms": elapsed},
}
def _patch_cache_remove_paths(rcjav_path: str | None, full_paths: list[str]) -> None:
"""Remove file entries from cache.json after successful deletion."""
try:
script = resolve_rcjav(rcjav_path)
cache_path = script.parent / "cache.json"
if not cache_path.exists():
return
cache = json.loads(cache_path.read_text(encoding="utf-8"))
# Build set of (remote, rel_path) to remove
to_remove: set[tuple[str, str]] = set()
for full_path in full_paths:
for remote in cache.get("remotes", {}):
sep = "" if remote.endswith("/") else "/"
prefix = remote + sep
if full_path.startswith(prefix):
to_remove.add((remote, full_path[len(prefix):]))
break
if not to_remove:
return
modified = False
for remote, remote_data in cache.get("remotes", {}).items():
before = len(remote_data.get("files", []))
remote_data["files"] = [
f for f in remote_data.get("files", [])
if (remote, f["path"]) not in to_remove
]
if len(remote_data["files"]) != before:
modified = True
if modified:
tmp = cache_path.with_suffix(cache_path.suffix + ".tmp")
tmp.write_text(json.dumps(cache, indent=2), encoding="utf-8")
os.replace(tmp, cache_path)
except Exception as e:
log(f"_patch_cache_remove_paths error: {e}")
def handle_delete_batch(payload: dict) -> dict:
"""Delete multiple files respecting existing delete settings, patch cache once."""
paths = payload.get("paths") or []
if not isinstance(paths, list) or not paths:
return {"ok": False, "error": "paths must be a non-empty array"}
base = {
"mode": payload.get("mode", "trash"),
"trash_dir": payload.get("trash_dir", ""),
"allowed_remotes": payload.get("allowed_remotes", []),
"rcjav_path": payload.get("rcjav_path", ""),
}
results = []
deleted_paths = []
for path in paths:
r = handle_delete({**base, "path": path})
results.append({"path": path, **r})
if r.get("ok"):
deleted_paths.append(path)
if deleted_paths:
_patch_cache_remove_paths(payload.get("rcjav_path"), deleted_paths)
ok_count = sum(1 for r in results if r.get("ok"))
return {
"ok": ok_count > 0,
"results": results,
"deleted_count": ok_count,
"failed_count": len(results) - ok_count,
}
def handle_rename_file(payload: dict) -> dict:
"""Rename one file in an rclone remote and patch cache. Collision-safe."""
remote = (payload.get("remote") or "").strip()
old_path = (payload.get("old_path") or "").strip()
new_path = (payload.get("new_path") or "").strip()
if not remote or not old_path or not new_path:
return {"ok": False, "error": "remote, old_path, and new_path are required"}
if old_path == new_path:
return {"ok": False, "error": "old_path and new_path are identical"}
t0 = time.perf_counter()
rc, out, err = run_rcjav(
["--rename-file", "--remote", remote, "--old-path", old_path, "--new-path", new_path],
extra_flags=["--basic", "--no-color"],
rcjav_path=payload.get("rcjav_path"),
)
elapsed = round((time.perf_counter() - t0) * 1000)
parsed = extract_json_blob(out)
if parsed is None:
return {"ok": False, "rc": rc, "error": (err or out or "rename failed").strip()[:1000]}
return {**parsed, "timings": {"host_rcjav_ms": elapsed}}
def handle_rename_files_batch(payload: dict) -> dict:
"""Rename multiple files in one call — single cache write at the end."""
renames = payload.get("renames") or []
if not isinstance(renames, list) or not renames:
return {"ok": False, "error": "renames must be a non-empty array"}
t0 = time.perf_counter()
rc, out, err = run_rcjav(
["--rename-files-batch"],
extra_flags=["--basic", "--no-color"],
rcjav_path=payload.get("rcjav_path"),
stdin_data=json.dumps(renames),
)
elapsed = round((time.perf_counter() - t0) * 1000)
parsed = extract_json_blob(out)
if parsed is None:
return {"ok": False, "rc": rc, "error": (err or out or "batch rename failed").strip()[:1000]}
return {**parsed, "timings": {"host_rcjav_ms": elapsed}}
def _remote_root(p: str) -> str:
"""Return the `remote:` prefix of an rclone path. For a local path, return ''."""
if ":" not in p:
return ""
return p.split(":", 1)[0] + ":"
def _normalize_rclone_prefix(p: str) -> str:
"""Normalize an allowed rclone path prefix without broadening it to the remote root."""
p = (p or "").strip().replace("\\", "/").rstrip("/")
return p if ":" in p else ""
def _path_in_allowed_prefixes(path: str, prefixes: list[str]) -> bool:
path_norm = path.replace("\\", "/")
for prefix in prefixes:
if path_norm == prefix or path_norm.startswith(prefix + "/"):
return True
return False
def handle_delete(payload: dict) -> dict:
"""Delete or move-to-trash a single file via rclone."""
path = payload.get("path", "").strip()
mode = payload.get("mode", "trash") # 'trash' or 'permanent'
trash_dir = payload.get("trash_dir", "").strip()
allowed_prefixes = [
p for p in (_normalize_rclone_prefix(r) for r in payload.get("allowed_remotes", []))
if p
]
# Derive additional allowlist from rc-jav's config.json (default_target +
# default_source) so the host is self-protective even if the extension forgets
# to pass allowed_remotes. Plus the trash_dir itself.
script = resolve_rcjav(payload.get("rcjav_path"))
cfg_path = script.parent / "config.json"
if cfg_path.exists():
try:
cfg = json.loads(cfg_path.read_text(encoding="utf-8"))
for key in ("default_target", "default_source"):
for r in cfg.get(key, []) or []:
prefix = _normalize_rclone_prefix(r)
if prefix and prefix not in allowed_prefixes:
allowed_prefixes.append(prefix)
except (OSError, json.JSONDecodeError):
pass
if trash_dir:
prefix = _normalize_rclone_prefix(trash_dir)
if prefix and prefix not in allowed_prefixes:
allowed_prefixes.append(prefix)
if not path:
return {"ok": False, "error": "no path"}
# Reject directory-shaped paths — `rclone moveto` on a trailing-slash path
# would move a whole tree and produce a destination with empty leaf.
if path.endswith("/") or path.endswith("\\"):
return {"ok": False, "error": "path looks like a directory (trailing slash)"}
# Sanity: must look like an rclone remote path (contains ':') or a local file
if ":" not in path and not Path(path).exists():
return {"ok": False, "error": "path doesn't look like an rclone remote or local file"}
# Allowlist: refuse to delete anything outside the configured library paths.
# Defaults derive from rc-jav config.json + trash_dir above.
# If still empty, no config was found — refuse rather than fail-open.
if not allowed_prefixes:
return {"ok": False, "error": "no allowed prefixes configured; refusing to delete"}
root = _remote_root(path)
if not root or not _path_in_allowed_prefixes(path, allowed_prefixes):
return {
"ok": False,
"error": f"path {path!r} not in allowed prefixes={allowed_prefixes}",
}
audit = DELETE_LOG
log_entry = {"ts": __import__("datetime").datetime.now().isoformat(),
"path": path, "mode": mode, "trash_dir": trash_dir}
if mode == "permanent":
cmd = [RCLONE_BIN, "deletefile", path]
elif mode == "trash":
if not trash_dir:
return {"ok": False, "error": "trash_dir required when mode=trash"}
# rclone moveto src dst — preserves filename if dst is dir-like, but for a single
# file we should give it a full destination path.
from datetime import date
date_prefix = date.today().isoformat()
leaf = path.split("/")[-1]
dst = f"{trash_dir.rstrip('/')}/{date_prefix}/{leaf}"
cmd = [RCLONE_BIN, "moveto", path, dst]
log_entry["dst"] = dst
else:
return {"ok": False, "error": f"unknown mode {mode!r}"}
creationflags = 0x08000000 if os.name == "nt" else 0
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=300,
encoding="utf-8", errors="replace",
creationflags=creationflags)
log_entry["rc"] = proc.returncode
log_entry["stderr"] = proc.stderr.strip()[:500]
except Exception as e:
log_entry["error"] = str(e)
try:
with audit.open("a", encoding="utf-8") as f:
f.write(json.dumps(log_entry) + "\n")
except OSError:
pass
return {"ok": False, "error": str(e)}
try:
with audit.open("a", encoding="utf-8") as f:
f.write(json.dumps(log_entry) + "\n")
except OSError:
pass
return {
"ok": proc.returncode == 0,
"rc": proc.returncode,
"stdout": proc.stdout,
"stderr": proc.stderr,
"path": path,
"mode": mode,
"dst": log_entry.get("dst"),
}
def handle_recent_deletes(payload: dict) -> dict:
"""Return the last N successful trash-mode deletes, newest first, so the
extension can offer Undo. Permanent deletes are excluded — they can't be
undone."""
limit = max(1, min(int(payload.get("limit") or 20), 200))
audit = DELETE_LOG
if not audit.exists():
return {"ok": True, "entries": []}
# Read tail. deletes.log is line-delimited JSON; small enough to read in full
# in practice, but cap defensively at 1 MiB so a runaway log doesn't OOM us.
try:
raw = audit.read_text(encoding="utf-8", errors="replace")
if len(raw) > 1_000_000:
raw = raw[-1_000_000:]
except OSError as e:
return {"ok": False, "error": f"can't read deletes.log: {e}"}
out: list[dict] = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
# Skip undo records and permanent deletes — neither is undoable.
if obj.get("kind") == "undo":
continue
if obj.get("mode") != "trash":
continue
if obj.get("rc") != 0:
continue
if not obj.get("dst"):
continue
out.append({
"ts": obj.get("ts"),
"path": obj.get("path"),
"dst": obj.get("dst"),
"trash_dir": obj.get("trash_dir"),
})
# Build the set of dst paths already undone so we hide them.
undone_dst = set()
for line in raw.splitlines():
try:
obj = json.loads(line)
except (json.JSONDecodeError, ValueError):
continue
if obj.get("kind") == "undo" and obj.get("rc") == 0 and obj.get("dst"):
undone_dst.add(obj["dst"])
out = [e for e in out if e["dst"] not in undone_dst]
out.reverse() # newest first
return {"ok": True, "entries": out[:limit]}
def handle_undo_delete(payload: dict) -> dict:
"""Move a file from trash back to its original path via `rclone moveto`.
The extension passes the {dst, path} pair captured in the delete audit log;
we re-validate both ends against the configured allowlist before acting."""
dst = (payload.get("dst") or "").strip()
orig = (payload.get("path") or "").strip()
if not dst or not orig:
return {"ok": False, "error": "undo needs both dst (trash location) and path (original)"}
# Reuse the same allowlist derivation handle_delete uses.
allowed_prefixes = [
p for p in (_normalize_rclone_prefix(r) for r in payload.get("allowed_remotes", []))
if p
]
script = resolve_rcjav(payload.get("rcjav_path"))
cfg_path = script.parent / "config.json"
if cfg_path.exists():
try:
cfg = json.loads(cfg_path.read_text(encoding="utf-8"))
for key in ("default_target", "default_source"):
for r in cfg.get(key, []) or []:
prefix = _normalize_rclone_prefix(r)
if prefix and prefix not in allowed_prefixes:
allowed_prefixes.append(prefix)
except (OSError, json.JSONDecodeError):
pass
trash_dir = (payload.get("trash_dir") or "").strip()
if trash_dir:
prefix = _normalize_rclone_prefix(trash_dir)
if prefix and prefix not in allowed_prefixes:
allowed_prefixes.append(prefix)
if not allowed_prefixes:
return {"ok": False, "error": "no allowed prefixes configured; refusing to undo"}
for end, label in ((dst, "dst"), (orig, "path")):
if end.endswith("/") or end.endswith("\\"):
return {"ok": False, "error": f"undo {label} looks like a directory"}
if ":" not in end:
return {"ok": False, "error": f"undo {label} not an rclone path"}
if not _path_in_allowed_prefixes(end, allowed_prefixes):
return {"ok": False, "error": f"undo {label} {end!r} not in allowed prefixes"}
creationflags = 0x08000000 if os.name == "nt" else 0
cmd = [RCLONE_BIN, "moveto", dst, orig]
audit = DELETE_LOG
log_entry = {
"ts": datetime.now().isoformat(),
"kind": "undo",
"path": orig,
"dst": dst,
"trash_dir": trash_dir,
}
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=300,
encoding="utf-8", errors="replace",
creationflags=creationflags)
log_entry["rc"] = proc.returncode
log_entry["stderr"] = (proc.stderr or "").strip()[:500]
except Exception as e:
log_entry["error"] = str(e)
try:
with audit.open("a", encoding="utf-8") as f:
f.write(json.dumps(log_entry) + "\n")
except OSError:
pass
return {"ok": False, "error": str(e)}
try:
with audit.open("a", encoding="utf-8") as f:
f.write(json.dumps(log_entry) + "\n")
except OSError:
pass
return {
"ok": proc.returncode == 0,
"rc": proc.returncode,
"stderr": proc.stderr,
"path": orig,
"dst": dst,
}
# rclone binary path — assumes on PATH. Override via env if needed.
RCLONE_BIN = "rclone"
def handle_ping(payload: dict) -> dict:
override = payload.get("rcjav_path", "")
effective = resolve_rcjav(override)
return {
"ok": True,
"version": VERSION,
"rc_jav": str(effective),
"rc_jav_exists": effective.exists(),
"rc_jav_default": str(RC_JAV),
"rc_jav_overridden": bool(override),
"python": PYTHON,
}
def handle_diagnostics(payload: dict) -> dict:
"""Run a battery of environment checks. Returns ordered list of {name, status, detail}."""
import shutil
checks: list[dict] = []
def add(name, status, detail=""):
checks.append({"name": name, "status": status, "detail": str(detail)})
# Python
add("Python interpreter", "ok", sys.version.split()[0] + "" + sys.executable)
# rc-jav script
override = payload.get("rcjav_path", "")
script = resolve_rcjav(override)
add("Host version", "ok", VERSION)
add("rc-jav path source", "ok",
"extension override: " + override if override else f"host default: {RC_JAV}")
if script.exists():
add("rc-jav.py", "ok", str(script))
else:
add("rc-jav.py", "fail", f"not found at {script}")
ext_settings = payload.get("extension_settings") or {}
if isinstance(ext_settings, dict) and ext_settings:
mode = "quick/live" if ext_settings.get("quick_mode") else "cache"
profile = ext_settings.get("active_profile") or "config.json defaults"
profile_count = ext_settings.get("profile_count", 0)
triggers = []
if ext_settings.get("auto_every_page"):
triggers.append("every page")
if ext_settings.get("auto_known_sites"):
triggers.append("known sites")
add("Extension settings", "ok",
f"mode={mode}; profile={profile}; profiles={profile_count}; auto={', '.join(triggers) or 'off'}")
# rc-jav can run --help
if script.exists():
try:
cf = 0x08000000 if os.name == "nt" else 0
r = subprocess.run([PYTHON, str(script), "--help"], capture_output=True, text=True, timeout=10, encoding="utf-8", errors="replace", creationflags=cf)
if r.returncode == 0:
add("rc-jav --help", "ok", "exits 0")
else:
add("rc-jav --help", "fail", f"exit {r.returncode}: {r.stderr.strip()[:200]}")
except Exception as e:
add("rc-jav --help", "fail", str(e))
else:
add("rc-jav --help", "warn", "skipped (script missing)")
# rich library present?
try:
if script.exists():
cf = 0x08000000 if os.name == "nt" else 0
r = subprocess.run([PYTHON, "-c", "import rich; print(rich.__file__)"], capture_output=True, text=True, timeout=10, encoding="utf-8", errors="replace", creationflags=cf)
if r.returncode == 0:
add("rich (Python pkg)", "ok", r.stdout.strip())
else:
add("rich (Python pkg)", "fail", "import failed — run: pip install rich")
except Exception as e:
add("rich (Python pkg)", "fail", str(e))
# rclone on PATH
rclone = shutil.which("rclone") or shutil.which("rclone.exe")
if rclone:
try:
cf = 0x08000000 if os.name == "nt" else 0
r = subprocess.run([rclone, "version"], capture_output=True, text=True, timeout=10, encoding="utf-8", errors="replace", creationflags=cf)
first = r.stdout.splitlines()[0] if r.stdout else "?"
add("rclone binary", "ok", f"{rclone}{first}")
except Exception as e:
add("rclone binary", "warn", f"found at {rclone} but version check failed: {e}")
else:
add("rclone binary", "fail", "not on PATH")
# Brave config — rclone listremotes
if rclone:
try:
cf = 0x08000000 if os.name == "nt" else 0
r = subprocess.run([rclone, "listremotes"], capture_output=True, text=True, timeout=10, encoding="utf-8", errors="replace", creationflags=cf)
if r.returncode == 0:
remotes = [s.strip() for s in r.stdout.splitlines() if s.strip()]
add("rclone remotes", "ok" if remotes else "warn",
", ".join(remotes) if remotes else "no remotes configured")
else:
add("rclone remotes", "fail", r.stderr.strip()[:200])
except Exception as e:
add("rclone remotes", "fail", str(e))
# rc-jav config.json
if script.exists():
cfg = script.parent / "config.json"
if cfg.exists():
try:
data = json.loads(cfg.read_text(encoding="utf-8"))
tgt = data.get("default_target", [])
src = data.get("default_source", [])
add("rc-jav config.json", "ok",
f"source={src or '(empty)'} target={tgt or '(empty)'}")
except Exception as e:
add("rc-jav config.json", "fail", str(e))
else:
add("rc-jav config.json", "warn", f"not present at {cfg}")
# cache.json status
cache = script.parent / "cache.json"
if cache.exists():
try:
data = json.loads(cache.read_text(encoding="utf-8"))
rmap = data.get("remotes", {})
summary = [f"{r}={len(v.get('files', []))} files" for r, v in rmap.items()]
add("rc-jav cache.json", "ok", "; ".join(summary) if summary else "empty")
except Exception as e:
add("rc-jav cache.json", "warn", str(e))
else:
add("rc-jav cache.json", "warn", "no cache yet (run --scan to build)")
try:
cache_status = handle_cache_status(payload)
warnings = cache_status.get("warnings") or []
stale = [r["remote"] for r in cache_status.get("remotes", []) if r.get("stale")]
if warnings:
add("Cache health", "warn", "; ".join((w.get("message") or w.get("code") or "?") for w in warnings[:3]))
elif stale:
add("Cache health", "warn", "stale remotes: " + ", ".join(stale))
elif cache_status.get("cache_exists"):
add("Cache health", "ok", "no mismatch warnings")
except Exception as e:
add("Cache health", "warn", str(e))
# WinCatalog folder
wc = script.parent / "wincatalog"
if wc.exists() and wc.is_dir():
files = [f.name for f in wc.iterdir() if f.suffix.lower() in (".csv", ".xml")]
add("WinCatalog folder", "ok" if files else "info",
f"{wc}{len(files)} CSV/XML file(s)" if files else f"optional: no CSV/XML files in {wc}")
else:
add("WinCatalog folder", "info", f"optional: not present at {wc}")
return {"ok": True, "checks": checks}
REGISTRY_HOST_KEYS = [
(r"HKLM\Software\BraveSoftware\Brave-Browser\NativeMessagingHosts\com.rcjav.host", "HKEY_LOCAL_MACHINE", r"Software\BraveSoftware\Brave-Browser\NativeMessagingHosts\com.rcjav.host"),
(r"HKLM\Software\Google\Chrome\NativeMessagingHosts\com.rcjav.host", "HKEY_LOCAL_MACHINE", r"Software\Google\Chrome\NativeMessagingHosts\com.rcjav.host"),
(r"HKLM\Software\WOW6432Node\Google\Chrome\NativeMessagingHosts\com.rcjav.host", "HKEY_LOCAL_MACHINE", r"Software\WOW6432Node\Google\Chrome\NativeMessagingHosts\com.rcjav.host"),
(r"HKLM\Software\Chromium\NativeMessagingHosts\com.rcjav.host", "HKEY_LOCAL_MACHINE", r"Software\Chromium\NativeMessagingHosts\com.rcjav.host"),
(r"HKCU\Software\BraveSoftware\Brave-Browser\NativeMessagingHosts\com.rcjav.host", "HKEY_CURRENT_USER", r"Software\BraveSoftware\Brave-Browser\NativeMessagingHosts\com.rcjav.host"),
(r"HKCU\Software\Google\Chrome\NativeMessagingHosts\com.rcjav.host", "HKEY_CURRENT_USER", r"Software\Google\Chrome\NativeMessagingHosts\com.rcjav.host"),
(r"HKCU\Software\Chromium\NativeMessagingHosts\com.rcjav.host", "HKEY_CURRENT_USER", r"Software\Chromium\NativeMessagingHosts\com.rcjav.host"),
]
def _registry_host_entries() -> list[dict]:
if os.name != "nt":
return []
try:
import winreg
except Exception:
return [{"key": "registry", "status": "fail", "detail": "winreg unavailable"}]
hives = {
"HKEY_LOCAL_MACHINE": winreg.HKEY_LOCAL_MACHINE,
"HKEY_CURRENT_USER": winreg.HKEY_CURRENT_USER,
}
entries = []
for label, hive_name, subkey in REGISTRY_HOST_KEYS:
try:
with winreg.OpenKey(hives[hive_name], subkey) as key:
value, _ = winreg.QueryValueEx(key, None)
entries.append({"key": label, "status": "ok", "manifest": str(value)})
except FileNotFoundError:
entries.append({"key": label, "status": "missing", "manifest": ""})
except PermissionError as e:
entries.append({"key": label, "status": "warn", "manifest": "", "detail": str(e)})
except OSError as e:
entries.append({"key": label, "status": "warn", "manifest": "", "detail": str(e)})
return entries
def handle_host_status(payload: dict) -> dict:
"""Check native-messaging registration without writing registry or files."""
extension_id = (payload.get("extension_id") or "").strip()
expected_origin = f"chrome-extension://{extension_id}/" if extension_id else ""
host_dir = Path(__file__).resolve().parent
manifest_path = host_dir / "com.rcjav.host.json"
install_script = host_dir / "install-host.ps1"
batch_path = host_dir / "rcjav-host.bat"
checks: list[dict] = []
def add(name, status, detail=""):
checks.append({"name": name, "status": status, "detail": str(detail)})
if manifest_path.exists():
add("Manifest file", "ok", str(manifest_path))
else:
add("Manifest file", "fail", f"not found at {manifest_path}")
manifest = {}
if manifest_path.exists():
try:
manifest = json.loads(manifest_path.read_text(encoding="utf-8-sig"))
add("Manifest JSON", "ok", "valid JSON")
except Exception as e:
add("Manifest JSON", "fail", str(e))
manifest_host_path = manifest.get("path", "") if isinstance(manifest, dict) else ""
if manifest_host_path:
hp = Path(manifest_host_path)
status = "ok" if hp.exists() else "fail"
add("Manifest host path", status, manifest_host_path)
else:
add("Manifest host path", "fail", "missing path")
origins = manifest.get("allowed_origins", []) if isinstance(manifest, dict) else []
if expected_origin:
add(
"Extension origin",
"ok" if expected_origin in origins else "fail",
expected_origin if expected_origin in origins else f"expected {expected_origin}; manifest has {origins}",
)
else:
add("Extension origin", "warn", "extension id not supplied")
registry_entries = _registry_host_entries()
matching_registry = [
e for e in registry_entries
if e.get("manifest") and Path(e["manifest"]).resolve() == manifest_path.resolve()
]
registered = [e for e in registry_entries if e.get("status") == "ok"]
if matching_registry:
add("Registry registration", "ok",
f"{len(matching_registry)} matching registry entr{'y' if len(matching_registry) == 1 else 'ies'}\n"
+ "\n".join(e["key"] for e in matching_registry))
elif registered:
add("Registry registration", "fail", "registered path differs: " + "; ".join(f"{e['key']} -> {e['manifest']}" for e in registered))
else:
add("Registry registration", "fail" if os.name == "nt" else "warn", "no matching native host registry entries found")
if batch_path.exists():
add("Host launcher", "ok", str(batch_path))
else:
add("Host launcher", "fail", str(batch_path))
has_problem = any(c.get("status") == "fail" for c in checks)
add("Repair command", "warn" if has_problem else "info",
f"pwsh -ExecutionPolicy Bypass -File \"{install_script}\" -ExtensionId {extension_id or '<extension-id>'}")
return {
"ok": True,
"checks": checks,
"registry": registry_entries,
"manifest_path": str(manifest_path),
"install_script": str(install_script),
}
def handle_host_repair(payload: dict) -> dict:
"""Repair the current host manifest and user-level registry entries.
This is callable only when Brave can already launch this host. Blocked
forbidden/not-found states still need the external install script because
no native host process exists for the extension to call.
"""
extension_id = (payload.get("extension_id") or "").strip()
if not extension_id:
return {"ok": False, "error": "extension id is required"}
host_dir = Path(__file__).resolve().parent
batch_path = host_dir / "rcjav-host.bat"
manifest_path = host_dir / "com.rcjav.host.json"
if not batch_path.exists():
return {"ok": False, "error": f"host launcher not found at {batch_path}"}
manifest = {
"name": "com.rcjav.host",
"description": "rclonex native messaging host (rc-jav bridge)",
"path": str(batch_path),
"type": "stdio",
"allowed_origins": [f"chrome-extension://{extension_id}/"],
}
try:
manifest_path.write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8")
except OSError as e:
return {"ok": False, "error": f"failed to write manifest: {e}"}
registrations = []
if os.name == "nt":
try:
import winreg
hkcu_keys = [subkey for _, hive_name, subkey in REGISTRY_HOST_KEYS
if hive_name == "HKEY_CURRENT_USER"]
for subkey in hkcu_keys:
try:
with winreg.CreateKey(winreg.HKEY_CURRENT_USER, subkey) as key:
winreg.SetValueEx(key, None, 0, winreg.REG_SZ, str(manifest_path))
registrations.append({"key": "HKCU\\" + subkey, "status": "ok"})
except OSError as e:
registrations.append({"key": "HKCU\\" + subkey, "status": "fail", "detail": str(e)})
except Exception as e:
registrations.append({"key": "HKCU registry", "status": "fail", "detail": str(e)})
verify = handle_host_status({**payload, "extension_id": extension_id})
failed = [r for r in registrations if r.get("status") != "ok"]
return {
"ok": not failed,
"manifest_path": str(manifest_path),
"registrations": registrations,
"verification": verify,
"restart_required": True,
"message": "manifest and user-level registrations repaired; restart Brave to relaunch the host with the repaired origin",
}
def _cache_age_hours(scanned_at: str) -> float | None:
try:
dt = datetime.fromisoformat(scanned_at.replace("Z", "+00:00"))
except (ValueError, AttributeError):
return None
now = datetime.now(dt.tzinfo) if dt.tzinfo else datetime.now()
return (now - dt).total_seconds() / 3600.0
def _stale_hours(payload: dict) -> float:
try:
hours = float(payload.get("stale_hours", 24))
except (TypeError, ValueError):
return 24
return min(max(hours, 1), 24 * 365)
def _read_scan_state() -> dict:
if not SCAN_STATE_FILE.exists():
return {}
try:
state = json.loads(SCAN_STATE_FILE.read_text(encoding="utf-8"))
return state if isinstance(state, dict) else {}
except (OSError, json.JSONDecodeError):
return {}
def _apply_scan_status(remotes: list[dict], state: dict) -> None:
current = state.get("current_remote")
if not current:
return
status = ""
if state.get("scanning") and not state.get("done"):
status = "scanning"
elif state.get("done") and state.get("scan_ok") is False:
status = "failed"
if not status:
return
for remote in remotes:
if remote.get("remote") == current:
remote["status"] = status
remote["stale"] = status == "failed" or remote.get("stale", False)
return
def _describe_skipped_id(path: str, remote: str = "") -> dict:
name = Path(str(path or "").replace("\\", "/")).name
ext = Path(name).suffix.lower().lstrip(".")
reason = "no supported ID at filename start"
if re.match(r"^\[[A-Za-z0-9-]+-\d+\]", name):
reason = "leading brackets hide the ID"
elif re.match(r"^[A-Za-z][A-Za-z0-9]+[\u2010-\u2015]\d+", name):
reason = "non-ASCII dash in ID"
elif re.match(r"^[A-Za-z][A-Za-z0-9]+\d+", name):
reason = "missing hyphen between prefix and number"
sep = "" if not remote or remote.endswith("/") or not path else "/"
full_path = f"{remote}{sep}{path}" if remote else path
return {"path": path, "name": name, "ext": ext, "reason": reason, "full_path": full_path}
def _cache_freshness_fields(data: dict | None, rules_info: dict) -> dict:
"""Build the cache-contract fields surfaced to the extension.
`data` is the parsed cache.json (or None when the file is missing).
`rules_info` is the dict returned by fetch_rules_info; when its
`ok` flag is False we report match-flags as None and let the UI
decide whether to show a "rules lookup failed" state.
"""
out: dict = {
# Legacy field — preserved for any consumer that still reads it.
"version": (data or {}).get("version") if data else None,
# New two-tier contract:
"cache_schema": (data or {}).get("cache_schema") if data else None,
"id_rules": (data or {}).get("id_rules") if data else None,
"id_rules_signature": (data or {}).get("id_rules_signature") if data else None,
"expected_cache_schema": None,
"expected_id_rules": None,
"expected_id_rules_signature": None,
"cache_schema_match": None,
"id_rules_match": None,
"id_rules_signature_match": None,
"cache_state": None, # 'fresh' | 'stale_by_rules' | 'schema_mismatch' | 'missing'
"rules_info_error": None,
}
if not rules_info.get("ok"):
out["rules_info_error"] = rules_info.get("error") or "rules lookup failed"
return out
out["expected_cache_schema"] = rules_info.get("cache_schema")
out["expected_id_rules"] = rules_info.get("id_rules")
out["expected_id_rules_signature"] = rules_info.get("id_rules_signature")
if data is None:
out["cache_state"] = "missing"
return out
# Legacy version:3 cache (pre-migration on disk): treat as stale_by_rules.
if "cache_schema" not in data and data.get("version") == 3:
out["cache_state"] = "stale_by_rules"
out["cache_schema_match"] = True # we'll migrate at next load_cache
out["id_rules_match"] = False
out["id_rules_signature_match"] = False
return out
schema_match = data.get("cache_schema") == out["expected_cache_schema"]
rules_match = data.get("id_rules") == out["expected_id_rules"]
sig_match = data.get("id_rules_signature") == out["expected_id_rules_signature"]
out["cache_schema_match"] = schema_match
out["id_rules_match"] = rules_match
out["id_rules_signature_match"] = sig_match
if not schema_match:
out["cache_state"] = "schema_mismatch"
elif rules_match and sig_match:
out["cache_state"] = "fresh"
else:
out["cache_state"] = "stale_by_rules"
return out
def handle_reextract_ids(payload: dict) -> dict:
"""Trigger a fast re-extract of jav_ids against the current rule set.
No rclone calls — walks the on-disk cache.json. Stamps current rules
+ signature into the cache on success so the next cache_status call
reports `cache_state: "fresh"`.
"""
rc, stdout, stderr = run_rcjav(
["--reextract", "--format", "json"],
extra_flags=[], # --reextract already JSON; --basic would prepend noise
rcjav_path=payload.get("rcjav_path"),
timeout=300,
)
if rc != 0:
return {"ok": False, "error": (stderr or stdout or "unknown").strip()}
try:
result = json.loads(stdout.strip())
except json.JSONDecodeError as e:
return {"ok": False, "error": f"invalid reextract JSON: {e}"}
return result
def handle_cache_status(payload: dict) -> dict:
script = resolve_rcjav(payload.get("rcjav_path", ""))
cache_path = script.parent / "cache.json"
config_path = script.parent / "config.json"
configured = {"default_source": [], "default_target": []}
if config_path.exists():
try:
cfg = json.loads(config_path.read_text(encoding="utf-8"))
configured["default_source"] = cfg.get("default_source", []) or []
configured["default_target"] = cfg.get("default_target", []) or []
except Exception:
pass
stale_hours = _stale_hours(payload)
scan_state = _read_scan_state()
configured_roots = set((configured.get("default_source") or []) + (configured.get("default_target") or []))
rules_info = fetch_rules_info(payload.get("rcjav_path", ""))
if not cache_path.exists():
remotes = [{
"remote": remote,
"status": "never_scanned",
"stale": True,
"file_count": 0,
"skipped_count": 0,
"issues": [],
} for remote in sorted(configured_roots)]
_apply_scan_status(remotes, scan_state)
return {
"ok": True,
"cache_exists": False,
"cache_path": str(cache_path),
"configured": configured,
"stale_hours": stale_hours,
"scan_state": scan_state,
"remotes": remotes,
**_cache_freshness_fields(None, rules_info),
}
try:
data = json.loads(cache_path.read_text(encoding="utf-8"))
except Exception as e:
return {"ok": False, "error": str(e), "cache_path": str(cache_path)}
remotes = []
warnings = []
remote_map = data.get("remotes", {}) or {}
configured_remote_prefixes = {r.split(":", 1)[0] + ":" for r in configured_roots if isinstance(r, str) and ":" in r}
cached_remote_prefixes = {r.split(":", 1)[0] + ":" for r in remote_map if isinstance(r, str) and ":" in r}
for remote, entry in remote_map.items():
files = entry.get("files", []) or []
skipped = entry.get("skipped", []) or []
scanned_at = entry.get("scanned_at", "")
age = _cache_age_hours(scanned_at)
bare_fc2 = 0
short_number = 0
missing_jav_id = 0
for f in files:
jid = str(f.get("jav_id") or "")
if not jid:
missing_jav_id += 1
continue
base = jid.split("#", 1)[0]
if re.match(r"^FC2-\d{4,}$", base, re.I):
bare_fc2 += 1
if re.match(r"^[A-Z][A-Z0-9]{1,}-\d{1,2}$", base, re.I):
short_number += 1
issues = []
if bare_fc2:
issues.append({"code": "bare_fc2", "count": bare_fc2, "message": "bare FC2 IDs should now be FC2-PPV"})
if short_number:
issues.append({"code": "short_number", "count": short_number, "message": "IDs with fewer than 3 digits suggest an old cache"})
if missing_jav_id:
issues.append({"code": "missing_jav_id", "count": missing_jav_id, "message": "cache rows missing jav_id"})
if issues:
warnings.append({
"remote": remote,
"code": "stale_id_shape",
"message": f"{remote} contains IDs from older normalization rules; rebuild cache recommended",
"issues": issues,
})
is_stale = bool(age is not None and age > stale_hours)
remotes.append({
"remote": remote,
"status": "stale" if is_stale else "fresh",
"scanned_at": scanned_at,
"age_hours": round(age, 3) if age is not None else None,
"stale": is_stale,
"file_count": len(files),
"skipped_count": len(skipped),
"skipped_items": [_describe_skipped_id(path, remote) for path in skipped],
"skipped_samples": [_describe_skipped_id(path, remote) for path in skipped[:5]],
"issues": issues,
})
cached_roots = set(remote_map.keys())
for remote in sorted(configured_roots - cached_roots):
remotes.append({
"remote": remote,
"status": "never_scanned",
"scanned_at": "",
"age_hours": None,
"stale": True,
"file_count": 0,
"skipped_count": 0,
"issues": [],
})
warnings.append({
"remote": remote,
"code": "remote_not_scanned",
"message": f"{remote} is configured but has not been scanned into cache.json",
})
if configured_remote_prefixes and cached_remote_prefixes and not cached_remote_prefixes.intersection(configured_remote_prefixes):
warnings.append({
"code": "remote_prefix_mismatch",
"message": "cache remotes do not share a remote prefix with config.json defaults",
"configured": sorted(configured_roots),
"cached": sorted(remote_map.keys()),
})
for remote in remote_map:
if configured_roots and remote not in configured_roots:
same_parent = any(remote.startswith(root.rstrip("/") + "/") or root.startswith(remote.rstrip("/") + "/") for root in configured_roots)
if not same_parent and remote not in configured_roots:
warnings.append({
"remote": remote,
"code": "remote_not_configured",
"message": f"{remote} is in cache.json but is not one of config.json's default source/target roots",
})
_apply_scan_status(remotes, scan_state)
remotes.sort(key=lambda r: r["remote"].lower())
return {
"ok": True,
"cache_exists": True,
"cache_path": str(cache_path),
"stale_hours": stale_hours,
"configured": configured,
"scan_state": scan_state,
"remotes": remotes,
"warnings": warnings,
**_cache_freshness_fields(data, rules_info),
}
def _scan_worker(cmd: list[str], creationflags: int, scan_since: str = "",
scan_roots: list[str] | None = None) -> None:
"""Background thread: runs rc-jav --scan via Popen, reads stderr line-by-line
to parse SCAN_START / SCAN_PROGRESS / 'Scan complete:' lines, and writes
live progress to SCAN_STATE_FILE so handle_scan_progress can return it."""
global _scan_thread, _scan_proc, _scan_cancel_requested
state: dict = {
"scanning": True, "done": False,
"started_at": datetime.now().isoformat(),
"scan_since": scan_since,
"scope": list(scan_roots or []),
"remotes": [], "total_remotes": 0,
"remote_jobs": [],
"current_remote": None, "current_label": None,
"files_this_remote": 0, "files_total": 0,
"file_count": None, "elapsed_s": None,
}
def _write() -> None:
try:
SCAN_STATE_FILE.write_text(json.dumps(state), encoding="utf-8")
except OSError:
pass
def _job(remote: str, label: str = "") -> dict:
for item in state["remote_jobs"]:
if item.get("remote") == remote:
if label:
item["label"] = label
return item
item = {
"remote": remote, "label": label, "status": "queued",
"files": 0, "skipped": 0, "total": None,
}
state["remote_jobs"].append(item)
return item
_write()
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL, # scan mode outputs nothing useful to stdout
stderr=subprocess.PIPE,
text=True, encoding="utf-8", errors="replace",
creationflags=creationflags,
)
with _scan_lock:
_scan_proc = proc
for raw in proc.stderr:
line = raw.rstrip("\n")
if line.startswith("SCAN_START "):
try:
d = json.loads(line[11:])
state["remotes"] = d.get("remotes", [])
state["total_remotes"] = d.get("total", 0)
for remote in state["remotes"]:
_job(remote)
_write()
except (json.JSONDecodeError, KeyError):
pass
elif line.startswith("SCAN_REMOTE_START "):
# Emitted immediately before the file-count probe starts.
try:
d = json.loads(line[18:])
state["current_remote"] = d.get("remote")
state["current_label"] = d.get("label")
state["current_index"] = d.get("index", 0)
state["files_this_remote"] = 0
state["files_remote_total"] = None # unknown until SCAN_REMOTE_COUNTED
job = _job(d.get("remote"), d.get("label"))
job.update({
"status": "running",
"index": d.get("index", 0),
"of": d.get("of", state.get("total_remotes", 0)),
"files": 0,
"skipped": 0,
"total": None,
"incremental": bool(d.get("incremental")),
"started_at": datetime.now().isoformat(),
})
_write()
except (json.JSONDecodeError, KeyError):
pass
elif line.startswith("SCAN_REMOTE_COUNTED "):
# Emitted after remote_file_count() returns — total is now known.
try:
d = json.loads(line[20:])
state["files_remote_total"] = d.get("total")
_job(d.get("remote"))["total"] = d.get("total")
_write()
except (json.JSONDecodeError, KeyError):
pass
elif line.startswith("SCAN_FILE_PROGRESS "):
# Emitted every CANCEL_CHECK_INTERVAL files from inside walk_remote.
try:
d = json.loads(line[19:])
state["files_this_remote"] = d.get("files", 0)
if d.get("total") is not None:
state["files_remote_total"] = d["total"]
job = _job(d.get("remote"), d.get("label"))
job["files"] = d.get("files", 0)
job["skipped"] = d.get("skipped", 0)
if d.get("total") is not None:
job["total"] = d["total"]
_write()
except (json.JSONDecodeError, KeyError):
pass
elif line.startswith("SCAN_PROGRESS "):
# Emitted after each remote completes — cumulative total.
try:
d = json.loads(line[14:])
state["current_remote"] = d.get("remote")
state["current_label"] = d.get("label")
state["files_this_remote"] = d.get("files", 0)
state["files_total"] = d.get("files_total", 0)
job = _job(d.get("remote"), d.get("label"))
job.update({
"status": "completed",
"files": d.get("files", 0),
"file_count": d.get("file_count"),
"incremental": bool(d.get("incremental", job.get("incremental"))),
"finished_at": datetime.now().isoformat(),
})
_write()
except (json.JSONDecodeError, KeyError):
pass
elif line == "SCAN_CANCELLED":
state["cancelled"] = True
_write()
elif line.startswith("Scan complete:"):
m = re.search(r"(\d+)\s*files\s*in\s*([\d.]+)s", line)
if m:
state["file_count"] = int(m.group(1))
state["elapsed_s"] = float(m.group(2))
_write()
proc.wait()
state["done"] = True
state["scanning"] = False
state["rc"] = proc.returncode
state["finished_at"] = datetime.now().isoformat()
if state.get("cancelled") or _scan_cancel_requested:
state["cancelled"] = True
state["scan_ok"] = True # clean cancel — not an error
else:
state["scan_ok"] = proc.returncode == 0
if proc.returncode != 0:
state["error"] = f"rc-jav exited {proc.returncode}"
for job in state["remote_jobs"]:
if job.get("status") == "running":
job["status"] = "cancelled" if state.get("cancelled") else "failed" if not state.get("scan_ok") else "completed"
job["finished_at"] = state["finished_at"]
elif job.get("status") == "queued" and (state.get("cancelled") or not state.get("scan_ok")):
job["status"] = "cancelled" if state.get("cancelled") else "failed"
job["finished_at"] = state["finished_at"]
_write()
log_event("scan_complete", ok=state["scan_ok"], rc=proc.returncode,
cancelled=state.get("cancelled", False),
file_count=state.get("file_count"), elapsed_s=state.get("elapsed_s"))
except Exception as e:
state.update({
"done": True, "scanning": False, "scan_ok": False,
"error": str(e), "finished_at": datetime.now().isoformat(),
})
for job in state.get("remote_jobs", []):
if job.get("status") in ("queued", "running"):
job["status"] = "failed"
job["finished_at"] = state["finished_at"]
_write()
log(f"_scan_worker error: {e}")
finally:
with _scan_lock:
_scan_thread = None
_scan_proc = None
_scan_cancel_requested = False
def handle_scan(payload: dict) -> dict:
"""Start rc-jav.py --scan in a background thread and return immediately.
The caller should poll scan_progress until done=true.
scan_since: optional rclone duration string (e.g. '24h')."""
global _scan_thread, _scan_script_path, _scan_cancel_requested
with _scan_lock:
if _scan_thread is not None and _scan_thread.is_alive():
return {"ok": True, "scanning": True, "already_running": True}
_scan_cancel_requested = False
script = resolve_rcjav(payload.get("rcjav_path", ""))
scan_since = (payload.get("scan_since") or "").strip()
cmd = [sys.executable, str(script), "--scan", "--basic"]
cmd += part_pattern_args(payload)
scan_roots = [
str(root).strip() for root in (payload.get("scan_roots") or [])
if isinstance(root, str) and str(root).strip()
]
for root in scan_roots:
cmd += ["--target", root]
if scan_since:
cmd += ["--scan-since", scan_since]
creationflags = 0x08000000 if os.name == "nt" else 0
_scan_script_path = script
t = threading.Thread(target=_scan_worker, args=(cmd, creationflags, scan_since, scan_roots), daemon=True, name="scan-worker")
with _scan_lock:
_scan_thread = t
t.start()
return {"ok": True, "scanning": True, "started": True, "scan_roots": scan_roots}
def _deferred_kill(proc: subprocess.Popen, delay_s: float = 5.0) -> None:
"""Wait up to delay_s for proc to exit via the cancel flag, then terminate.
Runs in a daemon thread so it never blocks the message loop."""
time.sleep(delay_s)
try:
if proc.poll() is None:
proc.terminate()
except Exception:
pass
def handle_scan_cancel(payload: dict) -> dict:
"""Write a cancel flag file that rc-jav.py's walk_remote checks every
CANCEL_CHECK_INTERVAL files. rc-jav exits cleanly when it sees the flag.
A deferred kill fires after 5 s if rc-jav has not exited on its own —
this avoids terminating it mid-write (which would corrupt cache.json or
prevent SCAN_CANCELLED from reaching the scan-state parser)."""
global _scan_cancel_requested
with _scan_lock:
running = _scan_thread is not None and _scan_thread.is_alive()
proc = _scan_proc
if not running:
return {"ok": True, "cancelled": False, "message": "no scan running"}
_scan_cancel_requested = True
# Resolve the rc-jav directory so we write the flag next to rc-jav.py
# (where rc-jav.py looks for it via Path(__file__).parent / "scan-cancel.flag").
script = _scan_script_path or resolve_rcjav(payload.get("rcjav_path", ""))
cancel_flag = script.parent / "scan-cancel.flag"
try:
cancel_flag.write_text("cancel", encoding="utf-8")
if proc is not None and proc.poll() is None:
t = threading.Thread(
target=_deferred_kill, args=(proc, 5.0),
daemon=True, name="cancel-kill",
)
t.start()
return {"ok": True, "cancelled": True}
except Exception as e:
return {"ok": False, "error": str(e)}
def handle_scan_progress(payload: dict) -> dict:
"""Return current scan progress from SCAN_STATE_FILE."""
with _scan_lock:
running = _scan_thread is not None and _scan_thread.is_alive()
if not SCAN_STATE_FILE.exists():
return {"ok": True, "scanning": running, "done": not running, "no_state": True}
try:
state = json.loads(SCAN_STATE_FILE.read_text(encoding="utf-8"))
# Thread may have just finished — ensure state is consistent.
if not running and state.get("scanning"):
state["scanning"] = False
state["done"] = True
state.pop("ok", None)
return {"ok": True, **state}
except Exception as e:
return {"ok": False, "error": str(e)}
def handle_scan_clear(payload: dict) -> dict:
"""Clear persisted scan job UI history without touching cache.json."""
with _scan_lock:
running = _scan_thread is not None and _scan_thread.is_alive()
if running:
return {"ok": False, "error": "scan is still running; cancel it before clearing job history"}
try:
if SCAN_STATE_FILE.exists():
SCAN_STATE_FILE.unlink()
return {"ok": True, "cleared": True}
except Exception as e:
return {"ok": False, "error": str(e)}
def handle_delete_skipped(payload: dict) -> dict:
"""Delete one or more skipped (non-JAV) files by full_path and patch cache."""
paths = payload.get("paths") or []
if not isinstance(paths, list) or not paths:
return {"ok": False, "error": "paths must be a non-empty array"}
base = {
"mode": payload.get("mode", "trash"),
"trash_dir": payload.get("trash_dir", ""),
"allowed_remotes": payload.get("allowed_remotes", []),
"rcjav_path": payload.get("rcjav_path", ""),
}
results = []
deleted_paths = []
for path in paths:
r = handle_delete({**base, "path": path})
results.append({"path": path, **r})
if r.get("ok"):
deleted_paths.append(path)
# Patch skipped list in cache.json too
if deleted_paths:
try:
script = resolve_rcjav(payload.get("rcjav_path"))
cache_path = script.parent / "cache.json"
if cache_path.exists():
cache = json.loads(cache_path.read_text(encoding="utf-8"))
modified = False
for remote_key, remote_data in cache.get("remotes", {}).items():
sep = "" if remote_key.endswith("/") else "/"
prefix = remote_key + sep
before = remote_data.get("skipped", []) or []
after = [
p for p in before
if f"{prefix}{p}" not in deleted_paths
]
if len(after) != len(before):
remote_data["skipped"] = after
modified = True
if modified:
tmp = cache_path.with_suffix(cache_path.suffix + ".tmp")
tmp.write_text(json.dumps(cache, indent=2), encoding="utf-8")
os.replace(tmp, cache_path)
except Exception as e:
log(f"handle_delete_skipped cache patch error: {e}")
_patch_cache_remove_paths(payload.get("rcjav_path"), deleted_paths)
ok_count = sum(1 for r in results if r.get("ok"))
return {
"ok": ok_count > 0,
"results": results,
"deleted_count": ok_count,
"failed_count": len(results) - ok_count,
}
def handle_get_keep_ranking(payload: dict) -> dict:
"""Return the current keep_ranking config from rc-jav's config.json."""
script = resolve_rcjav(payload.get("rcjav_path", ""))
cfg_path = script.parent / "config.json"
cfg = {}
if cfg_path.exists():
try:
cfg = json.loads(cfg_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
pass
ranking = cfg.get("keep_ranking") or {}
defaults = {
"priority_folders": ["ClearJAV"],
"size_tolerance_mib": 0,
"format_preference": ["mkv", "mp4", "wmv", "avi"],
"tiebreak_res_tag": True,
"tiebreak_longer_name": True,
}
merged = {**defaults, **ranking}
return {"ok": True, "keep_ranking": merged, "is_default": not bool(cfg.get("keep_ranking"))}
def handle_save_keep_ranking(payload: dict) -> dict:
"""Write keep_ranking config to rc-jav's config.json."""
script = resolve_rcjav(payload.get("rcjav_path", ""))
cfg_path = script.parent / "config.json"
ranking = payload.get("keep_ranking")
if not isinstance(ranking, dict):
return {"ok": False, "error": "keep_ranking must be an object"}
# Validate fields
tolerance = ranking.get("size_tolerance_mib")
if tolerance is not None:
try:
tolerance = float(tolerance)
if tolerance < 0:
return {"ok": False, "error": "size_tolerance_mib must be >= 0"}
except (TypeError, ValueError):
return {"ok": False, "error": "size_tolerance_mib must be a number"}
fmt = ranking.get("format_preference")
if fmt is not None and not isinstance(fmt, list):
return {"ok": False, "error": "format_preference must be an array"}
priority_folders = ranking.get("priority_folders")
if priority_folders is not None and not isinstance(priority_folders, list):
return {"ok": False, "error": "priority_folders must be an array"}
cfg = {}
if cfg_path.exists():
try:
cfg = json.loads(cfg_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
pass
cfg["keep_ranking"] = ranking
try:
tmp = cfg_path.with_suffix(cfg_path.suffix + ".tmp")
tmp.write_text(json.dumps(cfg, indent=2), encoding="utf-8")
os.replace(tmp, cfg_path)
except OSError as e:
return {"ok": False, "error": f"failed to write config: {e}"}
return {"ok": True, "keep_ranking": ranking}
def handle_list_remotes(payload: dict) -> dict:
"""Return all rclone remotes (from `rclone listremotes`) plus the
default_source / default_target paths from rc-jav's config.json so the
options page can offer autocomplete when building profiles."""
creationflags = 0x08000000 if os.name == "nt" else 0
try:
proc = subprocess.run(
[RCLONE_BIN, "listremotes"],
capture_output=True, text=True, timeout=15,
encoding="utf-8", errors="replace",
creationflags=creationflags,
)
remotes = [r.strip() for r in (proc.stdout or "").splitlines() if r.strip()]
except Exception as e:
remotes = []
log(f"list_remotes: rclone listremotes failed: {e}")
# Also read config.json so the user sees their configured defaults.
script = resolve_rcjav(payload.get("rcjav_path", ""))
cfg_path = script.parent / "config.json"
default_source: list[str] = []
default_target: list[str] = []
if cfg_path.exists():
try:
cfg = json.loads(cfg_path.read_text(encoding="utf-8"))
default_source = cfg.get("default_source") or []
default_target = cfg.get("default_target") or []
except Exception:
pass
return {
"ok": True,
"remotes": remotes,
"default_source": default_source,
"default_target": default_target,
}
DISPATCH = {
"search": handle_search,
"bulk_search": handle_bulk_search,
"dupe_review": handle_dupe_review,
"delete_batch": handle_delete_batch,
"library_issues": handle_library_issues,
"rename_file": handle_rename_file,
"rename_files_batch": handle_rename_files_batch,
"ping": handle_ping,
"delete": handle_delete,
"diagnostics": handle_diagnostics,
"host_status": handle_host_status,
"host_repair": handle_host_repair,
"cache_status": handle_cache_status,
"reextract_ids": handle_reextract_ids,
"recent_deletes": handle_recent_deletes,
"undo_delete": handle_undo_delete,
"scan": handle_scan,
"scan_progress": handle_scan_progress,
"scan_cancel": handle_scan_cancel,
"scan_clear": handle_scan_clear,
"list_remotes": handle_list_remotes,
"delete_skipped": handle_delete_skipped,
"get_keep_ranking": handle_get_keep_ranking,
"save_keep_ranking": handle_save_keep_ranking,
}
def main():
log(f"--- host start pid={os.getpid()} ---")
while True:
try:
msg = read_message()
except Exception as e:
log(f"read error: {e}\n{traceback.format_exc()}")
break
if msg is None:
log("stdin closed, exiting")
break
req_id = msg.get("req_id")
action = msg.get("action", "search")
handler = DISPATCH.get(action)
if not handler:
write_message({"req_id": req_id, "ok": False, "error": f"unknown action {action}"})
continue
t0 = time.monotonic()
try:
resp = handler(msg)
except Exception as e:
log(f"handler error: {e}\n{traceback.format_exc()}")
resp = {"ok": False, "error": str(e)}
elapsed_ms = round((time.monotonic() - t0) * 1000)
# Structured event log — include search-specific fields for analytics.
event_extra: dict = {"ok": resp.get("ok"), "elapsed_ms": elapsed_ms}
if action == "search":
event_extra["id"] = msg.get("id")
event_extra["hits"] = resp.get("hits")
event_extra["search_mode"] = resp.get("search_mode")
elif action in ("delete", "undo_delete", "scan"):
if not resp.get("ok"):
event_extra["error"] = (resp.get("error") or "")[:200]
# Don't log every scan_progress poll — it'd flood the events log.
if action not in ("scan_progress",):
log_event(action, **event_extra)
resp["req_id"] = req_id
try:
write_message(resp)
except Exception as e:
log(f"write error: {e}")
break
if __name__ == "__main__":
main()