Files
rclone-jav/rcjav/ids.py
T
admin ba57b7fd21 Step 10a + 10b: scaffold rcjav/ package, extract ID rules
Carves the first slice out of the monolithic rc-jav.py (now 2017
lines, was 2230). Two new modules:

  rcjav/model.py    FileEntry dataclass — the one shared shape that
                    every other submodule will need.
  rcjav/ids.py      Single source of truth for everything that
                    influences a FileEntry.jav_id: PRIMARY_ID_RE,
                    FALLBACK_ID_RE, COMPOUND_ID_RE, BUILTIN_PART_RES,
                    configure_part_patterns, detect_part,
                    detect_part_from_stem, part_key, extract_id,
                    normalize_id, describe_id_match, expand_range,
                    plus the supporting "private" regexes
                    (_BRACKET_ID_RE, _RESOLUTION_TAG_RE, etc.) that
                    other code in rc-jav.py still reads.

rcjav/__init__.py re-exports the public surface so future external
consumers can `from rcjav import extract_id` without caring which
submodule it lives in.

rc-jav.py drops the inline ID block and pulls everything from
rcjav.ids via a single import statement. PART_RES is intentionally
NOT imported — it's mutated by configure_part_patterns at runtime, so
a captured top-level reference would go stale. A small helper
`_current_part_res()` reads it dynamically via `_rcjav_ids.PART_RES`.

fixtures/run.py fix: synthesized importlib module name changed from
"rcjav" (which now collides with the real package directory) to
"rcjav_script". Also prepends ROOT to sys.path so rc-jav.py's
`from rcjav.model import …` resolves when run as
`python fixtures/run.py`.

Verified:
  - python rc-jav.py --help              → usage banner prints
  - python fixtures/run.py               → 17/17 cases pass
  - python -m unittest tests.test_rules  → 5/5 OK

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-22 21:43:57 +02:00

244 lines
9.6 KiB
Python

