Step 10e: extract WinCatalog ingest into rcjav/catalog.py

This commit is contained in:
admin
2026-05-22 21:51:09 +02:00
parent 8d636ec633
commit 41f7c80f1b
3 changed files with 201 additions and 156 deletions
+13 -156
View File
@@ -155,12 +155,17 @@ DEFAULT_TARGET = ["cq:personal-files/JAV/TMP"]
# Default WinCatalog export folder (or specific files). Folder entries auto-discover *.csv / *.xml. # Default WinCatalog export folder (or specific files). Folder entries auto-discover *.csv / *.xml.
DEFAULT_CATALOG: list[str] = [str(Path(__file__).resolve().parent / "wincatalog")] DEFAULT_CATALOG: list[str] = [str(Path(__file__).resolve().parent / "wincatalog")]
# CSV column synonyms (lowercased) — first matching one wins. from rcjav.catalog import (
CATALOG_COL_NAME = ("name", "file name", "filename", "title") CATALOG_COL_NAME,
CATALOG_COL_PATH = ("path", "full path", "location", "folder") CATALOG_COL_PATH,
CATALOG_COL_SIZE = ("size", "file size", "bytes", "size (bytes)") CATALOG_COL_SIZE,
CATALOG_COL_DISC = ("disc", "disc name", "disc label", "volume", "source", "catalog", "media") CATALOG_COL_DISC,
normalize_catalog_path,
load_catalog_csv,
load_catalog_xml,
load_catalogs,
_expand_catalog_paths,
)
from rcjav.cache import ( from rcjav.cache import (
CACHE_PATH, CACHE_PATH,
CACHE_VERSION, CACHE_VERSION,
@@ -208,154 +213,6 @@ def save_config(cfg: dict) -> None:
os.replace(tmp, CONFIG_PATH) os.replace(tmp, CONFIG_PATH)
# ---------- WinCatalog ingest ----------
def _pick_col(headers_lower: list[str], synonyms: tuple[str, ...]) -> str | None:
for s in synonyms:
if s in headers_lower:
return s
return None
def normalize_catalog_path(path: str) -> str:
"""Keep catalog paths display-compatible with rclone-style path consumers."""
p = (path or "").replace("\\", "/")
if p.startswith("//"):
return "//" + re.sub(r"/+", "/", p[2:])
return re.sub(r"/+", "/", p)
def load_catalog_csv(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]:
"""Load a WinCatalog CSV export. Lenient about column names."""
entries: list[FileEntry] = []
with path.open("r", encoding="utf-8-sig", newline="") as f:
# Sniff delimiter
sample = f.read(4096)
f.seek(0)
try:
dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|")
except csv.Error:
dialect = csv.excel
reader = csv.DictReader(f, dialect=dialect)
if not reader.fieldnames:
return entries
headers: dict[str, str] = {}
for h in reader.fieldnames:
hl = h.lower()
if hl not in headers:
headers[hl] = h
col_name = _pick_col(list(headers), CATALOG_COL_NAME)
col_path = _pick_col(list(headers), CATALOG_COL_PATH)
col_size = _pick_col(list(headers), CATALOG_COL_SIZE)
col_disc = _pick_col(list(headers), CATALOG_COL_DISC)
if not col_name and not col_path:
console.print(f"[yellow]WARN: catalog CSV {path} has no Name/Path columns; skipping.[/]")
return entries
for row in reader:
name = (row.get(headers[col_name]) if col_name else "") or ""
full_path = (row.get(headers[col_path]) if col_path else "") or ""
if not name and full_path:
name = Path(full_path).name
full_path = normalize_catalog_path(full_path)
if not name:
continue
jav_id = extract_id(name)
if not jav_id:
skipped.append((f"catalog:{path.name}", full_path or name))
continue
try:
size = int(row.get(headers[col_size], 0)) if col_size else 0
except (ValueError, TypeError):
size = 0
disc = (row.get(headers[col_disc]) if col_disc else "") or ""
# Encode disc label into "remote" so it surfaces in output.
remote_label = f"catalog:{disc}" if disc else f"catalog:{path.name}"
entries.append(FileEntry(
source="Catalog", remote=remote_label,
path=full_path or name, size=size, mod_time="",
jav_id=jav_id,
))
return entries
def _strip_xml_ns(tag: str) -> str:
"""Remove Clark-notation namespace {uri}local → local."""
return tag.split("}")[-1] if "}" in tag else tag
def load_catalog_xml(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]:
"""Load a WinCatalog XML export. Walks for any element with file-like attrs."""
entries: list[FileEntry] = []
tree = ET.parse(str(path))
root = tree.getroot()
def walk(node, disc_label: str, parent_path: str, _depth: int = 0):
if _depth > 500:
return
tag = _strip_xml_ns(node.tag).lower()
# Heuristics: disc/catalog/source containers reset disc_label
if tag in ("disc", "catalog", "source", "volume", "media"):
disc_label = node.get("name") or node.get("Name") or disc_label
# File-like nodes
if tag in ("file", "f"):
name = node.get("name") or node.get("Name") or node.findtext("Name") or ""
size_raw = node.get("size") or node.get("Size") or node.findtext("Size") or "0"
try:
size = int(size_raw)
except ValueError:
size = 0
full_path = normalize_catalog_path(f"{parent_path}/{name}" if parent_path else name)
jav_id = extract_id(name)
if jav_id:
entries.append(FileEntry(
source="Catalog",
remote=f"catalog:{disc_label}" if disc_label else f"catalog:{path.name}",
path=full_path, size=size, mod_time="", jav_id=jav_id,
))
else:
skipped.append((f"catalog:{disc_label or path.name}", full_path))
return
# Folder-like: extend parent_path
if tag in ("folder", "dir", "directory"):
folder_name = node.get("name") or node.get("Name") or ""
parent_path = normalize_catalog_path(f"{parent_path}/{folder_name}" if parent_path else folder_name)
for child in node:
walk(child, disc_label, parent_path, _depth + 1)
walk(root, "", "")
return entries
def _expand_catalog_paths(paths: list[str]) -> list[Path]:
"""Expand any directories to their *.csv / *.xml children. Files passed through."""
out: list[Path] = []
for p in paths:
cp = Path(p)
if cp.is_dir():
for child in sorted(cp.iterdir()):
if child.suffix.lower() in (".csv", ".xml") and child.is_file():
out.append(child)
elif cp.exists():
out.append(cp)
# silently skip missing default dir; warn for everything else
elif Path(p).resolve() not in {Path(d).resolve() for d in DEFAULT_CATALOG}:
console.print(f"[yellow]WARN: catalog path not found: {p}[/]")
return out
def load_catalogs(paths: list[str], skipped: list[tuple[str, str]]) -> list[FileEntry]:
out: list[FileEntry] = []
for cp in _expand_catalog_paths(paths):
ext = cp.suffix.lower()
if ext == ".csv":
out.extend(load_catalog_csv(cp, skipped))
elif ext == ".xml":
out.extend(load_catalog_xml(cp, skipped))
else:
console.print(f"[yellow]WARN: unknown catalog format '{ext}' for {cp}; skipping.[/]")
return out
# ---------- quick search (no cache) ---------- # ---------- quick search (no cache) ----------
def quick_search_remote(remote: str, source_label: str, def quick_search_remote(remote: str, source_label: str,
@@ -1552,7 +1409,7 @@ def main():
catalog_entries: list[FileEntry] = [] catalog_entries: list[FileEntry] = []
phase_t0 = time.perf_counter() phase_t0 = time.perf_counter()
for cp_str in args.catalog: for cp_str in args.catalog:
for cp in _expand_catalog_paths([cp_str]): for cp in _expand_catalog_paths([cp_str], default_paths=DEFAULT_CATALOG):
ext = cp.suffix.lower() ext = cp.suffix.lower()
if ext == ".csv": if ext == ".csv":
one = load_catalog_csv(cp, skipped) one = load_catalog_csv(cp, skipped)
@@ -1584,7 +1441,7 @@ def main():
remotes_by_label = ([("Source", r) for r in args.source] remotes_by_label = ([("Source", r) for r in args.source]
+ [("Target", r) for r in args.target]) + [("Target", r) for r in args.target])
entries = collect_with_progress(remotes_by_label, skipped) entries = collect_with_progress(remotes_by_label, skipped)
entries.extend(load_catalogs(args.catalog, skipped)) entries.extend(load_catalogs(args.catalog, skipped, default_paths=DEFAULT_CATALOG))
elapsed = time.perf_counter() - t0 elapsed = time.perf_counter() - t0
if BASIC: if BASIC:
+10
View File
@@ -6,6 +6,16 @@ find at the top level. Adding a new submodule does not change the
public surface — only this file does. public surface — only this file does.
""" """
from rcjav.model import FileEntry # noqa: F401 from rcjav.model import FileEntry # noqa: F401
from rcjav.catalog import ( # noqa: F401
CATALOG_COL_NAME,
CATALOG_COL_PATH,
CATALOG_COL_SIZE,
CATALOG_COL_DISC,
normalize_catalog_path,
load_catalog_csv,
load_catalog_xml,
load_catalogs,
)
from rcjav.dupes import ( # noqa: F401 from rcjav.dupes import ( # noqa: F401
DEFAULT_KEEP_RANKING, DEFAULT_KEEP_RANKING,
set_keep_ranking, set_keep_ranking,
+178
View File
@@ -0,0 +1,178 @@
"""WinCatalog ingest — CSV and XML.
Catalog entries are offline references (e.g. an exported disc index
from WinCatalog). They show up in dupe output but never participate
in keep ranking — rcjav.dupes filters Catalog-sourced entries out
before choosing a winner.
Warnings are written to stderr without rich markup; the calling
module owns terminal styling.
"""
from __future__ import annotations
import csv
import re
import sys
import xml.etree.ElementTree as ET
from pathlib import Path
from rcjav.ids import extract_id
from rcjav.model import FileEntry
CATALOG_COL_NAME = ("name", "file name", "filename", "title")
CATALOG_COL_PATH = ("path", "full path", "location", "folder")
CATALOG_COL_SIZE = ("size", "file size", "bytes", "size (bytes)")
CATALOG_COL_DISC = ("disc", "disc name", "disc label", "volume", "source", "catalog", "media")
def _warn(msg: str) -> None:
sys.stderr.write(f"WARN: {msg}\n")
def _pick_col(headers_lower: list[str], synonyms: tuple[str, ...]) -> str | None:
for s in synonyms:
if s in headers_lower:
return s
return None
def normalize_catalog_path(path: str) -> str:
"""Keep catalog paths display-compatible with rclone-style path consumers."""
p = (path or "").replace("\\", "/")
if p.startswith("//"):
return "//" + re.sub(r"/+", "/", p[2:])
return re.sub(r"/+", "/", p)
def load_catalog_csv(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]:
"""Load a WinCatalog CSV export. Lenient about column names."""
entries: list[FileEntry] = []
with path.open("r", encoding="utf-8-sig", newline="") as f:
sample = f.read(4096)
f.seek(0)
try:
dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|")
except csv.Error:
dialect = csv.excel
reader = csv.DictReader(f, dialect=dialect)
if not reader.fieldnames:
return entries
headers: dict[str, str] = {}
for h in reader.fieldnames:
hl = h.lower()
if hl not in headers:
headers[hl] = h
col_name = _pick_col(list(headers), CATALOG_COL_NAME)
col_path = _pick_col(list(headers), CATALOG_COL_PATH)
col_size = _pick_col(list(headers), CATALOG_COL_SIZE)
col_disc = _pick_col(list(headers), CATALOG_COL_DISC)
if not col_name and not col_path:
_warn(f"catalog CSV {path} has no Name/Path columns; skipping.")
return entries
for row in reader:
name = (row.get(headers[col_name]) if col_name else "") or ""
full_path = (row.get(headers[col_path]) if col_path else "") or ""
if not name and full_path:
name = Path(full_path).name
full_path = normalize_catalog_path(full_path)
if not name:
continue
jav_id = extract_id(name)
if not jav_id:
skipped.append((f"catalog:{path.name}", full_path or name))
continue
try:
size = int(row.get(headers[col_size], 0)) if col_size else 0
except (ValueError, TypeError):
size = 0
disc = (row.get(headers[col_disc]) if col_disc else "") or ""
# Encode disc label into "remote" so it surfaces in output.
remote_label = f"catalog:{disc}" if disc else f"catalog:{path.name}"
entries.append(FileEntry(
source="Catalog", remote=remote_label,
path=full_path or name, size=size, mod_time="",
jav_id=jav_id,
))
return entries
def _strip_xml_ns(tag: str) -> str:
"""Remove Clark-notation namespace {uri}local -> local."""
return tag.split("}")[-1] if "}" in tag else tag
def load_catalog_xml(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]:
"""Load a WinCatalog XML export. Walks for any element with file-like attrs."""
entries: list[FileEntry] = []
tree = ET.parse(str(path))
root = tree.getroot()
def walk(node, disc_label: str, parent_path: str, _depth: int = 0):
if _depth > 500:
return
tag = _strip_xml_ns(node.tag).lower()
if tag in ("disc", "catalog", "source", "volume", "media"):
disc_label = node.get("name") or node.get("Name") or disc_label
if tag in ("file", "f"):
name = node.get("name") or node.get("Name") or node.findtext("Name") or ""
size_raw = node.get("size") or node.get("Size") or node.findtext("Size") or "0"
try:
size = int(size_raw)
except ValueError:
size = 0
full_path = normalize_catalog_path(f"{parent_path}/{name}" if parent_path else name)
jav_id = extract_id(name)
if jav_id:
entries.append(FileEntry(
source="Catalog",
remote=f"catalog:{disc_label}" if disc_label else f"catalog:{path.name}",
path=full_path, size=size, mod_time="", jav_id=jav_id,
))
else:
skipped.append((f"catalog:{disc_label or path.name}", full_path))
return
if tag in ("folder", "dir", "directory"):
folder_name = node.get("name") or node.get("Name") or ""
parent_path = normalize_catalog_path(f"{parent_path}/{folder_name}" if parent_path else folder_name)
for child in node:
walk(child, disc_label, parent_path, _depth + 1)
walk(root, "", "")
return entries
def _expand_catalog_paths(paths: list[str], default_paths: list[str] | None = None) -> list[Path]:
"""Expand any directories to their *.csv / *.xml children. Files passed through.
`default_paths` is the configured DEFAULT_CATALOG list; missing paths inside
that set are silently skipped (it's normal to not have a catalog dir).
Missing paths outside the default set produce a warning.
"""
defaults = {Path(d).resolve() for d in (default_paths or [])}
out: list[Path] = []
for p in paths:
cp = Path(p)
if cp.is_dir():
for child in sorted(cp.iterdir()):
if child.suffix.lower() in (".csv", ".xml") and child.is_file():
out.append(child)
elif cp.exists():
out.append(cp)
elif Path(p).resolve() not in defaults:
_warn(f"catalog path not found: {p}")
return out
def load_catalogs(paths: list[str], skipped: list[tuple[str, str]],
default_paths: list[str] | None = None) -> list[FileEntry]:
out: list[FileEntry] = []
for cp in _expand_catalog_paths(paths, default_paths=default_paths):
ext = cp.suffix.lower()
if ext == ".csv":
out.extend(load_catalog_csv(cp, skipped))
elif ext == ".xml":
out.extend(load_catalog_xml(cp, skipped))
else:
_warn(f"unknown catalog format '{ext}' for {cp}; skipping.")
return out