diff --git a/rc-jav.py b/rc-jav.py index 0c84afe..0c6d108 100644 --- a/rc-jav.py +++ b/rc-jav.py @@ -171,15 +171,15 @@ from rcjav.cache import ( fmt_age, ) -DEFAULT_KEEP_RANKING: dict = { - "priority_folders": ["ClearJAV"], - "size_tolerance_mib": 0, - "format_preference": ["mkv", "mp4", "wmv", "avi"], - "tiebreak_res_tag": True, - "tiebreak_longer_name": True, -} -# Module-level ranking config; set from config.json in main() so all call sites pick it up. -_KEEP_RANKING: dict = {} +from rcjav.dupes import ( + DEFAULT_KEEP_RANKING, + set_keep_ranking, + decide_keep_with_reason, + decide_keep, + find_dupes, + describe_dupe_risks, + find_variant_alerts, +) CONFIG_PATH = Path(__file__).resolve().parent / "config.json" @@ -950,215 +950,6 @@ def render_dupes(dupes: dict[str, list[FileEntry]], console.print() -def decide_keep_with_reason(entries: list[FileEntry]) -> tuple[FileEntry, dict[str, str]]: - """Pick KEEP candidate and explain the first ranking rule that settled it. - - Catalog entries are excluded — they are offline/informational. - - Ranking (descending priority, configurable via keep_ranking in config.json): - 1. Video files in ordered priority folders outrank other rclone entries. - 2. Source entries outrank Target entries when no priority-folder video exists. - 3. Non-.ts files outrank .ts files when a duplicate group has both. - 4. Largest file size. If sizes are within size_tolerance_mib, treated as equal - and format preference is consulted instead. - 5. Format preference: ordered list of extensions (e.g. mkv > mp4 > wmv > avi). - 6. Tie-break: has resolution tag in filename ([1080p], [2160p], [720p], [480p]). - 7. Tie-break: longer filename (more metadata = more descriptive). - """ - ranking = _KEEP_RANKING or {} - tolerance_bytes = int(float(ranking.get("size_tolerance_mib") or 0) * 1024 * 1024) - priority_folders: list[str] = [ - str(folder).strip() for folder in - (ranking.get("priority_folders") or DEFAULT_KEEP_RANKING["priority_folders"]) - if str(folder).strip() - ] - fmt_order: list[str] = list( - ranking.get("format_preference") or DEFAULT_KEEP_RANKING["format_preference"] - ) - use_res_tag: bool = ranking.get("tiebreak_res_tag", True) - use_longer_name: bool = ranking.get("tiebreak_longer_name", True) - - rclone = [e for e in entries if e.source != "Catalog"] - - def _priority_folder_rank(e: FileEntry) -> int | None: - if Path(e.path).suffix.lower() not in _VIDEO_EXTS: - return None - # A root can be cq:JAV while the favored folder is a child path, or the - # supplied root can itself end in that folder. Match across full_path. - full_path = e.full_path.replace("\\", "/").strip("/").lower() - segments = [segment for segment in full_path.split("/") if segment] - for index, raw_folder in enumerate(priority_folders): - folder = raw_folder.replace("\\", "/").strip("/").lower() - if not folder: - continue - if "/" in folder or ":" in folder: - framed = f"/{full_path}/" - if full_path == folder or full_path.startswith(folder + "/") or f"/{folder}/" in framed: - return index - elif folder in segments: - return index - return None - - prioritized = [(rank, e) for e in rclone if (rank := _priority_folder_rank(e)) is not None] - best_priority = min((rank for rank, _ in prioritized), default=None) - priority_videos = [e for rank, e in prioritized if rank == best_priority] - pool_priority = [e for e in rclone if e.source == "Source"] - reason = {"code": "fallback", "summary": "First remaining duplicate candidate"} - if priority_videos: - pool = priority_videos - reason = { - "code": "vip_folder", - "summary": f"VIP folder: {priority_folders[best_priority]}", - } - elif pool_priority: - pool = pool_priority - reason = {"code": "source", "summary": "Source copy outranks target copies"} - else: - pool = rclone if rclone else entries - - # Transport streams often inflate size without being the better keeper. - preferred_containers = [ - e for e in pool if Path(e.path).suffix.lower() not in _LOWEST_KEEP_PRIORITY_EXTS - ] - if preferred_containers and len(preferred_containers) != len(pool): - pool = preferred_containers - reason = {"code": "container", "summary": "Non-TS video outranks transport stream"} - - # Step 1: narrow to within size tolerance of the maximum - max_size = max(e.size for e in pool) - candidates = [e for e in pool if max_size - e.size <= tolerance_bytes] - - if len(candidates) == 1: - if len(pool) > 1 and reason["code"] not in {"vip_folder", "source", "container"}: - reason = {"code": "size", "summary": "Largest file after ranking rules"} - return candidates[0], reason - - # Step 2: format preference (lower index in fmt_order = higher priority) - def _fmt_rank(e: FileEntry) -> int: - ext = Path(e.path).suffix.lower().lstrip(".") - try: - return fmt_order.index(ext) # lower = better - except ValueError: - return len(fmt_order) # unknown = lowest - - best_fmt = min(_fmt_rank(e) for e in candidates) - by_fmt = [e for e in candidates if _fmt_rank(e) == best_fmt] - if len(by_fmt) != len(candidates): - ext = Path(by_fmt[0].path).suffix.lower().lstrip(".").upper() or "preferred format" - reason = {"code": "format", "summary": f"Format preference: {ext}"} - candidates = by_fmt - - if len(candidates) == 1: - return candidates[0], reason - - # Step 3: resolution tag tie-break - if use_res_tag: - tagged = [e for e in candidates if _RES_LABEL_RE.search(Path(e.path).name)] - if tagged: - if len(tagged) != len(candidates): - reason = {"code": "resolution_tag", "summary": "Filename has a resolution tag"} - candidates = tagged - - if len(candidates) == 1: - return candidates[0], reason - - # Step 4: longer filename tie-break - if use_longer_name: - keep = max(candidates, key=lambda e: len(Path(e.path).name)) - return keep, {"code": "filename", "summary": "Longer filename tie-break"} - - return candidates[0], reason - - -def decide_keep(entries: list[FileEntry]) -> FileEntry: - """Pick KEEP candidate for duplicate output.""" - return decide_keep_with_reason(entries)[0] - - -def find_dupes(entries: Iterable[FileEntry]) -> dict[str, list[FileEntry]]: - """Group entries by jav_id. A group is a dupe only if it has >=2 non-Catalog entries.""" - groups: dict[str, list[FileEntry]] = {} - for e in entries: - # Re-evaluate duplicate keys from the current filename rules. Cached - # entries may predate a new part detector such as `.1of2`; treating those - # stale base IDs as duplicate files would produce risky delete hints. - key = extract_id(Path(e.path).name) or e.jav_id - groups.setdefault(key, []).append(e) - out: dict[str, list[FileEntry]] = {} - for k, v in groups.items(): - rclone_count = sum(1 for e in v if e.source != "Catalog") - if rclone_count >= 2: - out[k] = v - return out - - -_SUSPICIOUS_MULTIPART_TAIL_RE = re.compile( - r"(?:^|[-_.\s])(?:p|pt|part|cd|disc|ep|episode|vol|volume|scene)[-_.\s]*([a-d]|\d{1,2})(?:$|[-_.\s\[])" - r"|(?:^|[-_.\s])([a-d]|\d{1,2})(?:$|\s*\[)", - re.IGNORECASE, -) - - -def describe_dupe_risks(jav_id: str, entries: list[FileEntry]) -> list[dict[str, str]]: - """Flag duplicate groups that deserve manual review before deletion.""" - rclone = [e for e in entries if e.source != "Catalog"] - risks: list[dict[str, str]] = [] - if "#part" not in jav_id and len(rclone) >= 3: - risks.append({ - "code": "large_same_id_group", - "summary": f"{len(rclone)} files share this base ID; review for unrecognized parts before deleting.", - }) - - suspicious: list[str] = [] - for e in rclone: - stem = Path(e.path).stem - base_match = PRIMARY_ID_RE.match(stem) or COMPOUND_ID_RE.match(stem) or FALLBACK_ID_RE.match(stem) - if not base_match: - continue - tail = _RESOLUTION_TAG_RE.sub("", stem[base_match.end():]).strip() - if _SUSPICIOUS_MULTIPART_TAIL_RE.search(tail): - suspicious.append(Path(e.path).name) - if suspicious and "#part" not in jav_id: - samples = ", ".join(suspicious[:3]) - more = " ..." if len(suspicious) > 3 else "" - risks.append({ - "code": "part_like_suffix", - "summary": f"Part-like suffixes still share the base ID: {samples}{more}", - }) - return risks - - -def find_variant_alerts( - entries: Iterable[FileEntry], -) -> dict[str, list[FileEntry]]: - """Detect IDs where a bare form and a lowercase-variant form coexist. - - Example: both ``IBW-902.mp4`` and ``IBW-902z.mp4`` are present. - They are different products — not dupes — but their coexistence is - suspicious and warrants manual comparison. - - Returns {bare_id: [all entries whose re-evaluated ID matches bare or variant]}. - Only bare IDs that have at least one variant sibling are included. - """ - index: dict[str, list[FileEntry]] = {} - for e in entries: - key = extract_id(Path(e.path).name) or e.jav_id - index.setdefault(key, []).append(e) - - alerts: dict[str, list[FileEntry]] = {} - for jav_id in index: - if "#" in jav_id: - continue # skip multipart IDs - m = _VARIANT_SUFFIX_RE.match(jav_id) - if not m: - continue - bare = m.group(1) - if bare in index: - # Merge bare + variant entries under the bare key. - if bare not in alerts: - alerts[bare] = list(index[bare]) - alerts[bare].extend(index[jav_id]) - return alerts # ---------- library issues (non-canonical filenames) ---------- @@ -1552,8 +1343,7 @@ def main(): DEFAULT_TARGET = list(cfg["default_target"]) if "default_catalog" in cfg: DEFAULT_CATALOG = list(cfg["default_catalog"]) - global _KEEP_RANKING - _KEEP_RANKING = cfg.get("keep_ranking") or {} + set_keep_ranking(cfg.get("keep_ranking") or {}) part_patterns = list(cfg.get("part_patterns") or []) + list(args.part_pattern) pattern_errors = configure_part_patterns(part_patterns) if pattern_errors: diff --git a/rcjav/__init__.py b/rcjav/__init__.py index 7e66a4f..240adbd 100644 --- a/rcjav/__init__.py +++ b/rcjav/__init__.py @@ -6,6 +6,16 @@ find at the top level. Adding a new submodule does not change the public surface — only this file does. """ from rcjav.model import FileEntry # noqa: F401 +from rcjav.dupes import ( # noqa: F401 + DEFAULT_KEEP_RANKING, + set_keep_ranking, + get_keep_ranking, + decide_keep_with_reason, + decide_keep, + find_dupes, + describe_dupe_risks, + find_variant_alerts, +) from rcjav.cache import ( # noqa: F401 CACHE_PATH, CACHE_VERSION, diff --git a/rcjav/dupes.py b/rcjav/dupes.py new file mode 100644 index 0000000..1b08799 --- /dev/null +++ b/rcjav/dupes.py @@ -0,0 +1,264 @@ +"""Duplicate detection, keep-ranking, and variant-alert logic. + +Operates on FileEntry lists already extracted from rclone listings or +catalogs (see rcjav.rclone_io and rcjav.catalog). The "rules" that +influence which file gets KEPT live here; the "rules" that influence +how an entry's jav_id is derived live in rcjav.ids. + +`_KEEP_RANKING` is a module-level dict mutated by `set_keep_ranking()` +at startup from config.json. Read via the local `_KEEP_RANKING` binding +inside `decide_keep_with_reason`. Callers outside this module should +go through `set_keep_ranking()` rather than touching the global. +""" +from __future__ import annotations + +import re +from pathlib import Path +from typing import Iterable + +from rcjav.ids import ( + PRIMARY_ID_RE, + FALLBACK_ID_RE, + COMPOUND_ID_RE, + _RES_LABEL_RE, + _VARIANT_SUFFIX_RE, + _RESOLUTION_TAG_RE, + _VIDEO_EXTS, + _LOWEST_KEEP_PRIORITY_EXTS, + extract_id, +) +from rcjav.model import FileEntry + + +DEFAULT_KEEP_RANKING: dict = { + "priority_folders": ["ClearJAV"], + "size_tolerance_mib": 0, + "format_preference": ["mkv", "mp4", "wmv", "avi"], + "tiebreak_res_tag": True, + "tiebreak_longer_name": True, +} +# Module-level ranking config; set from config.json via set_keep_ranking() so +# all call sites pick it up. +_KEEP_RANKING: dict = {} + + +def set_keep_ranking(ranking: dict | None) -> None: + """Replace the module's effective keep-ranking config in place.""" + global _KEEP_RANKING + _KEEP_RANKING = ranking or {} + + +def get_keep_ranking() -> dict: + """Return the module's current effective ranking (read-only snapshot).""" + return dict(_KEEP_RANKING) + + +def decide_keep_with_reason(entries: list[FileEntry]) -> tuple[FileEntry, dict[str, str]]: + """Pick KEEP candidate and explain the first ranking rule that settled it. + + Catalog entries are excluded — they are offline/informational. + + Ranking (descending priority, configurable via keep_ranking in config.json): + 1. Video files in ordered priority folders outrank other rclone entries. + 2. Source entries outrank Target entries when no priority-folder video exists. + 3. Non-.ts files outrank .ts files when a duplicate group has both. + 4. Largest file size. If sizes are within size_tolerance_mib, treated as equal + and format preference is consulted instead. + 5. Format preference: ordered list of extensions (e.g. mkv > mp4 > wmv > avi). + 6. Tie-break: has resolution tag in filename ([1080p], [2160p], [720p], [480p]). + 7. Tie-break: longer filename (more metadata = more descriptive). + """ + ranking = _KEEP_RANKING or {} + tolerance_bytes = int(float(ranking.get("size_tolerance_mib") or 0) * 1024 * 1024) + priority_folders: list[str] = [ + str(folder).strip() for folder in + (ranking.get("priority_folders") or DEFAULT_KEEP_RANKING["priority_folders"]) + if str(folder).strip() + ] + fmt_order: list[str] = list( + ranking.get("format_preference") or DEFAULT_KEEP_RANKING["format_preference"] + ) + use_res_tag: bool = ranking.get("tiebreak_res_tag", True) + use_longer_name: bool = ranking.get("tiebreak_longer_name", True) + + rclone = [e for e in entries if e.source != "Catalog"] + + def _priority_folder_rank(e: FileEntry) -> int | None: + if Path(e.path).suffix.lower() not in _VIDEO_EXTS: + return None + # A root can be cq:JAV while the favored folder is a child path, or the + # supplied root can itself end in that folder. Match across full_path. + full_path = e.full_path.replace("\\", "/").strip("/").lower() + segments = [segment for segment in full_path.split("/") if segment] + for index, raw_folder in enumerate(priority_folders): + folder = raw_folder.replace("\\", "/").strip("/").lower() + if not folder: + continue + if "/" in folder or ":" in folder: + framed = f"/{full_path}/" + if full_path == folder or full_path.startswith(folder + "/") or f"/{folder}/" in framed: + return index + elif folder in segments: + return index + return None + + prioritized = [(rank, e) for e in rclone if (rank := _priority_folder_rank(e)) is not None] + best_priority = min((rank for rank, _ in prioritized), default=None) + priority_videos = [e for rank, e in prioritized if rank == best_priority] + pool_priority = [e for e in rclone if e.source == "Source"] + reason = {"code": "fallback", "summary": "First remaining duplicate candidate"} + if priority_videos: + pool = priority_videos + reason = { + "code": "vip_folder", + "summary": f"VIP folder: {priority_folders[best_priority]}", + } + elif pool_priority: + pool = pool_priority + reason = {"code": "source", "summary": "Source copy outranks target copies"} + else: + pool = rclone if rclone else entries + + # Transport streams often inflate size without being the better keeper. + preferred_containers = [ + e for e in pool if Path(e.path).suffix.lower() not in _LOWEST_KEEP_PRIORITY_EXTS + ] + if preferred_containers and len(preferred_containers) != len(pool): + pool = preferred_containers + reason = {"code": "container", "summary": "Non-TS video outranks transport stream"} + + # Step 1: narrow to within size tolerance of the maximum + max_size = max(e.size for e in pool) + candidates = [e for e in pool if max_size - e.size <= tolerance_bytes] + + if len(candidates) == 1: + if len(pool) > 1 and reason["code"] not in {"vip_folder", "source", "container"}: + reason = {"code": "size", "summary": "Largest file after ranking rules"} + return candidates[0], reason + + # Step 2: format preference (lower index in fmt_order = higher priority) + def _fmt_rank(e: FileEntry) -> int: + ext = Path(e.path).suffix.lower().lstrip(".") + try: + return fmt_order.index(ext) # lower = better + except ValueError: + return len(fmt_order) # unknown = lowest + + best_fmt = min(_fmt_rank(e) for e in candidates) + by_fmt = [e for e in candidates if _fmt_rank(e) == best_fmt] + if len(by_fmt) != len(candidates): + ext = Path(by_fmt[0].path).suffix.lower().lstrip(".").upper() or "preferred format" + reason = {"code": "format", "summary": f"Format preference: {ext}"} + candidates = by_fmt + + if len(candidates) == 1: + return candidates[0], reason + + # Step 3: resolution tag tie-break + if use_res_tag: + tagged = [e for e in candidates if _RES_LABEL_RE.search(Path(e.path).name)] + if tagged: + if len(tagged) != len(candidates): + reason = {"code": "resolution_tag", "summary": "Filename has a resolution tag"} + candidates = tagged + + if len(candidates) == 1: + return candidates[0], reason + + # Step 4: longer filename tie-break + if use_longer_name: + keep = max(candidates, key=lambda e: len(Path(e.path).name)) + return keep, {"code": "filename", "summary": "Longer filename tie-break"} + + return candidates[0], reason + + +def decide_keep(entries: list[FileEntry]) -> FileEntry: + """Pick KEEP candidate for duplicate output.""" + return decide_keep_with_reason(entries)[0] + + +def find_dupes(entries: Iterable[FileEntry]) -> dict[str, list[FileEntry]]: + """Group entries by jav_id. A group is a dupe only if it has >=2 non-Catalog entries.""" + groups: dict[str, list[FileEntry]] = {} + for e in entries: + # Re-evaluate duplicate keys from the current filename rules. Cached + # entries may predate a new part detector such as `.1of2`; treating those + # stale base IDs as duplicate files would produce risky delete hints. + key = extract_id(Path(e.path).name) or e.jav_id + groups.setdefault(key, []).append(e) + out: dict[str, list[FileEntry]] = {} + for k, v in groups.items(): + rclone_count = sum(1 for e in v if e.source != "Catalog") + if rclone_count >= 2: + out[k] = v + return out + + +_SUSPICIOUS_MULTIPART_TAIL_RE = re.compile( + r"(?:^|[-_.\s])(?:p|pt|part|cd|disc|ep|episode|vol|volume|scene)[-_.\s]*([a-d]|\d{1,2})(?:$|[-_.\s\[])" + r"|(?:^|[-_.\s])([a-d]|\d{1,2})(?:$|\s*\[)", + re.IGNORECASE, +) + + +def describe_dupe_risks(jav_id: str, entries: list[FileEntry]) -> list[dict[str, str]]: + """Flag duplicate groups that deserve manual review before deletion.""" + rclone = [e for e in entries if e.source != "Catalog"] + risks: list[dict[str, str]] = [] + if "#part" not in jav_id and len(rclone) >= 3: + risks.append({ + "code": "large_same_id_group", + "summary": f"{len(rclone)} files share this base ID; review for unrecognized parts before deleting.", + }) + + suspicious: list[str] = [] + for e in rclone: + stem = Path(e.path).stem + base_match = PRIMARY_ID_RE.match(stem) or COMPOUND_ID_RE.match(stem) or FALLBACK_ID_RE.match(stem) + if not base_match: + continue + tail = _RESOLUTION_TAG_RE.sub("", stem[base_match.end():]).strip() + if _SUSPICIOUS_MULTIPART_TAIL_RE.search(tail): + suspicious.append(Path(e.path).name) + if suspicious and "#part" not in jav_id: + samples = ", ".join(suspicious[:3]) + more = " ..." if len(suspicious) > 3 else "" + risks.append({ + "code": "part_like_suffix", + "summary": f"Part-like suffixes still share the base ID: {samples}{more}", + }) + return risks + + +def find_variant_alerts( + entries: Iterable[FileEntry], +) -> dict[str, list[FileEntry]]: + """Detect IDs where a bare form and a lowercase-variant form coexist. + + Example: both ``IBW-902.mp4`` and ``IBW-902z.mp4`` are present. + They are different products — not dupes — but their coexistence is + suspicious and warrants manual comparison. + + Returns {bare_id: [all entries whose re-evaluated ID matches bare or variant]}. + Only bare IDs that have at least one variant sibling are included. + """ + index: dict[str, list[FileEntry]] = {} + for e in entries: + key = extract_id(Path(e.path).name) or e.jav_id + index.setdefault(key, []).append(e) + + alerts: dict[str, list[FileEntry]] = {} + for jav_id in index: + if "#" in jav_id: + continue # skip multipart IDs + m = _VARIANT_SUFFIX_RE.match(jav_id) + if not m: + continue + bare = m.group(1) + if bare in index: + # Merge bare + variant entries under the bare key. + if bare not in alerts: + alerts[bare] = list(index[bare]) + alerts[bare].extend(index[jav_id]) + return alerts