"""JAV ID extraction, normalization, and part-suffix detection.
This is the single source of truth for everything that influences the
`jav_id` field of a FileEntry. Any change here is a "rules" change in
the cache-contract sense (see docs/CACHE_CONTRACT.md in the extension
repo) and bumps `id_rules` once that contract is implemented.
Cross-side note: the browser extension's content.js mirrors a subset
of this logic (page-title surface, no part suffix). The shared
fixture corpus at fixtures/ pins the cases both sides must agree on.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import Iterable
PRIMARY_ID_RE = re.compile(r"^([A-Za-z]+)-(\d+)")
FALLBACK_ID_RE = re.compile(r"^([A-Za-z0-9]+)-(\d+)")
COMPOUND_ID_RE = re.compile(r"^([A-Za-z0-9]+(?:-[A-Za-z0-9]+)+)-(\d+)")
# Part-suffix patterns: anchored at end of stem (after stripping extension).
# Each pattern's group(1) is the part number.
RANGE_RE = re.compile(r"\[(\d+)-(\d+)\]")
# Non-anchored XofY probe used in detect_part() to resolve the priority conflict
# between a trailing (N) copy-marker suffix and an embedded XofY part indicator.
# Example: "ENKI-031 [1080p].2of2 (1)" — the (1) is a filesystem collision suffix
# (rclone, Windows copy), not a part number; the 2of2 is the real part indicator.
# This pattern intentionally has no end-anchor so it matches anywhere in the stem.
_XOFY_PRIORITY_RE = re.compile(r"[._ -](\d+)\s*of\s*\d+", re.IGNORECASE)
BUILTIN_PART_RES = [
re.compile(r"[-_ ](?:pt|part|cd|disc)[-_ ]?(\d+)$", re.IGNORECASE),
re.compile(r"\s*\((\d+)(?:\s*of\s*\d+)?\)$", re.IGNORECASE),
# Exported multipart filenames often end in `.1of2` / `-2 of 4`.
re.compile(r"[._ -](\d+)\s*of\s*\d+$", re.IGNORECASE),
# Bare numeric suffixes (`_N`, ` N`) are only treated as part numbers when
# the number is 1-2 digits. Wider patterns falsely matched resolution tags
# (`_2160`, `_4K2160`) and dates/years (`SSIS-001 2023.mp4` -> `#part2023`),
# corrupting cache keys.
# Staged detection also retries after resolution/actress cleanup, so end
# anchors can match both raw suffixes and metadata-blocked suffixes safely.
re.compile(r"_(\d{1,2})$"),
# Hyphen short-part suffix after the ID, e.g. OFJE-195-1 [480p].mp4.
# Limit to 1-2 digits so the base ID's usual 3+ digit numeric component
# does not make every canonical `ABC-123` filename look multipart.
re.compile(r"-(\d{1,2})$"),
# Lettered parts: separator (hyphen or underscore) followed by A-D.
# Uppercase only — lowercase letters are variant designators (e.g. IBW-902z)
# and are preserved as part of the base ID, not treated as part numbers.
re.compile(r"[-_]([A-D])$"),
# Bare uppercase letter directly after the ID digits with no separator,
# e.g. BAK-052A, BAK-052B. Lookbehind ensures a digit precedes.
re.compile(r"(?<=\d)([A-D])$"),
re.compile(r"\s+(\d{1,2})$"),
]
PART_RES = list(BUILTIN_PART_RES)
def configure_part_patterns(patterns: Iterable[str]) -> list[str]:
"""Extend part suffix detection with user regexes whose first group is part number."""
global PART_RES
PART_RES = list(BUILTIN_PART_RES)
errors: list[str] = []
for pattern in patterns:
pattern = str(pattern or "").strip()
if not pattern:
continue
try:
compiled = re.compile(pattern, re.IGNORECASE)
except re.error as e:
errors.append(f"{pattern!r}: {e}")
continue
if compiled.groups < 1:
errors.append(f"{pattern!r}: needs a capture group for the part number")
continue
PART_RES.append(compiled)
return errors
def detect_part(stem: str) -> str | None:
"""Return part number as string if stem ends with a part suffix, else None.
XofY (e.g. .2of2) anywhere in the stem takes unconditional priority over a
trailing (N) suffix. A file named 'ENKI-031 [1080p].2of2 (1).mp4' is part 2;
the trailing (1) is a filesystem copy-collision marker (rclone / Windows),
not a part number. Without this pre-check the ordered PART_RES list would
match (1) first and misclassify the file as part 1.
"""
m = _XOFY_PRIORITY_RE.search(stem)
if m:
return m.group(1)
for r in PART_RES:
m = r.search(stem)
if m:
return m.group(1)
return None
def part_key(part: str) -> str:
token = str(part or "").strip()
if token.isdigit():
return str(int(token))
if len(token) == 1 and token.isalpha():
return str(ord(token.upper()) - ord("A") + 1)
return token.upper()
# Matches a trailing lowercase letter variant designator, e.g. the 'z' in IBW-902z.
_VARIANT_SUFFIX_RE = re.compile(r"^(.+?)([a-z])$")
# Strips `[resolution]` and ` - Actress Name` from a stem so that part-suffix
# patterns anchored at `$` fire correctly.
# Canonical naming: {ID}[-{part}][ - {actress}][ [{resolution}]]
_RESOLUTION_TAG_RE = re.compile(r"\s*\[[^\]]*\]\s*$")
# Bracket-wrapped ID: [REAL-779] or [HODV-21076] Saki Hatsumi [1080p]
_BRACKET_ID_RE = re.compile(r"^\[([^\]]+)\]")
_RES_LABEL_RE = re.compile(r"\[(?:2160|1080|720|480)p\]", re.IGNORECASE)
_VIDEO_EXTS = {
".avi", ".flv", ".m2ts", ".m4v", ".mkv", ".mov", ".mp4", ".mpeg",
".mpg", ".ts", ".webm", ".wmv",
}
_LOWEST_KEEP_PRIORITY_EXTS = {".ts"}
# No-hyphen ID fallback: MVSD312 -> MVSD-312 (letters-only prefix + digits, no hyphen)
_NOHYPHEN_ID_RE = re.compile(r"^([A-Za-z]{2,8})(\d{3,6})")
def _clean_stem_for_parts(stem: str) -> str:
"""Return stem with trailing [tag] and ' - Actress' stripped.
Resolution is always the last bracketed token; actress follows ' - '."""
s = _RESOLUTION_TAG_RE.sub("", stem).strip()
if " - " in s:
s = s[:s.index(" - ")].strip()
return s
def _part_detection_stems(stem: str) -> list[str]:
"""Return stem stages for part detection from least to most cleaned."""
resolution_clean = _RESOLUTION_TAG_RE.sub("", stem).strip()
actress_clean = _clean_stem_for_parts(stem)
out: list[str] = []
for candidate in (stem, resolution_clean, actress_clean):
if candidate and candidate not in out:
out.append(candidate)
return out
def detect_part_from_stem(stem: str) -> str | None:
"""Try part suffix rules before and after metadata cleanup."""
for candidate in _part_detection_stems(stem):
part = detect_part(candidate)
if part:
return part
return None
def extract_id(name: str) -> str | None:
stem = Path(name).stem
# Strip bracket wrapper: [REAL-779] -> REAL-779, [SCOP-297] [1080p] -> SCOP-297
effective_stem = stem
if stem.startswith("["):
bm = _BRACKET_ID_RE.match(stem)
if bm:
effective_stem = bm.group(1).strip()
m = PRIMARY_ID_RE.match(effective_stem)
if not m:
m = COMPOUND_ID_RE.match(effective_stem)
if not m:
m = FALLBACK_ID_RE.match(effective_stem)
if not m:
# No-hyphen fallback: MVSD312 -> MVSD-312
m = _NOHYPHEN_ID_RE.match(effective_stem)
if not m:
return None
num = int(m.group(2))
width = max(3, len(m.group(2)))
prefix = m.group(1).upper()
if prefix == "FC2":
prefix = "FC2-PPV"
# Check the character immediately after the matched digits.
# Lowercase -> variant designator (e.g. IBW-902z): fold into the base ID.
# Uppercase A-D -> part letter: handled below by detect_part.
# Anything else (space, '[', end-of-string) -> no variant.
after = effective_stem[m.end():m.end() + 1]
variant = after if after.islower() else ""
base = f"{prefix}-{num:0{width}d}{variant}"
# Use original stem (not effective_stem) so bracket-wrapped filenames like
# [REAL-779-1].mp4 still get part detection applied to the full stem.
# Run before and after metadata cleanup: raw suffixes such as
# "KV-118 - Actress_PART1" must survive, while trailing [1080p] tags still
# need cleanup before end-anchored detectors can match.
part = detect_part_from_stem(stem)
return f"{base}#part{part_key(part)}" if part else base
def normalize_id(raw: str) -> str | None:
return extract_id(raw + ".x") # add dummy ext so stem keeps the ID intact
def describe_id_match(display_query: str, matched_query: str, matched_id: str,
expansion_count: int) -> dict[str, str]:
"""Explain the matcher path used for one ID hit in JSON output."""
if "*" in matched_query or "?" in matched_query:
kind, label, confidence = "wildcard", "Wildcard ID", "broad"
elif expansion_count > 1:
kind, label, confidence = "range", "Range member", "expanded"
elif "#part" in matched_query:
kind, label, confidence = "exact_part", "Exact part ID", "high"
elif matched_id.startswith(matched_query + "#part"):
kind, label, confidence = "part", "Base ID + part", "related"
elif display_query.upper() != matched_query.upper():
kind, label, confidence = "normalized", "Normalized ID", "normalized"
else:
kind, label, confidence = "exact", "Exact ID", "high"
return {
"match_kind": kind,
"match_reason": label,
"match_confidence": confidence,
"matched_query": matched_query,
"matched_id": matched_id,
}
def expand_range(raw: str) -> list[str] | None:
"""Expand a bracket range like 'IPZZ-[820-860]' into individual ID strings.
Returns None if no range marker present."""
m = RANGE_RE.search(raw)
if not m:
return None
a, b = int(m.group(1)), int(m.group(2))
lo, hi = (a, b) if a <= b else (b, a)
width = max(len(m.group(1)), len(m.group(2))) # preserve zero-padding
return [raw[:m.start()] + f"{n:0{width}d}" + raw[m.end():] for n in range(lo, hi + 1)]