"""Duplicate detection, keep-ranking, and variant-alert logic. Operates on FileEntry lists already extracted from rclone listings or catalogs (see rcjav.rclone_io and rcjav.catalog). The "rules" that influence which file gets KEPT live here; the "rules" that influence how an entry's jav_id is derived live in rcjav.ids. `_KEEP_RANKING` is a module-level dict mutated by `set_keep_ranking()` at startup from config.json. Read via the local `_KEEP_RANKING` binding inside `decide_keep_with_reason`. Callers outside this module should go through `set_keep_ranking()` rather than touching the global. """ from __future__ import annotations import re from pathlib import Path from typing import Iterable from rcjav.ids import ( PRIMARY_ID_RE, FALLBACK_ID_RE, COMPOUND_ID_RE, _RES_LABEL_RE, _VARIANT_SUFFIX_RE, _RESOLUTION_TAG_RE, _VIDEO_EXTS, _LOWEST_KEEP_PRIORITY_EXTS, extract_id, ) from rcjav.model import FileEntry DEFAULT_KEEP_RANKING: dict = { "priority_folders": ["ClearJAV"], "size_tolerance_mib": 0, "format_preference": ["mkv", "mp4", "wmv", "avi"], "tiebreak_res_tag": True, "tiebreak_longer_name": True, } # Module-level ranking config; set from config.json via set_keep_ranking() so # all call sites pick it up. _KEEP_RANKING: dict = {} def set_keep_ranking(ranking: dict | None) -> None: """Replace the module's effective keep-ranking config in place.""" global _KEEP_RANKING _KEEP_RANKING = ranking or {} def get_keep_ranking() -> dict: """Return the module's current effective ranking (read-only snapshot).""" return dict(_KEEP_RANKING) def decide_keep_with_reason(entries: list[FileEntry]) -> tuple[FileEntry, dict[str, str]]: """Pick KEEP candidate and explain the first ranking rule that settled it. Catalog entries are excluded — they are offline/informational. Ranking (descending priority, configurable via keep_ranking in config.json): 1. Video files in ordered priority folders outrank other rclone entries. 2. Source entries outrank Target entries when no priority-folder video exists. 3. Non-.ts files outrank .ts files when a duplicate group has both. 4. Largest file size. If sizes are within size_tolerance_mib, treated as equal and format preference is consulted instead. 5. Format preference: ordered list of extensions (e.g. mkv > mp4 > wmv > avi). 6. Tie-break: has resolution tag in filename ([1080p], [2160p], [720p], [480p]). 7. Tie-break: longer filename (more metadata = more descriptive). """ ranking = _KEEP_RANKING or {} tolerance_bytes = int(float(ranking.get("size_tolerance_mib") or 0) * 1024 * 1024) priority_folders: list[str] = [ str(folder).strip() for folder in (ranking.get("priority_folders") or DEFAULT_KEEP_RANKING["priority_folders"]) if str(folder).strip() ] fmt_order: list[str] = list( ranking.get("format_preference") or DEFAULT_KEEP_RANKING["format_preference"] ) use_res_tag: bool = ranking.get("tiebreak_res_tag", True) use_longer_name: bool = ranking.get("tiebreak_longer_name", True) rclone = [e for e in entries if e.source != "Catalog"] def _priority_folder_rank(e: FileEntry) -> int | None: if Path(e.path).suffix.lower() not in _VIDEO_EXTS: return None # A root can be cq:JAV while the favored folder is a child path, or the # supplied root can itself end in that folder. Match across full_path. full_path = e.full_path.replace("\\", "/").strip("/").lower() segments = [segment for segment in full_path.split("/") if segment] for index, raw_folder in enumerate(priority_folders): folder = raw_folder.replace("\\", "/").strip("/").lower() if not folder: continue if "/" in folder or ":" in folder: framed = f"/{full_path}/" if full_path == folder or full_path.startswith(folder + "/") or f"/{folder}/" in framed: return index elif folder in segments: return index return None prioritized = [(rank, e) for e in rclone if (rank := _priority_folder_rank(e)) is not None] best_priority = min((rank for rank, _ in prioritized), default=None) priority_videos = [e for rank, e in prioritized if rank == best_priority] pool_priority = [e for e in rclone if e.source == "Source"] reason = {"code": "fallback", "summary": "First remaining duplicate candidate"} if priority_videos: pool = priority_videos reason = { "code": "vip_folder", "summary": f"VIP folder: {priority_folders[best_priority]}", } elif pool_priority: pool = pool_priority reason = {"code": "source", "summary": "Source copy outranks target copies"} else: pool = rclone if rclone else entries # Transport streams often inflate size without being the better keeper. preferred_containers = [ e for e in pool if Path(e.path).suffix.lower() not in _LOWEST_KEEP_PRIORITY_EXTS ] if preferred_containers and len(preferred_containers) != len(pool): pool = preferred_containers reason = {"code": "container", "summary": "Non-TS video outranks transport stream"} # Step 1: narrow to within size tolerance of the maximum max_size = max(e.size for e in pool) candidates = [e for e in pool if max_size - e.size <= tolerance_bytes] if len(candidates) == 1: if len(pool) > 1 and reason["code"] not in {"vip_folder", "source", "container"}: reason = {"code": "size", "summary": "Largest file after ranking rules"} return candidates[0], reason # Step 2: format preference (lower index in fmt_order = higher priority) def _fmt_rank(e: FileEntry) -> int: ext = Path(e.path).suffix.lower().lstrip(".") try: return fmt_order.index(ext) # lower = better except ValueError: return len(fmt_order) # unknown = lowest best_fmt = min(_fmt_rank(e) for e in candidates) by_fmt = [e for e in candidates if _fmt_rank(e) == best_fmt] if len(by_fmt) != len(candidates): ext = Path(by_fmt[0].path).suffix.lower().lstrip(".").upper() or "preferred format" reason = {"code": "format", "summary": f"Format preference: {ext}"} candidates = by_fmt if len(candidates) == 1: return candidates[0], reason # Step 3: resolution tag tie-break if use_res_tag: tagged = [e for e in candidates if _RES_LABEL_RE.search(Path(e.path).name)] if tagged: if len(tagged) != len(candidates): reason = {"code": "resolution_tag", "summary": "Filename has a resolution tag"} candidates = tagged if len(candidates) == 1: return candidates[0], reason # Step 4: longer filename tie-break if use_longer_name: keep = max(candidates, key=lambda e: len(Path(e.path).name)) return keep, {"code": "filename", "summary": "Longer filename tie-break"} return candidates[0], reason def decide_keep(entries: list[FileEntry]) -> FileEntry: """Pick KEEP candidate for duplicate output.""" return decide_keep_with_reason(entries)[0] def find_dupes(entries: Iterable[FileEntry]) -> dict[str, list[FileEntry]]: """Group entries by jav_id. A group is a dupe only if it has >=2 non-Catalog entries.""" groups: dict[str, list[FileEntry]] = {} for e in entries: # Re-evaluate duplicate keys from the current filename rules. Cached # entries may predate a new part detector such as `.1of2`; treating those # stale base IDs as duplicate files would produce risky delete hints. key = extract_id(Path(e.path).name) or e.jav_id groups.setdefault(key, []).append(e) out: dict[str, list[FileEntry]] = {} for k, v in groups.items(): rclone_count = sum(1 for e in v if e.source != "Catalog") if rclone_count >= 2: out[k] = v return out _SUSPICIOUS_MULTIPART_TAIL_RE = re.compile( r"(?:^|[-_.\s])(?:p|pt|part|cd|disc|ep|episode|vol|volume|scene)[-_.\s]*([a-d]|\d{1,2})(?:$|[-_.\s\[])" r"|(?:^|[-_.\s])([a-d]|\d{1,2})(?:$|\s*\[)", re.IGNORECASE, ) def describe_dupe_risks(jav_id: str, entries: list[FileEntry]) -> list[dict[str, str]]: """Flag duplicate groups that deserve manual review before deletion.""" rclone = [e for e in entries if e.source != "Catalog"] risks: list[dict[str, str]] = [] if "#part" not in jav_id and len(rclone) >= 3: risks.append({ "code": "large_same_id_group", "summary": f"{len(rclone)} files share this base ID; review for unrecognized parts before deleting.", }) suspicious: list[str] = [] for e in rclone: stem = Path(e.path).stem base_match = PRIMARY_ID_RE.match(stem) or COMPOUND_ID_RE.match(stem) or FALLBACK_ID_RE.match(stem) if not base_match: continue tail = _RESOLUTION_TAG_RE.sub("", stem[base_match.end():]).strip() if _SUSPICIOUS_MULTIPART_TAIL_RE.search(tail): suspicious.append(Path(e.path).name) if suspicious and "#part" not in jav_id: samples = ", ".join(suspicious[:3]) more = " ..." if len(suspicious) > 3 else "" risks.append({ "code": "part_like_suffix", "summary": f"Part-like suffixes still share the base ID: {samples}{more}", }) return risks def find_variant_alerts( entries: Iterable[FileEntry], ) -> dict[str, list[FileEntry]]: """Detect IDs where a bare form and a lowercase-variant form coexist. Example: both ``IBW-902.mp4`` and ``IBW-902z.mp4`` are present. They are different products — not dupes — but their coexistence is suspicious and warrants manual comparison. Returns {bare_id: [all entries whose re-evaluated ID matches bare or variant]}. Only bare IDs that have at least one variant sibling are included. """ index: dict[str, list[FileEntry]] = {} for e in entries: key = extract_id(Path(e.path).name) or e.jav_id index.setdefault(key, []).append(e) alerts: dict[str, list[FileEntry]] = {} for jav_id in index: if "#" in jav_id: continue # skip multipart IDs m = _VARIANT_SUFFIX_RE.match(jav_id) if not m: continue bare = m.group(1) if bare in index: # Merge bare + variant entries under the bare key. if bare not in alerts: alerts[bare] = list(index[bare]) alerts[bare].extend(index[jav_id]) return alerts