mirror of
https://github.com/Yuvi9587/Kemono-Downloader.git
synced 2025-12-29 16:14:44 +00:00
Commit
This commit is contained in:
375
src/core/booru_client.py
Normal file
375
src/core/booru_client.py
Normal file
@@ -0,0 +1,375 @@
|
||||
# src/core/booru_client.py
|
||||
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import datetime
|
||||
import urllib.parse
|
||||
import requests
|
||||
import logging
|
||||
import cloudscraper
|
||||
# --- Start of Combined Code from 1.py ---
|
||||
|
||||
# Part 1: Essential Utilities & Exceptions
|
||||
|
||||
class BooruClientException(Exception):
|
||||
"""Base class for exceptions in this client."""
|
||||
pass
|
||||
|
||||
class HttpError(BooruClientException):
|
||||
"""HTTP request during data extraction failed."""
|
||||
def __init__(self, message="", response=None):
|
||||
self.response = response
|
||||
self.status = response.status_code if response else 0
|
||||
if response and not message:
|
||||
message = f"'{response.status_code} {response.reason}' for '{response.url}'"
|
||||
super().__init__(message)
|
||||
|
||||
class NotFoundError(BooruClientException):
|
||||
pass
|
||||
|
||||
def unquote(s):
|
||||
return urllib.parse.unquote(s)
|
||||
|
||||
def parse_datetime(date_string, fmt):
|
||||
try:
|
||||
# Assumes date_string is in a format that strptime can handle with timezone
|
||||
return datetime.datetime.strptime(date_string, fmt)
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
def nameext_from_url(url, data=None):
|
||||
if data is None: data = {}
|
||||
try:
|
||||
path = urllib.parse.urlparse(url).path
|
||||
filename = unquote(os.path.basename(path))
|
||||
if '.' in filename:
|
||||
name, ext = filename.rsplit('.', 1)
|
||||
data["filename"], data["extension"] = name, ext.lower()
|
||||
else:
|
||||
data["filename"], data["extension"] = filename, ""
|
||||
except Exception:
|
||||
data["filename"], data["extension"] = "", ""
|
||||
return data
|
||||
|
||||
USERAGENT_FIREFOX = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/118.0"
|
||||
|
||||
# Part 2: Core Extractor Logic
|
||||
|
||||
class Extractor:
|
||||
category = ""
|
||||
subcategory = ""
|
||||
directory_fmt = ("{category}", "{id}")
|
||||
filename_fmt = "{filename}.{extension}"
|
||||
_retries = 3
|
||||
_timeout = 30
|
||||
|
||||
def __init__(self, match, logger_func=print):
|
||||
self.url = match.string
|
||||
self.match = match
|
||||
self.groups = match.groups()
|
||||
self.session = cloudscraper.create_scraper()
|
||||
self.session.headers["User-Agent"] = USERAGENT_FIREFOX
|
||||
self.log = logger_func
|
||||
self.api_key = None
|
||||
self.user_id = None
|
||||
|
||||
def set_auth(self, api_key, user_id):
|
||||
self.api_key = api_key
|
||||
self.user_id = user_id
|
||||
self._init_auth()
|
||||
|
||||
def _init_auth(self):
|
||||
"""Placeholder for extractor-specific auth setup."""
|
||||
pass
|
||||
|
||||
def request(self, url, method="GET", fatal=True, **kwargs):
|
||||
for attempt in range(self._retries + 1):
|
||||
try:
|
||||
response = self.session.request(method, url, timeout=self._timeout, **kwargs)
|
||||
if response.status_code < 400:
|
||||
return response
|
||||
if response.status_code == 404 and fatal:
|
||||
raise NotFoundError(f"Resource not found at {url}")
|
||||
self.log(f"Request for {url} failed with status {response.status_code}. Retrying...")
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.log(f"Request for {url} failed: {e}. Retrying...")
|
||||
if attempt < self._retries:
|
||||
time.sleep(2 ** attempt)
|
||||
if fatal:
|
||||
raise HttpError(f"Failed to retrieve {url} after {self._retries} retries.")
|
||||
return None
|
||||
|
||||
def request_json(self, url, **kwargs):
|
||||
response = self.request(url, **kwargs)
|
||||
try:
|
||||
return response.json()
|
||||
except (ValueError, TypeError) as exc:
|
||||
self.log(f"Failed to decode JSON from {url}: {exc}")
|
||||
raise BooruClientException("Invalid JSON response")
|
||||
|
||||
def items(self):
|
||||
data = self.metadata()
|
||||
for item in self.posts():
|
||||
# Check for our special page update message
|
||||
if isinstance(item, tuple) and item[0] == 'PAGE_UPDATE':
|
||||
yield item
|
||||
continue
|
||||
|
||||
# Otherwise, process it as a post
|
||||
post = item
|
||||
url = post.get("file_url")
|
||||
if not url: continue
|
||||
|
||||
nameext_from_url(url, post)
|
||||
post["date"] = parse_datetime(post.get("created_at"), "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||
|
||||
if url.startswith("/"):
|
||||
url = self.root + url
|
||||
post['file_url'] = url # Ensure full URL
|
||||
|
||||
post.update(data)
|
||||
yield post
|
||||
|
||||
class BaseExtractor(Extractor):
|
||||
instances = ()
|
||||
|
||||
def __init__(self, match, logger_func=print):
|
||||
super().__init__(match, logger_func)
|
||||
self._init_category()
|
||||
|
||||
def _init_category(self):
|
||||
parsed_url = urllib.parse.urlparse(self.url)
|
||||
self.root = f"{parsed_url.scheme}://{parsed_url.netloc}"
|
||||
for i, group in enumerate(self.groups):
|
||||
if group is not None:
|
||||
try:
|
||||
self.category = self.instances[i][0]
|
||||
return
|
||||
except IndexError:
|
||||
continue
|
||||
|
||||
@classmethod
|
||||
def update(cls, instances):
|
||||
pattern_list = []
|
||||
instance_list = cls.instances = []
|
||||
for category, info in instances.items():
|
||||
root = info["root"].rstrip("/") if info["root"] else ""
|
||||
instance_list.append((category, root, info))
|
||||
pattern = info.get("pattern", re.escape(root.partition("://")[2]))
|
||||
pattern_list.append(f"({pattern})")
|
||||
return r"(?:https?://)?(?:" + "|".join(pattern_list) + r")"
|
||||
|
||||
# Part 3: Danbooru Extractor
|
||||
|
||||
class DanbooruExtractor(BaseExtractor):
|
||||
filename_fmt = "{category}_{id}_{filename}.{extension}"
|
||||
per_page = 200
|
||||
|
||||
def __init__(self, match, logger_func=print):
|
||||
super().__init__(match, logger_func)
|
||||
self._auth_logged = False
|
||||
|
||||
def _init_auth(self):
|
||||
if self.user_id and self.api_key:
|
||||
if not self._auth_logged:
|
||||
self.log("Danbooru auth set.")
|
||||
self._auth_logged = True
|
||||
self.session.auth = (self.user_id, self.api_key)
|
||||
|
||||
|
||||
def items(self):
|
||||
data = self.metadata()
|
||||
for item in self.posts():
|
||||
# Check for our special page update message
|
||||
if isinstance(item, tuple) and item[0] == 'PAGE_UPDATE':
|
||||
yield item
|
||||
continue
|
||||
|
||||
# Otherwise, process it as a post
|
||||
post = item
|
||||
url = post.get("file_url")
|
||||
if not url: continue
|
||||
|
||||
nameext_from_url(url, post)
|
||||
post["date"] = parse_datetime(post.get("created_at"), "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||
|
||||
if url.startswith("/"):
|
||||
url = self.root + url
|
||||
post['file_url'] = url # Ensure full URL
|
||||
|
||||
post.update(data)
|
||||
yield post
|
||||
|
||||
def metadata(self):
|
||||
return {}
|
||||
|
||||
def posts(self):
|
||||
return []
|
||||
|
||||
def _pagination(self, endpoint, params, prefix="b"):
|
||||
url = self.root + endpoint
|
||||
params["limit"] = self.per_page
|
||||
params["page"] = 1
|
||||
threshold = self.per_page - 20
|
||||
|
||||
while True:
|
||||
posts = self.request_json(url, params=params)
|
||||
if not posts: break
|
||||
yield ('PAGE_UPDATE', len(posts))
|
||||
yield from posts
|
||||
if len(posts) < threshold: return
|
||||
if prefix:
|
||||
params["page"] = f"{prefix}{posts[-1]['id']}"
|
||||
else:
|
||||
params["page"] += 1
|
||||
|
||||
BASE_PATTERN = DanbooruExtractor.update({
|
||||
"danbooru": {"root": None, "pattern": r"(?:danbooru|safebooru)\.donmai\.us"},
|
||||
})
|
||||
|
||||
class DanbooruTagExtractor(DanbooruExtractor):
|
||||
subcategory = "tag"
|
||||
directory_fmt = ("{category}", "{search_tags}")
|
||||
pattern = BASE_PATTERN + r"(/posts\?(?:[^&#]*&)*tags=([^&#]*))"
|
||||
|
||||
def metadata(self):
|
||||
self.tags = unquote(self.groups[-1].replace("+", " ")).strip()
|
||||
sanitized_tags = re.sub(r'[\\/*?:"<>|]', "_", self.tags)
|
||||
return {"search_tags": sanitized_tags}
|
||||
|
||||
def posts(self):
|
||||
return self._pagination("/posts.json", {"tags": self.tags})
|
||||
|
||||
class DanbooruPostExtractor(DanbooruExtractor):
|
||||
subcategory = "post"
|
||||
pattern = BASE_PATTERN + r"(/post(?:s|/show)/(\d+))"
|
||||
|
||||
def posts(self):
|
||||
post_id = self.groups[-1]
|
||||
url = f"{self.root}/posts/{post_id}.json"
|
||||
post = self.request_json(url)
|
||||
return (post,) if post else ()
|
||||
|
||||
class GelbooruBase(Extractor):
|
||||
category = "gelbooru"
|
||||
root = "https://gelbooru.com"
|
||||
|
||||
def __init__(self, match, logger_func=print):
|
||||
super().__init__(match, logger_func)
|
||||
self._auth_logged = False
|
||||
|
||||
def _api_request(self, params, key="post"):
|
||||
# Auth is now added dynamically
|
||||
if self.api_key and self.user_id:
|
||||
if not self._auth_logged:
|
||||
self.log("Gelbooru auth set.")
|
||||
self._auth_logged = True
|
||||
params.update({"api_key": self.api_key, "user_id": self.user_id})
|
||||
|
||||
url = self.root + "/index.php?page=dapi&q=index&json=1"
|
||||
data = self.request_json(url, params=params)
|
||||
|
||||
if not key: return data
|
||||
posts = data.get(key, [])
|
||||
return posts if isinstance(posts, list) else [posts] if posts else []
|
||||
|
||||
def items(self):
|
||||
base_data = self.metadata()
|
||||
base_data['category'] = self.category
|
||||
|
||||
for item in self.posts():
|
||||
# Check for our special page update message
|
||||
if isinstance(item, tuple) and item[0] == 'PAGE_UPDATE':
|
||||
yield item
|
||||
continue
|
||||
|
||||
# Otherwise, process it as a post
|
||||
post = item
|
||||
url = post.get("file_url")
|
||||
if not url: continue
|
||||
|
||||
data = base_data.copy()
|
||||
data.update(post)
|
||||
nameext_from_url(url, data)
|
||||
yield data
|
||||
|
||||
def metadata(self): return {}
|
||||
def posts(self): return []
|
||||
|
||||
GELBOORU_PATTERN = r"(?:https?://)?(?:www\.)?gelbooru\.com"
|
||||
|
||||
class GelbooruTagExtractor(GelbooruBase):
|
||||
subcategory = "tag"
|
||||
directory_fmt = ("{category}", "{search_tags}")
|
||||
filename_fmt = "{category}_{id}_{md5}.{extension}"
|
||||
pattern = GELBOORU_PATTERN + r"(/index\.php\?page=post&s=list&tags=([^&#]*))"
|
||||
|
||||
def metadata(self):
|
||||
self.tags = unquote(self.groups[-1].replace("+", " ")).strip()
|
||||
sanitized_tags = re.sub(r'[\\/*?:"<>|]', "_", self.tags)
|
||||
return {"search_tags": sanitized_tags}
|
||||
|
||||
def posts(self):
|
||||
"""Scrapes HTML search pages as API can be restrictive for tags."""
|
||||
pid = 0
|
||||
posts_per_page = 42
|
||||
search_url = self.root + "/index.php"
|
||||
params = {"page": "post", "s": "list", "tags": self.tags}
|
||||
|
||||
while True:
|
||||
params['pid'] = pid
|
||||
self.log(f"Scraping search results page (offset: {pid})...")
|
||||
response = self.request(search_url, params=params)
|
||||
html_content = response.text
|
||||
post_ids = re.findall(r'id="p(\d+)"', html_content)
|
||||
|
||||
if not post_ids:
|
||||
self.log("No more posts found on page. Ending scrape.")
|
||||
break
|
||||
yield ('PAGE_UPDATE', len(post_ids))
|
||||
for post_id in post_ids:
|
||||
post_data = self._api_request({"s": "post", "id": post_id})
|
||||
yield from post_data
|
||||
|
||||
pid += posts_per_page
|
||||
|
||||
class GelbooruPostExtractor(GelbooruBase):
|
||||
subcategory = "post"
|
||||
filename_fmt = "{category}_{id}_{md5}.{extension}"
|
||||
pattern = GELBOORU_PATTERN + r"(/index\.php\?page=post&s=view&id=(\d+))"
|
||||
|
||||
def posts(self):
|
||||
post_id = self.groups[-1]
|
||||
return self._api_request({"s": "post", "id": post_id})
|
||||
|
||||
# --- Main Entry Point ---
|
||||
|
||||
EXTRACTORS = [
|
||||
DanbooruTagExtractor,
|
||||
DanbooruPostExtractor,
|
||||
GelbooruTagExtractor,
|
||||
GelbooruPostExtractor,
|
||||
]
|
||||
|
||||
def find_extractor(url, logger_func):
|
||||
for extractor_cls in EXTRACTORS:
|
||||
match = re.search(extractor_cls.pattern, url)
|
||||
if match:
|
||||
return extractor_cls(match, logger_func)
|
||||
return None
|
||||
|
||||
def fetch_booru_data(url, api_key, user_id, logger_func):
|
||||
"""
|
||||
Main function to find an extractor and yield image data.
|
||||
"""
|
||||
extractor = find_extractor(url, logger_func)
|
||||
if not extractor:
|
||||
logger_func(f"No suitable Booru extractor found for URL: {url}")
|
||||
return
|
||||
|
||||
logger_func(f"Using extractor: {extractor.__class__.__name__}")
|
||||
extractor.set_auth(api_key, user_id)
|
||||
|
||||
# The 'items' method will now yield the data dictionaries directly
|
||||
yield from extractor.items()
|
||||
Reference in New Issue
Block a user