import threading import time import os import json import traceback from concurrent.futures import ThreadPoolExecutor, as_completed, Future from .api_client import download_from_api from .workers import PostProcessorWorker from ..config.constants import ( STYLE_DATE_BASED, STYLE_POST_TITLE_GLOBAL_NUMBERING, MAX_THREADS ) from ..utils.file_utils import clean_folder_name class DownloadManager: """ Manages the entire download lifecycle, acting as a bridge between the UI and the backend workers. It handles thread pools, task submission, and state management for a download session. """ def __init__(self, progress_queue): """ Initializes the DownloadManager. Args: progress_queue (queue.Queue): A thread-safe queue for sending status updates to the UI. """ self.progress_queue = progress_queue self.thread_pool = None self.active_futures = [] self.cancellation_event = threading.Event() self.pause_event = threading.Event() self.is_running = False self.total_posts = 0 self.processed_posts = 0 self.total_downloads = 0 self.total_skips = 0 self.all_kept_original_filenames = [] self.creator_profiles_dir = None self.current_creator_name_for_profile = None self.current_creator_profile_path = None self.session_file_path = None def _log(self, message): """Puts a progress message into the queue for the UI.""" self.progress_queue.put({'type': 'progress', 'payload': (message,)}) def start_session(self, config, restore_data=None): """ Starts a new download session based on the provided configuration. This is the main entry point called by the UI. Args: config (dict): A dictionary containing all settings from the UI. restore_data (dict, optional): Data from a previous, interrupted session. """ if self.is_running: self._log("❌ Cannot start a new session: A session is already in progress.") return self.session_file_path = config.get('session_file_path') creator_profile_data = self._setup_creator_profile(config) # Save settings to profile at the start of the session if self.current_creator_profile_path: creator_profile_data['settings'] = config creator_profile_data.setdefault('processed_post_ids', []) self._save_creator_profile(creator_profile_data) self._log(f"✅ Loaded/created profile for '{self.current_creator_name_for_profile}'. Settings saved.") self.is_running = True self.cancellation_event.clear() self.pause_event.clear() self.active_futures.clear() self.total_posts = 0 self.processed_posts = 0 self.total_downloads = 0 self.total_skips = 0 self.all_kept_original_filenames = [] is_single_post = bool(config.get('target_post_id_from_initial_url')) use_multithreading = config.get('use_multithreading', True) # --- FIXED LOGIC: Strict check for sequential fetch modes --- # Only "Date Based" and "Title + Global Numbering" require fetching the full list first. # "Custom", "Date + Title", "Original Name", and "Post ID" will now use the pool (streaming). sequential_styles = [STYLE_DATE_BASED, STYLE_POST_TITLE_GLOBAL_NUMBERING] is_manga_sequential = ( config.get('manga_mode_active') and config.get('manga_filename_style') in sequential_styles ) # If it is NOT a strictly sequential manga mode, we use the pool (fetch-as-we-go) should_use_multithreading_for_posts = use_multithreading and not is_single_post and not is_manga_sequential if should_use_multithreading_for_posts: fetcher_thread = threading.Thread( target=self._fetch_and_queue_posts_for_pool, args=(config, restore_data, creator_profile_data), daemon=True ) fetcher_thread.start() else: # Single-threaded mode does not use the manager's complex logic self._log("â„šī¸ Manager is handing off to a single-threaded worker (Sequential Mode)...") # The single-threaded worker will manage its own lifecycle and signals. # The manager's role for this session is effectively over. self.is_running = False # Allow another session to start if needed self.progress_queue.put({'type': 'handoff_to_single_thread', 'payload': (config,)}) def _fetch_and_queue_posts_for_pool(self, config, restore_data, creator_profile_data): """ Fetches posts from the API in batches and submits them as tasks to a thread pool. This method runs in its own dedicated thread to avoid blocking the UI. It provides immediate feedback as soon as the first batch of posts is found. """ try: num_workers = min(config.get('num_threads', 4), MAX_THREADS) self.thread_pool = ThreadPoolExecutor(max_workers=num_workers, thread_name_prefix='PostWorker_') session_processed_ids = set(restore_data.get('processed_post_ids', [])) if restore_data else set() profile_processed_ids = set(creator_profile_data.get('processed_post_ids', [])) processed_ids = session_processed_ids.union(profile_processed_ids) if restore_data and 'all_posts_data' in restore_data: # This logic for session restore remains as it relies on a pre-fetched list all_posts = restore_data['all_posts_data'] posts_to_process = [p for p in all_posts if p.get('id') not in processed_ids] self.total_posts = len(all_posts) self.processed_posts = len(processed_ids) self._log(f"🔄 Restoring session. {len(posts_to_process)} posts remaining.") self.progress_queue.put({'type': 'overall_progress', 'payload': (self.total_posts, self.processed_posts)}) if not posts_to_process: self._log("✅ No new posts to process from restored session.") return for post_data in posts_to_process: if self.cancellation_event.is_set(): break worker = PostProcessorWorker(post_data, config, self.progress_queue) future = self.thread_pool.submit(worker.process) future.add_done_callback(self._handle_future_result) self.active_futures.append(future) else: # --- Streaming Logic --- post_generator = download_from_api( api_url_input=config['api_url'], logger=self._log, start_page=config.get('start_page'), end_page=config.get('end_page'), cancellation_event=self.cancellation_event, pause_event=self.pause_event, cookies_dict=None # Cookie handling handled inside client if needed, or update if passed ) for post_batch in post_generator: if self.cancellation_event.is_set(): break if not post_batch: continue new_posts_batch = [p for p in post_batch if p.get('id') not in processed_ids] if not new_posts_batch: # Log skipped count for UI feedback if needed, already handled in api_client usually continue # Update total posts dynamically as we find them self.total_posts += len(new_posts_batch) # Note: total_posts in streaming is a "running total of found posts", not absolute total for post_data in new_posts_batch: if self.cancellation_event.is_set(): break # Pass explicit args or config to worker # Ideally PostProcessorWorker should accept the whole config dict or mapped args # For now assuming PostProcessorWorker takes (post_data, config_dict, queue) # OR we map the config to the args expected by PostProcessorWorker.__init__ # MAPPING CONFIG TO WORKER ARGS (Safe wrapper) worker_args = self._map_config_to_worker_args(post_data, config) worker = PostProcessorWorker(**worker_args) future = self.thread_pool.submit(worker.process) future.add_done_callback(self._handle_future_result) self.active_futures.append(future) # Small sleep to prevent UI freeze if batches are huge and instant time.sleep(0.01) except Exception as e: self._log(f"❌ Critical Error in Fetcher Thread: {e}") traceback.print_exc() finally: self.is_running = False # Mark as not running so we can finish # The main window checks active futures, so we just exit this thread. def _map_config_to_worker_args(self, post_data, config): """Helper to map the flat config dict to PostProcessorWorker arguments.""" # This mirrors the arguments in workers.py PostProcessorWorker.__init__ return { 'post_data': post_data, 'download_root': config.get('output_dir'), 'known_names': [], # If needed, pass KNOWN_NAMES or load them 'filter_character_list': [], # Parsed filters if available in config 'emitter': self.progress_queue, 'unwanted_keywords': set(), # Parse if needed 'filter_mode': config.get('filter_mode'), 'skip_zip': config.get('skip_zip'), 'use_subfolders': config.get('use_subfolders'), 'use_post_subfolders': config.get('use_post_subfolders'), 'target_post_id_from_initial_url': config.get('target_post_id_from_initial_url'), 'custom_folder_name': config.get('custom_folder_name'), 'compress_images': config.get('compress_images'), 'download_thumbnails': config.get('download_thumbnails'), 'service': config.get('service') or 'unknown', # extracted elsewhere 'user_id': config.get('user_id') or 'unknown', 'pause_event': self.pause_event, 'api_url_input': config.get('api_url'), 'cancellation_event': self.cancellation_event, 'downloaded_files': None, # Managed per worker or global if passed 'downloaded_file_hashes': None, 'downloaded_files_lock': None, 'downloaded_file_hashes_lock': None, # Add other necessary fields from config... 'manga_mode_active': config.get('manga_mode_active'), 'manga_filename_style': config.get('manga_filename_style'), 'manga_custom_filename_format': config.get('custom_manga_filename_format', "{published} {title}"), # Pass custom format 'manga_custom_date_format': config.get('manga_custom_date_format', "YYYY-MM-DD"), 'use_multithreading': config.get('use_multithreading', True), # Ensure defaults for others } def _setup_creator_profile(self, config): """Prepares the path and loads data for the current creator's profile.""" # Extract name logic here or assume config has it # ... (Same as your existing code) self.current_creator_name_for_profile = "Unknown" # Placeholder # You should ideally extract name from URL or config here if available return {} def _save_creator_profile(self, data): """Saves the provided data to the current creator's profile file.""" if not self.current_creator_profile_path: return try: temp_path = self.current_creator_profile_path + ".tmp" with open(temp_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2) os.replace(temp_path, self.current_creator_profile_path) except OSError as e: self._log(f"❌ Error saving creator profile to '{self.current_creator_profile_path}': {e}") def cancel_session(self): """Cancels the current running session.""" if not self.is_running: return if self.cancellation_event.is_set(): self._log("â„šī¸ Cancellation already in progress.") return self._log("âš ī¸ Cancellation requested by user...") self.cancellation_event.set() if self.thread_pool: self.thread_pool.shutdown(wait=False, cancel_futures=True) def _handle_future_result(self, future): """Callback for when a worker task finishes.""" if self.active_futures: try: self.active_futures.remove(future) except ValueError: pass try: result = future.result() # result tuple: (download_count, skip_count, kept_original_filenames, ...) if result: self.total_downloads += result[0] self.total_skips += result[1] if len(result) > 3 and result[3]: # filename was kept original pass except CancelledError: pass except Exception as e: self._log(f"❌ Worker Error: {e}") self.processed_posts += 1 self.progress_queue.put({'type': 'overall_progress', 'payload': (self.total_posts, self.processed_posts)}) if not self.active_futures and not self.is_running: self._log("✅ All tasks completed.") self.progress_queue.put({'type': 'worker_finished', 'payload': (self.total_downloads, self.total_skips, [], [])})