Files
MeDBia/scanner/scan_disc.py
Malin 6002fc6e58 feat: add macOS disc scanner + API ingest endpoint
- scanner/scan_disc.py: polls optical drive via drutil, detects disc type
  (DVD/Blu-ray/Audio CD/Data CD), reads volume label, file/track count,
  posts to remote API, auto-ejects. Pure Python + requests, no drivers.
- scanner/requirements.txt + README.md: setup and usage docs
- videodb/api_ingest.php: authenticated POST endpoint that writes disc
  records directly into the videoDB MySQL schema; token stored in config
- docker-compose.yml: adds INGEST_API_TOKEN env var
- docker-entrypoint.sh: writes ingest_api_token into config.inc.php

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-11 09:58:11 +02:00

305 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""
MeDBia Disc Scanner — macOS client
===================================
Polls the optical drive, reads disc metadata, posts to the remote
videoDB API, then ejects the disc.
Setup:
pip3 install requests
export VIDEODB_URL=http://your-server:6761
export VIDEODB_TOKEN=change_this_secret_token
python3 scan_disc.py
"""
import os
import re
import sys
import time
import subprocess
from pathlib import Path
try:
import requests
except ImportError:
sys.exit("Install requests first: pip3 install requests")
# ── Config (override with environment variables) ───────────────────────────────
API_URL = os.environ.get("VIDEODB_URL", "http://your-server:6761").rstrip("/") + "/api_ingest.php"
API_TOKEN = os.environ.get("VIDEODB_TOKEN", "change_this_secret_token")
POLL_SEC = int(os.environ.get("POLL_INTERVAL", "5"))
# videoDB mediatype IDs (must match install.sql)
MEDIATYPE = {
"dvd": 1,
"bluray": 16,
"cd": 18,
"data_cd": 18,
}
# Known system volumes to ignore when scanning /Volumes/
IGNORE_VOLUMES = {"Macintosh HD", "Preboot", "Recovery", "VM", "Data", "Update"}
# ── Shell helpers ──────────────────────────────────────────────────────────────
def run(cmd: list) -> str:
"""Run a command, return stdout (empty string on error)."""
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
return r.stdout
except Exception:
return ""
# ── Disc detection ─────────────────────────────────────────────────────────────
def disc_status() -> dict | None:
"""
Returns a dict with keys: drutil_type, device, tracks
Returns None when no disc is present.
"""
out = run(["drutil", "status"])
if not out or "No Media" in out:
return None
info: dict = {}
# Type: DVD-ROM / Audio CD / CD-ROM / BD-ROM …
m = re.search(r"Type:\s+(.+?)(?:\s{3,}|$)", out, re.MULTILINE)
if m:
info["drutil_type"] = m.group(1).strip()
# Device node: /dev/disk2
m = re.search(r"Name:\s+(/dev/\S+)", out)
if m:
info["device"] = m.group(1)
# Track count
m = re.search(r"Tracks:\s+(\d+)", out)
if m:
info["tracks"] = int(m.group(1))
return info
def find_mount(device: str) -> str | None:
"""Find where the optical disc is mounted."""
# Try 'mount' output first
for line in run(["mount"]).splitlines():
if device in line:
# "... on /Volumes/FOO (…)"
m = re.search(r" on (/Volumes/\S+)", line)
if m:
return m.group(1).rstrip("()")
# Fallback: new entry in /Volumes/ that isn't a system volume
volumes = set(os.listdir("/Volumes/")) - IGNORE_VOLUMES
if volumes:
# Return the first one alphabetically
return f"/Volumes/{sorted(volumes)[0]}"
return None
# ── Disc classification ───────────────────────────────────────────────────────
def classify(mount: str | None, drutil_type: str) -> tuple[int, str]:
"""
Returns (mediatype_id, label) for videoDB.
Checks filesystem structure first, falls back to drutil type string.
"""
if mount:
p = Path(mount)
if (p / "BDMV").exists():
return MEDIATYPE["bluray"], "Blu-ray"
if (p / "VIDEO_TS").exists() or (p / "VIDEO_TS.IFO").exists():
return MEDIATYPE["dvd"], "DVD"
t = drutil_type.upper()
if "AUDIO" in t:
return MEDIATYPE["cd"], "Audio CD"
if "BD" in t:
return MEDIATYPE["bluray"], "Blu-ray"
if "DVD" in t:
return MEDIATYPE["dvd"], "DVD"
return MEDIATYPE["cd"], "CD/Data"
def volume_label(mount: str | None, device: str) -> str:
"""Get the disc's volume label."""
if mount:
label = os.path.basename(mount)
if label:
return label
if device:
for line in run(["diskutil", "info", device]).splitlines():
if "Volume Name" in line:
return line.split(":", 1)[-1].strip()
return "Unknown Disc"
def disc_size_bytes(mount: str | None) -> int:
"""Total used space on the disc in bytes."""
if not mount:
return 0
out = run(["df", "-k", mount])
for line in out.splitlines()[1:]:
parts = line.split()
if len(parts) >= 3:
try:
return int(parts[2]) * 1024 # 'Used' column, KB→B
except ValueError:
pass
return 0
def sample_files(mount: str | None, limit: int = 30) -> list[str]:
"""Return a sample of file paths on the disc."""
if not mount or not os.path.exists(mount):
return []
found = []
try:
for root, _dirs, files in os.walk(mount):
for f in files:
rel = os.path.relpath(os.path.join(root, f), mount)
found.append(rel)
if len(found) >= limit:
return found
except PermissionError:
pass
return found
# ── Eject ─────────────────────────────────────────────────────────────────────
def eject(mount: str | None):
"""Eject the disc. Try diskutil first, fall back to drutil."""
if mount:
result = subprocess.run(
["diskutil", "eject", mount],
capture_output=True, text=True
)
if result.returncode == 0:
return
run(["drutil", "eject"])
# ── API submission ────────────────────────────────────────────────────────────
def submit(payload: dict) -> bool:
"""POST disc data to the videoDB API. Returns True on success."""
try:
resp = requests.post(
API_URL,
json=payload,
headers={"X-API-Token": API_TOKEN},
timeout=15,
)
if resp.status_code == 200:
data = resp.json()
print(f" [OK] Indexed as entry #{data.get('id', '?')}: {payload['title']}")
return True
else:
print(f" [ERR] API {resp.status_code}: {resp.text[:200]}")
return False
except requests.ConnectionError:
print(f" [ERR] Cannot reach {API_URL} — check VIDEODB_URL and network")
return False
except Exception as e:
print(f" [ERR] {e}")
return False
# ── Main scan ─────────────────────────────────────────────────────────────────
def scan_and_submit():
status = disc_status()
if not status:
return False
drutil_type = status.get("drutil_type", "")
device = status.get("device", "")
tracks = status.get("tracks", 0)
print(f" drutil type : {drutil_type}")
print(f" device : {device}")
print(f" tracks : {tracks}")
# Give macOS a moment to finish mounting
time.sleep(3)
mount = find_mount(device)
print(f" mount point : {mount or '(not mounted)'}")
mediatype_id, mediatype_name = classify(mount, drutil_type)
title = volume_label(mount, device)
size = disc_size_bytes(mount)
files = sample_files(mount)
# Build a short content summary for the 'comment' field
if tracks and mediatype_id == MEDIATYPE["cd"]:
summary = f"{tracks} audio tracks"
elif files:
summary = f"{len(files)} files"
if len(files) <= 10:
summary += ": " + ", ".join(Path(f).name for f in files[:10])
else:
summary = ""
payload = {
"title": title,
"mediatype": mediatype_id,
"comment": summary[:255],
"filesize": size,
"disklabel": title[:32],
"custom1": drutil_type[:255], # raw disc type string
"custom2": str(tracks) if tracks else str(len(files)), # track/file count
}
print(f"\n Submitting [{mediatype_name}] \"{title}\" ({size // (1024*1024)} MB)")
ok = submit(payload)
print(" Ejecting...")
eject(mount)
return ok
# ── Entry point ───────────────────────────────────────────────────────────────
def main():
print("=" * 55)
print(" MeDBia Disc Scanner")
print(f" API : {API_URL}")
print(f" Poll: every {POLL_SEC}s")
print("=" * 55)
print("Insert a disc to index it. Ctrl-C to stop.\n")
was_present = False
while True:
status = disc_status()
if status and not was_present:
was_present = True
print("Disc detected!")
scan_and_submit()
was_present = False # reset — disc was ejected
print("\nReady — insert next disc.\n")
elif not status and was_present:
# Disc manually removed before scan completed
was_present = False
time.sleep(POLL_SEC)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nStopped.")