import logging import os import re import requests import html import time import datetime import urllib.parse import json import random import binascii import itertools class MockMessage: Directory = 1 Url = 2 Version = 3 class AlbumException(Exception): pass class ExtractionError(AlbumException): pass class HttpError(ExtractionError): def __init__(self, message="", response=None): self.response = response self.status = response.status_code if response is not None else 0 super().__init__(message) class ControlException(AlbumException): pass class AbortExtraction(ExtractionError, ControlException): pass try: re_compile = re._compiler.compile except AttributeError: re_compile = re.sre_compile.compile HTML_RE = re_compile(r"<[^>]+>") def extr(txt, begin, end, default=""): try: first = txt.index(begin) + len(begin) return txt[first:txt.index(end, first)] except Exception: return default def extract_iter(txt, begin, end, pos=None): try: index = txt.index lbeg = len(begin) lend = len(end) while True: first = index(begin, pos) + lbeg last = index(end, first) pos = last + lend yield txt[first:last] except Exception: return def split_html(txt): try: return [html.unescape(x).strip() for x in HTML_RE.split(txt) if x and not x.isspace()] except TypeError: return [] def parse_datetime(date_string, format="%Y-%m-%dT%H:%M:%S%z", utcoffset=0): try: d = datetime.datetime.strptime(date_string, format) o = d.utcoffset() if o is not None: d = d.replace(tzinfo=None, microsecond=0) - o else: if d.microsecond: d = d.replace(microsecond=0) if utcoffset: d += datetime.timedelta(0, utcoffset * -3600) return d except (TypeError, IndexError, KeyError, ValueError, OverflowError): return None unquote = urllib.parse.unquote unescape = html.unescape def decrypt_xor(encrypted, key, base64=True, fromhex=False): if base64: encrypted = binascii.a2b_base64(encrypted) if fromhex: encrypted = bytes.fromhex(encrypted.decode()) div = len(key) return bytes([encrypted[i] ^ key[i % div] for i in range(len(encrypted))]).decode() def advance(iterable, num): iterator = iter(iterable) next(itertools.islice(iterator, num, num), None) return iterator def json_loads(s): return json.loads(s) def json_dumps(obj): return json.dumps(obj, separators=(",", ":")) class Extractor: def __init__(self, match, logger): self.log = logger self.url = match.string self.match = match self.groups = match.groups() self.session = requests.Session() self.session.headers["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0" @classmethod def from_url(cls, url, logger): if isinstance(cls.pattern, str): cls.pattern = re.compile(cls.pattern) match = cls.pattern.match(url) return cls(match, logger) if match else None def __iter__(self): return self.items() def items(self): yield MockMessage.Version, 1 def request(self, url, method="GET", fatal=True, **kwargs): tries = 1 while True: try: response = self.session.request(method, url, **kwargs) if response.status_code < 400: return response msg = f"'{response.status_code} {response.reason}' for '{response.url}'" except requests.exceptions.RequestException as exc: msg = str(exc) self.log.info("%s (retrying...)", msg) if tries > 4: break time.sleep(tries) tries += 1 if not fatal: return None raise HttpError(msg) def request_json(self, url, **kwargs): response = self.request(url, **kwargs) try: return json_loads(response.text) except Exception as exc: self.log.warning("%s: %s", exc.__class__.__name__, exc) if not kwargs.get("fatal", True): return {} raise BASE_PATTERN_BUNKR = r"(?:https?://)?(?:[a-zA-Z0-9-]+\.)?(bunkr\.(?:si|la|ws|red|black|media|site|is|to|ac|cr|ci|fi|pk|ps|sk|ph|su)|bunkrr\.ru)" DOMAINS = ["bunkr.si", "bunkr.ws", "bunkr.la", "bunkr.red", "bunkr.black", "bunkr.media", "bunkr.site"] CF_DOMAINS = set() class BunkrAlbumExtractor(Extractor): category = "bunkr" root = "https://bunkr.si" root_dl = "https://get.bunkrr.su" root_api = "https://apidl.bunkr.ru" pattern = re.compile(BASE_PATTERN_BUNKR + r"/a/([^/?#]+)") def __init__(self, match, logger): super().__init__(match, logger) domain_match = re.search(BASE_PATTERN_BUNKR, match.string) if domain_match: self.root = "https://" + domain_match.group(1) self.endpoint = self.root_api + "/api/_001_v2" self.album_id = self.groups[-1] def items(self): page = self.request(self.url).text title = unescape(unescape(extr(page, 'property="og:title" content="', '"'))) items_html = list(extract_iter(page, '
")) album_data = { "album_id": self.album_id, "album_name": title, "count": len(items_html), } yield MockMessage.Directory, album_data, {} for item_html in items_html: try: webpage_url = unescape(extr(item_html, ' href="', '"')) if webpage_url.startswith("/"): webpage_url = self.root + webpage_url file_data = self._extract_file(webpage_url) info = split_html(item_html) if not file_data.get("name"): file_data["name"] = info[-3] yield MockMessage.Url, file_data, {} except Exception as exc: self.log.error("%s: %s", exc.__class__.__name__, exc) def _extract_file(self, webpage_url): page = self.request(webpage_url).text data_id = extr(page, 'data-file-id="', '"') # This referer is for the API call only api_referer = self.root_dl + "/file/" + data_id headers = {"Referer": api_referer, "Origin": self.root_dl} data = self.request_json(self.endpoint, method="POST", headers=headers, json={"id": data_id}) # Get the raw file URL (no domain replacement) file_url = decrypt_xor(data["url"], f"SECRET_KEY_{data['timestamp'] // 3600}".encode()) if data.get("encrypted") else data["url"] file_name = extr(page, "")[2] # --- NEW FIX --- # The download thread uses a new `requests` call, so we must # explicitly pass BOTH the User-Agent and the correct Referer. # 1. Get the User-Agent from this extractor's session user_agent = self.session.headers.get("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0") # 2. Use the original album URL as the Referer download_referer = self.url return { "url": file_url, "name": unescape(file_name), "_http_headers": { "Referer": download_referer, "User-Agent": user_agent } } class BunkrMediaExtractor(BunkrAlbumExtractor): pattern = re.compile(BASE_PATTERN_BUNKR + r"(/[fvid]/[^/?#]+)") def items(self): try: media_path = self.groups[-1] file_data = self._extract_file(self.root + media_path) album_data = {"album_name": file_data.get("name", "bunkr_media"), "count": 1} yield MockMessage.Directory, album_data, {} yield MockMessage.Url, file_data, {} except Exception as exc: self.log.error("%s: %s", exc.__class__.__name__, exc) yield MockMessage.Directory, {"album_name": "error", "count": 0}, {} def get_bunkr_extractor(url, logger): """Selects the correct Bunkr extractor based on the URL pattern.""" if BunkrAlbumExtractor.pattern.match(url): logger.info("Bunkr Album URL detected.") return BunkrAlbumExtractor.from_url(url, logger) elif BunkrMediaExtractor.pattern.match(url): logger.info("Bunkr Media URL detected.") return BunkrMediaExtractor.from_url(url, logger) else: logger.error(f"No suitable Bunkr extractor found for URL: {url}") return None def fetch_bunkr_data(url, logger): """ Main function to be called from the GUI. It extracts all file information from a Bunkr URL, now handling both albums and direct file links. Returns: A tuple of (album_name, list_of_files) - album_name (str): The name of the album. - list_of_files (list): A list of dicts, each containing 'url', 'name', and '_http_headers'. Returns (None, None) on failure. """ # --- START: New logic to handle direct CDN file URLs --- try: parsed_url = urllib.parse.urlparse(url) # Check if the hostname contains 'cdn' and the path has a common file extension is_direct_cdn_file = (parsed_url.hostname and 'cdn' in parsed_url.hostname and 'bunkr' in parsed_url.hostname and any(parsed_url.path.lower().endswith(ext) for ext in ['.mp4', '.mkv', '.webm', '.jpg', '.jpeg', '.png', '.gif', '.zip', '.rar'])) if is_direct_cdn_file: logger.info("Bunkr direct file URL detected.") filename = os.path.basename(parsed_url.path) # Use the filename (without extension) as a sensible album name album_name = os.path.splitext(filename)[0] files_to_download = [{ 'url': url, 'name': filename, '_http_headers': {'Referer': 'https://bunkr.ru/'} # Use a generic Referer }] return album_name, files_to_download except Exception as e: logger.warning(f"Could not parse Bunkr URL for direct file check: {e}") # --- END: New logic --- # This is the original logic for album and media pages extractor = get_bunkr_extractor(url, logger) if not extractor: return None, None try: album_name = "default_bunkr_album" files_to_download = [] for msg_type, data, metadata in extractor: if msg_type == MockMessage.Directory: raw_album_name = data.get('album_name', 'untitled') album_name = re.sub(r'[<>:"/\\|?*]', '_', raw_album_name).strip() or "untitled" logger.info(f"Processing Bunkr album: {album_name}") elif msg_type == MockMessage.Url: files_to_download.append(data) if not files_to_download: logger.warning("No files found to download from the Bunkr URL.") return None, None return album_name, files_to_download except Exception as e: logger.error(f"An error occurred while extracting Bunkr info: {e}", exc_info=True) return None, None