This commit is contained in:
Yuvi9587
2025-07-27 06:32:15 -07:00
parent 9db89cfad0
commit e3dd0e70b6
9 changed files with 508 additions and 254 deletions

View File

@@ -120,7 +120,7 @@ def download_from_api(
selected_cookie_file=None,
app_base_dir=None,
manga_filename_style_for_sort_check=None,
processed_post_ids=None # --- ADD THIS ARGUMENT ---
processed_post_ids=None
):
headers = {
'User-Agent': 'Mozilla/5.0',
@@ -139,9 +139,19 @@ def download_from_api(
parsed_input_url_for_domain = urlparse(api_url_input)
api_domain = parsed_input_url_for_domain.netloc
if not any(d in api_domain.lower() for d in ['kemono.su', 'kemono.party', 'coomer.su', 'coomer.party']):
logger(f"⚠️ Unrecognized domain '{api_domain}' from input URL. Defaulting to kemono.su for API calls.")
api_domain = "kemono.su"
fallback_api_domain = None
# --- START: MODIFIED DOMAIN LOGIC WITH FALLBACK ---
if 'kemono.cr' in api_domain.lower():
fallback_api_domain = 'kemono.su'
elif 'coomer.st' in api_domain.lower():
fallback_api_domain = 'coomer.su'
elif not any(d in api_domain.lower() for d in ['kemono.su', 'kemono.party', 'kemono.cr', 'coomer.su', 'coomer.party', 'coomer.st']):
logger(f"⚠️ Unrecognized domain '{api_domain}'. Defaulting to kemono.cr with fallback to kemono.su.")
api_domain = "kemono.cr"
fallback_api_domain = "kemono.su"
# --- END: MODIFIED DOMAIN LOGIC WITH FALLBACK ---
cookies_for_api = None
if use_cookie and app_base_dir:
cookies_for_api = prepare_cookies_for_request(use_cookie, cookie_text, selected_cookie_file, app_base_dir, logger, target_domain=api_domain)
@@ -178,7 +188,6 @@ def download_from_api(
logger("⚠️ Page range (start/end page) is ignored when a specific post URL is provided (searching all pages for the post).")
is_manga_mode_fetch_all_and_sort_oldest_first = manga_mode and (manga_filename_style_for_sort_check != STYLE_DATE_POST_TITLE) and not target_post_id
api_base_url = f"https://{api_domain}/api/v1/{service}/user/{user_id}"
page_size = 50
if is_manga_mode_fetch_all_and_sort_oldest_first:
logger(f" Manga Mode (Style: {manga_filename_style_for_sort_check if manga_filename_style_for_sort_check else 'Default'} - Oldest First Sort Active): Fetching all posts to sort by date...")
@@ -191,6 +200,12 @@ def download_from_api(
logger(f" Manga Mode: Starting fetch from page 1 (offset 0).")
if end_page:
logger(f" Manga Mode: Will fetch up to page {end_page}.")
# --- START: MANGA MODE FALLBACK LOGIC ---
is_first_page_attempt_manga = True
api_base_url_manga = f"https://{api_domain}/api/v1/{service}/user/{user_id}"
# --- END: MANGA MODE FALLBACK LOGIC ---
while True:
if pause_event and pause_event.is_set():
logger(" Manga mode post fetching paused...")
@@ -208,7 +223,10 @@ def download_from_api(
logger(f" Manga Mode: Reached specified end page ({end_page}). Stopping post fetch.")
break
try:
posts_batch_manga = fetch_posts_paginated(api_base_url, headers, current_offset_manga, logger, cancellation_event, pause_event, cookies_dict=cookies_for_api)
# --- START: MANGA MODE FALLBACK EXECUTION ---
posts_batch_manga = fetch_posts_paginated(api_base_url_manga, headers, current_offset_manga, logger, cancellation_event, pause_event, cookies_dict=cookies_for_api)
is_first_page_attempt_manga = False # Success, no need to fallback
# --- END: MANGA MODE FALLBACK EXECUTION ---
if not isinstance(posts_batch_manga, list):
logger(f"❌ API Error (Manga Mode): Expected list of posts, got {type(posts_batch_manga)}.")
break
@@ -220,9 +238,21 @@ def download_from_api(
logger(f" Manga Mode: No posts found within the specified page range ({start_page or 1}-{end_page}).")
break
all_posts_for_manga_mode.extend(posts_batch_manga)
logger(f"MANGA_FETCH_PROGRESS:{len(all_posts_for_manga_mode)}:{current_page_num_manga}")
current_offset_manga += page_size
time.sleep(0.6)
except RuntimeError as e:
# --- START: MANGA MODE FALLBACK HANDLING ---
if is_first_page_attempt_manga and fallback_api_domain:
logger(f" ⚠️ Initial API fetch (Manga Mode) from '{api_domain}' failed: {e}")
logger(f" ↪️ Falling back to old domain: '{fallback_api_domain}'")
api_domain = fallback_api_domain
api_base_url_manga = f"https://{api_domain}/api/v1/{service}/user/{user_id}"
is_first_page_attempt_manga = False
continue # Retry the same offset with the new domain
# --- END: MANGA MODE FALLBACK HANDLING ---
if "cancelled by user" in str(e).lower():
logger(f" Manga mode pagination stopped due to cancellation: {e}")
else:
@@ -232,7 +262,12 @@ def download_from_api(
logger(f"❌ Unexpected error during manga mode fetch: {e}")
traceback.print_exc()
break
if cancellation_event and cancellation_event.is_set(): return
if all_posts_for_manga_mode:
logger(f"MANGA_FETCH_COMPLETE:{len(all_posts_for_manga_mode)}")
if all_posts_for_manga_mode:
if processed_post_ids:
original_count = len(all_posts_for_manga_mode)
@@ -278,6 +313,12 @@ def download_from_api(
current_offset = (start_page - 1) * page_size
current_page_num = start_page
logger(f" Starting from page {current_page_num} (calculated offset {current_offset}).")
# --- START: STANDARD PAGINATION FALLBACK LOGIC ---
is_first_page_attempt = True
api_base_url = f"https://{api_domain}/api/v1/{service}/user/{user_id}"
# --- END: STANDARD PAGINATION FALLBACK LOGIC ---
while True:
if pause_event and pause_event.is_set():
logger(" Post fetching loop paused...")
@@ -296,11 +337,23 @@ def download_from_api(
logger(f"✅ Reached specified end page ({end_page}) for creator feed. Stopping.")
break
try:
# --- START: STANDARD PAGINATION FALLBACK EXECUTION ---
posts_batch = fetch_posts_paginated(api_base_url, headers, current_offset, logger, cancellation_event, pause_event, cookies_dict=cookies_for_api)
is_first_page_attempt = False # Success, no more fallbacks needed
# --- END: STANDARD PAGINATION FALLBACK EXECUTION ---
if not isinstance(posts_batch, list):
logger(f"❌ API Error: Expected list of posts, got {type(posts_batch)} at page {current_page_num} (offset {current_offset}).")
break
except RuntimeError as e:
# --- START: STANDARD PAGINATION FALLBACK HANDLING ---
if is_first_page_attempt and fallback_api_domain:
logger(f" ⚠️ Initial API fetch from '{api_domain}' failed: {e}")
logger(f" ↪️ Falling back to old domain: '{fallback_api_domain}'")
api_domain = fallback_api_domain
api_base_url = f"https://{api_domain}/api/v1/{service}/user/{user_id}"
is_first_page_attempt = False
continue # Retry the same offset with the new domain
# --- END: STANDARD PAGINATION FALLBACK HANDLING ---
if "cancelled by user" in str(e).lower():
logger(f" Pagination stopped due to cancellation: {e}")
else:
@@ -340,4 +393,4 @@ def download_from_api(
current_page_num += 1
time.sleep(0.6)
if target_post_id and not processed_target_post_flag and not (cancellation_event and cancellation_event.is_set()):
logger(f"❌ Target post {target_post_id} could not be found after checking all relevant pages (final check after loop).")
logger(f"❌ Target post {target_post_id} could not be found after checking all relevant pages (final check after loop).")

View File

@@ -5,11 +5,10 @@ import json
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
from .api_client import download_from_api
from .workers import PostProcessorWorker, DownloadThread
from .workers import PostProcessorWorker
from ..config.constants import (
STYLE_DATE_BASED, STYLE_POST_TITLE_GLOBAL_NUMBERING,
MAX_THREADS, POST_WORKER_BATCH_THRESHOLD, POST_WORKER_NUM_BATCHES,
POST_WORKER_BATCH_DELAY_SECONDS
MAX_THREADS
)
from ..utils.file_utils import clean_folder_name
@@ -44,6 +43,7 @@ class DownloadManager:
self.creator_profiles_dir = None
self.current_creator_name_for_profile = None
self.current_creator_profile_path = None
self.session_file_path = None
def _log(self, message):
"""Puts a progress message into the queue for the UI."""
@@ -61,12 +61,16 @@ class DownloadManager:
if self.is_running:
self._log("❌ Cannot start a new session: A session is already in progress.")
return
self.session_file_path = config.get('session_file_path')
creator_profile_data = self._setup_creator_profile(config)
creator_profile_data['settings'] = config
creator_profile_data.setdefault('processed_post_ids', [])
self._save_creator_profile(creator_profile_data)
self._log(f"✅ Loaded/created profile for '{self.current_creator_name_for_profile}'. Settings saved.")
# Save settings to profile at the start of the session
if self.current_creator_profile_path:
creator_profile_data['settings'] = config
creator_profile_data.setdefault('processed_post_ids', [])
self._save_creator_profile(creator_profile_data)
self._log(f"✅ Loaded/created profile for '{self.current_creator_name_for_profile}'. Settings saved.")
self.is_running = True
self.cancellation_event.clear()
@@ -77,6 +81,7 @@ class DownloadManager:
self.total_downloads = 0
self.total_skips = 0
self.all_kept_original_filenames = []
is_single_post = bool(config.get('target_post_id_from_initial_url'))
use_multithreading = config.get('use_multithreading', True)
is_manga_sequential = config.get('manga_mode_active') and config.get('manga_filename_style') in [STYLE_DATE_BASED, STYLE_POST_TITLE_GLOBAL_NUMBERING]
@@ -86,72 +91,99 @@ class DownloadManager:
if should_use_multithreading_for_posts:
fetcher_thread = threading.Thread(
target=self._fetch_and_queue_posts_for_pool,
args=(config, restore_data, creator_profile_data), # Add argument here
args=(config, restore_data, creator_profile_data),
daemon=True
)
fetcher_thread.start()
else:
self._start_single_threaded_session(config)
# Single-threaded mode does not use the manager's complex logic
self._log(" Manager is handing off to a single-threaded worker...")
# The single-threaded worker will manage its own lifecycle and signals.
# The manager's role for this session is effectively over.
self.is_running = False # Allow another session to start if needed
self.progress_queue.put({'type': 'handoff_to_single_thread', 'payload': (config,)})
def _start_single_threaded_session(self, config):
"""Handles downloads that are best processed by a single worker thread."""
self._log(" Initializing single-threaded download process...")
self.worker_thread = threading.Thread(
target=self._run_single_worker,
args=(config,),
daemon=True
)
self.worker_thread.start()
def _run_single_worker(self, config):
"""Target function for the single-worker thread."""
try:
worker = DownloadThread(config, self.progress_queue)
worker.run() # This is the main blocking call for this thread
except Exception as e:
self._log(f"❌ CRITICAL ERROR in single-worker thread: {e}")
self._log(traceback.format_exc())
finally:
self.is_running = False
def _fetch_and_queue_posts_for_pool(self, config, restore_data):
def _fetch_and_queue_posts_for_pool(self, config, restore_data, creator_profile_data):
"""
Fetches all posts from the API and submits them as tasks to a thread pool.
This method runs in its own dedicated thread to avoid blocking.
Fetches posts from the API in batches and submits them as tasks to a thread pool.
This method runs in its own dedicated thread to avoid blocking the UI.
It provides immediate feedback as soon as the first batch of posts is found.
"""
try:
num_workers = min(config.get('num_threads', 4), MAX_THREADS)
self.thread_pool = ThreadPoolExecutor(max_workers=num_workers, thread_name_prefix='PostWorker_')
session_processed_ids = set(restore_data['processed_post_ids']) if restore_data else set()
session_processed_ids = set(restore_data.get('processed_post_ids', [])) if restore_data else set()
profile_processed_ids = set(creator_profile_data.get('processed_post_ids', []))
processed_ids = session_processed_ids.union(profile_processed_ids)
if restore_data:
if restore_data and 'all_posts_data' in restore_data:
# This logic for session restore remains as it relies on a pre-fetched list
all_posts = restore_data['all_posts_data']
processed_ids = set(restore_data['processed_post_ids'])
posts_to_process = [p for p in all_posts if p.get('id') not in processed_ids]
self.total_posts = len(all_posts)
self.processed_posts = len(processed_ids)
self._log(f"🔄 Restoring session. {len(posts_to_process)} posts remaining.")
self.progress_queue.put({'type': 'overall_progress', 'payload': (self.total_posts, self.processed_posts)})
if not posts_to_process:
self._log("✅ No new posts to process from restored session.")
return
for post_data in posts_to_process:
if self.cancellation_event.is_set(): break
worker = PostProcessorWorker(post_data, config, self.progress_queue)
future = self.thread_pool.submit(worker.process)
future.add_done_callback(self._handle_future_result)
self.active_futures.append(future)
else:
posts_to_process = self._get_all_posts(config)
self.total_posts = len(posts_to_process)
# --- START: REFACTORED STREAMING LOGIC ---
post_generator = download_from_api(
api_url_input=config['api_url'],
logger=self._log,
start_page=config.get('start_page'),
end_page=config.get('end_page'),
manga_mode=config.get('manga_mode_active', False),
cancellation_event=self.cancellation_event,
pause_event=self.pause_event,
use_cookie=config.get('use_cookie', False),
cookie_text=config.get('cookie_text', ''),
selected_cookie_file=config.get('selected_cookie_file'),
app_base_dir=config.get('app_base_dir'),
manga_filename_style_for_sort_check=config.get('manga_filename_style'),
processed_post_ids=list(processed_ids)
)
self.total_posts = 0
self.processed_posts = 0
self.progress_queue.put({'type': 'overall_progress', 'payload': (self.total_posts, self.processed_posts)})
if not posts_to_process:
self._log("✅ No new posts to process.")
return
for post_data in posts_to_process:
if self.cancellation_event.is_set():
break
worker = PostProcessorWorker(post_data, config, self.progress_queue)
future = self.thread_pool.submit(worker.process)
future.add_done_callback(self._handle_future_result)
self.active_futures.append(future)
# Process posts in batches as they are yielded by the API client
for batch in post_generator:
if self.cancellation_event.is_set():
self._log(" Post fetching cancelled.")
break
# Filter out any posts that might have been processed since the start
posts_in_batch_to_process = [p for p in batch if p.get('id') not in processed_ids]
if not posts_in_batch_to_process:
continue
# Update total count and immediately inform the UI
self.total_posts += len(posts_in_batch_to_process)
self.progress_queue.put({'type': 'overall_progress', 'payload': (self.total_posts, self.processed_posts)})
for post_data in posts_in_batch_to_process:
if self.cancellation_event.is_set(): break
worker = PostProcessorWorker(post_data, config, self.progress_queue)
future = self.thread_pool.submit(worker.process)
future.add_done_callback(self._handle_future_result)
self.active_futures.append(future)
if self.total_posts == 0 and not self.cancellation_event.is_set():
self._log("✅ No new posts found to process.")
except Exception as e:
self._log(f"❌ CRITICAL ERROR in post fetcher thread: {e}")
self._log(traceback.format_exc())
@@ -164,28 +196,6 @@ class DownloadManager:
'type': 'finished',
'payload': (self.total_downloads, self.total_skips, self.cancellation_event.is_set(), self.all_kept_original_filenames)
})
def _get_all_posts(self, config):
"""Helper to fetch all posts using the API client."""
all_posts = []
post_generator = download_from_api(
api_url_input=config['api_url'],
logger=self._log,
start_page=config.get('start_page'),
end_page=config.get('end_page'),
manga_mode=config.get('manga_mode_active', False),
cancellation_event=self.cancellation_event,
pause_event=self.pause_event,
use_cookie=config.get('use_cookie', False),
cookie_text=config.get('cookie_text', ''),
selected_cookie_file=config.get('selected_cookie_file'),
app_base_dir=config.get('app_base_dir'),
manga_filename_style_for_sort_check=config.get('manga_filename_style'),
processed_post_ids=config.get('processed_post_ids', [])
)
for batch in post_generator:
all_posts.extend(batch)
return all_posts
def _handle_future_result(self, future: Future):
"""Callback executed when a worker task completes."""
@@ -261,9 +271,15 @@ class DownloadManager:
"""Cancels the current running session."""
if not self.is_running:
return
if self.cancellation_event.is_set():
self._log(" Cancellation already in progress.")
return
self._log("⚠️ Cancellation requested by user...")
self.cancellation_event.set()
if self.thread_pool:
self.thread_pool.shutdown(wait=False, cancel_futures=True)
self.is_running = False
self._log(" Signaling all worker threads to stop and shutting down pool...")
self.thread_pool.shutdown(wait=False)

View File

@@ -1,4 +1,5 @@
import os
import sys
import queue
import re
import threading
@@ -1175,11 +1176,18 @@ class PostProcessorWorker:
if FPDF:
self.logger(f" Creating formatted PDF for {'comments' if self.text_only_scope == 'comments' else 'content'}...")
pdf = PDF()
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
# If the application is run as a bundled exe, _MEIPASS is the temp folder
base_path = sys._MEIPASS
else:
# If running as a normal .py script, use the project_root_dir
base_path = self.project_root_dir
font_path = ""
bold_font_path = ""
if self.project_root_dir:
font_path = os.path.join(self.project_root_dir, 'data', 'dejavu-sans', 'DejaVuSans.ttf')
bold_font_path = os.path.join(self.project_root_dir, 'data', 'dejavu-sans', 'DejaVuSans-Bold.ttf')
if base_path:
font_path = os.path.join(base_path, 'data', 'dejavu-sans', 'DejaVuSans.ttf')
bold_font_path = os.path.join(base_path, 'data', 'dejavu-sans', 'DejaVuSans-Bold.ttf')
try:
if not os.path.exists(font_path): raise RuntimeError(f"Font file not found: {font_path}")
@@ -1666,10 +1674,12 @@ class PostProcessorWorker:
if not self.extract_links_only and self.use_post_subfolders and total_downloaded_this_post == 0:
path_to_check_for_emptiness = determined_post_save_path_for_history
try:
# Check if the path is a directory and if it's empty
if os.path.isdir(path_to_check_for_emptiness) and not os.listdir(path_to_check_for_emptiness):
self.logger(f" 🗑️ Removing empty post-specific subfolder: '{path_to_check_for_emptiness}'")
os.rmdir(path_to_check_for_emptiness)
except OSError as e_rmdir:
# Log if removal fails for any reason (e.g., permissions)
self.logger(f" ⚠️ Could not remove empty post-specific subfolder '{path_to_check_for_emptiness}': {e_rmdir}")
result_tuple = (total_downloaded_this_post, total_skipped_this_post,
@@ -1678,6 +1688,15 @@ class PostProcessorWorker:
None)
finally:
if not self.extract_links_only and self.use_post_subfolders and total_downloaded_this_post == 0:
path_to_check_for_emptiness = determined_post_save_path_for_history
try:
if os.path.isdir(path_to_check_for_emptiness) and not os.listdir(path_to_check_for_emptiness):
self.logger(f" 🗑️ Removing empty post-specific subfolder: '{path_to_check_for_emptiness}'")
os.rmdir(path_to_check_for_emptiness)
except OSError as e_rmdir:
self.logger(f" ⚠️ Could not remove potentially empty subfolder '{path_to_check_for_emptiness}': {e_rmdir}")
self._emit_signal('worker_finished', result_tuple)
return result_tuple