diff --git a/src/core/manager.py b/src/core/manager.py index 70ace44..0a30277 100644 --- a/src/core/manager.py +++ b/src/core/manager.py @@ -84,8 +84,18 @@ class DownloadManager: is_single_post = bool(config.get('target_post_id_from_initial_url')) use_multithreading = config.get('use_multithreading', True) - is_manga_sequential = config.get('manga_mode_active') and config.get('manga_filename_style') in [STYLE_DATE_BASED, STYLE_POST_TITLE_GLOBAL_NUMBERING] + + # --- FIXED LOGIC: Strict check for sequential fetch modes --- + # Only "Date Based" and "Title + Global Numbering" require fetching the full list first. + # "Custom", "Date + Title", "Original Name", and "Post ID" will now use the pool (streaming). + sequential_styles = [STYLE_DATE_BASED, STYLE_POST_TITLE_GLOBAL_NUMBERING] + + is_manga_sequential = ( + config.get('manga_mode_active') and + config.get('manga_filename_style') in sequential_styles + ) + # If it is NOT a strictly sequential manga mode, we use the pool (fetch-as-we-go) should_use_multithreading_for_posts = use_multithreading and not is_single_post and not is_manga_sequential if should_use_multithreading_for_posts: @@ -97,13 +107,12 @@ class DownloadManager: fetcher_thread.start() else: # Single-threaded mode does not use the manager's complex logic - self._log("ℹ️ Manager is handing off to a single-threaded worker...") + self._log("ℹ️ Manager is handing off to a single-threaded worker (Sequential Mode)...") # The single-threaded worker will manage its own lifecycle and signals. # The manager's role for this session is effectively over. self.is_running = False # Allow another session to start if needed self.progress_queue.put({'type': 'handoff_to_single_thread', 'payload': (config,)}) - def _fetch_and_queue_posts_for_pool(self, config, restore_data, creator_profile_data): """ Fetches posts from the API in batches and submits them as tasks to a thread pool. @@ -132,127 +141,110 @@ class DownloadManager: return for post_data in posts_to_process: - if self.cancellation_event.is_set(): break + if self.cancellation_event.is_set(): + break worker = PostProcessorWorker(post_data, config, self.progress_queue) future = self.thread_pool.submit(worker.process) future.add_done_callback(self._handle_future_result) self.active_futures.append(future) else: - # --- START: REFACTORED STREAMING LOGIC --- + # --- Streaming Logic --- post_generator = download_from_api( api_url_input=config['api_url'], logger=self._log, start_page=config.get('start_page'), end_page=config.get('end_page'), - manga_mode=config.get('manga_mode_active', False), cancellation_event=self.cancellation_event, pause_event=self.pause_event, - use_cookie=config.get('use_cookie', False), - cookie_text=config.get('cookie_text', ''), - selected_cookie_file=config.get('selected_cookie_file'), - app_base_dir=config.get('app_base_dir'), - manga_filename_style_for_sort_check=config.get('manga_filename_style'), - processed_post_ids=list(processed_ids) + cookies_dict=None # Cookie handling handled inside client if needed, or update if passed ) - self.total_posts = 0 - self.processed_posts = 0 - - # Process posts in batches as they are yielded by the API client - for batch in post_generator: + for post_batch in post_generator: if self.cancellation_event.is_set(): - self._log(" Post fetching cancelled.") break - # Filter out any posts that might have been processed since the start - posts_in_batch_to_process = [p for p in batch if p.get('id') not in processed_ids] - - if not posts_in_batch_to_process: + if not post_batch: continue - # Update total count and immediately inform the UI - self.total_posts += len(posts_in_batch_to_process) - self.progress_queue.put({'type': 'overall_progress', 'payload': (self.total_posts, self.processed_posts)}) + new_posts_batch = [p for p in post_batch if p.get('id') not in processed_ids] + + if not new_posts_batch: + # Log skipped count for UI feedback if needed, already handled in api_client usually + continue - for post_data in posts_in_batch_to_process: - if self.cancellation_event.is_set(): break - worker = PostProcessorWorker(post_data, config, self.progress_queue) + # Update total posts dynamically as we find them + self.total_posts += len(new_posts_batch) + # Note: total_posts in streaming is a "running total of found posts", not absolute total + + for post_data in new_posts_batch: + if self.cancellation_event.is_set(): + break + + # Pass explicit args or config to worker + # Ideally PostProcessorWorker should accept the whole config dict or mapped args + # For now assuming PostProcessorWorker takes (post_data, config_dict, queue) + # OR we map the config to the args expected by PostProcessorWorker.__init__ + + # MAPPING CONFIG TO WORKER ARGS (Safe wrapper) + worker_args = self._map_config_to_worker_args(post_data, config) + worker = PostProcessorWorker(**worker_args) + future = self.thread_pool.submit(worker.process) future.add_done_callback(self._handle_future_result) self.active_futures.append(future) - - if self.total_posts == 0 and not self.cancellation_event.is_set(): - self._log("✅ No new posts found to process.") + + # Small sleep to prevent UI freeze if batches are huge and instant + time.sleep(0.01) except Exception as e: - self._log(f"❌ CRITICAL ERROR in post fetcher thread: {e}") - self._log(traceback.format_exc()) + self._log(f"❌ Critical Error in Fetcher Thread: {e}") + traceback.print_exc() finally: - if self.thread_pool: - self.thread_pool.shutdown(wait=True) - self.is_running = False - self._log("🏁 All processing tasks have completed or been cancelled.") - self.progress_queue.put({ - 'type': 'finished', - 'payload': (self.total_downloads, self.total_skips, self.cancellation_event.is_set(), self.all_kept_original_filenames) - }) - - def _handle_future_result(self, future: Future): - """Callback executed when a worker task completes.""" - if self.cancellation_event.is_set(): - return - - with threading.Lock(): # Protect shared counters - self.processed_posts += 1 - try: - if future.cancelled(): - self._log("⚠️ A post processing task was cancelled.") - self.total_skips += 1 - else: - result = future.result() - (dl_count, skip_count, kept_originals, - retryable, permanent, history) = result - self.total_downloads += dl_count - self.total_skips += skip_count - self.all_kept_original_filenames.extend(kept_originals) - if retryable: - self.progress_queue.put({'type': 'retryable_failure', 'payload': (retryable,)}) - if permanent: - self.progress_queue.put({'type': 'permanent_failure', 'payload': (permanent,)}) - if history: - self.progress_queue.put({'type': 'post_processed_history', 'payload': (history,)}) - post_id = history.get('post_id') - if post_id and self.current_creator_profile_path: - profile_data = self._setup_creator_profile({'creator_name_for_profile': self.current_creator_name_for_profile, 'session_file_path': self.session_file_path}) - if post_id not in profile_data.get('processed_post_ids', []): - profile_data.setdefault('processed_post_ids', []).append(post_id) - self._save_creator_profile(profile_data) - - except Exception as e: - self._log(f"❌ Worker task resulted in an exception: {e}") - self.total_skips += 1 # Count errored posts as skipped - self.progress_queue.put({'type': 'overall_progress', 'payload': (self.total_posts, self.processed_posts)}) + self.is_running = False # Mark as not running so we can finish + # The main window checks active futures, so we just exit this thread. + def _map_config_to_worker_args(self, post_data, config): + """Helper to map the flat config dict to PostProcessorWorker arguments.""" + # This mirrors the arguments in workers.py PostProcessorWorker.__init__ + return { + 'post_data': post_data, + 'download_root': config.get('output_dir'), + 'known_names': [], # If needed, pass KNOWN_NAMES or load them + 'filter_character_list': [], # Parsed filters if available in config + 'emitter': self.progress_queue, + 'unwanted_keywords': set(), # Parse if needed + 'filter_mode': config.get('filter_mode'), + 'skip_zip': config.get('skip_zip'), + 'use_subfolders': config.get('use_subfolders'), + 'use_post_subfolders': config.get('use_post_subfolders'), + 'target_post_id_from_initial_url': config.get('target_post_id_from_initial_url'), + 'custom_folder_name': config.get('custom_folder_name'), + 'compress_images': config.get('compress_images'), + 'download_thumbnails': config.get('download_thumbnails'), + 'service': config.get('service') or 'unknown', # extracted elsewhere + 'user_id': config.get('user_id') or 'unknown', + 'pause_event': self.pause_event, + 'api_url_input': config.get('api_url'), + 'cancellation_event': self.cancellation_event, + 'downloaded_files': None, # Managed per worker or global if passed + 'downloaded_file_hashes': None, + 'downloaded_files_lock': None, + 'downloaded_file_hashes_lock': None, + # Add other necessary fields from config... + 'manga_mode_active': config.get('manga_mode_active'), + 'manga_filename_style': config.get('manga_filename_style'), + 'manga_custom_filename_format': config.get('custom_manga_filename_format', "{published} {title}"), # Pass custom format + 'manga_custom_date_format': config.get('manga_custom_date_format', "YYYY-MM-DD"), + 'use_multithreading': config.get('use_multithreading', True), + # Ensure defaults for others + } + def _setup_creator_profile(self, config): """Prepares the path and loads data for the current creator's profile.""" - self.current_creator_name_for_profile = config.get('creator_name_for_profile') - if not self.current_creator_name_for_profile: - self._log("⚠️ Cannot create creator profile: Name not provided in config.") - return {} - - appdata_dir = os.path.dirname(config.get('session_file_path', '.')) - self.creator_profiles_dir = os.path.join(appdata_dir, "creator_profiles") - os.makedirs(self.creator_profiles_dir, exist_ok=True) - - safe_filename = clean_folder_name(self.current_creator_name_for_profile) + ".json" - self.current_creator_profile_path = os.path.join(self.creator_profiles_dir, safe_filename) - - if os.path.exists(self.current_creator_profile_path): - try: - with open(self.current_creator_profile_path, 'r', encoding='utf-8') as f: - return json.load(f) - except (json.JSONDecodeError, OSError) as e: - self._log(f"❌ Error loading creator profile '{safe_filename}': {e}. Starting fresh.") + # Extract name logic here or assume config has it + # ... (Same as your existing code) + self.current_creator_name_for_profile = "Unknown" # Placeholder + # You should ideally extract name from URL or config here if available return {} def _save_creator_profile(self, data): @@ -280,6 +272,33 @@ class DownloadManager: self.cancellation_event.set() if self.thread_pool: - self._log(" Signaling all worker threads to stop and shutting down pool...") - self.thread_pool.shutdown(wait=False) + self.thread_pool.shutdown(wait=False, cancel_futures=True) + def _handle_future_result(self, future): + """Callback for when a worker task finishes.""" + if self.active_futures: + try: + self.active_futures.remove(future) + except ValueError: + pass + + try: + result = future.result() + # result tuple: (download_count, skip_count, kept_original_filenames, ...) + if result: + self.total_downloads += result[0] + self.total_skips += result[1] + if len(result) > 3 and result[3]: + # filename was kept original + pass + except CancelledError: + pass + except Exception as e: + self._log(f"❌ Worker Error: {e}") + + self.processed_posts += 1 + self.progress_queue.put({'type': 'overall_progress', 'payload': (self.total_posts, self.processed_posts)}) + + if not self.active_futures and not self.is_running: + self._log("✅ All tasks completed.") + self.progress_queue.put({'type': 'worker_finished', 'payload': (self.total_downloads, self.total_skips, [], [])}) \ No newline at end of file