Files
MeDBia/scanner/scan_disc.py
Malin 451afc0440 fix: add libonig-dev for mbstring; update scanner for data discs
Dockerfile: add libonig-dev (oniguruma) — required by mbstring extension

scanner/scan_disc.py:
- Treat all discs as data discs (mp4/mkv/etc), no VIDEO_TS/BDMV logic
- List video files by extension (.mp4 .mkv .avi .mov .m4v .ts .m2ts …)
- Infer media type from drutil type string, fall back to used_gb capacity
  (>8 GB → Blu-ray, >0.68 GB → DVD, smaller → CD)
- Store video file names in comment field, count in custom2

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-11 10:12:34 +02:00

322 lines
10 KiB
Python

#!/usr/bin/env python3
"""
MeDBia Disc Scanner — macOS client
===================================
Polls the optical drive, reads the disc content (video/data files),
posts to the remote videoDB API, then ejects the disc.
All discs are treated as data discs containing media files (mp4, mkv, etc.).
Media type is inferred from disc capacity reported by drutil.
Setup:
pip3 install requests
export VIDEODB_URL=http://your-server:6761
export VIDEODB_TOKEN=change_this_secret_token
python3 scan_disc.py
"""
import os
import re
import sys
import time
import subprocess
from pathlib import Path
try:
import requests
except ImportError:
sys.exit("Install requests first: pip3 install requests")
# ── Config ─────────────────────────────────────────────────────────────────────
API_URL = os.environ.get("VIDEODB_URL", "http://your-server:6761").rstrip("/") + "/api_ingest.php"
API_TOKEN = os.environ.get("VIDEODB_TOKEN", "change_this_secret_token")
POLL_SEC = int(os.environ.get("POLL_INTERVAL", "5"))
# videoDB mediatype IDs (from install.sql)
MT_DVD = 1
MT_BLURAY = 16
MT_CD = 18
# Video file extensions to list in the index
VIDEO_EXT = {
".mp4", ".mkv", ".avi", ".mov", ".m4v", ".ts", ".m2ts",
".wmv", ".flv", ".webm", ".vob", ".mpg", ".mpeg", ".iso",
}
# System volumes to ignore when scanning /Volumes/
IGNORE_VOLUMES = {"Macintosh HD", "Preboot", "Recovery", "VM", "Data", "Update"}
# ── Shell helper ───────────────────────────────────────────────────────────────
def run(cmd: list) -> str:
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=20)
return r.stdout
except Exception:
return ""
# ── Disc presence ──────────────────────────────────────────────────────────────
def disc_status() -> dict | None:
"""Returns disc info dict, or None when no disc is present."""
out = run(["drutil", "status"])
if not out or "No Media" in out:
return None
info: dict = {}
m = re.search(r"Type:\s+(.+?)(?:\s{3,}|$)", out, re.MULTILINE)
if m:
info["drutil_type"] = m.group(1).strip()
m = re.search(r"Name:\s+(/dev/\S+)", out)
if m:
info["device"] = m.group(1)
# Space Used in GB (e.g. "Space Used: 7.88 GB")
m = re.search(r"Space Used:\s+([\d.]+)\s*GB", out)
if m:
info["used_gb"] = float(m.group(1))
return info
# ── Mount ──────────────────────────────────────────────────────────────────────
def find_mount(device: str) -> str | None:
"""Return the mount point for the optical disc."""
for line in run(["mount"]).splitlines():
if device in line:
m = re.search(r" on (/Volumes/[^\s(]+)", line)
if m:
return m.group(1)
# Fallback: first non-system entry in /Volumes/
try:
volumes = set(os.listdir("/Volumes/")) - IGNORE_VOLUMES
if volumes:
return f"/Volumes/{sorted(volumes)[0]}"
except Exception:
pass
return None
# ── Media type from disc capacity ──────────────────────────────────────────────
def mediatype_from_size(used_gb: float) -> tuple[int, str]:
"""
Infer videoDB mediatype from used disc capacity.
Blu-ray discs hold 25/50 GB; DVDs hold ~4.7/8.5 GB; CDs ~0.7 GB.
"""
if used_gb > 8.0:
return MT_BLURAY, "Blu-ray"
if used_gb > 0.68:
return MT_DVD, "DVD"
return MT_CD, "CD"
def mediatype_from_drutil(drutil_type: str) -> tuple[int, str] | None:
"""Parse drutil type string if available."""
t = drutil_type.upper()
if "BD" in t:
return MT_BLURAY, "Blu-ray"
if "DVD" in t:
return MT_DVD, "DVD"
if "CD" in t:
return MT_CD, "CD"
return None
# ── File listing ───────────────────────────────────────────────────────────────
def list_video_files(mount: str) -> list[str]:
"""Return relative paths of all video files on the disc."""
found = []
try:
for root, _dirs, files in os.walk(mount):
for f in files:
if Path(f).suffix.lower() in VIDEO_EXT:
rel = os.path.relpath(os.path.join(root, f), mount)
found.append(rel)
except PermissionError:
pass
return sorted(found)
def all_files_count(mount: str) -> int:
"""Count every file on the disc (for discs with no video files)."""
count = 0
try:
for _root, _dirs, files in os.walk(mount):
count += len(files)
except PermissionError:
pass
return count
def disc_size_bytes(mount: str) -> int:
"""Used space in bytes via df."""
out = run(["df", "-k", mount])
for line in out.splitlines()[1:]:
parts = line.split()
if len(parts) >= 3:
try:
return int(parts[2]) * 1024
except ValueError:
pass
return 0
# ── Volume label ───────────────────────────────────────────────────────────────
def volume_label(mount: str | None, device: str) -> str:
if mount:
label = os.path.basename(mount)
if label:
return label
if device:
for line in run(["diskutil", "info", device]).splitlines():
if "Volume Name" in line:
return line.split(":", 1)[-1].strip()
return "Unknown Disc"
# ── Eject ──────────────────────────────────────────────────────────────────────
def eject(mount: str | None):
if mount:
r = subprocess.run(["diskutil", "eject", mount], capture_output=True)
if r.returncode == 0:
return
run(["drutil", "eject"])
# ── API submission ─────────────────────────────────────────────────────────────
def submit(payload: dict) -> bool:
try:
resp = requests.post(
API_URL,
json=payload,
headers={"X-API-Token": API_TOKEN},
timeout=15,
)
if resp.status_code == 200:
data = resp.json()
print(f" [OK] Entry #{data.get('id', '?')}: {payload['title']}")
return True
else:
print(f" [ERR] API {resp.status_code}: {resp.text[:300]}")
return False
except requests.ConnectionError:
print(f" [ERR] Cannot reach {API_URL}")
print(f" Check VIDEODB_URL and that the server is up.")
return False
except Exception as e:
print(f" [ERR] {e}")
return False
# ── Main scan routine ──────────────────────────────────────────────────────────
def scan_and_submit():
status = disc_status()
if not status:
return False
drutil_type = status.get("drutil_type", "")
device = status.get("device", "")
used_gb = status.get("used_gb", 0.0)
print(f" drutil type : {drutil_type}")
print(f" device : {device}")
print(f" used : {used_gb:.2f} GB")
# Give macOS a moment to finish mounting the filesystem
time.sleep(4)
mount = find_mount(device)
print(f" mount : {mount or '(not mounted)'}")
# Determine media type — prefer drutil string, fall back to size
mt = mediatype_from_drutil(drutil_type)
if mt:
mediatype_id, mediatype_name = mt
else:
mediatype_id, mediatype_name = mediatype_from_size(used_gb)
title = volume_label(mount, device)
size_b = disc_size_bytes(mount) if mount else int(used_gb * 1024**3)
# Build file listing
video_files = list_video_files(mount) if mount else []
total_files = all_files_count(mount) if mount else 0
if video_files:
# List video file names (not full paths) for the comment field
names = [Path(f).name for f in video_files]
summary = f"{len(video_files)} video files: " + ", ".join(names[:10])
if len(names) > 10:
summary += f" … +{len(names) - 10} more"
# Store full file list in custom2 (255 char limit — truncate gracefully)
file_detail = "\n".join(video_files)
else:
summary = f"{total_files} files (no video files detected)"
file_detail = ""
payload = {
"title": title,
"mediatype": mediatype_id,
"comment": summary[:255],
"filesize": size_b,
"disklabel": title[:32],
"custom1": drutil_type[:255],
"custom2": str(len(video_files) or total_files),
}
print(f"\n [{mediatype_name}] \"{title}\"")
print(f" {summary[:120]}")
ok = submit(payload)
print(" Ejecting disc...")
eject(mount)
return ok
# ── Entry point ────────────────────────────────────────────────────────────────
def main():
print("=" * 55)
print(" MeDBia Disc Scanner")
print(f" API : {API_URL}")
print(f" Poll : every {POLL_SEC}s")
print("=" * 55)
print("Insert a disc to index it. Ctrl-C to stop.\n")
was_present = False
while True:
status = disc_status()
if status and not was_present:
was_present = True
print("Disc detected!")
scan_and_submit()
was_present = False
print("\nReady — insert next disc.\n")
elif not status and was_present:
was_present = False
time.sleep(POLL_SEC)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nStopped.")