Files
MeDBia/scanner/scan_disc.py
Malin 7d401b1963 feat: photo gallery indexing — one record per folder, full filename search
scanner/scan_disc.py:
- Inventory disc into video files + photo folders (grouped by directory)
- Photo extensions: jpg/jpeg/png/gif/tiff/bmp/heic/heif/webp + raw formats
  (cr2 cr3 nef arw dng orf rw2 raf)
- Video disc   → 1 record per disc with video file list
- Photo disc   → 1 record per gallery folder with photo list
- Mixed disc   → both: 1 video record + 1 record per photo folder
- Unknown disc → 1 fallback record with total file count
- Folder title format: "DISC_LABEL — Folder / Subfolder"

videodb/api_ingest.php:
- Add subtitle field (gallery folder path)
- Add plot field (TEXT — full filename list, no 255-char limit)
- Add custom3 field (content type: video | photo | mixed | data)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-11 10:15:56 +02:00

413 lines
14 KiB
Python

#!/usr/bin/env python3
"""
MeDBia Disc Scanner — macOS client
===================================
Polls the optical drive, inspects disc contents, and submits records
to the remote videoDB API:
• Video discs → one record per disc, listing video filenames
• Photo discs → one record per gallery folder, listing photo filenames
• Mixed discs → both: a video record + one record per photo folder
Setup:
pip3 install requests
export VIDEODB_URL=http://your-server:6761
export VIDEODB_TOKEN=change_this_secret_token
python3 scan_disc.py
"""
import os
import re
import sys
import time
import subprocess
from collections import defaultdict
from pathlib import Path
try:
import requests
except ImportError:
sys.exit("Install requests first: pip3 install requests")
# ── Config ─────────────────────────────────────────────────────────────────────
API_URL = os.environ.get("VIDEODB_URL", "http://your-server:6761").rstrip("/") + "/api_ingest.php"
API_TOKEN = os.environ.get("VIDEODB_TOKEN", "change_this_secret_token")
POLL_SEC = int(os.environ.get("POLL_INTERVAL", "5"))
# videoDB mediatype IDs (from install.sql)
MT_DVD = 1
MT_BLURAY = 16
MT_CD = 18
VIDEO_EXT = {
".mp4", ".mkv", ".avi", ".mov", ".m4v", ".ts", ".m2ts", ".mts",
".wmv", ".flv", ".webm", ".vob", ".mpg", ".mpeg", ".iso", ".divx",
}
PHOTO_EXT = {
".jpg", ".jpeg", ".png", ".gif", ".tiff", ".tif", ".bmp",
".heic", ".heif", ".webp",
".raw", ".cr2", ".cr3", ".nef", ".arw", ".dng", ".orf", ".rw2", ".raf",
}
IGNORE_VOLUMES = {"Macintosh HD", "Preboot", "Recovery", "VM", "Data", "Update"}
# ── Shell helper ───────────────────────────────────────────────────────────────
def run(cmd: list) -> str:
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=20)
return r.stdout
except Exception:
return ""
# ── Disc presence ──────────────────────────────────────────────────────────────
def disc_status() -> dict | None:
out = run(["drutil", "status"])
if not out or "No Media" in out:
return None
info: dict = {}
m = re.search(r"Type:\s+(.+?)(?:\s{3,}|$)", out, re.MULTILINE)
if m:
info["drutil_type"] = m.group(1).strip()
m = re.search(r"Name:\s+(/dev/\S+)", out)
if m:
info["device"] = m.group(1)
m = re.search(r"Space Used:\s+([\d.]+)\s*GB", out)
if m:
info["used_gb"] = float(m.group(1))
return info
# ── Mount ──────────────────────────────────────────────────────────────────────
def find_mount(device: str) -> str | None:
for line in run(["mount"]).splitlines():
if device in line:
m = re.search(r" on (/Volumes/[^\s(]+)", line)
if m:
return m.group(1)
try:
volumes = set(os.listdir("/Volumes/")) - IGNORE_VOLUMES
if volumes:
return f"/Volumes/{sorted(volumes)[0]}"
except Exception:
pass
return None
# ── Media type inference ───────────────────────────────────────────────────────
def mediatype_from_drutil(drutil_type: str) -> tuple[int, str] | None:
t = drutil_type.upper()
if "BD" in t:
return MT_BLURAY, "Blu-ray"
if "DVD" in t:
return MT_DVD, "DVD"
if "CD" in t:
return MT_CD, "CD"
return None
def mediatype_from_size(used_gb: float) -> tuple[int, str]:
if used_gb > 8.0:
return MT_BLURAY, "Blu-ray"
if used_gb > 0.68:
return MT_DVD, "DVD"
return MT_CD, "CD"
# ── Disc inventory ─────────────────────────────────────────────────────────────
def inventory_disc(mount: str) -> dict:
"""
Walk the disc and return:
video_files: list of relative paths to video files
photo_folders: dict of { relative_folder_path -> [photo filenames] }
"""
video_files: list[str] = []
# folder path (relative to mount) -> list of photo filenames in that folder
photo_folders: dict[str, list[str]] = defaultdict(list)
try:
for root, _dirs, files in os.walk(mount):
for fname in files:
ext = Path(fname).suffix.lower()
rel_path = os.path.relpath(os.path.join(root, fname), mount)
rel_folder = os.path.relpath(root, mount)
if rel_folder == ".":
rel_folder = "(root)"
if ext in VIDEO_EXT:
video_files.append(rel_path)
elif ext in PHOTO_EXT:
photo_folders[rel_folder].append(fname)
except PermissionError:
pass
return {
"video_files": sorted(video_files),
"photo_folders": dict(photo_folders),
}
def disc_size_bytes(mount: str) -> int:
out = run(["df", "-k", mount])
for line in out.splitlines()[1:]:
parts = line.split()
if len(parts) >= 3:
try:
return int(parts[2]) * 1024
except ValueError:
pass
return 0
def volume_label(mount: str | None, device: str) -> str:
if mount:
label = os.path.basename(mount)
if label:
return label
if device:
for line in run(["diskutil", "info", device]).splitlines():
if "Volume Name" in line:
return line.split(":", 1)[-1].strip()
return "Unknown Disc"
# ── Eject ──────────────────────────────────────────────────────────────────────
def eject(mount: str | None):
if mount:
r = subprocess.run(["diskutil", "eject", mount], capture_output=True)
if r.returncode == 0:
return
run(["drutil", "eject"])
# ── API submission ─────────────────────────────────────────────────────────────
def submit(payload: dict) -> bool:
try:
resp = requests.post(
API_URL,
json=payload,
headers={"X-API-Token": API_TOKEN},
timeout=15,
)
if resp.status_code == 200:
data = resp.json()
print(f" [OK] Entry #{data.get('id', '?')}: {payload['title']}")
return True
else:
print(f" [ERR] API {resp.status_code}: {resp.text[:300]}")
return False
except requests.ConnectionError:
print(f" [ERR] Cannot reach {API_URL}")
return False
except Exception as e:
print(f" [ERR] {e}")
return False
# ── Record builders ────────────────────────────────────────────────────────────
def build_video_record(disc_label: str, disklabel: str, video_files: list[str],
mediatype_id: int, drutil_type: str, size_b: int) -> dict:
names = [Path(f).name for f in video_files]
comment = f"{len(names)} video file{'s' if len(names) != 1 else ''}"
plot = "\n".join(names) # full list in the TEXT field — no length limit
# Short preview for comment (VARCHAR 255)
preview = ", ".join(names[:8])
if len(names) > 8:
preview += f" … +{len(names) - 8} more"
comment = f"{len(names)} video files: {preview}"
return {
"title": disc_label,
"subtitle": "",
"mediatype": mediatype_id,
"comment": comment[:255],
"plot": plot,
"filesize": size_b,
"disklabel": disklabel,
"custom1": drutil_type,
"custom2": str(len(names)),
"custom3": "video",
}
def build_photo_records(disc_label: str, disklabel: str, photo_folders: dict[str, list[str]],
mediatype_id: int, drutil_type: str, size_b: int) -> list[dict]:
"""One record per gallery folder."""
records = []
for folder, photos in photo_folders.items():
photos_sorted = sorted(photos)
count = len(photos_sorted)
# Title: disc label + folder name (skip "(root)" clutter if only one folder)
if folder == "(root)" and len(photo_folders) == 1:
title = disc_label
else:
folder_display = folder.replace("/", " / ")
title = f"{disc_label}{folder_display}"
# Comment: short summary
preview = ", ".join(photos_sorted[:6])
if count > 6:
preview += f" … +{count - 6} more"
comment = f"{count} photo{'s' if count != 1 else ''}: {preview}"
# Plot: full filename list — searchable via videoDB full-text search
plot = "\n".join(photos_sorted)
records.append({
"title": title[:255],
"subtitle": folder if folder != "(root)" else "",
"mediatype": mediatype_id,
"comment": comment[:255],
"plot": plot,
"filesize": 0, # folder-level size not easily available
"disklabel": disklabel,
"custom1": drutil_type,
"custom2": str(count),
"custom3": "photo",
})
return records
# ── Main scan routine ──────────────────────────────────────────────────────────
def scan_and_submit():
status = disc_status()
if not status:
return False
drutil_type = status.get("drutil_type", "Unknown")
device = status.get("device", "")
used_gb = status.get("used_gb", 0.0)
print(f" drutil type : {drutil_type}")
print(f" device : {device}")
print(f" used : {used_gb:.2f} GB")
time.sleep(4) # let macOS finish mounting
mount = find_mount(device)
print(f" mount : {mount or '(not mounted)'}")
mt = mediatype_from_drutil(drutil_type) or mediatype_from_size(used_gb)
mediatype_id, mediatype_name = mt
disc_label = volume_label(mount, device)
disklabel = disc_label[:32]
if not mount:
print(" [WARN] Disc not mounted — submitting with disc label only")
submit({
"title": disc_label,
"subtitle": "",
"mediatype": mediatype_id,
"comment": f"Disc not mounted ({drutil_type})",
"plot": "",
"filesize": int(used_gb * 1024**3),
"disklabel": disklabel,
"custom1": drutil_type,
"custom2": "0",
"custom3": "unknown",
})
eject(None)
return True
size_b = disc_size_bytes(mount)
print(f" Scanning contents...")
inv = inventory_disc(mount)
video_files = inv["video_files"]
photo_folders = inv["photo_folders"]
n_videos = len(video_files)
n_photos = sum(len(v) for v in photo_folders.values())
n_galleries = len(photo_folders)
print(f" Found: {n_videos} video files, {n_photos} photos in {n_galleries} folder(s)")
records = []
if video_files:
records.append(build_video_record(
disc_label, disklabel, video_files, mediatype_id, drutil_type, size_b
))
if photo_folders:
records += build_photo_records(
disc_label, disklabel, photo_folders, mediatype_id, drutil_type, size_b
)
if not records:
# Disc has neither video nor photo files — index with raw file count
total = sum(
len(files)
for _, _, files in os.walk(mount)
)
records.append({
"title": disc_label,
"subtitle": "",
"mediatype": mediatype_id,
"comment": f"{total} files (no video or photo files detected)",
"plot": "",
"filesize": size_b,
"disklabel": disklabel,
"custom1": drutil_type,
"custom2": str(total),
"custom3": "data",
})
print(f"\n Submitting {len(records)} record(s)...")
ok_count = 0
for rec in records:
if submit(rec):
ok_count += 1
print(f" {ok_count}/{len(records)} records submitted.")
print(" Ejecting disc...")
eject(mount)
return ok_count > 0
# ── Entry point ────────────────────────────────────────────────────────────────
def main():
print("=" * 55)
print(" MeDBia Disc Scanner")
print(f" API : {API_URL}")
print(f" Poll : every {POLL_SEC}s")
print("=" * 55)
print("Insert a disc to index it. Ctrl-C to stop.\n")
was_present = False
while True:
status = disc_status()
if status and not was_present:
was_present = True
print("Disc detected!")
scan_and_submit()
was_present = False
print("\nReady — insert next disc.\n")
elif not status and was_present:
was_present = False
time.sleep(POLL_SEC)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nStopped.")