import logging import os import re import requests import html import time import datetime import urllib.parse import json import random import binascii import itertools class MockMessage: Directory = 1 Url = 2 Version = 3 class AlbumException(Exception): pass class ExtractionError(AlbumException): pass class HttpError(ExtractionError): def __init__(self, message="", response=None): self.response = response self.status = response.status_code if response is not None else 0 super().__init__(message) class ControlException(AlbumException): pass class AbortExtraction(ExtractionError, ControlException): pass try: re_compile = re._compiler.compile except AttributeError: re_compile = re.sre_compile.compile HTML_RE = re_compile(r"<[^>]+>") def extr(txt, begin, end, default=""): try: first = txt.index(begin) + len(begin) return txt[first:txt.index(end, first)] except Exception: return default def extract_iter(txt, begin, end, pos=None): try: index = txt.index lbeg = len(begin) lend = len(end) while True: first = index(begin, pos) + lbeg last = index(end, first) pos = last + lend yield txt[first:last] except Exception: return def split_html(txt): try: return [html.unescape(x).strip() for x in HTML_RE.split(txt) if x and not x.isspace()] except TypeError: return [] def parse_datetime(date_string, format="%Y-%m-%dT%H:%M:%S%z", utcoffset=0): try: d = datetime.datetime.strptime(date_string, format) o = d.utcoffset() if o is not None: d = d.replace(tzinfo=None, microsecond=0) - o else: if d.microsecond: d = d.replace(microsecond=0) if utcoffset: d += datetime.timedelta(0, utcoffset * -3600) return d except (TypeError, IndexError, KeyError, ValueError, OverflowError): return None unquote = urllib.parse.unquote unescape = html.unescape def decrypt_xor(encrypted, key, base64=True, fromhex=False): if base64: encrypted = binascii.a2b_base64(encrypted) if fromhex: encrypted = bytes.fromhex(encrypted.decode()) div = len(key) return bytes([encrypted[i] ^ key[i % div] for i in range(len(encrypted))]).decode() def advance(iterable, num): iterator = iter(iterable) next(itertools.islice(iterator, num, num), None) return iterator def json_loads(s): return json.loads(s) def json_dumps(obj): return json.dumps(obj, separators=(",", ":")) class Extractor: def __init__(self, match, logger): self.log = logger self.url = match.string self.match = match self.groups = match.groups() self.session = requests.Session() self.session.headers["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:102.0) Gecko/20100101 Firefox/102.0" @classmethod def from_url(cls, url, logger): if isinstance(cls.pattern, str): cls.pattern = re.compile(cls.pattern) match = cls.pattern.match(url) return cls(match, logger) if match else None def __iter__(self): return self.items() def items(self): yield MockMessage.Version, 1 def request(self, url, method="GET", fatal=True, **kwargs): tries = 1 while True: try: response = self.session.request(method, url, **kwargs) if response.status_code < 400: return response msg = f"'{response.status_code} {response.reason}' for '{response.url}'" except requests.exceptions.RequestException as exc: msg = str(exc) self.log.info("%s (retrying...)", msg) if tries > 4: break time.sleep(tries) tries += 1 if not fatal: return None raise HttpError(msg) def request_json(self, url, **kwargs): response = self.request(url, **kwargs) try: return json_loads(response.text) except Exception as exc: self.log.warning("%s: %s", exc.__class__.__name__, exc) if not kwargs.get("fatal", True): return {} raise BASE_PATTERN_BUNKR = r"(?:https?://)?(?:[a-zA-Z0-9-]+\.)?(bunkr\.(?:si|la|ws|red|black|media|site|is|to|ac|cr|ci|fi|pk|ps|sk|ph|su)|bunkrr\.ru)" DOMAINS = ["bunkr.si", "bunkr.ws", "bunkr.la", "bunkr.red", "bunkr.black", "bunkr.media", "bunkr.site"] CF_DOMAINS = set() class BunkrAlbumExtractor(Extractor): category = "bunkr" root = "https://bunkr.si" root_dl = "https://get.bunkrr.su" root_api = "https://apidl.bunkr.ru" pattern = re.compile(BASE_PATTERN_BUNKR + r"/a/([^/?#]+)") def __init__(self, match, logger): super().__init__(match, logger) domain_match = re.search(BASE_PATTERN_BUNKR, match.string) if domain_match: self.root = "https://" + domain_match.group(1) self.endpoint = self.root_api + "/api/_001_v2" self.album_id = self.groups[-1] def items(self): page = self.request(self.url).text title = unescape(unescape(extr(page, 'property="og:title" content="', '"'))) items_html = list(extract_iter(page, '