33c495ad57
Implements the two-tier contract from docs/CACHE_CONTRACT.md (extension
repo, locked at step 9):
cache_schema on-disk shape; mismatch -> force rebuild
id_rules bumps when extraction rules change
id_rules_signature sha256 over canonical rule text; catches drift
when the integer bump is forgotten
New constants in rcjav/cache.py:
CACHE_SCHEMA_VERSION = 1
ID_RULES_VERSION = 1 (the legacy "version: 3" cache reads as
id_rules: 0 after in-place migration)
New helpers:
rcjav.ids.current_rules_signature()
Sha256 over the canonical text of every rule that influences
a jav_id: built-in regexes, BUILTIN_PART_RES, PART_RES (which
captures user-added part patterns), FC2 handling.
rcjav.cache.load_cache(signature=None)
Reads cache.json. Legacy `version: 3` headers get an in-place
header upgrade with no forced rescan; the cache is stamped as
`id_rules: 0` + signature "legacy" so it surfaces as
"stale by rules" in cache_state. Schema mismatch on the new
header still forces a rebuild.
rcjav.cache.cache_state(cache, signature)
Classifies a cache as "fresh" / "stale_by_rules" /
"schema_mismatch". Drives the three-state extension UX.
rcjav.cache.stamp_current_rules(cache, signature)
Updates id_rules and id_rules_signature in place. Called after
a successful full scan or --reextract.
New CLI command:
rc-jav.py --reextract
Walks `cache["remotes"][r]["files"]` against the live rule set and
updates `jav_id` in place. No rclone calls — fast path (seconds on
a 7k-file cache). Reports changed/unchanged/dropped per remote.
Stamps current rules into the saved cache.
--scan (full, no --scan-since) now also stamps current rules.
--scan --scan-since deliberately does NOT stamp: it only re-walks
recently-modified files, so older entries may still carry jav_ids
from previous rules; cache stays "stale by rules" until a full scan
or --reextract.
Verified:
- python rc-jav.py --reextract --format json on the live 7124-file
cache → 0 changes (existing IDs already canonical), cache.json
rewritten with new header
- cache_state on the post-migration cache → "fresh"
- tests + fixtures + --help all pass
Extension-side (host's cache_status response + options-cache.js
three-state UX + Re-extract IDs button) ships in a separate commit
in the extension repo.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
276 lines
11 KiB
Python
276 lines
11 KiB
Python
"""JAV ID extraction, normalization, and part-suffix detection.
|
|
|
|
This is the single source of truth for everything that influences the
|
|
`jav_id` field of a FileEntry. Any change here is a "rules" change in
|
|
the cache-contract sense (see docs/CACHE_CONTRACT.md in the extension
|
|
repo) and bumps `id_rules` once that contract is implemented.
|
|
|
|
Cross-side note: the browser extension's content.js mirrors a subset
|
|
of this logic (page-title surface, no part suffix). The shared
|
|
fixture corpus at fixtures/ pins the cases both sides must agree on.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
|
|
PRIMARY_ID_RE = re.compile(r"^([A-Za-z]+)-(\d+)")
|
|
FALLBACK_ID_RE = re.compile(r"^([A-Za-z0-9]+)-(\d+)")
|
|
COMPOUND_ID_RE = re.compile(r"^([A-Za-z0-9]+(?:-[A-Za-z0-9]+)+)-(\d+)")
|
|
|
|
# Part-suffix patterns: anchored at end of stem (after stripping extension).
|
|
# Each pattern's group(1) is the part number.
|
|
RANGE_RE = re.compile(r"\[(\d+)-(\d+)\]")
|
|
|
|
# Non-anchored XofY probe used in detect_part() to resolve the priority conflict
|
|
# between a trailing (N) copy-marker suffix and an embedded XofY part indicator.
|
|
# Example: "ENKI-031 [1080p].2of2 (1)" — the (1) is a filesystem collision suffix
|
|
# (rclone, Windows copy), not a part number; the 2of2 is the real part indicator.
|
|
# This pattern intentionally has no end-anchor so it matches anywhere in the stem.
|
|
_XOFY_PRIORITY_RE = re.compile(r"[._ -](\d+)\s*of\s*\d+", re.IGNORECASE)
|
|
|
|
BUILTIN_PART_RES = [
|
|
re.compile(r"[-_ ](?:pt|part|cd|disc)[-_ ]?(\d+)$", re.IGNORECASE),
|
|
re.compile(r"\s*\((\d+)(?:\s*of\s*\d+)?\)$", re.IGNORECASE),
|
|
# Exported multipart filenames often end in `.1of2` / `-2 of 4`.
|
|
re.compile(r"[._ -](\d+)\s*of\s*\d+$", re.IGNORECASE),
|
|
# Bare numeric suffixes (`_N`, ` N`) are only treated as part numbers when
|
|
# the number is 1-2 digits. Wider patterns falsely matched resolution tags
|
|
# (`_2160`, `_4K2160`) and dates/years (`SSIS-001 2023.mp4` -> `#part2023`),
|
|
# corrupting cache keys.
|
|
# Staged detection also retries after resolution/actress cleanup, so end
|
|
# anchors can match both raw suffixes and metadata-blocked suffixes safely.
|
|
re.compile(r"_(\d{1,2})$"),
|
|
# Hyphen short-part suffix after the ID, e.g. OFJE-195-1 [480p].mp4.
|
|
# Limit to 1-2 digits so the base ID's usual 3+ digit numeric component
|
|
# does not make every canonical `ABC-123` filename look multipart.
|
|
re.compile(r"-(\d{1,2})$"),
|
|
# Lettered parts: separator (hyphen or underscore) followed by A-D.
|
|
# Uppercase only — lowercase letters are variant designators (e.g. IBW-902z)
|
|
# and are preserved as part of the base ID, not treated as part numbers.
|
|
re.compile(r"[-_]([A-D])$"),
|
|
# Bare uppercase letter directly after the ID digits with no separator,
|
|
# e.g. BAK-052A, BAK-052B. Lookbehind ensures a digit precedes.
|
|
re.compile(r"(?<=\d)([A-D])$"),
|
|
re.compile(r"\s+(\d{1,2})$"),
|
|
]
|
|
PART_RES = list(BUILTIN_PART_RES)
|
|
|
|
|
|
def configure_part_patterns(patterns: Iterable[str]) -> list[str]:
|
|
"""Extend part suffix detection with user regexes whose first group is part number."""
|
|
global PART_RES
|
|
PART_RES = list(BUILTIN_PART_RES)
|
|
errors: list[str] = []
|
|
for pattern in patterns:
|
|
pattern = str(pattern or "").strip()
|
|
if not pattern:
|
|
continue
|
|
try:
|
|
compiled = re.compile(pattern, re.IGNORECASE)
|
|
except re.error as e:
|
|
errors.append(f"{pattern!r}: {e}")
|
|
continue
|
|
if compiled.groups < 1:
|
|
errors.append(f"{pattern!r}: needs a capture group for the part number")
|
|
continue
|
|
PART_RES.append(compiled)
|
|
return errors
|
|
|
|
|
|
def detect_part(stem: str) -> str | None:
|
|
"""Return part number as string if stem ends with a part suffix, else None.
|
|
|
|
XofY (e.g. .2of2) anywhere in the stem takes unconditional priority over a
|
|
trailing (N) suffix. A file named 'ENKI-031 [1080p].2of2 (1).mp4' is part 2;
|
|
the trailing (1) is a filesystem copy-collision marker (rclone / Windows),
|
|
not a part number. Without this pre-check the ordered PART_RES list would
|
|
match (1) first and misclassify the file as part 1.
|
|
"""
|
|
m = _XOFY_PRIORITY_RE.search(stem)
|
|
if m:
|
|
return m.group(1)
|
|
for r in PART_RES:
|
|
m = r.search(stem)
|
|
if m:
|
|
return m.group(1)
|
|
return None
|
|
|
|
|
|
def part_key(part: str) -> str:
|
|
token = str(part or "").strip()
|
|
if token.isdigit():
|
|
return str(int(token))
|
|
if len(token) == 1 and token.isalpha():
|
|
return str(ord(token.upper()) - ord("A") + 1)
|
|
return token.upper()
|
|
|
|
|
|
# Matches a trailing lowercase letter variant designator, e.g. the 'z' in IBW-902z.
|
|
_VARIANT_SUFFIX_RE = re.compile(r"^(.+?)([a-z])$")
|
|
|
|
# Strips `[resolution]` and ` - Actress Name` from a stem so that part-suffix
|
|
# patterns anchored at `$` fire correctly.
|
|
# Canonical naming: {ID}[-{part}][ - {actress}][ [{resolution}]]
|
|
_RESOLUTION_TAG_RE = re.compile(r"\s*\[[^\]]*\]\s*$")
|
|
|
|
# Bracket-wrapped ID: [REAL-779] or [HODV-21076] Saki Hatsumi [1080p]
|
|
_BRACKET_ID_RE = re.compile(r"^\[([^\]]+)\]")
|
|
_RES_LABEL_RE = re.compile(r"\[(?:2160|1080|720|480)p\]", re.IGNORECASE)
|
|
_VIDEO_EXTS = {
|
|
".avi", ".flv", ".m2ts", ".m4v", ".mkv", ".mov", ".mp4", ".mpeg",
|
|
".mpg", ".ts", ".webm", ".wmv",
|
|
}
|
|
_LOWEST_KEEP_PRIORITY_EXTS = {".ts"}
|
|
|
|
# No-hyphen ID fallback: MVSD312 -> MVSD-312 (letters-only prefix + digits, no hyphen)
|
|
_NOHYPHEN_ID_RE = re.compile(r"^([A-Za-z]{2,8})(\d{3,6})")
|
|
|
|
|
|
def _clean_stem_for_parts(stem: str) -> str:
|
|
"""Return stem with trailing [tag] and ' - Actress' stripped.
|
|
Resolution is always the last bracketed token; actress follows ' - '."""
|
|
s = _RESOLUTION_TAG_RE.sub("", stem).strip()
|
|
if " - " in s:
|
|
s = s[:s.index(" - ")].strip()
|
|
return s
|
|
|
|
|
|
def _part_detection_stems(stem: str) -> list[str]:
|
|
"""Return stem stages for part detection from least to most cleaned."""
|
|
resolution_clean = _RESOLUTION_TAG_RE.sub("", stem).strip()
|
|
actress_clean = _clean_stem_for_parts(stem)
|
|
out: list[str] = []
|
|
for candidate in (stem, resolution_clean, actress_clean):
|
|
if candidate and candidate not in out:
|
|
out.append(candidate)
|
|
return out
|
|
|
|
|
|
def detect_part_from_stem(stem: str) -> str | None:
|
|
"""Try part suffix rules before and after metadata cleanup."""
|
|
for candidate in _part_detection_stems(stem):
|
|
part = detect_part(candidate)
|
|
if part:
|
|
return part
|
|
return None
|
|
|
|
|
|
def extract_id(name: str) -> str | None:
|
|
stem = Path(name).stem
|
|
|
|
# Strip bracket wrapper: [REAL-779] -> REAL-779, [SCOP-297] [1080p] -> SCOP-297
|
|
effective_stem = stem
|
|
if stem.startswith("["):
|
|
bm = _BRACKET_ID_RE.match(stem)
|
|
if bm:
|
|
effective_stem = bm.group(1).strip()
|
|
|
|
m = PRIMARY_ID_RE.match(effective_stem)
|
|
if not m:
|
|
m = COMPOUND_ID_RE.match(effective_stem)
|
|
if not m:
|
|
m = FALLBACK_ID_RE.match(effective_stem)
|
|
if not m:
|
|
# No-hyphen fallback: MVSD312 -> MVSD-312
|
|
m = _NOHYPHEN_ID_RE.match(effective_stem)
|
|
if not m:
|
|
return None
|
|
|
|
num = int(m.group(2))
|
|
width = max(3, len(m.group(2)))
|
|
prefix = m.group(1).upper()
|
|
if prefix == "FC2":
|
|
prefix = "FC2-PPV"
|
|
|
|
# Check the character immediately after the matched digits.
|
|
# Lowercase -> variant designator (e.g. IBW-902z): fold into the base ID.
|
|
# Uppercase A-D -> part letter: handled below by detect_part.
|
|
# Anything else (space, '[', end-of-string) -> no variant.
|
|
after = effective_stem[m.end():m.end() + 1]
|
|
variant = after if after.islower() else ""
|
|
|
|
base = f"{prefix}-{num:0{width}d}{variant}"
|
|
|
|
# Use original stem (not effective_stem) so bracket-wrapped filenames like
|
|
# [REAL-779-1].mp4 still get part detection applied to the full stem.
|
|
# Run before and after metadata cleanup: raw suffixes such as
|
|
# "KV-118 - Actress_PART1" must survive, while trailing [1080p] tags still
|
|
# need cleanup before end-anchored detectors can match.
|
|
part = detect_part_from_stem(stem)
|
|
return f"{base}#part{part_key(part)}" if part else base
|
|
|
|
|
|
def normalize_id(raw: str) -> str | None:
|
|
return extract_id(raw + ".x") # add dummy ext so stem keeps the ID intact
|
|
|
|
|
|
def describe_id_match(display_query: str, matched_query: str, matched_id: str,
|
|
expansion_count: int) -> dict[str, str]:
|
|
"""Explain the matcher path used for one ID hit in JSON output."""
|
|
if "*" in matched_query or "?" in matched_query:
|
|
kind, label, confidence = "wildcard", "Wildcard ID", "broad"
|
|
elif expansion_count > 1:
|
|
kind, label, confidence = "range", "Range member", "expanded"
|
|
elif "#part" in matched_query:
|
|
kind, label, confidence = "exact_part", "Exact part ID", "high"
|
|
elif matched_id.startswith(matched_query + "#part"):
|
|
kind, label, confidence = "part", "Base ID + part", "related"
|
|
elif display_query.upper() != matched_query.upper():
|
|
kind, label, confidence = "normalized", "Normalized ID", "normalized"
|
|
else:
|
|
kind, label, confidence = "exact", "Exact ID", "high"
|
|
return {
|
|
"match_kind": kind,
|
|
"match_reason": label,
|
|
"match_confidence": confidence,
|
|
"matched_query": matched_query,
|
|
"matched_id": matched_id,
|
|
}
|
|
|
|
|
|
def current_rules_signature() -> str:
|
|
"""Sha256 over the canonical text of every rule that influences a jav_id.
|
|
|
|
Includes built-in regex sources, BUILTIN_PART_RES sources, and PART_RES
|
|
(which captures user-added part patterns applied by
|
|
`configure_part_patterns`). Output prefixed with `sha256:` so callers can
|
|
sniff the algorithm without re-deriving it.
|
|
|
|
Stable across invocations: dict is dumped with sort_keys=True. Bumping a
|
|
regex changes the digest; reordering BUILTIN_PART_RES also changes it
|
|
(order is part of the contract because part-detection short-circuits).
|
|
"""
|
|
import hashlib
|
|
import json as _json
|
|
data = {
|
|
"schema": 1, # bump when this signature schema itself changes
|
|
"primary": PRIMARY_ID_RE.pattern,
|
|
"compound": COMPOUND_ID_RE.pattern,
|
|
"fallback": FALLBACK_ID_RE.pattern,
|
|
"nohyphen": _NOHYPHEN_ID_RE.pattern,
|
|
"bracket": _BRACKET_ID_RE.pattern,
|
|
"variant": _VARIANT_SUFFIX_RE.pattern,
|
|
"xofy": _XOFY_PRIORITY_RE.pattern,
|
|
"resolution_tag": _RESOLUTION_TAG_RE.pattern,
|
|
"builtin_part_res": [r.pattern for r in BUILTIN_PART_RES],
|
|
"part_res": [r.pattern for r in PART_RES],
|
|
"fc2_handling": "fc2_to_ppv",
|
|
}
|
|
text = _json.dumps(data, sort_keys=True, ensure_ascii=False)
|
|
return "sha256:" + hashlib.sha256(text.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def expand_range(raw: str) -> list[str] | None:
|
|
"""Expand a bracket range like 'IPZZ-[820-860]' into individual ID strings.
|
|
Returns None if no range marker present."""
|
|
m = RANGE_RE.search(raw)
|
|
if not m:
|
|
return None
|
|
a, b = int(m.group(1)), int(m.group(2))
|
|
lo, hi = (a, b) if a <= b else (b, a)
|
|
width = max(len(m.group(1)), len(m.group(2))) # preserve zero-padding
|
|
return [raw[:m.start()] + f"{n:0{width}d}" + raw[m.end():] for n in range(lo, hi + 1)]
|