179 lines
6.9 KiB
Python
179 lines
6.9 KiB
Python
"""WinCatalog ingest — CSV and XML.
|
|
|
|
Catalog entries are offline references (e.g. an exported disc index
|
|
from WinCatalog). They show up in dupe output but never participate
|
|
in keep ranking — rcjav.dupes filters Catalog-sourced entries out
|
|
before choosing a winner.
|
|
|
|
Warnings are written to stderr without rich markup; the calling
|
|
module owns terminal styling.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import re
|
|
import sys
|
|
import xml.etree.ElementTree as ET
|
|
from pathlib import Path
|
|
|
|
from rcjav.ids import extract_id
|
|
from rcjav.model import FileEntry
|
|
|
|
|
|
CATALOG_COL_NAME = ("name", "file name", "filename", "title")
|
|
CATALOG_COL_PATH = ("path", "full path", "location", "folder")
|
|
CATALOG_COL_SIZE = ("size", "file size", "bytes", "size (bytes)")
|
|
CATALOG_COL_DISC = ("disc", "disc name", "disc label", "volume", "source", "catalog", "media")
|
|
|
|
|
|
def _warn(msg: str) -> None:
|
|
sys.stderr.write(f"WARN: {msg}\n")
|
|
|
|
|
|
def _pick_col(headers_lower: list[str], synonyms: tuple[str, ...]) -> str | None:
|
|
for s in synonyms:
|
|
if s in headers_lower:
|
|
return s
|
|
return None
|
|
|
|
|
|
def normalize_catalog_path(path: str) -> str:
|
|
"""Keep catalog paths display-compatible with rclone-style path consumers."""
|
|
p = (path or "").replace("\\", "/")
|
|
if p.startswith("//"):
|
|
return "//" + re.sub(r"/+", "/", p[2:])
|
|
return re.sub(r"/+", "/", p)
|
|
|
|
|
|
def load_catalog_csv(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]:
|
|
"""Load a WinCatalog CSV export. Lenient about column names."""
|
|
entries: list[FileEntry] = []
|
|
with path.open("r", encoding="utf-8-sig", newline="") as f:
|
|
sample = f.read(4096)
|
|
f.seek(0)
|
|
try:
|
|
dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|")
|
|
except csv.Error:
|
|
dialect = csv.excel
|
|
reader = csv.DictReader(f, dialect=dialect)
|
|
if not reader.fieldnames:
|
|
return entries
|
|
headers: dict[str, str] = {}
|
|
for h in reader.fieldnames:
|
|
hl = h.lower()
|
|
if hl not in headers:
|
|
headers[hl] = h
|
|
col_name = _pick_col(list(headers), CATALOG_COL_NAME)
|
|
col_path = _pick_col(list(headers), CATALOG_COL_PATH)
|
|
col_size = _pick_col(list(headers), CATALOG_COL_SIZE)
|
|
col_disc = _pick_col(list(headers), CATALOG_COL_DISC)
|
|
if not col_name and not col_path:
|
|
_warn(f"catalog CSV {path} has no Name/Path columns; skipping.")
|
|
return entries
|
|
for row in reader:
|
|
name = (row.get(headers[col_name]) if col_name else "") or ""
|
|
full_path = (row.get(headers[col_path]) if col_path else "") or ""
|
|
if not name and full_path:
|
|
name = Path(full_path).name
|
|
full_path = normalize_catalog_path(full_path)
|
|
if not name:
|
|
continue
|
|
jav_id = extract_id(name)
|
|
if not jav_id:
|
|
skipped.append((f"catalog:{path.name}", full_path or name))
|
|
continue
|
|
try:
|
|
size = int(row.get(headers[col_size], 0)) if col_size else 0
|
|
except (ValueError, TypeError):
|
|
size = 0
|
|
disc = (row.get(headers[col_disc]) if col_disc else "") or ""
|
|
# Encode disc label into "remote" so it surfaces in output.
|
|
remote_label = f"catalog:{disc}" if disc else f"catalog:{path.name}"
|
|
entries.append(FileEntry(
|
|
source="Catalog", remote=remote_label,
|
|
path=full_path or name, size=size, mod_time="",
|
|
jav_id=jav_id,
|
|
))
|
|
return entries
|
|
|
|
|
|
def _strip_xml_ns(tag: str) -> str:
|
|
"""Remove Clark-notation namespace {uri}local -> local."""
|
|
return tag.split("}")[-1] if "}" in tag else tag
|
|
|
|
|
|
def load_catalog_xml(path: Path, skipped: list[tuple[str, str]]) -> list[FileEntry]:
|
|
"""Load a WinCatalog XML export. Walks for any element with file-like attrs."""
|
|
entries: list[FileEntry] = []
|
|
tree = ET.parse(str(path))
|
|
root = tree.getroot()
|
|
|
|
def walk(node, disc_label: str, parent_path: str, _depth: int = 0):
|
|
if _depth > 500:
|
|
return
|
|
tag = _strip_xml_ns(node.tag).lower()
|
|
if tag in ("disc", "catalog", "source", "volume", "media"):
|
|
disc_label = node.get("name") or node.get("Name") or disc_label
|
|
if tag in ("file", "f"):
|
|
name = node.get("name") or node.get("Name") or node.findtext("Name") or ""
|
|
size_raw = node.get("size") or node.get("Size") or node.findtext("Size") or "0"
|
|
try:
|
|
size = int(size_raw)
|
|
except ValueError:
|
|
size = 0
|
|
full_path = normalize_catalog_path(f"{parent_path}/{name}" if parent_path else name)
|
|
jav_id = extract_id(name)
|
|
if jav_id:
|
|
entries.append(FileEntry(
|
|
source="Catalog",
|
|
remote=f"catalog:{disc_label}" if disc_label else f"catalog:{path.name}",
|
|
path=full_path, size=size, mod_time="", jav_id=jav_id,
|
|
))
|
|
else:
|
|
skipped.append((f"catalog:{disc_label or path.name}", full_path))
|
|
return
|
|
if tag in ("folder", "dir", "directory"):
|
|
folder_name = node.get("name") or node.get("Name") or ""
|
|
parent_path = normalize_catalog_path(f"{parent_path}/{folder_name}" if parent_path else folder_name)
|
|
for child in node:
|
|
walk(child, disc_label, parent_path, _depth + 1)
|
|
|
|
walk(root, "", "")
|
|
return entries
|
|
|
|
|
|
def _expand_catalog_paths(paths: list[str], default_paths: list[str] | None = None) -> list[Path]:
|
|
"""Expand any directories to their *.csv / *.xml children. Files passed through.
|
|
|
|
`default_paths` is the configured DEFAULT_CATALOG list; missing paths inside
|
|
that set are silently skipped (it's normal to not have a catalog dir).
|
|
Missing paths outside the default set produce a warning.
|
|
"""
|
|
defaults = {Path(d).resolve() for d in (default_paths or [])}
|
|
out: list[Path] = []
|
|
for p in paths:
|
|
cp = Path(p)
|
|
if cp.is_dir():
|
|
for child in sorted(cp.iterdir()):
|
|
if child.suffix.lower() in (".csv", ".xml") and child.is_file():
|
|
out.append(child)
|
|
elif cp.exists():
|
|
out.append(cp)
|
|
elif Path(p).resolve() not in defaults:
|
|
_warn(f"catalog path not found: {p}")
|
|
return out
|
|
|
|
|
|
def load_catalogs(paths: list[str], skipped: list[tuple[str, str]],
|
|
default_paths: list[str] | None = None) -> list[FileEntry]:
|
|
out: list[FileEntry] = []
|
|
for cp in _expand_catalog_paths(paths, default_paths=default_paths):
|
|
ext = cp.suffix.lower()
|
|
if ext == ".csv":
|
|
out.extend(load_catalog_csv(cp, skipped))
|
|
elif ext == ".xml":
|
|
out.extend(load_catalog_xml(cp, skipped))
|
|
else:
|
|
_warn(f"unknown catalog format '{ext}' for {cp}; skipping.")
|
|
return out
|