From 41f7c80f1bd8f676df3ab10003034b8d9799ff72 Mon Sep 17 00:00:00 2001 From: admin Date: Fri, 22 May 2026 21:51:09 +0200 Subject: [PATCH] Step 10e: extract WinCatalog ingest into rcjav/catalog.py --- rc-jav.py | 169 ++++--------------------------------------- rcjav/__init__.py | 10 +++ rcjav/catalog.py | 178 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 201 insertions(+), 156 deletions(-) create mode 100644 rcjav/catalog.py diff --git a/rc-jav.py b/rc-jav.py index 0c6d108..8701e29 100644 --- a/rc-jav.py +++ b/rc-jav.py @@ -155,12 +155,17 @@ DEFAULT_TARGET = ["cq:personal-files/JAV/TMP"] # Default WinCatalog export folder (or specific files). Folder entries auto-discover *.csv / *.xml. DEFAULT_CATALOG: list[str] = [str(Path(__file__).resolve().parent / "wincatalog")] -# CSV column synonyms (lowercased) — first matching one wins. -CATALOG_COL_NAME = ("name", "file name", "filename", "title") -CATALOG_COL_PATH = ("path", "full path", "location", "folder") -CATALOG_COL_SIZE = ("size", "file size", "bytes", "size (bytes)") -CATALOG_COL_DISC = ("disc", "disc name", "disc label", "volume", "source", "catalog", "media") - +from rcjav.catalog import ( + CATALOG_COL_NAME, + CATALOG_COL_PATH, + CATALOG_COL_SIZE, + CATALOG_COL_DISC, + normalize_catalog_path, + load_catalog_csv, + load_catalog_xml, + load_catalogs, + _expand_catalog_paths, +) from rcjav.cache import ( CACHE_PATH, CACHE_VERSION, @@ -208,154 +213,6 @@ def save_config(cfg: dict) -> None: os.replace(tmp, CONFIG_PATH) -# ---------- WinCatalog ingest ---------- - -def _pick_col(headers_lower: list[str], synonyms: tuple[str, ...]) -> str | None: - for s in synonyms: - if s in headers_lower: - return s - return None - - -def normalize_catalog_path(path: str) -> str: - """Keep catalog paths display-compatible with rclone-style path consumers.""" - p = (path or "").replace("\\", "/") - if p.startswith("//"): - return "//" + re.sub(r"/+", "/", p[2:]) - return re.sub(r"/+", "/", p) - - -def load_catalog_csv(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]: - """Load a WinCatalog CSV export. Lenient about column names.""" - entries: list[FileEntry] = [] - with path.open("r", encoding="utf-8-sig", newline="") as f: - # Sniff delimiter - sample = f.read(4096) - f.seek(0) - try: - dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|") - except csv.Error: - dialect = csv.excel - reader = csv.DictReader(f, dialect=dialect) - if not reader.fieldnames: - return entries - headers: dict[str, str] = {} - for h in reader.fieldnames: - hl = h.lower() - if hl not in headers: - headers[hl] = h - col_name = _pick_col(list(headers), CATALOG_COL_NAME) - col_path = _pick_col(list(headers), CATALOG_COL_PATH) - col_size = _pick_col(list(headers), CATALOG_COL_SIZE) - col_disc = _pick_col(list(headers), CATALOG_COL_DISC) - if not col_name and not col_path: - console.print(f"[yellow]WARN: catalog CSV {path} has no Name/Path columns; skipping.[/]") - return entries - for row in reader: - name = (row.get(headers[col_name]) if col_name else "") or "" - full_path = (row.get(headers[col_path]) if col_path else "") or "" - if not name and full_path: - name = Path(full_path).name - full_path = normalize_catalog_path(full_path) - if not name: - continue - jav_id = extract_id(name) - if not jav_id: - skipped.append((f"catalog:{path.name}", full_path or name)) - continue - try: - size = int(row.get(headers[col_size], 0)) if col_size else 0 - except (ValueError, TypeError): - size = 0 - disc = (row.get(headers[col_disc]) if col_disc else "") or "" - # Encode disc label into "remote" so it surfaces in output. - remote_label = f"catalog:{disc}" if disc else f"catalog:{path.name}" - entries.append(FileEntry( - source="Catalog", remote=remote_label, - path=full_path or name, size=size, mod_time="", - jav_id=jav_id, - )) - return entries - - -def _strip_xml_ns(tag: str) -> str: - """Remove Clark-notation namespace {uri}local → local.""" - return tag.split("}")[-1] if "}" in tag else tag - - -def load_catalog_xml(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]: - """Load a WinCatalog XML export. Walks for any element with file-like attrs.""" - entries: list[FileEntry] = [] - tree = ET.parse(str(path)) - root = tree.getroot() - - def walk(node, disc_label: str, parent_path: str, _depth: int = 0): - if _depth > 500: - return - tag = _strip_xml_ns(node.tag).lower() - # Heuristics: disc/catalog/source containers reset disc_label - if tag in ("disc", "catalog", "source", "volume", "media"): - disc_label = node.get("name") or node.get("Name") or disc_label - # File-like nodes - if tag in ("file", "f"): - name = node.get("name") or node.get("Name") or node.findtext("Name") or "" - size_raw = node.get("size") or node.get("Size") or node.findtext("Size") or "0" - try: - size = int(size_raw) - except ValueError: - size = 0 - full_path = normalize_catalog_path(f"{parent_path}/{name}" if parent_path else name) - jav_id = extract_id(name) - if jav_id: - entries.append(FileEntry( - source="Catalog", - remote=f"catalog:{disc_label}" if disc_label else f"catalog:{path.name}", - path=full_path, size=size, mod_time="", jav_id=jav_id, - )) - else: - skipped.append((f"catalog:{disc_label or path.name}", full_path)) - return - # Folder-like: extend parent_path - if tag in ("folder", "dir", "directory"): - folder_name = node.get("name") or node.get("Name") or "" - parent_path = normalize_catalog_path(f"{parent_path}/{folder_name}" if parent_path else folder_name) - for child in node: - walk(child, disc_label, parent_path, _depth + 1) - - walk(root, "", "") - return entries - - -def _expand_catalog_paths(paths: list[str]) -> list[Path]: - """Expand any directories to their *.csv / *.xml children. Files passed through.""" - out: list[Path] = [] - for p in paths: - cp = Path(p) - if cp.is_dir(): - for child in sorted(cp.iterdir()): - if child.suffix.lower() in (".csv", ".xml") and child.is_file(): - out.append(child) - elif cp.exists(): - out.append(cp) - # silently skip missing default dir; warn for everything else - elif Path(p).resolve() not in {Path(d).resolve() for d in DEFAULT_CATALOG}: - console.print(f"[yellow]WARN: catalog path not found: {p}[/]") - return out - - -def load_catalogs(paths: list[str], skipped: list[tuple[str, str]]) -> list[FileEntry]: - out: list[FileEntry] = [] - for cp in _expand_catalog_paths(paths): - ext = cp.suffix.lower() - if ext == ".csv": - out.extend(load_catalog_csv(cp, skipped)) - elif ext == ".xml": - out.extend(load_catalog_xml(cp, skipped)) - else: - console.print(f"[yellow]WARN: unknown catalog format '{ext}' for {cp}; skipping.[/]") - return out - - # ---------- quick search (no cache) ---------- def quick_search_remote(remote: str, source_label: str, @@ -1552,7 +1409,7 @@ def main(): catalog_entries: list[FileEntry] = [] phase_t0 = time.perf_counter() for cp_str in args.catalog: - for cp in _expand_catalog_paths([cp_str]): + for cp in _expand_catalog_paths([cp_str], default_paths=DEFAULT_CATALOG): ext = cp.suffix.lower() if ext == ".csv": one = load_catalog_csv(cp, skipped) @@ -1584,7 +1441,7 @@ def main(): remotes_by_label = ([("Source", r) for r in args.source] + [("Target", r) for r in args.target]) entries = collect_with_progress(remotes_by_label, skipped) - entries.extend(load_catalogs(args.catalog, skipped)) + entries.extend(load_catalogs(args.catalog, skipped, default_paths=DEFAULT_CATALOG)) elapsed = time.perf_counter() - t0 if BASIC: diff --git a/rcjav/__init__.py b/rcjav/__init__.py index 240adbd..72c7fa6 100644 --- a/rcjav/__init__.py +++ b/rcjav/__init__.py @@ -6,6 +6,16 @@ find at the top level. Adding a new submodule does not change the public surface — only this file does. """ from rcjav.model import FileEntry # noqa: F401 +from rcjav.catalog import ( # noqa: F401 + CATALOG_COL_NAME, + CATALOG_COL_PATH, + CATALOG_COL_SIZE, + CATALOG_COL_DISC, + normalize_catalog_path, + load_catalog_csv, + load_catalog_xml, + load_catalogs, +) from rcjav.dupes import ( # noqa: F401 DEFAULT_KEEP_RANKING, set_keep_ranking, diff --git a/rcjav/catalog.py b/rcjav/catalog.py new file mode 100644 index 0000000..2f0f519 --- /dev/null +++ b/rcjav/catalog.py @@ -0,0 +1,178 @@ +"""WinCatalog ingest — CSV and XML. + +Catalog entries are offline references (e.g. an exported disc index +from WinCatalog). They show up in dupe output but never participate +in keep ranking — rcjav.dupes filters Catalog-sourced entries out +before choosing a winner. + +Warnings are written to stderr without rich markup; the calling +module owns terminal styling. +""" +from __future__ import annotations + +import csv +import re +import sys +import xml.etree.ElementTree as ET +from pathlib import Path + +from rcjav.ids import extract_id +from rcjav.model import FileEntry + + +CATALOG_COL_NAME = ("name", "file name", "filename", "title") +CATALOG_COL_PATH = ("path", "full path", "location", "folder") +CATALOG_COL_SIZE = ("size", "file size", "bytes", "size (bytes)") +CATALOG_COL_DISC = ("disc", "disc name", "disc label", "volume", "source", "catalog", "media") + + +def _warn(msg: str) -> None: + sys.stderr.write(f"WARN: {msg}\n") + + +def _pick_col(headers_lower: list[str], synonyms: tuple[str, ...]) -> str | None: + for s in synonyms: + if s in headers_lower: + return s + return None + + +def normalize_catalog_path(path: str) -> str: + """Keep catalog paths display-compatible with rclone-style path consumers.""" + p = (path or "").replace("\\", "/") + if p.startswith("//"): + return "//" + re.sub(r"/+", "/", p[2:]) + return re.sub(r"/+", "/", p) + + +def load_catalog_csv(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]: + """Load a WinCatalog CSV export. Lenient about column names.""" + entries: list[FileEntry] = [] + with path.open("r", encoding="utf-8-sig", newline="") as f: + sample = f.read(4096) + f.seek(0) + try: + dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|") + except csv.Error: + dialect = csv.excel + reader = csv.DictReader(f, dialect=dialect) + if not reader.fieldnames: + return entries + headers: dict[str, str] = {} + for h in reader.fieldnames: + hl = h.lower() + if hl not in headers: + headers[hl] = h + col_name = _pick_col(list(headers), CATALOG_COL_NAME) + col_path = _pick_col(list(headers), CATALOG_COL_PATH) + col_size = _pick_col(list(headers), CATALOG_COL_SIZE) + col_disc = _pick_col(list(headers), CATALOG_COL_DISC) + if not col_name and not col_path: + _warn(f"catalog CSV {path} has no Name/Path columns; skipping.") + return entries + for row in reader: + name = (row.get(headers[col_name]) if col_name else "") or "" + full_path = (row.get(headers[col_path]) if col_path else "") or "" + if not name and full_path: + name = Path(full_path).name + full_path = normalize_catalog_path(full_path) + if not name: + continue + jav_id = extract_id(name) + if not jav_id: + skipped.append((f"catalog:{path.name}", full_path or name)) + continue + try: + size = int(row.get(headers[col_size], 0)) if col_size else 0 + except (ValueError, TypeError): + size = 0 + disc = (row.get(headers[col_disc]) if col_disc else "") or "" + # Encode disc label into "remote" so it surfaces in output. + remote_label = f"catalog:{disc}" if disc else f"catalog:{path.name}" + entries.append(FileEntry( + source="Catalog", remote=remote_label, + path=full_path or name, size=size, mod_time="", + jav_id=jav_id, + )) + return entries + + +def _strip_xml_ns(tag: str) -> str: + """Remove Clark-notation namespace {uri}local -> local.""" + return tag.split("}")[-1] if "}" in tag else tag + + +def load_catalog_xml(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]: + """Load a WinCatalog XML export. Walks for any element with file-like attrs.""" + entries: list[FileEntry] = [] + tree = ET.parse(str(path)) + root = tree.getroot() + + def walk(node, disc_label: str, parent_path: str, _depth: int = 0): + if _depth > 500: + return + tag = _strip_xml_ns(node.tag).lower() + if tag in ("disc", "catalog", "source", "volume", "media"): + disc_label = node.get("name") or node.get("Name") or disc_label + if tag in ("file", "f"): + name = node.get("name") or node.get("Name") or node.findtext("Name") or "" + size_raw = node.get("size") or node.get("Size") or node.findtext("Size") or "0" + try: + size = int(size_raw) + except ValueError: + size = 0 + full_path = normalize_catalog_path(f"{parent_path}/{name}" if parent_path else name) + jav_id = extract_id(name) + if jav_id: + entries.append(FileEntry( + source="Catalog", + remote=f"catalog:{disc_label}" if disc_label else f"catalog:{path.name}", + path=full_path, size=size, mod_time="", jav_id=jav_id, + )) + else: + skipped.append((f"catalog:{disc_label or path.name}", full_path)) + return + if tag in ("folder", "dir", "directory"): + folder_name = node.get("name") or node.get("Name") or "" + parent_path = normalize_catalog_path(f"{parent_path}/{folder_name}" if parent_path else folder_name) + for child in node: + walk(child, disc_label, parent_path, _depth + 1) + + walk(root, "", "") + return entries + + +def _expand_catalog_paths(paths: list[str], default_paths: list[str] | None = None) -> list[Path]: + """Expand any directories to their *.csv / *.xml children. Files passed through. + + `default_paths` is the configured DEFAULT_CATALOG list; missing paths inside + that set are silently skipped (it's normal to not have a catalog dir). + Missing paths outside the default set produce a warning. + """ + defaults = {Path(d).resolve() for d in (default_paths or [])} + out: list[Path] = [] + for p in paths: + cp = Path(p) + if cp.is_dir(): + for child in sorted(cp.iterdir()): + if child.suffix.lower() in (".csv", ".xml") and child.is_file(): + out.append(child) + elif cp.exists(): + out.append(cp) + elif Path(p).resolve() not in defaults: + _warn(f"catalog path not found: {p}") + return out + + +def load_catalogs(paths: list[str], skipped: list[tuple[str, str]], + default_paths: list[str] | None = None) -> list[FileEntry]: + out: list[FileEntry] = [] + for cp in _expand_catalog_paths(paths, default_paths=default_paths): + ext = cp.suffix.lower() + if ext == ".csv": + out.extend(load_catalog_csv(cp, skipped)) + elif ext == ".xml": + out.extend(load_catalog_xml(cp, skipped)) + else: + _warn(f"unknown catalog format '{ext}' for {cp}; skipping.") + return out