"""Library-issue detection (non-canonical filenames) + safe renaming. Scans the cache (not the live remote) for files whose names violate the canonical `{ID}[ - actress][ [resolution]].ext` shape: - Bracket-wrapped IDs: `[REAL-779].mp4` -> `REAL-779.mp4` - No-hyphen IDs: `MVSD312 [576p].avi` -> `MVSD-312 [576p].avi` `rename_file_in_remote` performs the rclone moveto and patches the cache in place. `rename_files_batch` writes the cache once after a batch of renames. """ from __future__ import annotations import re import subprocess from collections import Counter from pathlib import Path from rcjav.cache import save_cache from rcjav.ids import ( _BRACKET_ID_RE, _NOHYPHEN_ID_RE, COMPOUND_ID_RE, FALLBACK_ID_RE, PRIMARY_ID_RE, extract_id, ) from rcjav.output import human_size as _human_size VIDEO_EXTS = {".avi", ".m4v", ".mkv", ".mov", ".mp4", ".mpeg", ".mpg", ".ts", ".webm", ".wmv"} CANONICAL_RESOLUTION_RE = re.compile(r"\[(?P\d{3,4}[pi]|4k|8k)\]$", re.IGNORECASE) RESOLUTION_COPY_SUFFIX_RE = re.compile(r"\[(?P\d{3,4}[pi]|4k|8k)\]\s*\((?P\d+)\)$", re.IGNORECASE) RESOLUTION_PART_SUFFIX_RE = re.compile( r"\[(?P\d{3,4}[pi]|4k|8k)\][._ -]*(?P\d+of\d+|part\d+|pt\d+)[.\s]*$", re.IGNORECASE, ) BARE_RESOLUTION_SUFFIX_RE = re.compile(r"(?:^|[._ -])(?P\d{3,4}[pi]|4k|8k)$", re.IGNORECASE) EMPTY_BRACKETS_RE = re.compile(r"\[\s*\]$") BRACKET_TOKEN_SUFFIX_RE = re.compile(r"\[(?P[^\]]+)\]$") HD_QUALITY_SUFFIX_RE = re.compile(r"(?:^|[._ -])(?Phd|fhd|uhd|sd|fullhd)$", re.IGNORECASE) MULTIPART_SUFFIX_RE = re.compile(r"(?:[._ -])(?P\d+of\d+|part\d+|pt\d+|cd\d+|disc\d+|[ab])$", re.IGNORECASE) def _issue(kind: str, *, source: str = "builtin", severity: str = "info", **extra) -> dict: return {"kind": kind, "source": source, "severity": severity, **extra} def _compile_custom_filename_rules(config: dict | None) -> list[dict]: rules = ((config or {}).get("filename_hygiene") or {}).get("custom_rules") or [] compiled = [] for i, rule in enumerate(rules): if not isinstance(rule, dict) or rule.get("enabled", True) is False: continue pattern = rule.get("pattern") or rule.get("match") kind = rule.get("kind") or rule.get("name") or f"custom_rule_{i + 1}" if not pattern: continue try: compiled.append({ "name": rule.get("name") or kind, "kind": kind, "severity": rule.get("severity") or "info", "target": rule.get("target") or "filename", "regex": re.compile(pattern, re.IGNORECASE if rule.get("ignore_case", True) else 0), }) except re.error: continue return compiled def classify_filename_hygiene(filename: str, config: dict | None = None) -> dict: """Classify filename hygiene without proposing destructive changes.""" stem = Path(filename).stem issues: list[dict] = [] has_resolution = False resolution_style = "missing" if m := CANONICAL_RESOLUTION_RE.search(stem): has_resolution = True resolution_style = "canonical" issues.append(_issue("resolution_canonical", resolution=m.group("resolution").lower())) elif m := RESOLUTION_COPY_SUFFIX_RE.search(stem): has_resolution = True resolution_style = "noncanonical" issues.append(_issue( "resolution_copy_suffix", severity="cleanup", resolution=m.group("resolution").lower(), copy=m.group("copy"), )) elif m := RESOLUTION_PART_SUFFIX_RE.search(stem): has_resolution = True resolution_style = "noncanonical" issues.append(_issue( "resolution_part_suffix", severity="cleanup", resolution=m.group("resolution").lower(), part=m.group("part"), )) elif m := BARE_RESOLUTION_SUFFIX_RE.search(stem): has_resolution = True resolution_style = "noncanonical" issues.append(_issue( "resolution_bare_suffix", severity="cleanup", resolution=m.group("resolution").lower(), )) if not has_resolution: issues.append(_issue("missing_resolution", severity="needs_probe")) if EMPTY_BRACKETS_RE.search(stem): issues.append(_issue("resolution_placeholder_empty", severity="needs_probe", token="[]")) elif m := HD_QUALITY_SUFFIX_RE.search(stem): issues.append(_issue("quality_marker_not_resolution", severity="needs_probe", token=m.group("quality"))) elif m := BRACKET_TOKEN_SUFFIX_RE.search(stem): issues.append(_issue("suspicious_bracket_token", severity="needs_probe", token=m.group("token"))) if m := MULTIPART_SUFFIX_RE.search(stem): issues.append(_issue("multipart_without_resolution", severity="needs_probe", part=m.group("part"))) for rule in _compile_custom_filename_rules(config): target = rule["target"] value = stem if target == "stem" else filename if target == "path": value = filename match = rule["regex"].search(value) if match: issues.append(_issue( rule["kind"], source="custom", severity=rule["severity"], name=rule["name"], matched=match.group(0), )) return { "has_resolution": has_resolution, "resolution_style": resolution_style, "issues": issues, } def _bracket_to_canonical(filename: str) -> str: """[REAL-779].mp4 -> REAL-779.mp4 | [HODV-21076] Saki [1080p].mkv -> HODV-21076 Saki [1080p].mkv""" stem = Path(filename).stem suffix = Path(filename).suffix bm = _BRACKET_ID_RE.match(stem) if not bm: return filename inner = bm.group(1).strip() rest = stem[bm.end():].strip() new_stem = f"{inner} {rest}".strip() if rest else inner return f"{new_stem}{suffix}" def _nohyphen_to_canonical(filename: str) -> str: """MVSD312 [576p].avi -> MVSD-312 [576p].avi""" stem = Path(filename).stem suffix = Path(filename).suffix m = _NOHYPHEN_ID_RE.match(stem) if not m: return filename prefix = m.group(1).upper() num_str = m.group(2) rest = stem[m.end():] return f"{prefix}-{num_str}{rest}{suffix}" def _cache_entry(remote: str, f: dict, issue: str, **extra) -> dict: path = f.get("path", "") filename = Path(path).name ext = Path(filename).suffix.lower() sep = "" if remote.endswith("/") or not path else "/" return { "remote": remote, "path": path, "full_path": f"{remote}{sep}{path}", "filename": filename, "extension": ext, "size": f.get("size", 0), "size_human": _human_size(f.get("size", 0)), "mod_time": f.get("mod_time", ""), "jav_id": f.get("jav_id", ""), "issue": issue, **extra, } def find_missing_resolution(cache: dict, config: dict | None = None) -> dict: """Return cached video files missing a final bracketed [resolution] tag.""" items: list[dict] = [] by_extension: Counter[str] = Counter() by_remote: Counter[str] = Counter() for remote, remote_data in cache.get("remotes", {}).items(): for f in remote_data.get("files", []): fname = Path(f.get("path", "")).name ext = Path(fname).suffix.lower() if ext not in VIDEO_EXTS: continue classification = classify_filename_hygiene(fname, config) if classification["has_resolution"]: continue entry = _cache_entry(remote, f, "missing_resolution", **classification) items.append(entry) by_extension[ext] += 1 by_remote[remote] += 1 return { "issue": "missing_resolution", "source": "cache", "count": len(items), "by_extension": dict(sorted(by_extension.items())), "by_remote": dict(sorted(by_remote.items())), "items": items, } def find_resolution_noncanonical(cache: dict, config: dict | None = None) -> dict: """Return cached video files with resolution present but not in final [resolution] form.""" items: list[dict] = [] by_kind: Counter[str] = Counter() by_extension: Counter[str] = Counter() for remote, remote_data in cache.get("remotes", {}).items(): for f in remote_data.get("files", []): fname = Path(f.get("path", "")).name ext = Path(fname).suffix.lower() if ext not in VIDEO_EXTS: continue classification = classify_filename_hygiene(fname, config) if classification["resolution_style"] != "noncanonical": continue entry = _cache_entry(remote, f, "resolution_noncanonical", **classification) items.append(entry) by_extension[ext] += 1 for issue in classification["issues"]: by_kind[issue["kind"]] += 1 return { "issue": "resolution_noncanonical", "source": "cache", "count": len(items), "by_kind": dict(sorted(by_kind.items())), "by_extension": dict(sorted(by_extension.items())), "items": items, } def find_library_issues(cache: dict, config: dict | None = None) -> dict: """Scan cache for files with non-canonical names. Returns: {"bracket_names": [...], "nohyphen_names": [...]} Each entry: {remote, path, size, mod_time, jav_id, canonical_name, issue} """ bracket: list[dict] = [] nohyphen: list[dict] = [] for remote, remote_data in cache.get("remotes", {}).items(): for f in remote_data.get("files", []): fname = Path(f["path"]).name stem = Path(fname).stem if stem.startswith("[") and _BRACKET_ID_RE.match(stem): bracket.append(_cache_entry( remote, f, "bracket_id", canonical_name=_bracket_to_canonical(fname), )) elif (not PRIMARY_ID_RE.match(stem) and not COMPOUND_ID_RE.match(stem) and not FALLBACK_ID_RE.match(stem) and _NOHYPHEN_ID_RE.match(stem)): nohyphen.append(_cache_entry( remote, f, "nohyphen_id", canonical_name=_nohyphen_to_canonical(fname), )) missing_resolution = find_missing_resolution(cache, config) resolution_noncanonical = find_resolution_noncanonical(cache, config) return { "bracket_names": bracket, "nohyphen_names": nohyphen, "missing_resolution": missing_resolution["items"], "missing_resolution_summary": { "count": missing_resolution["count"], "by_extension": missing_resolution["by_extension"], "by_remote": missing_resolution["by_remote"], }, "resolution_noncanonical": resolution_noncanonical["items"], "resolution_noncanonical_summary": { "count": resolution_noncanonical["count"], "by_kind": resolution_noncanonical["by_kind"], "by_extension": resolution_noncanonical["by_extension"], }, } def rename_file_in_remote( remote: str, old_rel_path: str, new_rel_path: str, cache: dict, rclone_bin: str = "rclone", save: bool = True, ) -> dict: """Rename one file via rclone moveto and patch cache.json. Returns {"ok": True, "old_path": ..., "new_path": ...} or {"ok": False, "error": ..., "conflict": bool} Pass save=False when batching — caller is responsible for calling save_cache() once. """ sep = "" if remote.endswith("/") else "/" old_full = f"{remote}{sep}{old_rel_path}" new_full = f"{remote}{sep}{new_rel_path}" check = subprocess.run( [rclone_bin, "lsf", new_full], capture_output=True, text=True, ) if check.returncode == 0 and check.stdout.strip(): return {"ok": False, "error": f"Target already exists: {new_full}", "conflict": True} result = subprocess.run( [rclone_bin, "moveto", old_full, new_full], capture_output=True, text=True, ) if result.returncode != 0: return {"ok": False, "error": (result.stderr or result.stdout).strip(), "conflict": False} remote_data = cache.get("remotes", {}).get(remote) if remote_data: for f in remote_data.get("files", []): if f["path"] == old_rel_path: f["path"] = new_rel_path f["jav_id"] = extract_id(Path(new_rel_path).name) or f["jav_id"] break remote_data["skipped"] = [s for s in remote_data.get("skipped", []) if s != old_rel_path] if save: save_cache(cache) return {"ok": True, "old_path": old_full, "new_path": new_full} def rename_files_batch( renames: list[dict], cache: dict, rclone_bin: str = "rclone", ) -> list[dict]: """Rename multiple files, writing cache once at the end. Each item in renames: {remote, old_path, new_path} Returns list of per-file results with old_path/new_path echoed back. """ results = [] cache_dirty = False for r in renames: res = rename_file_in_remote( r["remote"], r["old_path"], r["new_path"], cache, rclone_bin=rclone_bin, save=False, ) res["old_path"] = r["old_path"] res["new_path"] = r["new_path"] results.append(res) if res["ok"]: cache_dirty = True if cache_dirty: save_cache(cache) return results