#!/usr/bin/env python3 """Native messaging host for rclonex Brave extension. Reads length-prefixed JSON messages on stdin, dispatches to rc-jav.py, writes length-prefixed JSON responses on stdout. Long-lived: Brave keeps this process alive for the lifetime of the extension's connectNative() port. """ from __future__ import annotations import json import os import re import struct import subprocess import sys import threading import time import traceback import urllib.request import urllib.error from datetime import datetime from pathlib import Path # Native messaging requires raw binary stdio. Without this, Windows translates # CRLF on stdout (corrupting the 4-byte length prefix) and may buffer. if os.name == "nt": import msvcrt msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY) msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) # Redirect stderr fd2 to a shared-access append handle BEFORE any code can # emit a traceback. The previous design redirected stderr via cmd.exe's `2>>` # in rcjav-host.bat, which opens the log with exclusive write access. When # two host processes spawned near-simultaneously (extension reload + tabvault # check_tabs, or two extensions firing in parallel) the second cmd.exe got # SHARING VIOLATION and the spawn failed — Brave reported the generic "Error # when communicating with the native messaging host" to the calling extension. # Doing the redirect inside Python uses msvcrt's default FILE_SHARE_READ | # FILE_SHARE_WRITE so concurrent appenders coexist cleanly. def _redirect_stderr_shared(log_dir: Path) -> None: try: log_dir.mkdir(exist_ok=True) path = log_dir / "rcjav-host-stderr.log" flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND if hasattr(os, "O_BINARY"): flags |= os.O_BINARY fd = os.open(str(path), flags, 0o644) os.dup2(fd, sys.stderr.fileno()) os.close(fd) except OSError: # If we can't open the log, stderr stays as whatever cmd.exe gave us. # Better than failing the whole host on log setup. pass # ----- config ----- RC_JAV = Path(r"D:\DEV\Project\rclone-jav\rc-jav.py") PYTHON = "python" # or absolute path to python.exe HOST_DIR = Path(__file__).resolve().parent LOG_DIR = HOST_DIR / "logs" STATE_DIR = HOST_DIR / "state" LOG_DIR.mkdir(exist_ok=True) STATE_DIR.mkdir(exist_ok=True) # Take over stderr capture from cmd.exe's `2>>` (which couldn't be shared # across concurrent host spawns). Has to happen BEFORE any code can throw. _redirect_stderr_shared(LOG_DIR) LOG_FILE = LOG_DIR / "rcjav-host.log" EVENTS_LOG = LOG_DIR / "rcjav-host-events.log" DELETE_LOG = LOG_DIR / "deletes.log" EVENTS_MAX_BYTES = 5 * 1024 * 1024 # 5 MiB — rotate by truncating oldest half SCAN_STATE_FILE = STATE_DIR / "scan-state.json" ALERTS_CONFIG_FILE = HOST_DIR / "alerts-config.json" LAST_ALERT_FILE = STATE_DIR / "last-alert-ts.json" ALERT_MIN_INTERVAL_S = 10 * 60 # 10 minutes — matches extension-side rate limit VERSION = "0.1.0" # Scan background thread state _scan_lock = threading.Lock() _scan_thread: threading.Thread | None = None _scan_script_path: Path | None = None # set by handle_scan; used by handle_scan_cancel _scan_proc: subprocess.Popen | None = None _scan_cancel_requested = False HIT_RE = re.compile(r"\[(?P[^\]]+)\] (?P\d+) hit\(s\)") MISS_RE = re.compile(r"\[(?P[^\]]+)\] NOT FOUND") PRIMARY_ID_RE = re.compile(r"^([A-Za-z]+)-(\d+)") FALLBACK_ID_RE = re.compile(r"^([A-Za-z0-9]+)-(\d+)") COMPOUND_ID_RE = re.compile(r"^([A-Za-z0-9]+(?:-[A-Za-z0-9]+)+)-(\d+)") HOST_RANGE_RE = re.compile(r"\[(\d+)-(\d+)\]") TEXT_FC2PPV_RE = re.compile(r"\bFC2-?PPV-?(\d{4,})\b", re.IGNORECASE) TEXT_FC2_RE = re.compile(r"\bFC2-(\d{4,})\b", re.IGNORECASE) TEXT_ID_DASHED_RE = re.compile(r"\b([A-Za-z][A-Za-z0-9]*(?:-[A-Za-z0-9]+)*)-(\d{2,7})[a-zA-Z]?\b") TEXT_ID_UNDASHED_RE = re.compile(r"\b([A-Za-z][A-Za-z0-9]{1,})(\d{3,5})[a-zA-Z]?\b") _cache_lock = threading.Lock() _cache_mem: dict[str, dict] = {} def log(msg: str) -> None: try: with LOG_FILE.open("a", encoding="utf-8") as f: f.write(msg + "\n") except OSError: pass # ----- Discord webhook alerts ----- # The extension writes alerts-config.json via the save_alerts_config RPC. We # read it here so the host can post alerts that the browser-side webhook would # miss (host crashes mid-write, partial reads = port died, handler exceptions). # Rate-limited via LAST_ALERT_FILE so a sustained outage doesn't carpet-bomb. _alerts_cache: dict | None = None _alerts_lock = threading.Lock() def _read_alerts_config() -> dict: global _alerts_cache with _alerts_lock: if _alerts_cache is not None: return _alerts_cache try: _alerts_cache = json.loads(ALERTS_CONFIG_FILE.read_text(encoding="utf-8")) if not isinstance(_alerts_cache, dict): _alerts_cache = {} except (OSError, json.JSONDecodeError): _alerts_cache = {} return _alerts_cache def _invalidate_alerts_cache() -> None: global _alerts_cache with _alerts_lock: _alerts_cache = None def _alert_rate_limited() -> bool: """Return True if an alert was sent within ALERT_MIN_INTERVAL_S. Reads/writes LAST_ALERT_FILE — survives across host process spawns so per-spawn lifetime can't be used to bypass the limit.""" try: if LAST_ALERT_FILE.exists(): data = json.loads(LAST_ALERT_FILE.read_text(encoding="utf-8")) last = float(data.get("ts", 0)) if time.time() - last < ALERT_MIN_INTERVAL_S: return True except (OSError, json.JSONDecodeError, ValueError): pass try: LAST_ALERT_FILE.write_text(json.dumps({"ts": time.time()}), encoding="utf-8") except OSError: pass return False # Mirror of the extension's DISCORD_COLOR_BY_KIND so the embed color matches # whichever side fires the alert. _DISCORD_COLOR_BY_KIND = { "disconnected": 0xff5050, "exception": 0xff5050, "post_failed": 0xff5050, "write_error": 0xff5050, "partial_payload": 0xff5050, "partial_length": 0xff5050, "read_error": 0xff5050, "timeout": 0xffa500, "host_error": 0xffd400, } def _md_code_inline(s: str) -> str: """Wrap text in inline backticks so Discord doesn't markdown-format __id__ or *bold* in raw identifiers. Backticks escaped to a similar-looking glyph.""" return "`" + str(s).replace("`", "ˋ") + "`" def _md_code_block(s: str) -> str: return "```\n" + str(s).replace("```", "``​`") + "\n```" def _discord_post_worker(url: str, body: dict, alert_kind: str, alert_source: str, result_event: "threading.Event | None" = None, result_holder: dict | None = None) -> None: """The actual urlopen call. Runs on a background thread spawned by either post_discord_alert (fire-and-forget) or handle_test_alerts_config (wait briefly for result). Logs outcome to events.log with alert_kind + alert_source so future analytics can distinguish alert types. Never logs the webhook URL or full payload (only error reason text, capped). When result_holder/result_event are provided, signals them after completion so the test RPC can return a synchronous pass/fail.""" started = time.monotonic() outcome: dict = {"ok": False, "status": None, "error": None, "alert_kind": alert_kind, "alert_source": alert_source} try: req = urllib.request.Request( url, data=json.dumps(body).encode("utf-8"), headers={"Content-Type": "application/json", "User-Agent": "rclone-jav-host/1.0"}, method="POST", ) with urllib.request.urlopen(req, timeout=5) as resp: _ = resp.read(64) # drain outcome["ok"] = 200 <= resp.status < 300 outcome["status"] = resp.status if not outcome["ok"]: outcome["error"] = f"HTTP {resp.status}" except urllib.error.HTTPError as e: outcome["status"] = getattr(e, "code", None) outcome["error"] = f"HTTP {outcome['status']}"[:120] except urllib.error.URLError as e: outcome["error"] = str(getattr(e, "reason", e) or e)[:120] except Exception as e: outcome["error"] = f"{type(e).__name__}: {e}"[:120] outcome["elapsed_ms"] = round((time.monotonic() - started) * 1000) try: log_event("discord_post", ok=outcome["ok"], status=outcome["status"], error=outcome["error"], alert_kind=alert_kind, alert_source=alert_source, elapsed_ms=outcome["elapsed_ms"]) except Exception: pass if result_holder is not None: result_holder.update(outcome) if result_event is not None: result_event.set() def _build_discord_body(kind: str, summary: str, detail: str, fields: list[dict] | None, pc_label: str) -> dict: """Build the Discord embed payload. Kept separate so test and real paths produce identical embeds.""" embed_fields = list(fields or []) if pc_label: embed_fields.append({"name": "PC", "value": _md_code_inline(pc_label[:60]), "inline": True}) embed_fields.append({"name": "Source", "value": "host", "inline": True}) return { "username": "rclone-jav", "embeds": [{ "title": f"Native host {kind}", "description": (_md_code_block(detail[:1800]) if detail else summary[:1800] or "(no detail)"), "color": _DISCORD_COLOR_BY_KIND.get(kind, 0xff5050), "timestamp": datetime.utcnow().isoformat() + "Z", "fields": embed_fields, "footer": {"text": "rclone-jav host"}, }], } def post_discord_alert(kind: str, summary: str, detail: str = "", fields: list[dict] | None = None, alert_source: str = "unknown") -> None: """Fire a Discord webhook from host-side. Honors rate limit + URL config. Never raises — alert failures must not break the host's main loop. Threaded fire-and-forget so the main message loop isn't blocked waiting for Discord; outcome is logged to events.log via log_event('discord_post', ...) so failures remain visible despite being async.""" try: cfg = _read_alerts_config() url = (cfg.get("discord_webhook_url") or "").strip() if not url: return if not re.match(r"^https://(?:discord\.com|discordapp\.com)/api/webhooks/", url): return if _alert_rate_limited(): return body = _build_discord_body(kind, summary, detail, fields, (cfg.get("pc_label") or "").strip()) threading.Thread( target=_discord_post_worker, args=(url, body, kind, alert_source), daemon=True, name="discord-post", ).start() except Exception: # Catch-all because this runs on already-failing paths; never crash main loop. pass def handle_save_alerts_config(payload: dict) -> dict: """Persist Discord webhook URL + PC label so the host can read them on subsequent spawns. Called from the extension when Setup saves.""" url = (payload.get("discord_webhook_url") or "").strip() pc_label = (payload.get("pc_label") or "").strip() if url and not re.match(r"^https://(?:discord\.com|discordapp\.com)/api/webhooks/", url): return {"ok": False, "error": "URL must be a Discord webhook"} cfg = {"discord_webhook_url": url, "pc_label": pc_label} try: ALERTS_CONFIG_FILE.write_text(json.dumps(cfg, indent=2), encoding="utf-8") except OSError as e: return {"ok": False, "error": str(e)} _invalidate_alerts_cache() return {"ok": True, "path": str(ALERTS_CONFIG_FILE)} def handle_get_alerts_config(payload: dict) -> dict: """Return the saved Discord webhook URL + PC label. Lets sibling extensions (tabvault) read the same config without duplicating Setup UI — the rcjav extension's Setup pane is the single source of truth, persisted via host.""" cfg = _read_alerts_config() return { "ok": True, "discord_webhook_url": cfg.get("discord_webhook_url", ""), "pc_label": cfg.get("pc_label", ""), } def handle_test_alerts_config(payload: dict) -> dict: """Force a webhook post from host-side using current config, bypassing the rate limit. Lets the extension verify host-side path independently of extension-side path. Waits up to 6 s for the urlopen result so the user's Test button shows a clear pass/fail; on timeout returns ok:false with an explicit timeout message (the background post may still complete and its outcome lands in events.log either way).""" cfg = _read_alerts_config() url = (cfg.get("discord_webhook_url") or "").strip() if not url: return {"ok": False, "error": "no webhook URL configured on host"} # Bypass rate limit for explicit test — temporarily clear LAST_ALERT_FILE. try: if LAST_ALERT_FILE.exists(): LAST_ALERT_FILE.unlink() except OSError: pass body = _build_discord_body( kind="host_error", summary="Test alert from host-side handler", detail="Triggered via Options → Setup → Alerts → Test Host Webhook. If this arrives but extension-side test does not, the extension's chrome.runtime path is broken (or vice versa).", fields=[{"name": "Action", "value": _md_code_inline("test-host-alert"), "inline": True}], pc_label=(cfg.get("pc_label") or "").strip(), ) result_event = threading.Event() result_holder: dict = {} threading.Thread( target=_discord_post_worker, args=(url, body, "host_error", "test_alert", result_event, result_holder), daemon=True, name="discord-post-test", ).start() # 6s slack on the worker's 5s urlopen timeout. if not result_event.wait(timeout=6): return {"ok": False, "error": "Discord webhook timed out after 6s; background post may still complete (see events.log)", "elapsed_ms": None} if result_holder.get("ok"): return {"ok": True, "status": result_holder.get("status"), "elapsed_ms": result_holder.get("elapsed_ms")} return {"ok": False, "error": result_holder.get("error", "unknown failure"), "status": result_holder.get("status"), "elapsed_ms": result_holder.get("elapsed_ms")} def log_event(action: str, **kwargs) -> None: """Append a structured JSON-lines event to rcjav-host-events.log. Rotates by dropping the oldest half when the file exceeds EVENTS_MAX_BYTES.""" entry = {"ts": datetime.now().isoformat(), "action": action, **kwargs} line = json.dumps(entry, ensure_ascii=False) + "\n" try: size = EVENTS_LOG.stat().st_size if EVENTS_LOG.exists() else 0 if size >= EVENTS_MAX_BYTES: try: raw = EVENTS_LOG.read_bytes() mid = len(raw) // 2 # Seek forward to the next newline boundary so we don't split a JSON line. nl = raw.find(b"\n", mid) raw = raw[nl + 1:] if nl >= 0 else raw[mid:] EVENTS_LOG.write_bytes(raw) except OSError: pass with EVENTS_LOG.open("a", encoding="utf-8") as f: f.write(line) except OSError: pass # ----- stdio framing ----- class StdinClosed(Exception): """Raised when Brave closes our stdin. Carries enough state to distinguish a clean EOF (port intentionally disconnected) from a partial read mid-frame (port died unexpectedly — usually means the extension's SW was recycled or the browser killed us).""" def __init__(self, kind: str, got: int = 0, expected: int = 0): super().__init__(f"stdin closed kind={kind} got={got} expected={expected}") self.kind = kind self.got = got self.expected = expected def read_message(): raw_len = sys.stdin.buffer.read(4) if len(raw_len) == 0: raise StdinClosed("clean_eof") if len(raw_len) < 4: raise StdinClosed("partial_length", got=len(raw_len), expected=4) (msg_len,) = struct.unpack("extension messages at 1 MiB. Stay below. MAX_RESPONSE_BYTES = 900_000 def _shrink_response(obj: dict) -> dict: """If the JSON payload would exceed Chrome's native-messaging cap, drop the heaviest optional fields and add a `truncated` flag so the extension can report it. Last resort: cap `structured` to the first N entries.""" data = json.dumps(obj).encode("utf-8") if len(data) <= MAX_RESPONSE_BYTES: return obj shrunk = dict(obj) truncated_reason = [] for field in ("raw_json", "stdout", "stderr"): if field in shrunk and shrunk.get(field): shrunk[field] = None truncated_reason.append(f"dropped {field}") if len(json.dumps(shrunk).encode("utf-8")) <= MAX_RESPONSE_BYTES: shrunk["truncated"] = True shrunk["truncated_reason"] = "; ".join(truncated_reason) return shrunk structured = shrunk.get("structured") or [] # Set the truncated metadata before sizing so the budget includes it. shrunk["truncated"] = True shrunk["truncated_reason"] = "; ".join(truncated_reason + ["structured TBD"]) if structured: lo, hi = 0, len(structured) while lo < hi: mid = (lo + hi + 1) // 2 shrunk["structured"] = structured[:mid] if len(json.dumps(shrunk).encode("utf-8")) <= MAX_RESPONSE_BYTES: lo = mid else: hi = mid - 1 shrunk["structured"] = structured[:lo] truncated_reason.append(f"structured {len(structured)} -> {lo}") shrunk["truncated_reason"] = "; ".join(truncated_reason) return shrunk def write_message(obj: dict) -> None: obj = _shrink_response(obj) data = json.dumps(obj).encode("utf-8") sys.stdout.buffer.write(struct.pack(" dict | None: """Find the last top-level JSON object in `text`. Tolerates leading status lines printed by rc-jav, pretty-printed multi-line JSON, and trailing noise.""" if not text: return None last = None i = 0 n = len(text) while i < n: j = text.find("{", i) if j < 0: break try: obj, end = _JSON_DECODER.raw_decode(text, j) except json.JSONDecodeError: i = j + 1 continue if isinstance(obj, dict): last = obj i = end return last # ----- rc-jav invocation ----- def resolve_rcjav(rcjav_path: str | None) -> Path: """Accept either a folder (auto-append rc-jav.py) or a full .py path.""" if not rcjav_path: return RC_JAV p = Path(rcjav_path) if p.is_dir() or p.suffix.lower() != ".py": p = p / "rc-jav.py" return p def run_rcjav(args: list[str], timeout: int = 120, extra_flags: list[str] | None = None, rcjav_path: str | None = None, stdin_data: str | None = None) -> tuple[int, str, str]: flags = extra_flags if extra_flags is not None else ["--basic", "--no-color"] script = resolve_rcjav(rcjav_path) # Reject up-front instead of resolving a relative path against the host's # CWD (which is whatever Brave hands it — usually its install dir) and # surfacing a confusing "file not found" from subprocess. if not script.is_absolute() or not script.exists(): return 2, "", ( f"rc-jav.py not found at {script!s} — set the 'rc-jav path' in the " f"extension options to an absolute path that points to rc-jav.py " f"or its parent folder." ) cmd = [PYTHON, str(script), *args, *flags] # Hide console window on Windows creationflags = 0 if os.name == "nt": creationflags = 0x08000000 # CREATE_NO_WINDOW try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, encoding="utf-8", errors="replace", creationflags=creationflags, input=stdin_data, ) return proc.returncode, proc.stdout, proc.stderr except subprocess.TimeoutExpired: return 124, "", "timeout after %ds" % timeout except Exception as e: return 1, "", f"spawn error: {e}" # Memoize rules-info per script path so handle_cache_status doesn't pay # the Python startup cost on every poll. Invalidate on rcjav_path change. _RULES_INFO_CACHE: dict[str, dict] = {} def fetch_rules_info(rcjav_path: str | None) -> dict: """Get cache contract constants + current rules signature from rc-jav.py. Returns {ok, cache_schema, id_rules, id_rules_signature} on success, or {ok: False, error: ...} when the lookup fails. Memoized per script path; the result is cached for the lifetime of this host process. """ script = resolve_rcjav(rcjav_path) key = str(script) cached = _RULES_INFO_CACHE.get(key) if cached is not None: return cached rc, stdout, stderr = run_rcjav( ["--print-rules-info"], extra_flags=[], # NB: omit --basic / --no-color, --print-rules-info already JSON rcjav_path=rcjav_path, timeout=30, ) if rc != 0: info = {"ok": False, "error": (stderr or stdout or "unknown").strip()} else: try: info = json.loads(stdout.strip()) except json.JSONDecodeError as e: info = {"ok": False, "error": f"invalid rules-info JSON: {e}"} _RULES_INFO_CACHE[key] = info return info def part_pattern_args(payload: dict) -> list[str]: args: list[str] = [] for pattern in payload.get("part_patterns") or []: if isinstance(pattern, str) and pattern.strip(): args += ["--part-pattern", pattern.strip()] return args def parse_hits(stdout: str) -> tuple[int, list[str]]: """Sum 'N hit(s)' lines, collect IDs.""" total = 0 ids: list[str] = [] for m in HIT_RE.finditer(stdout): total += int(m.group("count")) ids.append(m.group("id")) return total, ids def host_human_size(n: int) -> str: nf = float(n or 0) for unit in ("B", "KiB", "MiB", "GiB", "TiB"): if nf < 1024: return f"{int(nf)} B" if unit == "B" else f"{nf:.2f} {unit}" nf /= 1024 return f"{nf:.2f} PiB" def host_normalize_id(raw: str) -> str | None: stem = Path(str(raw) + ".x").stem fc2ppv = TEXT_FC2PPV_RE.search(stem) if fc2ppv: return f"FC2-PPV-{int(fc2ppv.group(1))}" fc2 = TEXT_FC2_RE.search(stem) if fc2: return f"FC2-PPV-{int(fc2.group(1))}" m = PRIMARY_ID_RE.match(stem) or COMPOUND_ID_RE.match(stem) or FALLBACK_ID_RE.match(stem) if not m: m = re.match(r"^([A-Za-z][A-Za-z0-9]{1,})(\d{3,5})[a-zA-Z]?$", stem) if not m: return None num = int(m.group(2)) width = max(3, len(m.group(2))) prefix = m.group(1).upper() if prefix == "FC2": prefix = "FC2-PPV" return f"{prefix}-{num:0{width}d}" def host_extract_id_from_text(*parts: object) -> dict: """Extract a canonical JAV ID from saved-tab text using the host's rules. This mirrors the page-title surface used by the extension, then normalizes through host_normalize_id so cache lookups use the same padded key shape as rc-jav. It intentionally returns only the base ID; cached lookup expands to #partN matches below. """ for source, raw in parts: text = str(raw or "") if not text: continue fc2ppv = TEXT_FC2PPV_RE.search(text) if fc2ppv: return {"id": f"FC2-PPV-{int(fc2ppv.group(1))}", "source": source, "raw": fc2ppv.group(0)} fc2 = TEXT_FC2_RE.search(text) if fc2: return {"id": f"FC2-PPV-{int(fc2.group(1))}", "source": source, "raw": fc2.group(0)} m = TEXT_ID_DASHED_RE.search(text) if not m: m = TEXT_ID_UNDASHED_RE.search(text) if not m: continue candidate = m.group(0) norm = host_normalize_id(candidate) if norm: return {"id": norm, "source": source, "raw": candidate} return {"id": None, "source": "none", "raw": ""} def _load_rcjav_config(script: Path) -> dict: cfg_path = script.parent / "config.json" if not cfg_path.exists(): return {} try: return json.loads(cfg_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return {} def _cache_age_label(scanned_at: str, stale_hours: float = 24) -> tuple[str, bool]: try: dt = datetime.fromisoformat(str(scanned_at).replace("Z", "+00:00")) now = datetime.now(dt.tzinfo) if dt.tzinfo else datetime.now() hours = (now - dt).total_seconds() / 3600.0 except Exception: return "?", False if hours < 1: age = f"{int(hours * 60)}m" elif hours < 24: age = f"{hours:.1f}h" else: age = f"{hours / 24:.1f}d" return age, hours > stale_hours def _load_host_cache(script: Path) -> tuple[dict | None, dict]: cache_path = script.parent / "cache.json" timings: dict[str, int | str | bool | None] = {"host_cache_path": str(cache_path)} if not cache_path.exists(): timings["host_cache_error"] = "cache.json not found" return None, timings try: stat = cache_path.stat() except OSError as e: timings["host_cache_error"] = str(e) return None, timings key = str(cache_path.resolve()) stamp = (stat.st_mtime_ns, stat.st_size) with _cache_lock: cached = _cache_mem.get(key) if cached and cached.get("stamp") == stamp: timings["cache_load_ms"] = 0 timings["host_cache_reused"] = True return cached["data"], timings t0 = time.perf_counter() try: data = json.loads(cache_path.read_text(encoding="utf-8")) except Exception as e: timings["cache_load_ms"] = round((time.perf_counter() - t0) * 1000) timings["host_cache_error"] = str(e) return None, timings timings["cache_load_ms"] = round((time.perf_counter() - t0) * 1000) timings["host_cache_reused"] = False with _cache_lock: _cache_mem[key] = {"stamp": stamp, "data": data} return data, timings def _entry_to_hit(source: str, remote: str, item: dict, match_trace: dict | None = None) -> dict: path = item.get("path", "") sep = "" if remote.endswith("/") or not path else "/" size = int(item.get("size") or 0) hit = { "source": source, "remote": remote, "path": path, "full_path": f"{remote}{sep}{path}", "size": size, "size_human": host_human_size(size), "mod_time": item.get("mod_time", ""), "jav_id": item.get("jav_id", ""), } if match_trace: hit.update(match_trace) return hit def _no_match_state(search_mode: str, cache_meta: dict) -> dict: meta = list((cache_meta or {}).values()) missing = [m for m in meta if m.get("age") == "missing"] stale = [m for m in meta if m.get("stale")] if search_mode == "quick": return { "no_match_kind": "live_miss", "no_match_title": "Live search found no library hit", "no_match_detail": "The ID was extracted and searched against rclone directly.", } if missing: return { "no_match_kind": "cache_missing", "no_match_title": "Cache search has missing roots", "no_match_detail": "At least one requested root is not present in cache.json. Rebuild or refresh that cache root.", } if stale: return { "no_match_kind": "cache_stale_miss", "no_match_title": "Stale cache found no library hit", "no_match_detail": "The ID was searched in cache, but one or more cache roots are stale.", } return { "no_match_kind": "cache_miss", "no_match_title": "Cache search found no library hit", "no_match_detail": "The ID was extracted and searched in the current cache.", } def handle_cached_search_fast(payload: dict) -> dict | None: """Answer simple cached ID searches in-process, avoiding per-query Python startup. Returns None when the regular rc-jav CLI path should handle the request.""" query = payload.get("id") or payload.get("query") if not query or payload.get("name") or payload.get("quick"): return None raw = str(query) if "*" in raw or "?" in raw or HOST_RANGE_RE.search(raw): return None norm = host_normalize_id(raw) if not norm: return None t0 = time.perf_counter() script = resolve_rcjav(payload.get("rcjav_path")) cache, timings = _load_host_cache(script) if not isinstance(cache, dict) or not isinstance(cache.get("remotes"), dict): return None cfg = _load_rcjav_config(script) source_roots = [r for r in (payload.get("source_override") or []) if isinstance(r, str)] target_roots = [r for r in (payload.get("target_override") or []) if isinstance(r, str)] if not source_roots and not target_roots: target_roots = cfg.get("default_target") or [] if not source_roots and not target_roots: return None wanted: list[tuple[str, str]] = [("Source", r) for r in source_roots] + [("Target", r) for r in target_roots] cache_meta: dict[str, dict] = {} hits: list[dict] = [] index_start = time.perf_counter() for source, remote in wanted: entry = cache["remotes"].get(remote) if not isinstance(entry, dict): cache_meta[remote] = {"cached": False, "age": "missing", "stale": True, "file_count": 0} continue files = entry.get("files", []) or [] age, stale = _cache_age_label(entry.get("scanned_at", ""), _stale_hours(payload)) cache_meta[remote] = {"cached": True, "age": age, "stale": stale, "file_count": len(files)} for item in files: jid = item.get("jav_id", "") if jid == norm or str(jid).startswith(norm + "#part"): is_part = str(jid).startswith(norm + "#part") normalized = str(raw).upper() != norm.upper() hits.append(_entry_to_hit(source, remote, item, { "match_kind": "part" if is_part else "normalized" if normalized else "exact", "match_reason": "Base ID + part" if is_part else "Normalized ID" if normalized else "Exact ID", "match_confidence": "related" if is_part else "normalized" if normalized else "high", "matched_query": norm, "matched_id": jid, })) timings["cache_match_ms"] = round((time.perf_counter() - index_start) * 1000) hits.sort(key=lambda h: (h.get("jav_id", ""), h.get("path", "").lower())) host_ms = round((time.perf_counter() - t0) * 1000) timings.update({ "host_cached_ms": host_ms, "host_rcjav_ms": 0, "cli_elapsed_ms": None, }) result = { "ok": True, "rc": 0 if hits else 1, "hits": len(hits), "found": bool(hits), "search_mode": "cached", "structured": hits, "cache_meta": cache_meta, "scanned_remotes": list(cache_meta.keys()), "timings": timings, "stderr": "", "id": query, "name": None, "message": f"{len(hits)} hit(s)" if hits else "NOT FOUND", "fast_path": "host-cache", } if not hits: result.update(_no_match_state("cached", cache_meta)) return result # ----- dispatch ----- def handle_search(payload: dict) -> dict: query = payload.get("id") or payload.get("query") name = payload.get("name") quick = bool(payload.get("quick")) fast = handle_cached_search_fast(payload) if fast is not None: return fast args: list[str] = [] if query: args += ["--search", str(query)] if name: args += ["--name", str(name)] if quick: args.append("--quick") args += part_pattern_args(payload) if not args: return {"ok": False, "error": "no id or name"} # Optional profile overrides: --source / --target lists from the active profile. for r in payload.get("source_override") or []: if r and isinstance(r, str): args += ["--source", r] for r in payload.get("target_override") or []: if r and isinstance(r, str): args += ["--target", r] # Always ask for JSON output — needed for per-hit Delete UI in the extension. # Also run --basic to suppress rich on stderr. t0 = time.perf_counter() rc, out, err = run_rcjav(args, extra_flags=["--basic", "--no-color", "--format", "json"], rcjav_path=payload.get("rcjav_path")) rcjav_ms = round((time.perf_counter() - t0) * 1000) parsed = extract_json_blob(out) hits = 0 structured: list[dict] = [] cache_meta = {} if parsed: for q in parsed.get("queries", []): for h in q.get("hits", []): structured.append(h) structured.extend(parsed.get("name_matches", [])) cache_meta = parsed.get("cache_meta", {}) or {} hits = len(structured) else: # Fallback: text count (also covers --basic non-JSON output) hits_count_only, _ = parse_hits(out) hits = hits_count_only # If rc-jav failed and produced no parseable result, surface as an error # instead of an empty "NOT FOUND" — otherwise host crashes look like misses. if rc != 0 and parsed is None and hits == 0: return { "ok": False, "rc": rc, "error": (err or out or "rc-jav exited non-zero").strip()[:1000], "id": query, "name": name, } result = { "ok": True, "rc": rc, "hits": hits, "found": hits > 0, "search_mode": "quick" if quick else "cached", "structured": structured, "cache_meta": cache_meta, "scanned_remotes": list(cache_meta.keys()), "timings": { "host_rcjav_ms": rcjav_ms, "cli_elapsed_ms": round(float(parsed.get("elapsed_sec", 0)) * 1000) if parsed else None, **(parsed.get("timings", {}) if parsed else {}), }, # Drop full stdout from the success payload — it duplicates `structured` # (the parsed JSON blob) and can blow Chrome's 1 MiB message cap. "stderr": (err or "")[:2000], "id": query, "name": name, "message": f"{hits} hit(s)" if hits > 0 else "NOT FOUND", } if hits == 0: result.update(_no_match_state("quick" if quick else "cached", cache_meta)) return result def handle_bulk_search(payload: dict) -> dict: queries = [ str(q).strip() for q in (payload.get("queries") or []) if isinstance(q, str) and str(q).strip() ][:250] if not queries: return {"ok": False, "error": "bulk search needs one or more IDs"} quick = bool(payload.get("quick")) args: list[str] = [] for query in queries: args += ["--search", query] if quick: args.append("--quick") args += part_pattern_args(payload) for r in payload.get("source_override") or []: if r and isinstance(r, str): args += ["--source", r] for r in payload.get("target_override") or []: if r and isinstance(r, str): args += ["--target", r] t0 = time.perf_counter() rc, out, err = run_rcjav( args, extra_flags=["--basic", "--no-color", "--format", "json"], rcjav_path=payload.get("rcjav_path"), ) rcjav_ms = round((time.perf_counter() - t0) * 1000) parsed = extract_json_blob(out) if parsed is None: return {"ok": False, "rc": rc, "error": (err or out or "bulk search failed").strip()[:1000]} cache_meta = parsed.get("cache_meta", {}) or {} grouped = [] total_hits = 0 search_mode = "quick" if quick else "cached" for item in parsed.get("queries", []): hits = item.get("hits", []) or [] total_hits += len(hits) row = {"query": item.get("query", ""), "hits": len(hits), "structured": hits} if not hits: row.update(_no_match_state(search_mode, cache_meta)) grouped.append(row) return { "ok": True, "rc": rc, "search_mode": search_mode, "queries": grouped, "query_count": len(grouped), "hits": total_hits, "cache_meta": cache_meta, "scanned_remotes": list(cache_meta.keys()), "timings": { "host_rcjav_ms": rcjav_ms, "cli_elapsed_ms": round(float(parsed.get("elapsed_sec", 0)) * 1000), **(parsed.get("timings", {}) or {}), }, "stderr": (err or "")[:2000], } def _compact_tab_hit(hit: dict) -> dict: return { "source": hit.get("source", ""), "remote": hit.get("remote", ""), "path": hit.get("path", ""), "full_path": hit.get("full_path", ""), "size": hit.get("size", 0), "size_human": hit.get("size_human", ""), "mod_time": hit.get("mod_time", ""), "jav_id": hit.get("jav_id", ""), "match_kind": hit.get("match_kind", ""), "match_reason": hit.get("match_reason", ""), "matched_id": hit.get("matched_id", ""), } def handle_check_tabs(payload: dict) -> dict: items = [ item for item in (payload.get("items") or []) if isinstance(item, dict) ][:2000] if not items: return {"ok": False, "error": "check_tabs needs one or more tab items"} t0 = time.perf_counter() script = resolve_rcjav(payload.get("rcjav_path")) cache, timings = _load_host_cache(script) if not isinstance(cache, dict) or not isinstance(cache.get("remotes"), dict): return {"ok": False, "error": timings.get("host_cache_error") or "cache.json is missing or malformed", "timings": timings} cfg = _load_rcjav_config(script) source_roots = [r for r in (payload.get("source_override") or []) if isinstance(r, str)] target_roots = [r for r in (payload.get("target_override") or []) if isinstance(r, str)] if not source_roots and not target_roots: target_roots = cfg.get("default_target") or [] if not source_roots and not target_roots: return {"ok": False, "error": "no source or target roots configured"} wanted: list[tuple[str, str]] = [("Source", r) for r in source_roots] + [("Target", r) for r in target_roots] cache_meta: dict[str, dict] = {} index: dict[str, list[dict]] = {} index_start = time.perf_counter() for source, remote in wanted: entry = cache["remotes"].get(remote) if not isinstance(entry, dict): cache_meta[remote] = {"cached": False, "age": "missing", "stale": True, "file_count": 0} continue files = entry.get("files", []) or [] age, stale = _cache_age_label(entry.get("scanned_at", ""), _stale_hours(payload)) cache_meta[remote] = {"cached": True, "age": age, "stale": stale, "file_count": len(files)} for item in files: jid = str(item.get("jav_id") or "") if not jid: continue hit = _entry_to_hit(source, remote, item) index.setdefault(jid, []).append(hit) timings["cache_index_ms"] = round((time.perf_counter() - index_start) * 1000) unique_ids: dict[str, list[dict]] = {} extracted_by_key: dict[str, dict] = {} for pos, item in enumerate(items): key = str(item.get("key") or pos) extracted = host_extract_id_from_text( ("title", item.get("title", "")), ("url", item.get("url", "")), ) extracted_by_key[key] = extracted if extracted.get("id"): unique_ids.setdefault(extracted["id"], []).append(item) id_hits: dict[str, list[dict]] = {} for jav_id in unique_ids: hits: list[dict] = [] seen: set[tuple[str, str, str]] = set() for jid, bucket in index.items(): if jid == jav_id or jid.startswith(jav_id + "#part"): for hit in bucket: key = (hit.get("remote", ""), hit.get("path", ""), hit.get("jav_id", "")) if key in seen: continue seen.add(key) is_part = str(hit.get("jav_id", "")).startswith(jav_id + "#part") annotated = dict(hit) annotated.update({ "match_kind": "part" if is_part else "exact", "match_reason": "Base ID + part" if is_part else "Exact ID", "match_confidence": "related" if is_part else "high", "matched_query": jav_id, "matched_id": hit.get("jav_id", ""), }) hits.append(annotated) hits.sort(key=lambda h: (h.get("jav_id", ""), h.get("path", "").lower())) id_hits[jav_id] = hits results: list[dict] = [] found = missing = no_id = 0 for pos, item in enumerate(items): key = str(item.get("key") or pos) extracted = extracted_by_key.get(key) or {"id": None, "source": "none", "raw": ""} jav_id = extracted.get("id") if not jav_id: no_id += 1 results.append({ "key": key, "status": "no_id", "jav_id": None, "extracted_from": extracted.get("source", "none"), "raw_match": extracted.get("raw", ""), "hits": 0, "sample": None, }) continue hits = id_hits.get(jav_id, []) if hits: found += 1 results.append({ "key": key, "status": "found", "jav_id": jav_id, "extracted_from": extracted.get("source", "none"), "raw_match": extracted.get("raw", ""), "hits": len(hits), "sample": _compact_tab_hit(hits[0]), "truncated_hits": max(0, len(hits) - 1), }) else: missing += 1 row = { "key": key, "status": "missing", "jav_id": jav_id, "extracted_from": extracted.get("source", "none"), "raw_match": extracted.get("raw", ""), "hits": 0, "sample": None, } row.update(_no_match_state("cached", cache_meta)) results.append(row) timings["host_check_tabs_ms"] = round((time.perf_counter() - t0) * 1000) return { "ok": True, "search_mode": "cached", "item_count": len(items), "unique_id_count": len(unique_ids), "found": found, "missing": missing, "no_id": no_id, "results": results, "cache_meta": cache_meta, "scanned_remotes": list(cache_meta.keys()), "timings": timings, } def handle_dupe_review(payload: dict) -> dict: script = resolve_rcjav(payload.get("rcjav_path")) cfg = _load_rcjav_config(script) source_roots = [r for r in (payload.get("source_override") or []) if isinstance(r, str)] target_roots = [r for r in (payload.get("target_override") or []) if isinstance(r, str)] if not source_roots and not target_roots: target_roots = cfg.get("default_target") or [] args = ["--cache"] args += part_pattern_args(payload) for root in source_roots: args += ["--source", root] for root in target_roots: args += ["--target", root] if not source_roots and not target_roots: return {"ok": False, "error": "no source or target roots configured for duplicate review"} t0 = time.perf_counter() rc, out, err = run_rcjav( args, extra_flags=["--basic", "--no-color", "--format", "json"], rcjav_path=payload.get("rcjav_path"), ) elapsed = round((time.perf_counter() - t0) * 1000) parsed = extract_json_blob(out) if parsed is None: return {"ok": False, "rc": rc, "error": (err or out or "duplicate review failed").strip()[:1000]} groups = parsed.get("groups", {}) or {} reclaim = sum( int(item.get("size") or 0) for group in groups.values() for item in group.get("delete_candidates", []) or [] ) return { "ok": True, "groups": groups, "group_count": len(groups), "variant_alerts": parsed.get("variant_alerts", []) or [], "roots": {"source": source_roots, "target": target_roots}, "potential_reclaim": reclaim, "potential_reclaim_human": host_human_size(reclaim), "timings": {"host_rcjav_ms": elapsed}, "stderr": (err or "")[:2000], } def handle_library_issues(payload: dict) -> dict: """Return cache-only filename hygiene issues.""" script = resolve_rcjav(payload.get("rcjav_path")) t0 = time.perf_counter() rc, out, err = run_rcjav( ["--library-issues"], extra_flags=["--basic", "--no-color", "--format", "json"], rcjav_path=payload.get("rcjav_path"), ) elapsed = round((time.perf_counter() - t0) * 1000) parsed = extract_json_blob(out) if parsed is None: return {"ok": False, "rc": rc, "error": (err or out or "library-issues failed").strip()[:1000]} return { "ok": True, "bracket_names": parsed.get("bracket_names", []), "nohyphen_names": parsed.get("nohyphen_names", []), "missing_resolution": parsed.get("missing_resolution", []), "missing_resolution_summary": parsed.get("missing_resolution_summary", {}), "resolution_noncanonical": parsed.get("resolution_noncanonical", []), "resolution_noncanonical_summary": parsed.get("resolution_noncanonical_summary", {}), "timings": {"host_rcjav_ms": elapsed}, } def _patch_cache_remove_paths(rcjav_path: str | None, full_paths: list[str]) -> None: """Remove file entries from cache.json after successful deletion.""" try: script = resolve_rcjav(rcjav_path) cache_path = script.parent / "cache.json" if not cache_path.exists(): return cache = json.loads(cache_path.read_text(encoding="utf-8")) # Build set of (remote, rel_path) to remove to_remove: set[tuple[str, str]] = set() for full_path in full_paths: for remote in cache.get("remotes", {}): sep = "" if remote.endswith("/") else "/" prefix = remote + sep if full_path.startswith(prefix): to_remove.add((remote, full_path[len(prefix):])) break if not to_remove: return modified = False for remote, remote_data in cache.get("remotes", {}).items(): before = len(remote_data.get("files", [])) remote_data["files"] = [ f for f in remote_data.get("files", []) if (remote, f["path"]) not in to_remove ] if len(remote_data["files"]) != before: modified = True if modified: tmp = cache_path.with_suffix(cache_path.suffix + ".tmp") tmp.write_text(json.dumps(cache, indent=2), encoding="utf-8") os.replace(tmp, cache_path) except Exception as e: log(f"_patch_cache_remove_paths error: {e}") def handle_delete_batch(payload: dict) -> dict: """Delete multiple files respecting existing delete settings, patch cache once.""" paths = payload.get("paths") or [] if not isinstance(paths, list) or not paths: return {"ok": False, "error": "paths must be a non-empty array"} base = { "mode": payload.get("mode", "trash"), "trash_dir": payload.get("trash_dir", ""), "allowed_remotes": payload.get("allowed_remotes", []), "rcjav_path": payload.get("rcjav_path", ""), } results = [] deleted_paths = [] for path in paths: r = handle_delete({**base, "path": path}) results.append({"path": path, **r}) if r.get("ok"): deleted_paths.append(path) if deleted_paths: _patch_cache_remove_paths(payload.get("rcjav_path"), deleted_paths) ok_count = sum(1 for r in results if r.get("ok")) return { "ok": ok_count > 0, "results": results, "deleted_count": ok_count, "failed_count": len(results) - ok_count, } def handle_rename_file(payload: dict) -> dict: """Rename one file in an rclone remote and patch cache. Collision-safe.""" remote = (payload.get("remote") or "").strip() old_path = (payload.get("old_path") or "").strip() new_path = (payload.get("new_path") or "").strip() if not remote or not old_path or not new_path: return {"ok": False, "error": "remote, old_path, and new_path are required"} if old_path == new_path: return {"ok": False, "error": "old_path and new_path are identical"} t0 = time.perf_counter() rc, out, err = run_rcjav( ["--rename-file", "--remote", remote, "--old-path", old_path, "--new-path", new_path], extra_flags=["--basic", "--no-color"], rcjav_path=payload.get("rcjav_path"), ) elapsed = round((time.perf_counter() - t0) * 1000) parsed = extract_json_blob(out) if parsed is None: return {"ok": False, "rc": rc, "error": (err or out or "rename failed").strip()[:1000]} return {**parsed, "timings": {"host_rcjav_ms": elapsed}} def handle_rename_files_batch(payload: dict) -> dict: """Rename multiple files in one call — single cache write at the end.""" renames = payload.get("renames") or [] if not isinstance(renames, list) or not renames: return {"ok": False, "error": "renames must be a non-empty array"} t0 = time.perf_counter() rc, out, err = run_rcjav( ["--rename-files-batch"], extra_flags=["--basic", "--no-color"], rcjav_path=payload.get("rcjav_path"), stdin_data=json.dumps(renames), ) elapsed = round((time.perf_counter() - t0) * 1000) parsed = extract_json_blob(out) if parsed is None: return {"ok": False, "rc": rc, "error": (err or out or "batch rename failed").strip()[:1000]} return {**parsed, "timings": {"host_rcjav_ms": elapsed}} def _remote_root(p: str) -> str: """Return the `remote:` prefix of an rclone path. For a local path, return ''.""" if ":" not in p: return "" return p.split(":", 1)[0] + ":" def _normalize_rclone_prefix(p: str) -> str: """Normalize an allowed rclone path prefix without broadening it to the remote root.""" p = (p or "").strip().replace("\\", "/").rstrip("/") return p if ":" in p else "" def _path_in_allowed_prefixes(path: str, prefixes: list[str]) -> bool: path_norm = path.replace("\\", "/") for prefix in prefixes: if path_norm == prefix or path_norm.startswith(prefix + "/"): return True return False def handle_delete(payload: dict) -> dict: """Delete or move-to-trash a single file via rclone.""" path = payload.get("path", "").strip() mode = payload.get("mode", "trash") # 'trash' or 'permanent' trash_dir = payload.get("trash_dir", "").strip() allowed_prefixes = [ p for p in (_normalize_rclone_prefix(r) for r in payload.get("allowed_remotes", [])) if p ] # Derive additional allowlist from rc-jav's config.json (default_target + # default_source) so the host is self-protective even if the extension forgets # to pass allowed_remotes. Plus the trash_dir itself. script = resolve_rcjav(payload.get("rcjav_path")) cfg_path = script.parent / "config.json" if cfg_path.exists(): try: cfg = json.loads(cfg_path.read_text(encoding="utf-8")) for key in ("default_target", "default_source"): for r in cfg.get(key, []) or []: prefix = _normalize_rclone_prefix(r) if prefix and prefix not in allowed_prefixes: allowed_prefixes.append(prefix) except (OSError, json.JSONDecodeError): pass if trash_dir: prefix = _normalize_rclone_prefix(trash_dir) if prefix and prefix not in allowed_prefixes: allowed_prefixes.append(prefix) if not path: return {"ok": False, "error": "no path"} # Reject directory-shaped paths — `rclone moveto` on a trailing-slash path # would move a whole tree and produce a destination with empty leaf. if path.endswith("/") or path.endswith("\\"): return {"ok": False, "error": "path looks like a directory (trailing slash)"} # Sanity: must look like an rclone remote path (contains ':') or a local file if ":" not in path and not Path(path).exists(): return {"ok": False, "error": "path doesn't look like an rclone remote or local file"} # Allowlist: refuse to delete anything outside the configured library paths. # Defaults derive from rc-jav config.json + trash_dir above. # If still empty, no config was found — refuse rather than fail-open. if not allowed_prefixes: return {"ok": False, "error": "no allowed prefixes configured; refusing to delete"} root = _remote_root(path) if not root or not _path_in_allowed_prefixes(path, allowed_prefixes): return { "ok": False, "error": f"path {path!r} not in allowed prefixes={allowed_prefixes}", } audit = DELETE_LOG log_entry = {"ts": __import__("datetime").datetime.now().isoformat(), "path": path, "mode": mode, "trash_dir": trash_dir} if mode == "permanent": cmd = [RCLONE_BIN, "deletefile", path] elif mode == "trash": if not trash_dir: return {"ok": False, "error": "trash_dir required when mode=trash"} # rclone moveto src dst — preserves filename if dst is dir-like, but for a single # file we should give it a full destination path. from datetime import date date_prefix = date.today().isoformat() leaf = path.split("/")[-1] dst = f"{trash_dir.rstrip('/')}/{date_prefix}/{leaf}" cmd = [RCLONE_BIN, "moveto", path, dst] log_entry["dst"] = dst else: return {"ok": False, "error": f"unknown mode {mode!r}"} creationflags = 0x08000000 if os.name == "nt" else 0 try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=300, encoding="utf-8", errors="replace", creationflags=creationflags) log_entry["rc"] = proc.returncode log_entry["stderr"] = proc.stderr.strip()[:500] except Exception as e: log_entry["error"] = str(e) try: with audit.open("a", encoding="utf-8") as f: f.write(json.dumps(log_entry) + "\n") except OSError: pass return {"ok": False, "error": str(e)} try: with audit.open("a", encoding="utf-8") as f: f.write(json.dumps(log_entry) + "\n") except OSError: pass return { "ok": proc.returncode == 0, "rc": proc.returncode, "stdout": proc.stdout, "stderr": proc.stderr, "path": path, "mode": mode, "dst": log_entry.get("dst"), } def handle_recent_deletes(payload: dict) -> dict: """Return the last N successful trash-mode deletes, newest first, so the extension can offer Undo. Permanent deletes are excluded — they can't be undone.""" limit = max(1, min(int(payload.get("limit") or 20), 200)) audit = DELETE_LOG if not audit.exists(): return {"ok": True, "entries": []} # Read tail. deletes.log is line-delimited JSON; small enough to read in full # in practice, but cap defensively at 1 MiB so a runaway log doesn't OOM us. try: raw = audit.read_text(encoding="utf-8", errors="replace") if len(raw) > 1_000_000: raw = raw[-1_000_000:] except OSError as e: return {"ok": False, "error": f"can't read deletes.log: {e}"} out: list[dict] = [] for line in raw.splitlines(): line = line.strip() if not line: continue try: obj = json.loads(line) except json.JSONDecodeError: continue # Skip undo records and permanent deletes — neither is undoable. if obj.get("kind") == "undo": continue if obj.get("mode") != "trash": continue if obj.get("rc") != 0: continue if not obj.get("dst"): continue out.append({ "ts": obj.get("ts"), "path": obj.get("path"), "dst": obj.get("dst"), "trash_dir": obj.get("trash_dir"), }) # Build the set of dst paths already undone so we hide them. undone_dst = set() for line in raw.splitlines(): try: obj = json.loads(line) except (json.JSONDecodeError, ValueError): continue if obj.get("kind") == "undo" and obj.get("rc") == 0 and obj.get("dst"): undone_dst.add(obj["dst"]) out = [e for e in out if e["dst"] not in undone_dst] out.reverse() # newest first return {"ok": True, "entries": out[:limit]} def handle_undo_delete(payload: dict) -> dict: """Move a file from trash back to its original path via `rclone moveto`. The extension passes the {dst, path} pair captured in the delete audit log; we re-validate both ends against the configured allowlist before acting.""" dst = (payload.get("dst") or "").strip() orig = (payload.get("path") or "").strip() if not dst or not orig: return {"ok": False, "error": "undo needs both dst (trash location) and path (original)"} # Reuse the same allowlist derivation handle_delete uses. allowed_prefixes = [ p for p in (_normalize_rclone_prefix(r) for r in payload.get("allowed_remotes", [])) if p ] script = resolve_rcjav(payload.get("rcjav_path")) cfg_path = script.parent / "config.json" if cfg_path.exists(): try: cfg = json.loads(cfg_path.read_text(encoding="utf-8")) for key in ("default_target", "default_source"): for r in cfg.get(key, []) or []: prefix = _normalize_rclone_prefix(r) if prefix and prefix not in allowed_prefixes: allowed_prefixes.append(prefix) except (OSError, json.JSONDecodeError): pass trash_dir = (payload.get("trash_dir") or "").strip() if trash_dir: prefix = _normalize_rclone_prefix(trash_dir) if prefix and prefix not in allowed_prefixes: allowed_prefixes.append(prefix) if not allowed_prefixes: return {"ok": False, "error": "no allowed prefixes configured; refusing to undo"} for end, label in ((dst, "dst"), (orig, "path")): if end.endswith("/") or end.endswith("\\"): return {"ok": False, "error": f"undo {label} looks like a directory"} if ":" not in end: return {"ok": False, "error": f"undo {label} not an rclone path"} if not _path_in_allowed_prefixes(end, allowed_prefixes): return {"ok": False, "error": f"undo {label} {end!r} not in allowed prefixes"} creationflags = 0x08000000 if os.name == "nt" else 0 cmd = [RCLONE_BIN, "moveto", dst, orig] audit = DELETE_LOG log_entry = { "ts": datetime.now().isoformat(), "kind": "undo", "path": orig, "dst": dst, "trash_dir": trash_dir, } try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=300, encoding="utf-8", errors="replace", creationflags=creationflags) log_entry["rc"] = proc.returncode log_entry["stderr"] = (proc.stderr or "").strip()[:500] except Exception as e: log_entry["error"] = str(e) try: with audit.open("a", encoding="utf-8") as f: f.write(json.dumps(log_entry) + "\n") except OSError: pass return {"ok": False, "error": str(e)} try: with audit.open("a", encoding="utf-8") as f: f.write(json.dumps(log_entry) + "\n") except OSError: pass return { "ok": proc.returncode == 0, "rc": proc.returncode, "stderr": proc.stderr, "path": orig, "dst": dst, } # rclone binary path — assumes on PATH. Override via env if needed. RCLONE_BIN = "rclone" def handle_ping(payload: dict) -> dict: override = payload.get("rcjav_path", "") effective = resolve_rcjav(override) return { "ok": True, "version": VERSION, "rc_jav": str(effective), "rc_jav_exists": effective.exists(), "rc_jav_default": str(RC_JAV), "rc_jav_overridden": bool(override), "python": PYTHON, } def handle_diagnostics(payload: dict) -> dict: """Run a battery of environment checks. Returns ordered list of {name, status, detail}.""" import shutil checks: list[dict] = [] def add(name, status, detail=""): checks.append({"name": name, "status": status, "detail": str(detail)}) # Python add("Python interpreter", "ok", sys.version.split()[0] + " — " + sys.executable) # rc-jav script override = payload.get("rcjav_path", "") script = resolve_rcjav(override) add("Host version", "ok", VERSION) add("rc-jav path source", "ok", "extension override: " + override if override else f"host default: {RC_JAV}") if script.exists(): add("rc-jav.py", "ok", str(script)) else: add("rc-jav.py", "fail", f"not found at {script}") ext_settings = payload.get("extension_settings") or {} if isinstance(ext_settings, dict) and ext_settings: mode = "quick/live" if ext_settings.get("quick_mode") else "cache" profile = ext_settings.get("active_profile") or "config.json defaults" profile_count = ext_settings.get("profile_count", 0) triggers = [] if ext_settings.get("auto_every_page"): triggers.append("every page") if ext_settings.get("auto_known_sites"): triggers.append("known sites") add("Extension settings", "ok", f"mode={mode}; profile={profile}; profiles={profile_count}; auto={', '.join(triggers) or 'off'}") # rc-jav can run --help if script.exists(): try: cf = 0x08000000 if os.name == "nt" else 0 r = subprocess.run([PYTHON, str(script), "--help"], capture_output=True, text=True, timeout=10, encoding="utf-8", errors="replace", creationflags=cf) if r.returncode == 0: add("rc-jav --help", "ok", "exits 0") else: add("rc-jav --help", "fail", f"exit {r.returncode}: {r.stderr.strip()[:200]}") except Exception as e: add("rc-jav --help", "fail", str(e)) else: add("rc-jav --help", "warn", "skipped (script missing)") # rich library present? try: if script.exists(): cf = 0x08000000 if os.name == "nt" else 0 r = subprocess.run([PYTHON, "-c", "import rich; print(rich.__file__)"], capture_output=True, text=True, timeout=10, encoding="utf-8", errors="replace", creationflags=cf) if r.returncode == 0: add("rich (Python pkg)", "ok", r.stdout.strip()) else: add("rich (Python pkg)", "fail", "import failed — run: pip install rich") except Exception as e: add("rich (Python pkg)", "fail", str(e)) # rclone on PATH rclone = shutil.which("rclone") or shutil.which("rclone.exe") if rclone: try: cf = 0x08000000 if os.name == "nt" else 0 r = subprocess.run([rclone, "version"], capture_output=True, text=True, timeout=10, encoding="utf-8", errors="replace", creationflags=cf) first = r.stdout.splitlines()[0] if r.stdout else "?" add("rclone binary", "ok", f"{rclone} — {first}") except Exception as e: add("rclone binary", "warn", f"found at {rclone} but version check failed: {e}") else: add("rclone binary", "fail", "not on PATH") # Brave config — rclone listremotes if rclone: try: cf = 0x08000000 if os.name == "nt" else 0 r = subprocess.run([rclone, "listremotes"], capture_output=True, text=True, timeout=10, encoding="utf-8", errors="replace", creationflags=cf) if r.returncode == 0: remotes = [s.strip() for s in r.stdout.splitlines() if s.strip()] add("rclone remotes", "ok" if remotes else "warn", ", ".join(remotes) if remotes else "no remotes configured") else: add("rclone remotes", "fail", r.stderr.strip()[:200]) except Exception as e: add("rclone remotes", "fail", str(e)) # rc-jav config.json if script.exists(): cfg = script.parent / "config.json" if cfg.exists(): try: data = json.loads(cfg.read_text(encoding="utf-8")) tgt = data.get("default_target", []) src = data.get("default_source", []) add("rc-jav config.json", "ok", f"source={src or '(empty)'} target={tgt or '(empty)'}") except Exception as e: add("rc-jav config.json", "fail", str(e)) else: add("rc-jav config.json", "warn", f"not present at {cfg}") # cache.json status cache = script.parent / "cache.json" if cache.exists(): try: data = json.loads(cache.read_text(encoding="utf-8")) rmap = data.get("remotes", {}) summary = [f"{r}={len(v.get('files', []))} files" for r, v in rmap.items()] add("rc-jav cache.json", "ok", "; ".join(summary) if summary else "empty") except Exception as e: add("rc-jav cache.json", "warn", str(e)) else: add("rc-jav cache.json", "warn", "no cache yet (run --scan to build)") try: cache_status = handle_cache_status(payload) warnings = cache_status.get("warnings") or [] stale = [r["remote"] for r in cache_status.get("remotes", []) if r.get("stale")] if warnings: add("Cache health", "warn", "; ".join((w.get("message") or w.get("code") or "?") for w in warnings[:3])) elif stale: add("Cache health", "warn", "stale remotes: " + ", ".join(stale)) elif cache_status.get("cache_exists"): add("Cache health", "ok", "no mismatch warnings") except Exception as e: add("Cache health", "warn", str(e)) # WinCatalog folder wc = script.parent / "wincatalog" if wc.exists() and wc.is_dir(): files = [f.name for f in wc.iterdir() if f.suffix.lower() in (".csv", ".xml")] add("WinCatalog folder", "ok" if files else "info", f"{wc} — {len(files)} CSV/XML file(s)" if files else f"optional: no CSV/XML files in {wc}") else: add("WinCatalog folder", "info", f"optional: not present at {wc}") return {"ok": True, "checks": checks} REGISTRY_HOST_KEYS = [ (r"HKLM\Software\BraveSoftware\Brave-Browser\NativeMessagingHosts\com.rcjav.host", "HKEY_LOCAL_MACHINE", r"Software\BraveSoftware\Brave-Browser\NativeMessagingHosts\com.rcjav.host"), (r"HKLM\Software\Google\Chrome\NativeMessagingHosts\com.rcjav.host", "HKEY_LOCAL_MACHINE", r"Software\Google\Chrome\NativeMessagingHosts\com.rcjav.host"), (r"HKLM\Software\WOW6432Node\Google\Chrome\NativeMessagingHosts\com.rcjav.host", "HKEY_LOCAL_MACHINE", r"Software\WOW6432Node\Google\Chrome\NativeMessagingHosts\com.rcjav.host"), (r"HKLM\Software\Chromium\NativeMessagingHosts\com.rcjav.host", "HKEY_LOCAL_MACHINE", r"Software\Chromium\NativeMessagingHosts\com.rcjav.host"), (r"HKCU\Software\BraveSoftware\Brave-Browser\NativeMessagingHosts\com.rcjav.host", "HKEY_CURRENT_USER", r"Software\BraveSoftware\Brave-Browser\NativeMessagingHosts\com.rcjav.host"), (r"HKCU\Software\Google\Chrome\NativeMessagingHosts\com.rcjav.host", "HKEY_CURRENT_USER", r"Software\Google\Chrome\NativeMessagingHosts\com.rcjav.host"), (r"HKCU\Software\Chromium\NativeMessagingHosts\com.rcjav.host", "HKEY_CURRENT_USER", r"Software\Chromium\NativeMessagingHosts\com.rcjav.host"), ] def _registry_host_entries() -> list[dict]: if os.name != "nt": return [] try: import winreg except Exception: return [{"key": "registry", "status": "fail", "detail": "winreg unavailable"}] hives = { "HKEY_LOCAL_MACHINE": winreg.HKEY_LOCAL_MACHINE, "HKEY_CURRENT_USER": winreg.HKEY_CURRENT_USER, } entries = [] for label, hive_name, subkey in REGISTRY_HOST_KEYS: try: with winreg.OpenKey(hives[hive_name], subkey) as key: value, _ = winreg.QueryValueEx(key, None) entries.append({"key": label, "status": "ok", "manifest": str(value)}) except FileNotFoundError: entries.append({"key": label, "status": "missing", "manifest": ""}) except PermissionError as e: entries.append({"key": label, "status": "warn", "manifest": "", "detail": str(e)}) except OSError as e: entries.append({"key": label, "status": "warn", "manifest": "", "detail": str(e)}) return entries def handle_host_status(payload: dict) -> dict: """Check native-messaging registration without writing registry or files.""" extension_id = (payload.get("extension_id") or "").strip() expected_origin = f"chrome-extension://{extension_id}/" if extension_id else "" host_dir = Path(__file__).resolve().parent manifest_path = host_dir / "com.rcjav.host.json" install_script = host_dir / "install-host.ps1" batch_path = host_dir / "rcjav-host.bat" checks: list[dict] = [] def add(name, status, detail=""): checks.append({"name": name, "status": status, "detail": str(detail)}) if manifest_path.exists(): add("Manifest file", "ok", str(manifest_path)) else: add("Manifest file", "fail", f"not found at {manifest_path}") manifest = {} if manifest_path.exists(): try: manifest = json.loads(manifest_path.read_text(encoding="utf-8-sig")) add("Manifest JSON", "ok", "valid JSON") except Exception as e: add("Manifest JSON", "fail", str(e)) manifest_host_path = manifest.get("path", "") if isinstance(manifest, dict) else "" if manifest_host_path: hp = Path(manifest_host_path) status = "ok" if hp.exists() else "fail" add("Manifest host path", status, manifest_host_path) else: add("Manifest host path", "fail", "missing path") origins = manifest.get("allowed_origins", []) if isinstance(manifest, dict) else [] if expected_origin: add( "Extension origin", "ok" if expected_origin in origins else "fail", expected_origin if expected_origin in origins else f"expected {expected_origin}; manifest has {origins}", ) else: add("Extension origin", "warn", "extension id not supplied") registry_entries = _registry_host_entries() matching_registry = [ e for e in registry_entries if e.get("manifest") and Path(e["manifest"]).resolve() == manifest_path.resolve() ] registered = [e for e in registry_entries if e.get("status") == "ok"] if matching_registry: add("Registry registration", "ok", f"{len(matching_registry)} matching registry entr{'y' if len(matching_registry) == 1 else 'ies'}\n" + "\n".join(e["key"] for e in matching_registry)) elif registered: add("Registry registration", "fail", "registered path differs: " + "; ".join(f"{e['key']} -> {e['manifest']}" for e in registered)) else: add("Registry registration", "fail" if os.name == "nt" else "warn", "no matching native host registry entries found") if batch_path.exists(): add("Host launcher", "ok", str(batch_path)) else: add("Host launcher", "fail", str(batch_path)) has_problem = any(c.get("status") == "fail" for c in checks) add("Repair command", "warn" if has_problem else "info", f"pwsh -ExecutionPolicy Bypass -File \"{install_script}\" -ExtensionId {extension_id or ''}") return { "ok": True, "checks": checks, "registry": registry_entries, "manifest_path": str(manifest_path), "install_script": str(install_script), } def handle_host_repair(payload: dict) -> dict: """Launch install-host.ps1 to (re)register the native host on this PC. install-host.ps1 self-elevates via UAC, writes com.rcjav.host.json with the correct local batch path, registers both HKLM and HKCU registry entries across Brave/Chrome/Chromium hives, and pulls the extension ID from allowed-extension-ids.json (so the caller no longer has to provide it — ext-fixer keeps the ID stable across PCs). The elevated child runs in its own PowerShell window with a Read-Host pause at the end, so this RPC returns as soon as the launcher fires. """ # extension_id arg kept for backwards-compatible payloads but unused. host_dir = Path(__file__).resolve().parent ps1 = host_dir / "install-host.ps1" if not ps1.exists(): return {"ok": False, "error": f"install-host.ps1 not found at {ps1}"} if os.name != "nt": return {"ok": False, "error": "host repair script is Windows-only"} try: # DETACHED_PROCESS (0x08) + CREATE_NEW_PROCESS_GROUP (0x200) — keep the # ps1 alive independent of this RPC's lifetime. creationflags = 0x00000008 | 0x00000200 subprocess.Popen( ["powershell.exe", "-NoProfile", "-ExecutionPolicy", "Bypass", "-File", str(ps1)], creationflags=creationflags, cwd=str(host_dir), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) except Exception as e: return {"ok": False, "error": f"failed to launch install-host.ps1: {e}"} return { "ok": True, "launched": True, "script_path": str(ps1), "manifest_path": str(host_dir / "com.rcjav.host.json"), "restart_required": True, "message": ( "install-host.ps1 launched. Approve the UAC prompt, wait for " "'Press Enter to close', then fully restart Brave and click " "Verify Registration." ), } def _cache_age_hours(scanned_at: str) -> float | None: try: dt = datetime.fromisoformat(scanned_at.replace("Z", "+00:00")) except (ValueError, AttributeError): return None now = datetime.now(dt.tzinfo) if dt.tzinfo else datetime.now() return (now - dt).total_seconds() / 3600.0 def _stale_hours(payload: dict) -> float: try: hours = float(payload.get("stale_hours", 24)) except (TypeError, ValueError): return 24 return min(max(hours, 1), 24 * 365) def _read_scan_state() -> dict: if not SCAN_STATE_FILE.exists(): return {} try: state = json.loads(SCAN_STATE_FILE.read_text(encoding="utf-8")) return state if isinstance(state, dict) else {} except (OSError, json.JSONDecodeError): return {} def _apply_scan_status(remotes: list[dict], state: dict) -> None: current = state.get("current_remote") if not current: return status = "" if state.get("scanning") and not state.get("done"): status = "scanning" elif state.get("done") and state.get("scan_ok") is False: status = "failed" if not status: return for remote in remotes: if remote.get("remote") == current: remote["status"] = status remote["stale"] = status == "failed" or remote.get("stale", False) return def _describe_skipped_id(path: str, remote: str = "") -> dict: name = Path(str(path or "").replace("\\", "/")).name ext = Path(name).suffix.lower().lstrip(".") reason = "no supported ID at filename start" if re.match(r"^\[[A-Za-z0-9-]+-\d+\]", name): reason = "leading brackets hide the ID" elif re.match(r"^[A-Za-z][A-Za-z0-9]+[\u2010-\u2015]\d+", name): reason = "non-ASCII dash in ID" elif re.match(r"^[A-Za-z][A-Za-z0-9]+\d+", name): reason = "missing hyphen between prefix and number" sep = "" if not remote or remote.endswith("/") or not path else "/" full_path = f"{remote}{sep}{path}" if remote else path return {"path": path, "name": name, "ext": ext, "reason": reason, "full_path": full_path} def _cache_freshness_fields(data: dict | None, rules_info: dict) -> dict: """Build the cache-contract fields surfaced to the extension. `data` is the parsed cache.json (or None when the file is missing). `rules_info` is the dict returned by fetch_rules_info; when its `ok` flag is False we report match-flags as None and let the UI decide whether to show a "rules lookup failed" state. """ out: dict = { # Legacy field — preserved for any consumer that still reads it. "version": (data or {}).get("version") if data else None, # New two-tier contract: "cache_schema": (data or {}).get("cache_schema") if data else None, "id_rules": (data or {}).get("id_rules") if data else None, "id_rules_signature": (data or {}).get("id_rules_signature") if data else None, "expected_cache_schema": None, "expected_id_rules": None, "expected_id_rules_signature": None, "cache_schema_match": None, "id_rules_match": None, "id_rules_signature_match": None, "cache_state": None, # 'fresh' | 'stale_by_rules' | 'schema_mismatch' | 'missing' "rules_info_error": None, } if not rules_info.get("ok"): out["rules_info_error"] = rules_info.get("error") or "rules lookup failed" return out out["expected_cache_schema"] = rules_info.get("cache_schema") out["expected_id_rules"] = rules_info.get("id_rules") out["expected_id_rules_signature"] = rules_info.get("id_rules_signature") if data is None: out["cache_state"] = "missing" return out # Legacy version:3 cache (pre-migration on disk): treat as stale_by_rules. if "cache_schema" not in data and data.get("version") == 3: out["cache_state"] = "stale_by_rules" out["cache_schema_match"] = True # we'll migrate at next load_cache out["id_rules_match"] = False out["id_rules_signature_match"] = False return out schema_match = data.get("cache_schema") == out["expected_cache_schema"] rules_match = data.get("id_rules") == out["expected_id_rules"] sig_match = data.get("id_rules_signature") == out["expected_id_rules_signature"] out["cache_schema_match"] = schema_match out["id_rules_match"] = rules_match out["id_rules_signature_match"] = sig_match if not schema_match: out["cache_state"] = "schema_mismatch" elif rules_match and sig_match: out["cache_state"] = "fresh" else: out["cache_state"] = "stale_by_rules" return out def handle_reextract_ids(payload: dict) -> dict: """Trigger a fast re-extract of jav_ids against the current rule set. No rclone calls — walks the on-disk cache.json. Stamps current rules + signature into the cache on success so the next cache_status call reports `cache_state: "fresh"`. """ rc, stdout, stderr = run_rcjav( ["--reextract", "--format", "json"], extra_flags=[], # --reextract already JSON; --basic would prepend noise rcjav_path=payload.get("rcjav_path"), timeout=300, ) if rc != 0: return {"ok": False, "error": (stderr or stdout or "unknown").strip()} try: result = json.loads(stdout.strip()) except json.JSONDecodeError as e: return {"ok": False, "error": f"invalid reextract JSON: {e}"} return result def handle_cache_status(payload: dict) -> dict: script = resolve_rcjav(payload.get("rcjav_path", "")) cache_path = script.parent / "cache.json" config_path = script.parent / "config.json" configured = {"default_source": [], "default_target": []} if config_path.exists(): try: cfg = json.loads(config_path.read_text(encoding="utf-8")) configured["default_source"] = cfg.get("default_source", []) or [] configured["default_target"] = cfg.get("default_target", []) or [] except Exception: pass stale_hours = _stale_hours(payload) scan_state = _read_scan_state() configured_roots = set((configured.get("default_source") or []) + (configured.get("default_target") or [])) rules_info = fetch_rules_info(payload.get("rcjav_path", "")) if not cache_path.exists(): remotes = [{ "remote": remote, "status": "never_scanned", "stale": True, "file_count": 0, "skipped_count": 0, "issues": [], } for remote in sorted(configured_roots)] _apply_scan_status(remotes, scan_state) return { "ok": True, "cache_exists": False, "cache_path": str(cache_path), "configured": configured, "stale_hours": stale_hours, "scan_state": scan_state, "remotes": remotes, **_cache_freshness_fields(None, rules_info), } try: data = json.loads(cache_path.read_text(encoding="utf-8")) except Exception as e: return {"ok": False, "error": str(e), "cache_path": str(cache_path)} remotes = [] warnings = [] remote_map = data.get("remotes", {}) or {} configured_remote_prefixes = {r.split(":", 1)[0] + ":" for r in configured_roots if isinstance(r, str) and ":" in r} cached_remote_prefixes = {r.split(":", 1)[0] + ":" for r in remote_map if isinstance(r, str) and ":" in r} for remote, entry in remote_map.items(): files = entry.get("files", []) or [] skipped = entry.get("skipped", []) or [] scanned_at = entry.get("scanned_at", "") age = _cache_age_hours(scanned_at) bare_fc2 = 0 short_number = 0 missing_jav_id = 0 for f in files: jid = str(f.get("jav_id") or "") if not jid: missing_jav_id += 1 continue base = jid.split("#", 1)[0] if re.match(r"^FC2-\d{4,}$", base, re.I): bare_fc2 += 1 if re.match(r"^[A-Z][A-Z0-9]{1,}-\d{1,2}$", base, re.I): short_number += 1 issues = [] if bare_fc2: issues.append({"code": "bare_fc2", "count": bare_fc2, "message": "bare FC2 IDs should now be FC2-PPV"}) if short_number: issues.append({"code": "short_number", "count": short_number, "message": "IDs with fewer than 3 digits suggest an old cache"}) if missing_jav_id: issues.append({"code": "missing_jav_id", "count": missing_jav_id, "message": "cache rows missing jav_id"}) if issues: warnings.append({ "remote": remote, "code": "stale_id_shape", "message": f"{remote} contains IDs from older normalization rules; rebuild cache recommended", "issues": issues, }) is_stale = bool(age is not None and age > stale_hours) remotes.append({ "remote": remote, "status": "stale" if is_stale else "fresh", "scanned_at": scanned_at, "age_hours": round(age, 3) if age is not None else None, "stale": is_stale, "file_count": len(files), "skipped_count": len(skipped), "skipped_items": [_describe_skipped_id(path, remote) for path in skipped], "skipped_samples": [_describe_skipped_id(path, remote) for path in skipped[:5]], "issues": issues, }) cached_roots = set(remote_map.keys()) for remote in sorted(configured_roots - cached_roots): remotes.append({ "remote": remote, "status": "never_scanned", "scanned_at": "", "age_hours": None, "stale": True, "file_count": 0, "skipped_count": 0, "issues": [], }) warnings.append({ "remote": remote, "code": "remote_not_scanned", "message": f"{remote} is configured but has not been scanned into cache.json", }) if configured_remote_prefixes and cached_remote_prefixes and not cached_remote_prefixes.intersection(configured_remote_prefixes): warnings.append({ "code": "remote_prefix_mismatch", "message": "cache remotes do not share a remote prefix with config.json defaults", "configured": sorted(configured_roots), "cached": sorted(remote_map.keys()), }) for remote in remote_map: if configured_roots and remote not in configured_roots: same_parent = any(remote.startswith(root.rstrip("/") + "/") or root.startswith(remote.rstrip("/") + "/") for root in configured_roots) if not same_parent and remote not in configured_roots: warnings.append({ "remote": remote, "code": "remote_not_configured", "message": f"{remote} is in cache.json but is not one of config.json's default source/target roots", }) _apply_scan_status(remotes, scan_state) remotes.sort(key=lambda r: r["remote"].lower()) return { "ok": True, "cache_exists": True, "cache_path": str(cache_path), "stale_hours": stale_hours, "configured": configured, "scan_state": scan_state, "remotes": remotes, "warnings": warnings, **_cache_freshness_fields(data, rules_info), } def _scan_worker(cmd: list[str], creationflags: int, scan_since: str = "", scan_roots: list[str] | None = None, spawn_event: "threading.Event | None" = None, spawn_result: dict | None = None) -> None: """Background thread: runs rc-jav --scan via Popen, reads stderr line-by-line to parse SCAN_START / SCAN_PROGRESS / 'Scan complete:' lines, and writes live progress to SCAN_STATE_FILE so handle_scan_progress can return it. `spawn_event` + `spawn_result` (both per-invocation, owned by handle_scan) let the caller block briefly for Popen's success/failure before returning its RPC response — closes the race where handle_scan returned ok:true while Popen later raised in the worker, surfacing the failure only via the next scan-progress poll. Worker sets spawn_result["spawn_ok"] = True immediately after Popen returns, OR False + "error" on exception, THEN sets spawn_event.""" global _scan_thread, _scan_proc, _scan_cancel_requested state: dict = { "scanning": True, "done": False, "started_at": datetime.now().isoformat(), "scan_since": scan_since, "scope": list(scan_roots or []), "remotes": [], "total_remotes": 0, "remote_jobs": [], "current_remote": None, "current_label": None, "files_this_remote": 0, "files_total": 0, "file_count": None, "elapsed_s": None, } def _write() -> None: try: tmp = SCAN_STATE_FILE.with_name(SCAN_STATE_FILE.name + ".tmp") tmp.write_text(json.dumps(state), encoding="utf-8") os.replace(tmp, SCAN_STATE_FILE) except OSError: pass def _job(remote: str, label: str = "") -> dict: for item in state["remote_jobs"]: if item.get("remote") == remote: if label: item["label"] = label return item item = { "remote": remote, "label": label, "status": "queued", "files": 0, "skipped": 0, "total": None, } state["remote_jobs"].append(item) return item _write() try: proc = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, # scan mode outputs nothing useful to stdout stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace", creationflags=creationflags, ) # Signal spawn success to handle_scan BEFORE doing any more work. # Per-invocation holder so we never mix signals across scans. if spawn_result is not None: spawn_result["spawn_ok"] = True # IMPORTANT: assign _scan_proc UNDER the lock BEFORE signaling spawn_event. # If we signaled first, handle_scan would return ok:true to the extension # while _scan_proc was still None. A cancel-scan arriving in that # window would read _scan_proc as None under _scan_lock, return # "no scan running", and never write the cancel flag — scan would # then continue uninterruptable until completion. Reordering closes # the cancel-race window introduced by the M-3 spawn-handoff fix. with _scan_lock: _scan_proc = proc if spawn_event is not None: spawn_event.set() for raw in proc.stderr: line = raw.rstrip("\n") if line.startswith("SCAN_START "): try: d = json.loads(line[11:]) state["remotes"] = d.get("remotes", []) state["total_remotes"] = d.get("total", 0) for remote in state["remotes"]: _job(remote) _write() except (json.JSONDecodeError, KeyError): pass elif line.startswith("SCAN_REMOTE_START "): # Emitted immediately before the file-count probe starts. try: d = json.loads(line[18:]) state["current_remote"] = d.get("remote") state["current_label"] = d.get("label") state["current_index"] = d.get("index", 0) state["files_this_remote"] = 0 state["files_remote_total"] = None # unknown until SCAN_REMOTE_COUNTED job = _job(d.get("remote"), d.get("label")) job.update({ "status": "running", "index": d.get("index", 0), "of": d.get("of", state.get("total_remotes", 0)), "files": 0, "skipped": 0, "total": None, "incremental": bool(d.get("incremental")), "started_at": datetime.now().isoformat(), }) _write() except (json.JSONDecodeError, KeyError): pass elif line.startswith("SCAN_REMOTE_COUNTED "): # Emitted after remote_file_count() returns — total is now known. try: d = json.loads(line[20:]) state["files_remote_total"] = d.get("total") _job(d.get("remote"))["total"] = d.get("total") _write() except (json.JSONDecodeError, KeyError): pass elif line.startswith("SCAN_FILE_PROGRESS "): # Emitted every CANCEL_CHECK_INTERVAL files from inside walk_remote. try: d = json.loads(line[19:]) state["files_this_remote"] = d.get("files", 0) if d.get("total") is not None: state["files_remote_total"] = d["total"] job = _job(d.get("remote"), d.get("label")) job["files"] = d.get("files", 0) job["skipped"] = d.get("skipped", 0) if d.get("total") is not None: job["total"] = d["total"] _write() except (json.JSONDecodeError, KeyError): pass elif line.startswith("SCAN_PROGRESS "): # Emitted after each remote completes — cumulative total. try: d = json.loads(line[14:]) state["current_remote"] = d.get("remote") state["current_label"] = d.get("label") state["files_this_remote"] = d.get("files", 0) state["files_total"] = d.get("files_total", 0) job = _job(d.get("remote"), d.get("label")) job.update({ "status": "completed", "files": d.get("files", 0), "file_count": d.get("file_count"), "incremental": bool(d.get("incremental", job.get("incremental"))), "finished_at": datetime.now().isoformat(), }) _write() except (json.JSONDecodeError, KeyError): pass elif line == "SCAN_CANCELLED": state["cancelled"] = True _write() elif line.startswith("Scan complete:"): m = re.search(r"(\d+)\s*files\s*in\s*([\d.]+)s", line) if m: state["file_count"] = int(m.group(1)) state["elapsed_s"] = float(m.group(2)) _write() proc.wait() state["done"] = True state["scanning"] = False state["rc"] = proc.returncode state["finished_at"] = datetime.now().isoformat() if state.get("cancelled") or _scan_cancel_requested: state["cancelled"] = True state["scan_ok"] = True # clean cancel — not an error else: state["scan_ok"] = proc.returncode == 0 if proc.returncode != 0: state["error"] = f"rc-jav exited {proc.returncode}" for job in state["remote_jobs"]: if job.get("status") == "running": job["status"] = "cancelled" if state.get("cancelled") else "failed" if not state.get("scan_ok") else "completed" job["finished_at"] = state["finished_at"] elif job.get("status") == "queued" and (state.get("cancelled") or not state.get("scan_ok")): job["status"] = "cancelled" if state.get("cancelled") else "failed" job["finished_at"] = state["finished_at"] _write() log_event("scan_complete", ok=state["scan_ok"], rc=proc.returncode, cancelled=state.get("cancelled", False), file_count=state.get("file_count"), elapsed_s=state.get("elapsed_s")) except Exception as e: # Signal spawn failure synchronously to handle_scan caller (so it can # return ok:false with the real error instead of misleading ok:true + # extension polling for the failure 1-2s later). if spawn_result is not None and "spawn_ok" not in spawn_result: spawn_result["spawn_ok"] = False spawn_result["error"] = f"{type(e).__name__}: {e}"[:200] if spawn_event is not None and not spawn_event.is_set(): spawn_event.set() state.update({ "done": True, "scanning": False, "scan_ok": False, "error": str(e), "finished_at": datetime.now().isoformat(), }) for job in state.get("remote_jobs", []): if job.get("status") in ("queued", "running"): job["status"] = "failed" job["finished_at"] = state["finished_at"] _write() log(f"_scan_worker error: {e}") finally: with _scan_lock: _scan_thread = None _scan_proc = None _scan_cancel_requested = False def handle_scan(payload: dict) -> dict: """Start rc-jav.py --scan in a background thread and return immediately. The caller should poll scan_progress until done=true. scan_since: optional rclone duration string (e.g. '24h').""" global _scan_thread, _scan_script_path, _scan_cancel_requested with _scan_lock: if _scan_thread is not None and _scan_thread.is_alive(): return {"ok": True, "scanning": True, "already_running": True} _scan_cancel_requested = False script = resolve_rcjav(payload.get("rcjav_path", "")) # Clear any stale cancel flag left over from a previous scan that finished # before walk_remote's periodic flag check ran (e.g. a remote with 0 files). try: (script.parent / "scan-cancel.flag").unlink(missing_ok=True) except OSError: pass scan_since = (payload.get("scan_since") or "").strip() cmd = [sys.executable, str(script), "--scan", "--basic"] cmd += part_pattern_args(payload) scan_roots = [ str(root).strip() for root in (payload.get("scan_roots") or []) if isinstance(root, str) and str(root).strip() ] for root in scan_roots: cmd += ["--target", root] if scan_since: cmd += ["--scan-since", scan_since] creationflags = 0x08000000 if os.name == "nt" else 0 _scan_script_path = script # Per-invocation handoff so handle_scan can wait briefly for Popen's # success/failure before returning its RPC response. Keeps the spawn # decision local to THIS scan call — never reads global state or shared # files for the truth (avoids cross-invocation contamination and file-I/O # races). 500 ms is the upper bound; typical Popen on Windows completes # in <50 ms, so most scans still feel instant. On timeout, return # `started: true` with `startup_pending: true` so existing UIs that # ignore the new key keep working. spawn_event = threading.Event() spawn_result: dict = {} t = threading.Thread( target=_scan_worker, args=(cmd, creationflags, scan_since, scan_roots, spawn_event, spawn_result), daemon=True, name="scan-worker", ) with _scan_lock: _scan_thread = t t.start() spawn_event.wait(timeout=0.5) if spawn_result.get("spawn_ok") is True: return {"ok": True, "scanning": True, "started": True, "scan_roots": scan_roots} if spawn_result.get("spawn_ok") is False: return {"ok": False, "scanning": False, "started": False, "error": spawn_result.get("error", "spawn failed"), "scan_roots": scan_roots} # Timeout — Popen hasn't returned either way yet. Defer to scan-progress # polling for the eventual outcome; flag the response so diagnostics can # tell this apart from a clean fast start. return {"ok": True, "scanning": True, "started": True, "startup_pending": True, "scan_roots": scan_roots} def _deferred_kill(proc: subprocess.Popen, delay_s: float = 5.0) -> None: """Wait up to delay_s for proc to exit via the cancel flag, then terminate. Runs in a daemon thread so it never blocks the message loop.""" time.sleep(delay_s) try: if proc.poll() is None: proc.terminate() except Exception: pass def handle_scan_cancel(payload: dict) -> dict: """Write a cancel flag file that rc-jav.py's walk_remote checks every CANCEL_CHECK_INTERVAL files. rc-jav exits cleanly when it sees the flag. A deferred kill fires after 5 s if rc-jav has not exited on its own — this avoids terminating it mid-write (which would corrupt cache.json or prevent SCAN_CANCELLED from reaching the scan-state parser).""" global _scan_cancel_requested with _scan_lock: running = _scan_thread is not None and _scan_thread.is_alive() proc = _scan_proc if not running: return {"ok": True, "cancelled": False, "message": "no scan running"} _scan_cancel_requested = True # Resolve the rc-jav directory so we write the flag next to rc-jav.py # (where rc-jav.py looks for it via Path(__file__).parent / "scan-cancel.flag"). script = _scan_script_path or resolve_rcjav(payload.get("rcjav_path", "")) cancel_flag = script.parent / "scan-cancel.flag" try: cancel_flag.write_text("cancel", encoding="utf-8") if proc is not None and proc.poll() is None: t = threading.Thread( target=_deferred_kill, args=(proc, 5.0), daemon=True, name="cancel-kill", ) t.start() return {"ok": True, "cancelled": True} except Exception as e: return {"ok": False, "error": str(e)} def _add_scan_estimates(state: dict) -> None: """Add conservative percent/ETA fields based on currently-known totals.""" jobs = state.get("remote_jobs") if isinstance(state.get("remote_jobs"), list) else [] known_total = 0.0 processed = 0.0 known_count = 0 for job in jobs: if not isinstance(job, dict): continue total = job.get("total") total_num = float(total) if isinstance(total, (int, float)) and total >= 0 else None files = job.get("files") skipped = job.get("skipped") files_num = int(files) if isinstance(files, (int, float)) and files >= 0 else 0 skipped_num = int(skipped) if isinstance(skipped, (int, float)) and skipped >= 0 else 0 processed_num = files_num + skipped_num if total_num is not None: known_total += total_num known_count += 1 processed += min(processed_num, total_num) else: processed += processed_num if known_total <= 0: return percent = min(100.0, max(0.0, (processed / known_total) * 100.0)) state["scan_files_done"] = int(processed) state["scan_files_total_known"] = int(known_total) state["scan_total_known_complete"] = bool(jobs) and known_count == len(jobs) state["scan_percent"] = round(percent, 1) if not state.get("scanning"): return try: started = datetime.fromisoformat(state.get("started_at") or "") elapsed_s = max(0.0, (datetime.now() - started).total_seconds()) except (TypeError, ValueError): elapsed_s = 0.0 if processed > 0 and elapsed_s > 0: rate = processed / elapsed_s remaining = max(0.0, known_total - processed) state["scan_rate_files_per_s"] = round(rate, 2) state["scan_eta_s"] = round(remaining / rate, 1) if rate > 0 else None def handle_scan_progress(payload: dict) -> dict: """Return current scan progress from SCAN_STATE_FILE.""" with _scan_lock: running = _scan_thread is not None and _scan_thread.is_alive() if not SCAN_STATE_FILE.exists(): return {"ok": True, "scanning": running, "done": not running, "no_state": True} try: state = json.loads(SCAN_STATE_FILE.read_text(encoding="utf-8")) # Thread may have just finished — ensure state is consistent. if not running and state.get("scanning"): state["scanning"] = False state["done"] = True _add_scan_estimates(state) state.pop("ok", None) return {"ok": True, **state} except json.JSONDecodeError: # scan-state.json is a live polling file. If a reader catches it during # a replace edge or external scanner hiccup, treat it as transient # state-pending instead of surfacing a host_error alert. return {"ok": True, "scanning": running, "done": not running, "state_pending": True} except Exception as e: return {"ok": False, "error": str(e)} def handle_scan_clear(payload: dict) -> dict: """Clear persisted scan job UI history without touching cache.json.""" with _scan_lock: running = _scan_thread is not None and _scan_thread.is_alive() if running: return {"ok": False, "error": "scan is still running; cancel it before clearing job history"} try: if SCAN_STATE_FILE.exists(): SCAN_STATE_FILE.unlink() return {"ok": True, "cleared": True} except Exception as e: return {"ok": False, "error": str(e)} def handle_delete_skipped(payload: dict) -> dict: """Delete one or more skipped (non-JAV) files by full_path and patch cache.""" paths = payload.get("paths") or [] if not isinstance(paths, list) or not paths: return {"ok": False, "error": "paths must be a non-empty array"} base = { "mode": payload.get("mode", "trash"), "trash_dir": payload.get("trash_dir", ""), "allowed_remotes": payload.get("allowed_remotes", []), "rcjav_path": payload.get("rcjav_path", ""), } results = [] deleted_paths = [] for path in paths: r = handle_delete({**base, "path": path}) results.append({"path": path, **r}) if r.get("ok"): deleted_paths.append(path) # Patch skipped list in cache.json too if deleted_paths: try: script = resolve_rcjav(payload.get("rcjav_path")) cache_path = script.parent / "cache.json" if cache_path.exists(): cache = json.loads(cache_path.read_text(encoding="utf-8")) modified = False for remote_key, remote_data in cache.get("remotes", {}).items(): sep = "" if remote_key.endswith("/") else "/" prefix = remote_key + sep before = remote_data.get("skipped", []) or [] after = [ p for p in before if f"{prefix}{p}" not in deleted_paths ] if len(after) != len(before): remote_data["skipped"] = after modified = True if modified: tmp = cache_path.with_suffix(cache_path.suffix + ".tmp") tmp.write_text(json.dumps(cache, indent=2), encoding="utf-8") os.replace(tmp, cache_path) except Exception as e: log(f"handle_delete_skipped cache patch error: {e}") _patch_cache_remove_paths(payload.get("rcjav_path"), deleted_paths) ok_count = sum(1 for r in results if r.get("ok")) return { "ok": ok_count > 0, "results": results, "deleted_count": ok_count, "failed_count": len(results) - ok_count, } def handle_get_keep_ranking(payload: dict) -> dict: """Return the current keep_ranking config from rc-jav's config.json.""" script = resolve_rcjav(payload.get("rcjav_path", "")) cfg_path = script.parent / "config.json" cfg = {} if cfg_path.exists(): try: cfg = json.loads(cfg_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): pass ranking = cfg.get("keep_ranking") or {} defaults = { "priority_folders": ["ClearJAV"], "size_tolerance_mib": 0, "format_preference": ["mkv", "mp4", "wmv", "avi"], "tiebreak_res_tag": True, "tiebreak_longer_name": True, } merged = {**defaults, **ranking} return {"ok": True, "keep_ranking": merged, "is_default": not bool(cfg.get("keep_ranking"))} def handle_save_keep_ranking(payload: dict) -> dict: """Write keep_ranking config to rc-jav's config.json.""" script = resolve_rcjav(payload.get("rcjav_path", "")) cfg_path = script.parent / "config.json" ranking = payload.get("keep_ranking") if not isinstance(ranking, dict): return {"ok": False, "error": "keep_ranking must be an object"} # Validate fields tolerance = ranking.get("size_tolerance_mib") if tolerance is not None: try: tolerance = float(tolerance) if tolerance < 0: return {"ok": False, "error": "size_tolerance_mib must be >= 0"} except (TypeError, ValueError): return {"ok": False, "error": "size_tolerance_mib must be a number"} fmt = ranking.get("format_preference") if fmt is not None and not isinstance(fmt, list): return {"ok": False, "error": "format_preference must be an array"} priority_folders = ranking.get("priority_folders") if priority_folders is not None and not isinstance(priority_folders, list): return {"ok": False, "error": "priority_folders must be an array"} cfg = {} if cfg_path.exists(): try: cfg = json.loads(cfg_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): pass cfg["keep_ranking"] = ranking try: tmp = cfg_path.with_suffix(cfg_path.suffix + ".tmp") tmp.write_text(json.dumps(cfg, indent=2), encoding="utf-8") os.replace(tmp, cfg_path) except OSError as e: return {"ok": False, "error": f"failed to write config: {e}"} return {"ok": True, "keep_ranking": ranking} def handle_list_remotes(payload: dict) -> dict: """Return all rclone remotes (from `rclone listremotes`) plus the default_source / default_target paths from rc-jav's config.json so the options page can offer autocomplete when building profiles.""" creationflags = 0x08000000 if os.name == "nt" else 0 try: proc = subprocess.run( [RCLONE_BIN, "listremotes"], capture_output=True, text=True, timeout=15, encoding="utf-8", errors="replace", creationflags=creationflags, ) remotes = [r.strip() for r in (proc.stdout or "").splitlines() if r.strip()] except Exception as e: remotes = [] log(f"list_remotes: rclone listremotes failed: {e}") # Also read config.json so the user sees their configured defaults. script = resolve_rcjav(payload.get("rcjav_path", "")) cfg_path = script.parent / "config.json" default_source: list[str] = [] default_target: list[str] = [] if cfg_path.exists(): try: cfg = json.loads(cfg_path.read_text(encoding="utf-8")) default_source = cfg.get("default_source") or [] default_target = cfg.get("default_target") or [] except Exception: pass return { "ok": True, "remotes": remotes, "default_source": default_source, "default_target": default_target, } DISPATCH = { "search": handle_search, "bulk_search": handle_bulk_search, "check_tabs": handle_check_tabs, "dupe_review": handle_dupe_review, "delete_batch": handle_delete_batch, "library_issues": handle_library_issues, "rename_file": handle_rename_file, "rename_files_batch": handle_rename_files_batch, "ping": handle_ping, "delete": handle_delete, "diagnostics": handle_diagnostics, "host_status": handle_host_status, "host_repair": handle_host_repair, "cache_status": handle_cache_status, "reextract_ids": handle_reextract_ids, "recent_deletes": handle_recent_deletes, "undo_delete": handle_undo_delete, "scan": handle_scan, "scan_progress": handle_scan_progress, "scan_cancel": handle_scan_cancel, "scan_clear": handle_scan_clear, "list_remotes": handle_list_remotes, "delete_skipped": handle_delete_skipped, "get_keep_ranking": handle_get_keep_ranking, "save_keep_ranking": handle_save_keep_ranking, "clear_events_log": lambda payload: _handle_clear_events_log(), "save_alerts_config": handle_save_alerts_config, "test_alerts_config": handle_test_alerts_config, "get_alerts_config": handle_get_alerts_config, } def _handle_clear_events_log() -> dict: """Truncate the structured events log. Called from the extension's Diagnostics pane to reset connection lifecycle and per-RPC trace data without restarting the host process.""" try: EVENTS_LOG.write_text("", encoding="utf-8") except OSError as e: return {"ok": False, "error": str(e)} return {"ok": True, "path": str(EVENTS_LOG)} def main(): pid = os.getpid() started_at = time.monotonic() msg_count = 0 first_action: str | None = None actions_seen: dict[str, int] = {} log(f"--- host start pid={pid} ---") log_event("conn_open", pid=pid) while True: try: msg = read_message() except StdinClosed as sc: uptime_s = round(time.monotonic() - started_at, 2) top = sorted(actions_seen.items(), key=lambda kv: -kv[1])[:5] top_str = ", ".join(f"{k}={v}" for k, v in top) or "none" log(f"stdin closed kind={sc.kind} pid={pid} uptime={uptime_s}s msgs={msg_count} first={first_action!r} top={top_str}") log_event("conn_close", pid=pid, kind=sc.kind, uptime_s=uptime_s, msg_count=msg_count, first_action=first_action, top_actions=dict(top), partial_got=sc.got if sc.kind != "clean_eof" else None, partial_expected=sc.expected if sc.kind != "clean_eof" else None) # Alert only on ABNORMAL closes. clean_eof = browser intentionally # disconnected (SW recycle, extension unloaded) — not interesting. if sc.kind != "clean_eof": post_discord_alert( kind=sc.kind, alert_source=f"conn_close:{sc.kind}", summary=f"stdin closed mid-message (got {sc.got}/{sc.expected} bytes)", detail=( f"pid={pid}\nuptime={uptime_s}s\nmsgs={msg_count}\n" f"first_action={first_action}\ntop_actions={top_str}" ), fields=[ {"name": "PID", "value": str(pid), "inline": True}, {"name": "Uptime", "value": f"{uptime_s}s", "inline": True}, {"name": "Msgs", "value": str(msg_count), "inline": True}, ], ) break except Exception as e: log(f"read error pid={pid} msgs={msg_count}: {e}\n{traceback.format_exc()}") log_event("conn_close", pid=pid, kind="read_error", error=str(e), msg_count=msg_count, first_action=first_action) post_discord_alert( kind="read_error", alert_source="read_message_exception", summary="Host stdin read raised an exception", detail=f"{type(e).__name__}: {e}\nfirst_action={first_action}\nmsgs={msg_count}", fields=[{"name": "PID", "value": str(pid), "inline": True}], ) break if msg is None: # Belt-and-suspenders — old code path. read_message now raises instead. log("stdin closed, exiting") break msg_count += 1 msg_action = msg.get("action", "?") # Caller-supplied correlation key (independent of native req_id; survives # process spawn since sendNativeMessage opens a fresh host each call). client_req_id = msg.get("client_req_id") or None if first_action is None: first_action = msg_action log_event("conn_first_msg", pid=pid, first_action=first_action, extension_id=msg.get("extension_id"), client_req_id=client_req_id) actions_seen[msg_action] = actions_seen.get(msg_action, 0) + 1 req_id = msg.get("req_id") action = msg.get("action", "search") handler = DISPATCH.get(action) if not handler: write_message({"req_id": req_id, "ok": False, "error": f"unknown action {action}"}) continue t0 = time.monotonic() exc_kind: str | None = None try: resp = handler(msg) except Exception as e: log(f"handler error: action={action} req_id={req_id}: {e}\n{traceback.format_exc()}") exc_kind = type(e).__name__ resp = {"ok": False, "error": str(e), "error_kind": "exception", "exception_type": exc_kind} post_discord_alert( kind="exception", alert_source=f"handler_exception:{action}", summary=f"Handler raised {exc_kind}", detail=f"action={action}\nerror={e}\n\n{traceback.format_exc()[-1500:]}", fields=[ {"name": "Action", "value": _md_code_inline(action), "inline": True}, {"name": "Exception", "value": _md_code_inline(exc_kind), "inline": True}, ], ) elapsed_ms = round((time.monotonic() - t0) * 1000) # Stamp req_id + client_req_id BEFORE sizing so resp_bytes reflects the # actual wire size, including correlation fields. Without this, a # response just under the 1 MiB cap could pass the size check here but # exceed it after stamping — and the log would mislead. resp["req_id"] = req_id if client_req_id: resp["client_req_id"] = client_req_id try: raw_size = len(json.dumps(resp).encode("utf-8")) except Exception: raw_size = -1 shrunk_resp = _shrink_response(resp) was_truncated = bool(shrunk_resp.get("truncated") and not resp.get("truncated")) # Structured event log — include search-specific fields for analytics. event_extra: dict = {"ok": resp.get("ok"), "elapsed_ms": elapsed_ms, "resp_bytes": raw_size} if was_truncated: event_extra["truncated"] = True event_extra["truncated_reason"] = shrunk_resp.get("truncated_reason", "") if exc_kind: event_extra["exception"] = exc_kind if client_req_id: event_extra["client_req_id"] = client_req_id if action == "search": event_extra["id"] = msg.get("id") event_extra["hits"] = resp.get("hits") event_extra["search_mode"] = resp.get("search_mode") elif action in ("delete", "undo_delete", "scan"): if not resp.get("ok"): event_extra["error"] = (resp.get("error") or "")[:200] elif action == "check_tabs": event_extra["items"] = len(msg.get("items") or []) if not resp.get("ok"): event_extra["error"] = (resp.get("error") or "")[:200] # Don't log every scan_progress poll — it'd flood the events log. if action not in ("scan_progress",): log_event(action, **event_extra) # Bracket the write so we can prove whether the response actually left # the host. write_start = about to flush; write_ok = flush returned. # If a sample shows write_start but no write_ok → write itself failed. # If write_ok exists but caller never got it → Chromium NM plumbing lost it. log_event("write_start", rpc_action=action, req_id=req_id, client_req_id=client_req_id, size=raw_size) try: write_message(shrunk_resp) # Wire size = what actually went out (post-shrink, post-stamp). If # shrink fired, this differs from raw_size; both useful for diagnosing # cap-related disconnects. try: wire_size = len(json.dumps(shrunk_resp).encode("utf-8")) except Exception: wire_size = -1 log_event("write_ok", rpc_action=action, req_id=req_id, client_req_id=client_req_id, raw_size=raw_size, wire_size=wire_size, shrunk=(wire_size != raw_size)) except Exception as e: log(f"write error: action={action} req_id={req_id} size={raw_size}: {e}\n{traceback.format_exc()}") log_event("write_error", rpc_action=action, req_id=req_id, client_req_id=client_req_id, size=raw_size, error=str(e)) post_discord_alert( kind="write_error", alert_source=f"write_message_exception:{action}", summary="Failed to write response to extension", detail=f"action={action}\nsize={raw_size}\nerror={e}", fields=[ {"name": "Action", "value": _md_code_inline(action), "inline": True}, {"name": "Bytes", "value": str(raw_size), "inline": True}, ], ) break if __name__ == "__main__": main()