"""WinCatalog ingest — CSV and XML. Catalog entries are offline references (e.g. an exported disc index from WinCatalog). They show up in dupe output but never participate in keep ranking — rcjav.dupes filters Catalog-sourced entries out before choosing a winner. Warnings are written to stderr without rich markup; the calling module owns terminal styling. """ from __future__ import annotations import csv import re import sys import xml.etree.ElementTree as ET from pathlib import Path from rcjav.ids import extract_id from rcjav.model import FileEntry CATALOG_COL_NAME = ("name", "file name", "filename", "title") CATALOG_COL_PATH = ("path", "full path", "location", "folder") CATALOG_COL_SIZE = ("size", "file size", "bytes", "size (bytes)") CATALOG_COL_DISC = ("disc", "disc name", "disc label", "volume", "source", "catalog", "media") def _warn(msg: str) -> None: sys.stderr.write(f"WARN: {msg}\n") def _pick_col(headers_lower: list[str], synonyms: tuple[str, ...]) -> str | None: for s in synonyms: if s in headers_lower: return s return None def normalize_catalog_path(path: str) -> str: """Keep catalog paths display-compatible with rclone-style path consumers.""" p = (path or "").replace("\\", "/") if p.startswith("//"): return "//" + re.sub(r"/+", "/", p[2:]) return re.sub(r"/+", "/", p) def load_catalog_csv(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]: """Load a WinCatalog CSV export. Lenient about column names.""" entries: list[FileEntry] = [] with path.open("r", encoding="utf-8-sig", newline="") as f: sample = f.read(4096) f.seek(0) try: dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|") except csv.Error: dialect = csv.excel reader = csv.DictReader(f, dialect=dialect) if not reader.fieldnames: return entries headers: dict[str, str] = {} for h in reader.fieldnames: hl = h.lower() if hl not in headers: headers[hl] = h col_name = _pick_col(list(headers), CATALOG_COL_NAME) col_path = _pick_col(list(headers), CATALOG_COL_PATH) col_size = _pick_col(list(headers), CATALOG_COL_SIZE) col_disc = _pick_col(list(headers), CATALOG_COL_DISC) if not col_name and not col_path: _warn(f"catalog CSV {path} has no Name/Path columns; skipping.") return entries for row in reader: name = (row.get(headers[col_name]) if col_name else "") or "" full_path = (row.get(headers[col_path]) if col_path else "") or "" if not name and full_path: name = Path(full_path).name full_path = normalize_catalog_path(full_path) if not name: continue jav_id = extract_id(name) if not jav_id: skipped.append((f"catalog:{path.name}", full_path or name)) continue try: size = int(row.get(headers[col_size], 0)) if col_size else 0 except (ValueError, TypeError): size = 0 disc = (row.get(headers[col_disc]) if col_disc else "") or "" # Encode disc label into "remote" so it surfaces in output. remote_label = f"catalog:{disc}" if disc else f"catalog:{path.name}" entries.append(FileEntry( source="Catalog", remote=remote_label, path=full_path or name, size=size, mod_time="", jav_id=jav_id, )) return entries def _strip_xml_ns(tag: str) -> str: """Remove Clark-notation namespace {uri}local -> local.""" return tag.split("}")[-1] if "}" in tag else tag def load_catalog_xml(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]: """Load a WinCatalog XML export. Walks for any element with file-like attrs.""" entries: list[FileEntry] = [] tree = ET.parse(str(path)) root = tree.getroot() def walk(node, disc_label: str, parent_path: str, _depth: int = 0): if _depth > 500: return tag = _strip_xml_ns(node.tag).lower() if tag in ("disc", "catalog", "source", "volume", "media"): disc_label = node.get("name") or node.get("Name") or disc_label if tag in ("file", "f"): name = node.get("name") or node.get("Name") or node.findtext("Name") or "" size_raw = node.get("size") or node.get("Size") or node.findtext("Size") or "0" try: size = int(size_raw) except ValueError: size = 0 full_path = normalize_catalog_path(f"{parent_path}/{name}" if parent_path else name) jav_id = extract_id(name) if jav_id: entries.append(FileEntry( source="Catalog", remote=f"catalog:{disc_label}" if disc_label else f"catalog:{path.name}", path=full_path, size=size, mod_time="", jav_id=jav_id, )) else: skipped.append((f"catalog:{disc_label or path.name}", full_path)) return if tag in ("folder", "dir", "directory"): folder_name = node.get("name") or node.get("Name") or "" parent_path = normalize_catalog_path(f"{parent_path}/{folder_name}" if parent_path else folder_name) for child in node: walk(child, disc_label, parent_path, _depth + 1) walk(root, "", "") return entries def _expand_catalog_paths(paths: list[str], default_paths: list[str] | None = None) -> list[Path]: """Expand any directories to their *.csv / *.xml children. Files passed through. `default_paths` is the configured DEFAULT_CATALOG list; missing paths inside that set are silently skipped (it's normal to not have a catalog dir). Missing paths outside the default set produce a warning. """ defaults = {Path(d).resolve() for d in (default_paths or [])} out: list[Path] = [] for p in paths: cp = Path(p) if cp.is_dir(): for child in sorted(cp.iterdir()): if child.suffix.lower() in (".csv", ".xml") and child.is_file(): out.append(child) elif cp.exists(): out.append(cp) elif Path(p).resolve() not in defaults: _warn(f"catalog path not found: {p}") return out def load_catalogs(paths: list[str], skipped: list[tuple[str, str]], default_paths: list[str] | None = None) -> list[FileEntry]: out: list[FileEntry] = [] for cp in _expand_catalog_paths(paths, default_paths=default_paths): ext = cp.suffix.lower() if ext == ".csv": out.extend(load_catalog_csv(cp, skipped)) elif ext == ".xml": out.extend(load_catalog_xml(cp, skipped)) else: _warn(f"unknown catalog format '{ext}' for {cp}; skipping.") return out