Step 10d: extract dupes/keep-ranking into rcjav/dupes.py
Pulls the duplicate-detection and keep-ranking surface out of rc-jav.py: DEFAULT_KEEP_RANKING _KEEP_RANKING (module global) decide_keep_with_reason decide_keep find_dupes _SUSPICIOUS_MULTIPART_TAIL_RE describe_dupe_risks find_variant_alerts Same mutable-rebound pattern as PART_RES: `_KEEP_RANKING` is now configured via `set_keep_ranking(dict)` rather than a `global` write in rc-jav.py's main(). Reads happen only inside the module that owns the binding, so callers never see a stale snapshot. rc-jav.py: 1972 → 1763 lines (209 extracted). rcjav/dupes.py: 244 lines. Verified: - python rc-jav.py --help → ok - python fixtures/run.py → 17/17 cases pass - python -m unittest tests.test_rules → 5/5 OK Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -171,15 +171,15 @@ from rcjav.cache import (
|
||||
fmt_age,
|
||||
)
|
||||
|
||||
DEFAULT_KEEP_RANKING: dict = {
|
||||
"priority_folders": ["ClearJAV"],
|
||||
"size_tolerance_mib": 0,
|
||||
"format_preference": ["mkv", "mp4", "wmv", "avi"],
|
||||
"tiebreak_res_tag": True,
|
||||
"tiebreak_longer_name": True,
|
||||
}
|
||||
# Module-level ranking config; set from config.json in main() so all call sites pick it up.
|
||||
_KEEP_RANKING: dict = {}
|
||||
from rcjav.dupes import (
|
||||
DEFAULT_KEEP_RANKING,
|
||||
set_keep_ranking,
|
||||
decide_keep_with_reason,
|
||||
decide_keep,
|
||||
find_dupes,
|
||||
describe_dupe_risks,
|
||||
find_variant_alerts,
|
||||
)
|
||||
|
||||
CONFIG_PATH = Path(__file__).resolve().parent / "config.json"
|
||||
|
||||
@@ -950,215 +950,6 @@ def render_dupes(dupes: dict[str, list[FileEntry]],
|
||||
console.print()
|
||||
|
||||
|
||||
def decide_keep_with_reason(entries: list[FileEntry]) -> tuple[FileEntry, dict[str, str]]:
|
||||
"""Pick KEEP candidate and explain the first ranking rule that settled it.
|
||||
|
||||
Catalog entries are excluded — they are offline/informational.
|
||||
|
||||
Ranking (descending priority, configurable via keep_ranking in config.json):
|
||||
1. Video files in ordered priority folders outrank other rclone entries.
|
||||
2. Source entries outrank Target entries when no priority-folder video exists.
|
||||
3. Non-.ts files outrank .ts files when a duplicate group has both.
|
||||
4. Largest file size. If sizes are within size_tolerance_mib, treated as equal
|
||||
and format preference is consulted instead.
|
||||
5. Format preference: ordered list of extensions (e.g. mkv > mp4 > wmv > avi).
|
||||
6. Tie-break: has resolution tag in filename ([1080p], [2160p], [720p], [480p]).
|
||||
7. Tie-break: longer filename (more metadata = more descriptive).
|
||||
"""
|
||||
ranking = _KEEP_RANKING or {}
|
||||
tolerance_bytes = int(float(ranking.get("size_tolerance_mib") or 0) * 1024 * 1024)
|
||||
priority_folders: list[str] = [
|
||||
str(folder).strip() for folder in
|
||||
(ranking.get("priority_folders") or DEFAULT_KEEP_RANKING["priority_folders"])
|
||||
if str(folder).strip()
|
||||
]
|
||||
fmt_order: list[str] = list(
|
||||
ranking.get("format_preference") or DEFAULT_KEEP_RANKING["format_preference"]
|
||||
)
|
||||
use_res_tag: bool = ranking.get("tiebreak_res_tag", True)
|
||||
use_longer_name: bool = ranking.get("tiebreak_longer_name", True)
|
||||
|
||||
rclone = [e for e in entries if e.source != "Catalog"]
|
||||
|
||||
def _priority_folder_rank(e: FileEntry) -> int | None:
|
||||
if Path(e.path).suffix.lower() not in _VIDEO_EXTS:
|
||||
return None
|
||||
# A root can be cq:JAV while the favored folder is a child path, or the
|
||||
# supplied root can itself end in that folder. Match across full_path.
|
||||
full_path = e.full_path.replace("\\", "/").strip("/").lower()
|
||||
segments = [segment for segment in full_path.split("/") if segment]
|
||||
for index, raw_folder in enumerate(priority_folders):
|
||||
folder = raw_folder.replace("\\", "/").strip("/").lower()
|
||||
if not folder:
|
||||
continue
|
||||
if "/" in folder or ":" in folder:
|
||||
framed = f"/{full_path}/"
|
||||
if full_path == folder or full_path.startswith(folder + "/") or f"/{folder}/" in framed:
|
||||
return index
|
||||
elif folder in segments:
|
||||
return index
|
||||
return None
|
||||
|
||||
prioritized = [(rank, e) for e in rclone if (rank := _priority_folder_rank(e)) is not None]
|
||||
best_priority = min((rank for rank, _ in prioritized), default=None)
|
||||
priority_videos = [e for rank, e in prioritized if rank == best_priority]
|
||||
pool_priority = [e for e in rclone if e.source == "Source"]
|
||||
reason = {"code": "fallback", "summary": "First remaining duplicate candidate"}
|
||||
if priority_videos:
|
||||
pool = priority_videos
|
||||
reason = {
|
||||
"code": "vip_folder",
|
||||
"summary": f"VIP folder: {priority_folders[best_priority]}",
|
||||
}
|
||||
elif pool_priority:
|
||||
pool = pool_priority
|
||||
reason = {"code": "source", "summary": "Source copy outranks target copies"}
|
||||
else:
|
||||
pool = rclone if rclone else entries
|
||||
|
||||
# Transport streams often inflate size without being the better keeper.
|
||||
preferred_containers = [
|
||||
e for e in pool if Path(e.path).suffix.lower() not in _LOWEST_KEEP_PRIORITY_EXTS
|
||||
]
|
||||
if preferred_containers and len(preferred_containers) != len(pool):
|
||||
pool = preferred_containers
|
||||
reason = {"code": "container", "summary": "Non-TS video outranks transport stream"}
|
||||
|
||||
# Step 1: narrow to within size tolerance of the maximum
|
||||
max_size = max(e.size for e in pool)
|
||||
candidates = [e for e in pool if max_size - e.size <= tolerance_bytes]
|
||||
|
||||
if len(candidates) == 1:
|
||||
if len(pool) > 1 and reason["code"] not in {"vip_folder", "source", "container"}:
|
||||
reason = {"code": "size", "summary": "Largest file after ranking rules"}
|
||||
return candidates[0], reason
|
||||
|
||||
# Step 2: format preference (lower index in fmt_order = higher priority)
|
||||
def _fmt_rank(e: FileEntry) -> int:
|
||||
ext = Path(e.path).suffix.lower().lstrip(".")
|
||||
try:
|
||||
return fmt_order.index(ext) # lower = better
|
||||
except ValueError:
|
||||
return len(fmt_order) # unknown = lowest
|
||||
|
||||
best_fmt = min(_fmt_rank(e) for e in candidates)
|
||||
by_fmt = [e for e in candidates if _fmt_rank(e) == best_fmt]
|
||||
if len(by_fmt) != len(candidates):
|
||||
ext = Path(by_fmt[0].path).suffix.lower().lstrip(".").upper() or "preferred format"
|
||||
reason = {"code": "format", "summary": f"Format preference: {ext}"}
|
||||
candidates = by_fmt
|
||||
|
||||
if len(candidates) == 1:
|
||||
return candidates[0], reason
|
||||
|
||||
# Step 3: resolution tag tie-break
|
||||
if use_res_tag:
|
||||
tagged = [e for e in candidates if _RES_LABEL_RE.search(Path(e.path).name)]
|
||||
if tagged:
|
||||
if len(tagged) != len(candidates):
|
||||
reason = {"code": "resolution_tag", "summary": "Filename has a resolution tag"}
|
||||
candidates = tagged
|
||||
|
||||
if len(candidates) == 1:
|
||||
return candidates[0], reason
|
||||
|
||||
# Step 4: longer filename tie-break
|
||||
if use_longer_name:
|
||||
keep = max(candidates, key=lambda e: len(Path(e.path).name))
|
||||
return keep, {"code": "filename", "summary": "Longer filename tie-break"}
|
||||
|
||||
return candidates[0], reason
|
||||
|
||||
|
||||
def decide_keep(entries: list[FileEntry]) -> FileEntry:
|
||||
"""Pick KEEP candidate for duplicate output."""
|
||||
return decide_keep_with_reason(entries)[0]
|
||||
|
||||
|
||||
def find_dupes(entries: Iterable[FileEntry]) -> dict[str, list[FileEntry]]:
|
||||
"""Group entries by jav_id. A group is a dupe only if it has >=2 non-Catalog entries."""
|
||||
groups: dict[str, list[FileEntry]] = {}
|
||||
for e in entries:
|
||||
# Re-evaluate duplicate keys from the current filename rules. Cached
|
||||
# entries may predate a new part detector such as `.1of2`; treating those
|
||||
# stale base IDs as duplicate files would produce risky delete hints.
|
||||
key = extract_id(Path(e.path).name) or e.jav_id
|
||||
groups.setdefault(key, []).append(e)
|
||||
out: dict[str, list[FileEntry]] = {}
|
||||
for k, v in groups.items():
|
||||
rclone_count = sum(1 for e in v if e.source != "Catalog")
|
||||
if rclone_count >= 2:
|
||||
out[k] = v
|
||||
return out
|
||||
|
||||
|
||||
_SUSPICIOUS_MULTIPART_TAIL_RE = re.compile(
|
||||
r"(?:^|[-_.\s])(?:p|pt|part|cd|disc|ep|episode|vol|volume|scene)[-_.\s]*([a-d]|\d{1,2})(?:$|[-_.\s\[])"
|
||||
r"|(?:^|[-_.\s])([a-d]|\d{1,2})(?:$|\s*\[)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def describe_dupe_risks(jav_id: str, entries: list[FileEntry]) -> list[dict[str, str]]:
|
||||
"""Flag duplicate groups that deserve manual review before deletion."""
|
||||
rclone = [e for e in entries if e.source != "Catalog"]
|
||||
risks: list[dict[str, str]] = []
|
||||
if "#part" not in jav_id and len(rclone) >= 3:
|
||||
risks.append({
|
||||
"code": "large_same_id_group",
|
||||
"summary": f"{len(rclone)} files share this base ID; review for unrecognized parts before deleting.",
|
||||
})
|
||||
|
||||
suspicious: list[str] = []
|
||||
for e in rclone:
|
||||
stem = Path(e.path).stem
|
||||
base_match = PRIMARY_ID_RE.match(stem) or COMPOUND_ID_RE.match(stem) or FALLBACK_ID_RE.match(stem)
|
||||
if not base_match:
|
||||
continue
|
||||
tail = _RESOLUTION_TAG_RE.sub("", stem[base_match.end():]).strip()
|
||||
if _SUSPICIOUS_MULTIPART_TAIL_RE.search(tail):
|
||||
suspicious.append(Path(e.path).name)
|
||||
if suspicious and "#part" not in jav_id:
|
||||
samples = ", ".join(suspicious[:3])
|
||||
more = " ..." if len(suspicious) > 3 else ""
|
||||
risks.append({
|
||||
"code": "part_like_suffix",
|
||||
"summary": f"Part-like suffixes still share the base ID: {samples}{more}",
|
||||
})
|
||||
return risks
|
||||
|
||||
|
||||
def find_variant_alerts(
|
||||
entries: Iterable[FileEntry],
|
||||
) -> dict[str, list[FileEntry]]:
|
||||
"""Detect IDs where a bare form and a lowercase-variant form coexist.
|
||||
|
||||
Example: both ``IBW-902.mp4`` and ``IBW-902z.mp4`` are present.
|
||||
They are different products — not dupes — but their coexistence is
|
||||
suspicious and warrants manual comparison.
|
||||
|
||||
Returns {bare_id: [all entries whose re-evaluated ID matches bare or variant]}.
|
||||
Only bare IDs that have at least one variant sibling are included.
|
||||
"""
|
||||
index: dict[str, list[FileEntry]] = {}
|
||||
for e in entries:
|
||||
key = extract_id(Path(e.path).name) or e.jav_id
|
||||
index.setdefault(key, []).append(e)
|
||||
|
||||
alerts: dict[str, list[FileEntry]] = {}
|
||||
for jav_id in index:
|
||||
if "#" in jav_id:
|
||||
continue # skip multipart IDs
|
||||
m = _VARIANT_SUFFIX_RE.match(jav_id)
|
||||
if not m:
|
||||
continue
|
||||
bare = m.group(1)
|
||||
if bare in index:
|
||||
# Merge bare + variant entries under the bare key.
|
||||
if bare not in alerts:
|
||||
alerts[bare] = list(index[bare])
|
||||
alerts[bare].extend(index[jav_id])
|
||||
return alerts
|
||||
|
||||
|
||||
# ---------- library issues (non-canonical filenames) ----------
|
||||
@@ -1552,8 +1343,7 @@ def main():
|
||||
DEFAULT_TARGET = list(cfg["default_target"])
|
||||
if "default_catalog" in cfg:
|
||||
DEFAULT_CATALOG = list(cfg["default_catalog"])
|
||||
global _KEEP_RANKING
|
||||
_KEEP_RANKING = cfg.get("keep_ranking") or {}
|
||||
set_keep_ranking(cfg.get("keep_ranking") or {})
|
||||
part_patterns = list(cfg.get("part_patterns") or []) + list(args.part_pattern)
|
||||
pattern_errors = configure_part_patterns(part_patterns)
|
||||
if pattern_errors:
|
||||
|
||||
@@ -6,6 +6,16 @@ find at the top level. Adding a new submodule does not change the
|
||||
public surface — only this file does.
|
||||
"""
|
||||
from rcjav.model import FileEntry # noqa: F401
|
||||
from rcjav.dupes import ( # noqa: F401
|
||||
DEFAULT_KEEP_RANKING,
|
||||
set_keep_ranking,
|
||||
get_keep_ranking,
|
||||
decide_keep_with_reason,
|
||||
decide_keep,
|
||||
find_dupes,
|
||||
describe_dupe_risks,
|
||||
find_variant_alerts,
|
||||
)
|
||||
from rcjav.cache import ( # noqa: F401
|
||||
CACHE_PATH,
|
||||
CACHE_VERSION,
|
||||
|
||||
+264
@@ -0,0 +1,264 @@
|
||||
"""Duplicate detection, keep-ranking, and variant-alert logic.
|
||||
|
||||
Operates on FileEntry lists already extracted from rclone listings or
|
||||
catalogs (see rcjav.rclone_io and rcjav.catalog). The "rules" that
|
||||
influence which file gets KEPT live here; the "rules" that influence
|
||||
how an entry's jav_id is derived live in rcjav.ids.
|
||||
|
||||
`_KEEP_RANKING` is a module-level dict mutated by `set_keep_ranking()`
|
||||
at startup from config.json. Read via the local `_KEEP_RANKING` binding
|
||||
inside `decide_keep_with_reason`. Callers outside this module should
|
||||
go through `set_keep_ranking()` rather than touching the global.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
from rcjav.ids import (
|
||||
PRIMARY_ID_RE,
|
||||
FALLBACK_ID_RE,
|
||||
COMPOUND_ID_RE,
|
||||
_RES_LABEL_RE,
|
||||
_VARIANT_SUFFIX_RE,
|
||||
_RESOLUTION_TAG_RE,
|
||||
_VIDEO_EXTS,
|
||||
_LOWEST_KEEP_PRIORITY_EXTS,
|
||||
extract_id,
|
||||
)
|
||||
from rcjav.model import FileEntry
|
||||
|
||||
|
||||
DEFAULT_KEEP_RANKING: dict = {
|
||||
"priority_folders": ["ClearJAV"],
|
||||
"size_tolerance_mib": 0,
|
||||
"format_preference": ["mkv", "mp4", "wmv", "avi"],
|
||||
"tiebreak_res_tag": True,
|
||||
"tiebreak_longer_name": True,
|
||||
}
|
||||
# Module-level ranking config; set from config.json via set_keep_ranking() so
|
||||
# all call sites pick it up.
|
||||
_KEEP_RANKING: dict = {}
|
||||
|
||||
|
||||
def set_keep_ranking(ranking: dict | None) -> None:
|
||||
"""Replace the module's effective keep-ranking config in place."""
|
||||
global _KEEP_RANKING
|
||||
_KEEP_RANKING = ranking or {}
|
||||
|
||||
|
||||
def get_keep_ranking() -> dict:
|
||||
"""Return the module's current effective ranking (read-only snapshot)."""
|
||||
return dict(_KEEP_RANKING)
|
||||
|
||||
|
||||
def decide_keep_with_reason(entries: list[FileEntry]) -> tuple[FileEntry, dict[str, str]]:
|
||||
"""Pick KEEP candidate and explain the first ranking rule that settled it.
|
||||
|
||||
Catalog entries are excluded — they are offline/informational.
|
||||
|
||||
Ranking (descending priority, configurable via keep_ranking in config.json):
|
||||
1. Video files in ordered priority folders outrank other rclone entries.
|
||||
2. Source entries outrank Target entries when no priority-folder video exists.
|
||||
3. Non-.ts files outrank .ts files when a duplicate group has both.
|
||||
4. Largest file size. If sizes are within size_tolerance_mib, treated as equal
|
||||
and format preference is consulted instead.
|
||||
5. Format preference: ordered list of extensions (e.g. mkv > mp4 > wmv > avi).
|
||||
6. Tie-break: has resolution tag in filename ([1080p], [2160p], [720p], [480p]).
|
||||
7. Tie-break: longer filename (more metadata = more descriptive).
|
||||
"""
|
||||
ranking = _KEEP_RANKING or {}
|
||||
tolerance_bytes = int(float(ranking.get("size_tolerance_mib") or 0) * 1024 * 1024)
|
||||
priority_folders: list[str] = [
|
||||
str(folder).strip() for folder in
|
||||
(ranking.get("priority_folders") or DEFAULT_KEEP_RANKING["priority_folders"])
|
||||
if str(folder).strip()
|
||||
]
|
||||
fmt_order: list[str] = list(
|
||||
ranking.get("format_preference") or DEFAULT_KEEP_RANKING["format_preference"]
|
||||
)
|
||||
use_res_tag: bool = ranking.get("tiebreak_res_tag", True)
|
||||
use_longer_name: bool = ranking.get("tiebreak_longer_name", True)
|
||||
|
||||
rclone = [e for e in entries if e.source != "Catalog"]
|
||||
|
||||
def _priority_folder_rank(e: FileEntry) -> int | None:
|
||||
if Path(e.path).suffix.lower() not in _VIDEO_EXTS:
|
||||
return None
|
||||
# A root can be cq:JAV while the favored folder is a child path, or the
|
||||
# supplied root can itself end in that folder. Match across full_path.
|
||||
full_path = e.full_path.replace("\\", "/").strip("/").lower()
|
||||
segments = [segment for segment in full_path.split("/") if segment]
|
||||
for index, raw_folder in enumerate(priority_folders):
|
||||
folder = raw_folder.replace("\\", "/").strip("/").lower()
|
||||
if not folder:
|
||||
continue
|
||||
if "/" in folder or ":" in folder:
|
||||
framed = f"/{full_path}/"
|
||||
if full_path == folder or full_path.startswith(folder + "/") or f"/{folder}/" in framed:
|
||||
return index
|
||||
elif folder in segments:
|
||||
return index
|
||||
return None
|
||||
|
||||
prioritized = [(rank, e) for e in rclone if (rank := _priority_folder_rank(e)) is not None]
|
||||
best_priority = min((rank for rank, _ in prioritized), default=None)
|
||||
priority_videos = [e for rank, e in prioritized if rank == best_priority]
|
||||
pool_priority = [e for e in rclone if e.source == "Source"]
|
||||
reason = {"code": "fallback", "summary": "First remaining duplicate candidate"}
|
||||
if priority_videos:
|
||||
pool = priority_videos
|
||||
reason = {
|
||||
"code": "vip_folder",
|
||||
"summary": f"VIP folder: {priority_folders[best_priority]}",
|
||||
}
|
||||
elif pool_priority:
|
||||
pool = pool_priority
|
||||
reason = {"code": "source", "summary": "Source copy outranks target copies"}
|
||||
else:
|
||||
pool = rclone if rclone else entries
|
||||
|
||||
# Transport streams often inflate size without being the better keeper.
|
||||
preferred_containers = [
|
||||
e for e in pool if Path(e.path).suffix.lower() not in _LOWEST_KEEP_PRIORITY_EXTS
|
||||
]
|
||||
if preferred_containers and len(preferred_containers) != len(pool):
|
||||
pool = preferred_containers
|
||||
reason = {"code": "container", "summary": "Non-TS video outranks transport stream"}
|
||||
|
||||
# Step 1: narrow to within size tolerance of the maximum
|
||||
max_size = max(e.size for e in pool)
|
||||
candidates = [e for e in pool if max_size - e.size <= tolerance_bytes]
|
||||
|
||||
if len(candidates) == 1:
|
||||
if len(pool) > 1 and reason["code"] not in {"vip_folder", "source", "container"}:
|
||||
reason = {"code": "size", "summary": "Largest file after ranking rules"}
|
||||
return candidates[0], reason
|
||||
|
||||
# Step 2: format preference (lower index in fmt_order = higher priority)
|
||||
def _fmt_rank(e: FileEntry) -> int:
|
||||
ext = Path(e.path).suffix.lower().lstrip(".")
|
||||
try:
|
||||
return fmt_order.index(ext) # lower = better
|
||||
except ValueError:
|
||||
return len(fmt_order) # unknown = lowest
|
||||
|
||||
best_fmt = min(_fmt_rank(e) for e in candidates)
|
||||
by_fmt = [e for e in candidates if _fmt_rank(e) == best_fmt]
|
||||
if len(by_fmt) != len(candidates):
|
||||
ext = Path(by_fmt[0].path).suffix.lower().lstrip(".").upper() or "preferred format"
|
||||
reason = {"code": "format", "summary": f"Format preference: {ext}"}
|
||||
candidates = by_fmt
|
||||
|
||||
if len(candidates) == 1:
|
||||
return candidates[0], reason
|
||||
|
||||
# Step 3: resolution tag tie-break
|
||||
if use_res_tag:
|
||||
tagged = [e for e in candidates if _RES_LABEL_RE.search(Path(e.path).name)]
|
||||
if tagged:
|
||||
if len(tagged) != len(candidates):
|
||||
reason = {"code": "resolution_tag", "summary": "Filename has a resolution tag"}
|
||||
candidates = tagged
|
||||
|
||||
if len(candidates) == 1:
|
||||
return candidates[0], reason
|
||||
|
||||
# Step 4: longer filename tie-break
|
||||
if use_longer_name:
|
||||
keep = max(candidates, key=lambda e: len(Path(e.path).name))
|
||||
return keep, {"code": "filename", "summary": "Longer filename tie-break"}
|
||||
|
||||
return candidates[0], reason
|
||||
|
||||
|
||||
def decide_keep(entries: list[FileEntry]) -> FileEntry:
|
||||
"""Pick KEEP candidate for duplicate output."""
|
||||
return decide_keep_with_reason(entries)[0]
|
||||
|
||||
|
||||
def find_dupes(entries: Iterable[FileEntry]) -> dict[str, list[FileEntry]]:
|
||||
"""Group entries by jav_id. A group is a dupe only if it has >=2 non-Catalog entries."""
|
||||
groups: dict[str, list[FileEntry]] = {}
|
||||
for e in entries:
|
||||
# Re-evaluate duplicate keys from the current filename rules. Cached
|
||||
# entries may predate a new part detector such as `.1of2`; treating those
|
||||
# stale base IDs as duplicate files would produce risky delete hints.
|
||||
key = extract_id(Path(e.path).name) or e.jav_id
|
||||
groups.setdefault(key, []).append(e)
|
||||
out: dict[str, list[FileEntry]] = {}
|
||||
for k, v in groups.items():
|
||||
rclone_count = sum(1 for e in v if e.source != "Catalog")
|
||||
if rclone_count >= 2:
|
||||
out[k] = v
|
||||
return out
|
||||
|
||||
|
||||
_SUSPICIOUS_MULTIPART_TAIL_RE = re.compile(
|
||||
r"(?:^|[-_.\s])(?:p|pt|part|cd|disc|ep|episode|vol|volume|scene)[-_.\s]*([a-d]|\d{1,2})(?:$|[-_.\s\[])"
|
||||
r"|(?:^|[-_.\s])([a-d]|\d{1,2})(?:$|\s*\[)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def describe_dupe_risks(jav_id: str, entries: list[FileEntry]) -> list[dict[str, str]]:
|
||||
"""Flag duplicate groups that deserve manual review before deletion."""
|
||||
rclone = [e for e in entries if e.source != "Catalog"]
|
||||
risks: list[dict[str, str]] = []
|
||||
if "#part" not in jav_id and len(rclone) >= 3:
|
||||
risks.append({
|
||||
"code": "large_same_id_group",
|
||||
"summary": f"{len(rclone)} files share this base ID; review for unrecognized parts before deleting.",
|
||||
})
|
||||
|
||||
suspicious: list[str] = []
|
||||
for e in rclone:
|
||||
stem = Path(e.path).stem
|
||||
base_match = PRIMARY_ID_RE.match(stem) or COMPOUND_ID_RE.match(stem) or FALLBACK_ID_RE.match(stem)
|
||||
if not base_match:
|
||||
continue
|
||||
tail = _RESOLUTION_TAG_RE.sub("", stem[base_match.end():]).strip()
|
||||
if _SUSPICIOUS_MULTIPART_TAIL_RE.search(tail):
|
||||
suspicious.append(Path(e.path).name)
|
||||
if suspicious and "#part" not in jav_id:
|
||||
samples = ", ".join(suspicious[:3])
|
||||
more = " ..." if len(suspicious) > 3 else ""
|
||||
risks.append({
|
||||
"code": "part_like_suffix",
|
||||
"summary": f"Part-like suffixes still share the base ID: {samples}{more}",
|
||||
})
|
||||
return risks
|
||||
|
||||
|
||||
def find_variant_alerts(
|
||||
entries: Iterable[FileEntry],
|
||||
) -> dict[str, list[FileEntry]]:
|
||||
"""Detect IDs where a bare form and a lowercase-variant form coexist.
|
||||
|
||||
Example: both ``IBW-902.mp4`` and ``IBW-902z.mp4`` are present.
|
||||
They are different products — not dupes — but their coexistence is
|
||||
suspicious and warrants manual comparison.
|
||||
|
||||
Returns {bare_id: [all entries whose re-evaluated ID matches bare or variant]}.
|
||||
Only bare IDs that have at least one variant sibling are included.
|
||||
"""
|
||||
index: dict[str, list[FileEntry]] = {}
|
||||
for e in entries:
|
||||
key = extract_id(Path(e.path).name) or e.jav_id
|
||||
index.setdefault(key, []).append(e)
|
||||
|
||||
alerts: dict[str, list[FileEntry]] = {}
|
||||
for jav_id in index:
|
||||
if "#" in jav_id:
|
||||
continue # skip multipart IDs
|
||||
m = _VARIANT_SUFFIX_RE.match(jav_id)
|
||||
if not m:
|
||||
continue
|
||||
bare = m.group(1)
|
||||
if bare in index:
|
||||
# Merge bare + variant entries under the bare key.
|
||||
if bare not in alerts:
|
||||
alerts[bare] = list(index[bare])
|
||||
alerts[bare].extend(index[jav_id])
|
||||
return alerts
|
||||
Reference in New Issue
Block a user