8d636ec633
Pulls the duplicate-detection and keep-ranking surface out of rc-jav.py: DEFAULT_KEEP_RANKING _KEEP_RANKING (module global) decide_keep_with_reason decide_keep find_dupes _SUSPICIOUS_MULTIPART_TAIL_RE describe_dupe_risks find_variant_alerts Same mutable-rebound pattern as PART_RES: `_KEEP_RANKING` is now configured via `set_keep_ranking(dict)` rather than a `global` write in rc-jav.py's main(). Reads happen only inside the module that owns the binding, so callers never see a stale snapshot. rc-jav.py: 1972 → 1763 lines (209 extracted). rcjav/dupes.py: 244 lines. Verified: - python rc-jav.py --help → ok - python fixtures/run.py → 17/17 cases pass - python -m unittest tests.test_rules → 5/5 OK Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
265 lines
10 KiB
Python
265 lines
10 KiB
Python
"""Duplicate detection, keep-ranking, and variant-alert logic.
|
|
|
|
Operates on FileEntry lists already extracted from rclone listings or
|
|
catalogs (see rcjav.rclone_io and rcjav.catalog). The "rules" that
|
|
influence which file gets KEPT live here; the "rules" that influence
|
|
how an entry's jav_id is derived live in rcjav.ids.
|
|
|
|
`_KEEP_RANKING` is a module-level dict mutated by `set_keep_ranking()`
|
|
at startup from config.json. Read via the local `_KEEP_RANKING` binding
|
|
inside `decide_keep_with_reason`. Callers outside this module should
|
|
go through `set_keep_ranking()` rather than touching the global.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
from rcjav.ids import (
|
|
PRIMARY_ID_RE,
|
|
FALLBACK_ID_RE,
|
|
COMPOUND_ID_RE,
|
|
_RES_LABEL_RE,
|
|
_VARIANT_SUFFIX_RE,
|
|
_RESOLUTION_TAG_RE,
|
|
_VIDEO_EXTS,
|
|
_LOWEST_KEEP_PRIORITY_EXTS,
|
|
extract_id,
|
|
)
|
|
from rcjav.model import FileEntry
|
|
|
|
|
|
DEFAULT_KEEP_RANKING: dict = {
|
|
"priority_folders": ["ClearJAV"],
|
|
"size_tolerance_mib": 0,
|
|
"format_preference": ["mkv", "mp4", "wmv", "avi"],
|
|
"tiebreak_res_tag": True,
|
|
"tiebreak_longer_name": True,
|
|
}
|
|
# Module-level ranking config; set from config.json via set_keep_ranking() so
|
|
# all call sites pick it up.
|
|
_KEEP_RANKING: dict = {}
|
|
|
|
|
|
def set_keep_ranking(ranking: dict | None) -> None:
|
|
"""Replace the module's effective keep-ranking config in place."""
|
|
global _KEEP_RANKING
|
|
_KEEP_RANKING = ranking or {}
|
|
|
|
|
|
def get_keep_ranking() -> dict:
|
|
"""Return the module's current effective ranking (read-only snapshot)."""
|
|
return dict(_KEEP_RANKING)
|
|
|
|
|
|
def decide_keep_with_reason(entries: list[FileEntry]) -> tuple[FileEntry, dict[str, str]]:
|
|
"""Pick KEEP candidate and explain the first ranking rule that settled it.
|
|
|
|
Catalog entries are excluded — they are offline/informational.
|
|
|
|
Ranking (descending priority, configurable via keep_ranking in config.json):
|
|
1. Video files in ordered priority folders outrank other rclone entries.
|
|
2. Source entries outrank Target entries when no priority-folder video exists.
|
|
3. Non-.ts files outrank .ts files when a duplicate group has both.
|
|
4. Largest file size. If sizes are within size_tolerance_mib, treated as equal
|
|
and format preference is consulted instead.
|
|
5. Format preference: ordered list of extensions (e.g. mkv > mp4 > wmv > avi).
|
|
6. Tie-break: has resolution tag in filename ([1080p], [2160p], [720p], [480p]).
|
|
7. Tie-break: longer filename (more metadata = more descriptive).
|
|
"""
|
|
ranking = _KEEP_RANKING or {}
|
|
tolerance_bytes = int(float(ranking.get("size_tolerance_mib") or 0) * 1024 * 1024)
|
|
priority_folders: list[str] = [
|
|
str(folder).strip() for folder in
|
|
(ranking.get("priority_folders") or DEFAULT_KEEP_RANKING["priority_folders"])
|
|
if str(folder).strip()
|
|
]
|
|
fmt_order: list[str] = list(
|
|
ranking.get("format_preference") or DEFAULT_KEEP_RANKING["format_preference"]
|
|
)
|
|
use_res_tag: bool = ranking.get("tiebreak_res_tag", True)
|
|
use_longer_name: bool = ranking.get("tiebreak_longer_name", True)
|
|
|
|
rclone = [e for e in entries if e.source != "Catalog"]
|
|
|
|
def _priority_folder_rank(e: FileEntry) -> int | None:
|
|
if Path(e.path).suffix.lower() not in _VIDEO_EXTS:
|
|
return None
|
|
# A root can be cq:JAV while the favored folder is a child path, or the
|
|
# supplied root can itself end in that folder. Match across full_path.
|
|
full_path = e.full_path.replace("\\", "/").strip("/").lower()
|
|
segments = [segment for segment in full_path.split("/") if segment]
|
|
for index, raw_folder in enumerate(priority_folders):
|
|
folder = raw_folder.replace("\\", "/").strip("/").lower()
|
|
if not folder:
|
|
continue
|
|
if "/" in folder or ":" in folder:
|
|
framed = f"/{full_path}/"
|
|
if full_path == folder or full_path.startswith(folder + "/") or f"/{folder}/" in framed:
|
|
return index
|
|
elif folder in segments:
|
|
return index
|
|
return None
|
|
|
|
prioritized = [(rank, e) for e in rclone if (rank := _priority_folder_rank(e)) is not None]
|
|
best_priority = min((rank for rank, _ in prioritized), default=None)
|
|
priority_videos = [e for rank, e in prioritized if rank == best_priority]
|
|
pool_priority = [e for e in rclone if e.source == "Source"]
|
|
reason = {"code": "fallback", "summary": "First remaining duplicate candidate"}
|
|
if priority_videos:
|
|
pool = priority_videos
|
|
reason = {
|
|
"code": "vip_folder",
|
|
"summary": f"VIP folder: {priority_folders[best_priority]}",
|
|
}
|
|
elif pool_priority:
|
|
pool = pool_priority
|
|
reason = {"code": "source", "summary": "Source copy outranks target copies"}
|
|
else:
|
|
pool = rclone if rclone else entries
|
|
|
|
# Transport streams often inflate size without being the better keeper.
|
|
preferred_containers = [
|
|
e for e in pool if Path(e.path).suffix.lower() not in _LOWEST_KEEP_PRIORITY_EXTS
|
|
]
|
|
if preferred_containers and len(preferred_containers) != len(pool):
|
|
pool = preferred_containers
|
|
reason = {"code": "container", "summary": "Non-TS video outranks transport stream"}
|
|
|
|
# Step 1: narrow to within size tolerance of the maximum
|
|
max_size = max(e.size for e in pool)
|
|
candidates = [e for e in pool if max_size - e.size <= tolerance_bytes]
|
|
|
|
if len(candidates) == 1:
|
|
if len(pool) > 1 and reason["code"] not in {"vip_folder", "source", "container"}:
|
|
reason = {"code": "size", "summary": "Largest file after ranking rules"}
|
|
return candidates[0], reason
|
|
|
|
# Step 2: format preference (lower index in fmt_order = higher priority)
|
|
def _fmt_rank(e: FileEntry) -> int:
|
|
ext = Path(e.path).suffix.lower().lstrip(".")
|
|
try:
|
|
return fmt_order.index(ext) # lower = better
|
|
except ValueError:
|
|
return len(fmt_order) # unknown = lowest
|
|
|
|
best_fmt = min(_fmt_rank(e) for e in candidates)
|
|
by_fmt = [e for e in candidates if _fmt_rank(e) == best_fmt]
|
|
if len(by_fmt) != len(candidates):
|
|
ext = Path(by_fmt[0].path).suffix.lower().lstrip(".").upper() or "preferred format"
|
|
reason = {"code": "format", "summary": f"Format preference: {ext}"}
|
|
candidates = by_fmt
|
|
|
|
if len(candidates) == 1:
|
|
return candidates[0], reason
|
|
|
|
# Step 3: resolution tag tie-break
|
|
if use_res_tag:
|
|
tagged = [e for e in candidates if _RES_LABEL_RE.search(Path(e.path).name)]
|
|
if tagged:
|
|
if len(tagged) != len(candidates):
|
|
reason = {"code": "resolution_tag", "summary": "Filename has a resolution tag"}
|
|
candidates = tagged
|
|
|
|
if len(candidates) == 1:
|
|
return candidates[0], reason
|
|
|
|
# Step 4: longer filename tie-break
|
|
if use_longer_name:
|
|
keep = max(candidates, key=lambda e: len(Path(e.path).name))
|
|
return keep, {"code": "filename", "summary": "Longer filename tie-break"}
|
|
|
|
return candidates[0], reason
|
|
|
|
|
|
def decide_keep(entries: list[FileEntry]) -> FileEntry:
|
|
"""Pick KEEP candidate for duplicate output."""
|
|
return decide_keep_with_reason(entries)[0]
|
|
|
|
|
|
def find_dupes(entries: Iterable[FileEntry]) -> dict[str, list[FileEntry]]:
|
|
"""Group entries by jav_id. A group is a dupe only if it has >=2 non-Catalog entries."""
|
|
groups: dict[str, list[FileEntry]] = {}
|
|
for e in entries:
|
|
# Re-evaluate duplicate keys from the current filename rules. Cached
|
|
# entries may predate a new part detector such as `.1of2`; treating those
|
|
# stale base IDs as duplicate files would produce risky delete hints.
|
|
key = extract_id(Path(e.path).name) or e.jav_id
|
|
groups.setdefault(key, []).append(e)
|
|
out: dict[str, list[FileEntry]] = {}
|
|
for k, v in groups.items():
|
|
rclone_count = sum(1 for e in v if e.source != "Catalog")
|
|
if rclone_count >= 2:
|
|
out[k] = v
|
|
return out
|
|
|
|
|
|
_SUSPICIOUS_MULTIPART_TAIL_RE = re.compile(
|
|
r"(?:^|[-_.\s])(?:p|pt|part|cd|disc|ep|episode|vol|volume|scene)[-_.\s]*([a-d]|\d{1,2})(?:$|[-_.\s\[])"
|
|
r"|(?:^|[-_.\s])([a-d]|\d{1,2})(?:$|\s*\[)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def describe_dupe_risks(jav_id: str, entries: list[FileEntry]) -> list[dict[str, str]]:
|
|
"""Flag duplicate groups that deserve manual review before deletion."""
|
|
rclone = [e for e in entries if e.source != "Catalog"]
|
|
risks: list[dict[str, str]] = []
|
|
if "#part" not in jav_id and len(rclone) >= 3:
|
|
risks.append({
|
|
"code": "large_same_id_group",
|
|
"summary": f"{len(rclone)} files share this base ID; review for unrecognized parts before deleting.",
|
|
})
|
|
|
|
suspicious: list[str] = []
|
|
for e in rclone:
|
|
stem = Path(e.path).stem
|
|
base_match = PRIMARY_ID_RE.match(stem) or COMPOUND_ID_RE.match(stem) or FALLBACK_ID_RE.match(stem)
|
|
if not base_match:
|
|
continue
|
|
tail = _RESOLUTION_TAG_RE.sub("", stem[base_match.end():]).strip()
|
|
if _SUSPICIOUS_MULTIPART_TAIL_RE.search(tail):
|
|
suspicious.append(Path(e.path).name)
|
|
if suspicious and "#part" not in jav_id:
|
|
samples = ", ".join(suspicious[:3])
|
|
more = " ..." if len(suspicious) > 3 else ""
|
|
risks.append({
|
|
"code": "part_like_suffix",
|
|
"summary": f"Part-like suffixes still share the base ID: {samples}{more}",
|
|
})
|
|
return risks
|
|
|
|
|
|
def find_variant_alerts(
|
|
entries: Iterable[FileEntry],
|
|
) -> dict[str, list[FileEntry]]:
|
|
"""Detect IDs where a bare form and a lowercase-variant form coexist.
|
|
|
|
Example: both ``IBW-902.mp4`` and ``IBW-902z.mp4`` are present.
|
|
They are different products — not dupes — but their coexistence is
|
|
suspicious and warrants manual comparison.
|
|
|
|
Returns {bare_id: [all entries whose re-evaluated ID matches bare or variant]}.
|
|
Only bare IDs that have at least one variant sibling are included.
|
|
"""
|
|
index: dict[str, list[FileEntry]] = {}
|
|
for e in entries:
|
|
key = extract_id(Path(e.path).name) or e.jav_id
|
|
index.setdefault(key, []).append(e)
|
|
|
|
alerts: dict[str, list[FileEntry]] = {}
|
|
for jav_id in index:
|
|
if "#" in jav_id:
|
|
continue # skip multipart IDs
|
|
m = _VARIANT_SUFFIX_RE.match(jav_id)
|
|
if not m:
|
|
continue
|
|
bare = m.group(1)
|
|
if bare in index:
|
|
# Merge bare + variant entries under the bare key.
|
|
if bare not in alerts:
|
|
alerts[bare] = list(index[bare])
|
|
alerts[bare].extend(index[jav_id])
|
|
return alerts
|