"""Library-issue detection (non-canonical filenames) + safe renaming. Scans the cache (not the live remote) for files whose names violate the canonical `{ID}[ - actress][ [resolution]].ext` shape: - Bracket-wrapped IDs: `[REAL-779].mp4` -> `REAL-779.mp4` - No-hyphen IDs: `MVSD312 [576p].avi` -> `MVSD-312 [576p].avi` `rename_file_in_remote` performs the rclone moveto and patches the cache in place. `rename_files_batch` writes the cache once after a batch of renames. """ from __future__ import annotations import subprocess from pathlib import Path from rcjav.cache import save_cache from rcjav.ids import ( _BRACKET_ID_RE, _NOHYPHEN_ID_RE, COMPOUND_ID_RE, FALLBACK_ID_RE, PRIMARY_ID_RE, extract_id, ) def _human_size(n: int) -> str: nf = float(max(0, n)) for unit in ("B", "KiB", "MiB", "GiB", "TiB"): if nf < 1024: return f"{int(nf)} B" if unit == "B" else f"{nf:.2f} {unit}" nf /= 1024 return f"{nf:.2f} PiB" def _bracket_to_canonical(filename: str) -> str: """[REAL-779].mp4 -> REAL-779.mp4 | [HODV-21076] Saki [1080p].mkv -> HODV-21076 Saki [1080p].mkv""" stem = Path(filename).stem suffix = Path(filename).suffix bm = _BRACKET_ID_RE.match(stem) if not bm: return filename inner = bm.group(1).strip() rest = stem[bm.end():].strip() new_stem = f"{inner} {rest}".strip() if rest else inner return f"{new_stem}{suffix}" def _nohyphen_to_canonical(filename: str) -> str: """MVSD312 [576p].avi -> MVSD-312 [576p].avi""" stem = Path(filename).stem suffix = Path(filename).suffix m = _NOHYPHEN_ID_RE.match(stem) if not m: return filename prefix = m.group(1).upper() num_str = m.group(2) rest = stem[m.end():] return f"{prefix}-{num_str}{rest}{suffix}" def find_library_issues(cache: dict) -> dict: """Scan cache for files with non-canonical names. Returns: {"bracket_names": [...], "nohyphen_names": [...]} Each entry: {remote, path, size, mod_time, jav_id, canonical_name, issue} """ bracket: list[dict] = [] nohyphen: list[dict] = [] for remote, remote_data in cache.get("remotes", {}).items(): for f in remote_data.get("files", []): fname = Path(f["path"]).name stem = Path(fname).stem if stem.startswith("[") and _BRACKET_ID_RE.match(stem): bracket.append({ "remote": remote, "path": f["path"], "size": f.get("size", 0), "size_human": _human_size(f.get("size", 0)), "mod_time": f.get("mod_time", ""), "jav_id": f.get("jav_id", ""), "canonical_name": _bracket_to_canonical(fname), "issue": "bracket_id", }) elif (not PRIMARY_ID_RE.match(stem) and not COMPOUND_ID_RE.match(stem) and not FALLBACK_ID_RE.match(stem) and _NOHYPHEN_ID_RE.match(stem)): nohyphen.append({ "remote": remote, "path": f["path"], "size": f.get("size", 0), "size_human": _human_size(f.get("size", 0)), "mod_time": f.get("mod_time", ""), "jav_id": f.get("jav_id", ""), "canonical_name": _nohyphen_to_canonical(fname), "issue": "nohyphen_id", }) return {"bracket_names": bracket, "nohyphen_names": nohyphen} def rename_file_in_remote( remote: str, old_rel_path: str, new_rel_path: str, cache: dict, rclone_bin: str = "rclone", save: bool = True, ) -> dict: """Rename one file via rclone moveto and patch cache.json. Returns {"ok": True, "old_path": ..., "new_path": ...} or {"ok": False, "error": ..., "conflict": bool} Pass save=False when batching — caller is responsible for calling save_cache() once. """ sep = "" if remote.endswith("/") else "/" old_full = f"{remote}{sep}{old_rel_path}" new_full = f"{remote}{sep}{new_rel_path}" check = subprocess.run( [rclone_bin, "lsf", new_full], capture_output=True, text=True, ) if check.returncode == 0 and check.stdout.strip(): return {"ok": False, "error": f"Target already exists: {new_full}", "conflict": True} result = subprocess.run( [rclone_bin, "moveto", old_full, new_full], capture_output=True, text=True, ) if result.returncode != 0: return {"ok": False, "error": (result.stderr or result.stdout).strip(), "conflict": False} remote_data = cache.get("remotes", {}).get(remote) if remote_data: for f in remote_data.get("files", []): if f["path"] == old_rel_path: f["path"] = new_rel_path f["jav_id"] = extract_id(Path(new_rel_path).name) or f["jav_id"] break remote_data["skipped"] = [s for s in remote_data.get("skipped", []) if s != old_rel_path] if save: save_cache(cache) return {"ok": True, "old_path": old_full, "new_path": new_full} def rename_files_batch( renames: list[dict], cache: dict, rclone_bin: str = "rclone", ) -> list[dict]: """Rename multiple files, writing cache once at the end. Each item in renames: {remote, old_path, new_path} Returns list of per-file results with old_path/new_path echoed back. """ results = [] cache_dirty = False for r in renames: res = rename_file_in_remote( r["remote"], r["old_path"], r["new_path"], cache, rclone_bin=rclone_bin, save=False, ) res["old_path"] = r["old_path"] res["new_path"] = r["new_path"] results.append(res) if res["ok"]: cache_dirty = True if cache_dirty: save_cache(cache) return results