scanner/scan_disc.py: - Inventory disc into video files + photo folders (grouped by directory) - Photo extensions: jpg/jpeg/png/gif/tiff/bmp/heic/heif/webp + raw formats (cr2 cr3 nef arw dng orf rw2 raf) - Video disc → 1 record per disc with video file list - Photo disc → 1 record per gallery folder with photo list - Mixed disc → both: 1 video record + 1 record per photo folder - Unknown disc → 1 fallback record with total file count - Folder title format: "DISC_LABEL — Folder / Subfolder" videodb/api_ingest.php: - Add subtitle field (gallery folder path) - Add plot field (TEXT — full filename list, no 255-char limit) - Add custom3 field (content type: video | photo | mixed | data) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
413 lines
14 KiB
Python
413 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MeDBia Disc Scanner — macOS client
|
|
===================================
|
|
Polls the optical drive, inspects disc contents, and submits records
|
|
to the remote videoDB API:
|
|
|
|
• Video discs → one record per disc, listing video filenames
|
|
• Photo discs → one record per gallery folder, listing photo filenames
|
|
• Mixed discs → both: a video record + one record per photo folder
|
|
|
|
Setup:
|
|
pip3 install requests
|
|
export VIDEODB_URL=http://your-server:6761
|
|
export VIDEODB_TOKEN=change_this_secret_token
|
|
python3 scan_disc.py
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import subprocess
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
sys.exit("Install requests first: pip3 install requests")
|
|
|
|
# ── Config ─────────────────────────────────────────────────────────────────────
|
|
API_URL = os.environ.get("VIDEODB_URL", "http://your-server:6761").rstrip("/") + "/api_ingest.php"
|
|
API_TOKEN = os.environ.get("VIDEODB_TOKEN", "change_this_secret_token")
|
|
POLL_SEC = int(os.environ.get("POLL_INTERVAL", "5"))
|
|
|
|
# videoDB mediatype IDs (from install.sql)
|
|
MT_DVD = 1
|
|
MT_BLURAY = 16
|
|
MT_CD = 18
|
|
|
|
VIDEO_EXT = {
|
|
".mp4", ".mkv", ".avi", ".mov", ".m4v", ".ts", ".m2ts", ".mts",
|
|
".wmv", ".flv", ".webm", ".vob", ".mpg", ".mpeg", ".iso", ".divx",
|
|
}
|
|
|
|
PHOTO_EXT = {
|
|
".jpg", ".jpeg", ".png", ".gif", ".tiff", ".tif", ".bmp",
|
|
".heic", ".heif", ".webp",
|
|
".raw", ".cr2", ".cr3", ".nef", ".arw", ".dng", ".orf", ".rw2", ".raf",
|
|
}
|
|
|
|
IGNORE_VOLUMES = {"Macintosh HD", "Preboot", "Recovery", "VM", "Data", "Update"}
|
|
|
|
|
|
# ── Shell helper ───────────────────────────────────────────────────────────────
|
|
|
|
def run(cmd: list) -> str:
|
|
try:
|
|
r = subprocess.run(cmd, capture_output=True, text=True, timeout=20)
|
|
return r.stdout
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
# ── Disc presence ──────────────────────────────────────────────────────────────
|
|
|
|
def disc_status() -> dict | None:
|
|
out = run(["drutil", "status"])
|
|
if not out or "No Media" in out:
|
|
return None
|
|
|
|
info: dict = {}
|
|
m = re.search(r"Type:\s+(.+?)(?:\s{3,}|$)", out, re.MULTILINE)
|
|
if m:
|
|
info["drutil_type"] = m.group(1).strip()
|
|
m = re.search(r"Name:\s+(/dev/\S+)", out)
|
|
if m:
|
|
info["device"] = m.group(1)
|
|
m = re.search(r"Space Used:\s+([\d.]+)\s*GB", out)
|
|
if m:
|
|
info["used_gb"] = float(m.group(1))
|
|
return info
|
|
|
|
|
|
# ── Mount ──────────────────────────────────────────────────────────────────────
|
|
|
|
def find_mount(device: str) -> str | None:
|
|
for line in run(["mount"]).splitlines():
|
|
if device in line:
|
|
m = re.search(r" on (/Volumes/[^\s(]+)", line)
|
|
if m:
|
|
return m.group(1)
|
|
try:
|
|
volumes = set(os.listdir("/Volumes/")) - IGNORE_VOLUMES
|
|
if volumes:
|
|
return f"/Volumes/{sorted(volumes)[0]}"
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
# ── Media type inference ───────────────────────────────────────────────────────
|
|
|
|
def mediatype_from_drutil(drutil_type: str) -> tuple[int, str] | None:
|
|
t = drutil_type.upper()
|
|
if "BD" in t:
|
|
return MT_BLURAY, "Blu-ray"
|
|
if "DVD" in t:
|
|
return MT_DVD, "DVD"
|
|
if "CD" in t:
|
|
return MT_CD, "CD"
|
|
return None
|
|
|
|
def mediatype_from_size(used_gb: float) -> tuple[int, str]:
|
|
if used_gb > 8.0:
|
|
return MT_BLURAY, "Blu-ray"
|
|
if used_gb > 0.68:
|
|
return MT_DVD, "DVD"
|
|
return MT_CD, "CD"
|
|
|
|
|
|
# ── Disc inventory ─────────────────────────────────────────────────────────────
|
|
|
|
def inventory_disc(mount: str) -> dict:
|
|
"""
|
|
Walk the disc and return:
|
|
video_files: list of relative paths to video files
|
|
photo_folders: dict of { relative_folder_path -> [photo filenames] }
|
|
"""
|
|
video_files: list[str] = []
|
|
# folder path (relative to mount) -> list of photo filenames in that folder
|
|
photo_folders: dict[str, list[str]] = defaultdict(list)
|
|
|
|
try:
|
|
for root, _dirs, files in os.walk(mount):
|
|
for fname in files:
|
|
ext = Path(fname).suffix.lower()
|
|
rel_path = os.path.relpath(os.path.join(root, fname), mount)
|
|
rel_folder = os.path.relpath(root, mount)
|
|
if rel_folder == ".":
|
|
rel_folder = "(root)"
|
|
|
|
if ext in VIDEO_EXT:
|
|
video_files.append(rel_path)
|
|
elif ext in PHOTO_EXT:
|
|
photo_folders[rel_folder].append(fname)
|
|
except PermissionError:
|
|
pass
|
|
|
|
return {
|
|
"video_files": sorted(video_files),
|
|
"photo_folders": dict(photo_folders),
|
|
}
|
|
|
|
|
|
def disc_size_bytes(mount: str) -> int:
|
|
out = run(["df", "-k", mount])
|
|
for line in out.splitlines()[1:]:
|
|
parts = line.split()
|
|
if len(parts) >= 3:
|
|
try:
|
|
return int(parts[2]) * 1024
|
|
except ValueError:
|
|
pass
|
|
return 0
|
|
|
|
|
|
def volume_label(mount: str | None, device: str) -> str:
|
|
if mount:
|
|
label = os.path.basename(mount)
|
|
if label:
|
|
return label
|
|
if device:
|
|
for line in run(["diskutil", "info", device]).splitlines():
|
|
if "Volume Name" in line:
|
|
return line.split(":", 1)[-1].strip()
|
|
return "Unknown Disc"
|
|
|
|
|
|
# ── Eject ──────────────────────────────────────────────────────────────────────
|
|
|
|
def eject(mount: str | None):
|
|
if mount:
|
|
r = subprocess.run(["diskutil", "eject", mount], capture_output=True)
|
|
if r.returncode == 0:
|
|
return
|
|
run(["drutil", "eject"])
|
|
|
|
|
|
# ── API submission ─────────────────────────────────────────────────────────────
|
|
|
|
def submit(payload: dict) -> bool:
|
|
try:
|
|
resp = requests.post(
|
|
API_URL,
|
|
json=payload,
|
|
headers={"X-API-Token": API_TOKEN},
|
|
timeout=15,
|
|
)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
print(f" [OK] Entry #{data.get('id', '?')}: {payload['title']}")
|
|
return True
|
|
else:
|
|
print(f" [ERR] API {resp.status_code}: {resp.text[:300]}")
|
|
return False
|
|
except requests.ConnectionError:
|
|
print(f" [ERR] Cannot reach {API_URL}")
|
|
return False
|
|
except Exception as e:
|
|
print(f" [ERR] {e}")
|
|
return False
|
|
|
|
|
|
# ── Record builders ────────────────────────────────────────────────────────────
|
|
|
|
def build_video_record(disc_label: str, disklabel: str, video_files: list[str],
|
|
mediatype_id: int, drutil_type: str, size_b: int) -> dict:
|
|
names = [Path(f).name for f in video_files]
|
|
comment = f"{len(names)} video file{'s' if len(names) != 1 else ''}"
|
|
plot = "\n".join(names) # full list in the TEXT field — no length limit
|
|
# Short preview for comment (VARCHAR 255)
|
|
preview = ", ".join(names[:8])
|
|
if len(names) > 8:
|
|
preview += f" … +{len(names) - 8} more"
|
|
comment = f"{len(names)} video files: {preview}"
|
|
|
|
return {
|
|
"title": disc_label,
|
|
"subtitle": "",
|
|
"mediatype": mediatype_id,
|
|
"comment": comment[:255],
|
|
"plot": plot,
|
|
"filesize": size_b,
|
|
"disklabel": disklabel,
|
|
"custom1": drutil_type,
|
|
"custom2": str(len(names)),
|
|
"custom3": "video",
|
|
}
|
|
|
|
|
|
def build_photo_records(disc_label: str, disklabel: str, photo_folders: dict[str, list[str]],
|
|
mediatype_id: int, drutil_type: str, size_b: int) -> list[dict]:
|
|
"""One record per gallery folder."""
|
|
records = []
|
|
for folder, photos in photo_folders.items():
|
|
photos_sorted = sorted(photos)
|
|
count = len(photos_sorted)
|
|
|
|
# Title: disc label + folder name (skip "(root)" clutter if only one folder)
|
|
if folder == "(root)" and len(photo_folders) == 1:
|
|
title = disc_label
|
|
else:
|
|
folder_display = folder.replace("/", " / ")
|
|
title = f"{disc_label} — {folder_display}"
|
|
|
|
# Comment: short summary
|
|
preview = ", ".join(photos_sorted[:6])
|
|
if count > 6:
|
|
preview += f" … +{count - 6} more"
|
|
comment = f"{count} photo{'s' if count != 1 else ''}: {preview}"
|
|
|
|
# Plot: full filename list — searchable via videoDB full-text search
|
|
plot = "\n".join(photos_sorted)
|
|
|
|
records.append({
|
|
"title": title[:255],
|
|
"subtitle": folder if folder != "(root)" else "",
|
|
"mediatype": mediatype_id,
|
|
"comment": comment[:255],
|
|
"plot": plot,
|
|
"filesize": 0, # folder-level size not easily available
|
|
"disklabel": disklabel,
|
|
"custom1": drutil_type,
|
|
"custom2": str(count),
|
|
"custom3": "photo",
|
|
})
|
|
return records
|
|
|
|
|
|
# ── Main scan routine ──────────────────────────────────────────────────────────
|
|
|
|
def scan_and_submit():
|
|
status = disc_status()
|
|
if not status:
|
|
return False
|
|
|
|
drutil_type = status.get("drutil_type", "Unknown")
|
|
device = status.get("device", "")
|
|
used_gb = status.get("used_gb", 0.0)
|
|
|
|
print(f" drutil type : {drutil_type}")
|
|
print(f" device : {device}")
|
|
print(f" used : {used_gb:.2f} GB")
|
|
|
|
time.sleep(4) # let macOS finish mounting
|
|
|
|
mount = find_mount(device)
|
|
print(f" mount : {mount or '(not mounted)'}")
|
|
|
|
mt = mediatype_from_drutil(drutil_type) or mediatype_from_size(used_gb)
|
|
mediatype_id, mediatype_name = mt
|
|
disc_label = volume_label(mount, device)
|
|
disklabel = disc_label[:32]
|
|
|
|
if not mount:
|
|
print(" [WARN] Disc not mounted — submitting with disc label only")
|
|
submit({
|
|
"title": disc_label,
|
|
"subtitle": "",
|
|
"mediatype": mediatype_id,
|
|
"comment": f"Disc not mounted ({drutil_type})",
|
|
"plot": "",
|
|
"filesize": int(used_gb * 1024**3),
|
|
"disklabel": disklabel,
|
|
"custom1": drutil_type,
|
|
"custom2": "0",
|
|
"custom3": "unknown",
|
|
})
|
|
eject(None)
|
|
return True
|
|
|
|
size_b = disc_size_bytes(mount)
|
|
print(f" Scanning contents...")
|
|
inv = inventory_disc(mount)
|
|
|
|
video_files = inv["video_files"]
|
|
photo_folders = inv["photo_folders"]
|
|
|
|
n_videos = len(video_files)
|
|
n_photos = sum(len(v) for v in photo_folders.values())
|
|
n_galleries = len(photo_folders)
|
|
|
|
print(f" Found: {n_videos} video files, {n_photos} photos in {n_galleries} folder(s)")
|
|
|
|
records = []
|
|
|
|
if video_files:
|
|
records.append(build_video_record(
|
|
disc_label, disklabel, video_files, mediatype_id, drutil_type, size_b
|
|
))
|
|
|
|
if photo_folders:
|
|
records += build_photo_records(
|
|
disc_label, disklabel, photo_folders, mediatype_id, drutil_type, size_b
|
|
)
|
|
|
|
if not records:
|
|
# Disc has neither video nor photo files — index with raw file count
|
|
total = sum(
|
|
len(files)
|
|
for _, _, files in os.walk(mount)
|
|
)
|
|
records.append({
|
|
"title": disc_label,
|
|
"subtitle": "",
|
|
"mediatype": mediatype_id,
|
|
"comment": f"{total} files (no video or photo files detected)",
|
|
"plot": "",
|
|
"filesize": size_b,
|
|
"disklabel": disklabel,
|
|
"custom1": drutil_type,
|
|
"custom2": str(total),
|
|
"custom3": "data",
|
|
})
|
|
|
|
print(f"\n Submitting {len(records)} record(s)...")
|
|
ok_count = 0
|
|
for rec in records:
|
|
if submit(rec):
|
|
ok_count += 1
|
|
|
|
print(f" {ok_count}/{len(records)} records submitted.")
|
|
print(" Ejecting disc...")
|
|
eject(mount)
|
|
return ok_count > 0
|
|
|
|
|
|
# ── Entry point ────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
print("=" * 55)
|
|
print(" MeDBia Disc Scanner")
|
|
print(f" API : {API_URL}")
|
|
print(f" Poll : every {POLL_SEC}s")
|
|
print("=" * 55)
|
|
print("Insert a disc to index it. Ctrl-C to stop.\n")
|
|
|
|
was_present = False
|
|
|
|
while True:
|
|
status = disc_status()
|
|
|
|
if status and not was_present:
|
|
was_present = True
|
|
print("Disc detected!")
|
|
scan_and_submit()
|
|
was_present = False
|
|
print("\nReady — insert next disc.\n")
|
|
|
|
elif not status and was_present:
|
|
was_present = False
|
|
|
|
time.sleep(POLL_SEC)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
print("\nStopped.")
|