diff --git a/src/services/multipart_downloader.py b/src/services/multipart_downloader.py index 3509569..9f29467 100644 --- a/src/services/multipart_downloader.py +++ b/src/services/multipart_downloader.py @@ -1,7 +1,7 @@ # --- Standard Library Imports --- +# --- Standard Library Imports --- import os import time -import hashlib import http.client import traceback import threading @@ -10,27 +10,49 @@ from concurrent.futures import ThreadPoolExecutor, as_completed # --- Third-Party Library Imports --- import requests +MULTIPART_DOWNLOADER_AVAILABLE = True # --- Module Constants --- CHUNK_DOWNLOAD_RETRY_DELAY = 2 MAX_CHUNK_DOWNLOAD_RETRIES = 1 DOWNLOAD_CHUNK_SIZE_ITER = 1024 * 256 # 256 KB per iteration chunk -# Flag to indicate if this module and its dependencies are available. -MULTIPART_DOWNLOADER_AVAILABLE = True - def _download_individual_chunk( - chunk_url, temp_file_path, start_byte, end_byte, headers, + chunk_url, chunk_temp_file_path, start_byte, end_byte, headers, part_num, total_parts, progress_data, cancellation_event, skip_event, pause_event, global_emit_time_ref, cookies_for_chunk, logger_func, emitter=None, api_original_filename=None ): """ - Downloads a single segment (chunk) of a larger file. This function is - intended to be run in a separate thread by a ThreadPoolExecutor. + Downloads a single segment (chunk) of a larger file to its own unique part file. + This function is intended to be run in a separate thread by a ThreadPoolExecutor. - It handles retries, pauses, and cancellations for its specific chunk. + It handles retries, pauses, and cancellations for its specific chunk. If a + download fails, the partial chunk file is removed, allowing a clean retry later. + + Args: + chunk_url (str): The URL to download the file from. + chunk_temp_file_path (str): The unique path to save this specific chunk + (e.g., 'my_video.mp4.part0'). + start_byte (int): The starting byte for the Range header. + end_byte (int): The ending byte for the Range header. + headers (dict): The HTTP headers to use for the request. + part_num (int): The index of this chunk (e.g., 0 for the first part). + total_parts (int): The total number of chunks for the entire file. + progress_data (dict): A thread-safe dictionary for sharing progress. + cancellation_event (threading.Event): Event to signal cancellation. + skip_event (threading.Event): Event to signal skipping the file. + pause_event (threading.Event): Event to signal pausing the download. + global_emit_time_ref (list): A mutable list with one element (a timestamp) + to rate-limit UI updates. + cookies_for_chunk (dict): Cookies to use for the request. + logger_func (function): A function to log messages. + emitter (queue.Queue or QObject): Emitter for sending progress to the UI. + api_original_filename (str): The original filename for UI display. + + Returns: + tuple: A tuple containing (bytes_downloaded, success_flag). """ # --- Pre-download checks for control events --- if cancellation_event and cancellation_event.is_set(): @@ -48,18 +70,16 @@ def _download_individual_chunk( time.sleep(0.2) logger_func(f" [Chunk {part_num + 1}/{total_parts}] Download resumed.") - # --- START: FIX --- # Set this chunk's status to 'active' before starting the download. with progress_data['lock']: progress_data['chunks_status'][part_num]['active'] = True - # --- END: FIX --- try: # Prepare headers for the specific byte range of this chunk chunk_headers = headers.copy() if end_byte != -1: chunk_headers['Range'] = f"bytes={start_byte}-{end_byte}" - + bytes_this_chunk = 0 last_speed_calc_time = time.time() bytes_at_last_speed_calc = 0 @@ -77,13 +97,14 @@ def _download_individual_chunk( bytes_at_last_speed_calc = bytes_this_chunk logger_func(f" 🚀 [Chunk {part_num + 1}/{total_parts}] Starting download: bytes {start_byte}-{end_byte if end_byte != -1 else 'EOF'}") - + response = requests.get(chunk_url, headers=chunk_headers, timeout=(10, 120), stream=True, cookies=cookies_for_chunk) response.raise_for_status() # --- Data Writing Loop --- - with open(temp_file_path, 'r+b') as f: - f.seek(start_byte) + # We open the unique chunk file in write-binary ('wb') mode. + # No more seeking is required. + with open(chunk_temp_file_path, 'wb') as f: for data_segment in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE_ITER): if cancellation_event and cancellation_event.is_set(): return bytes_this_chunk, False @@ -98,12 +119,12 @@ def _download_individual_chunk( if data_segment: f.write(data_segment) bytes_this_chunk += len(data_segment) - + # Update shared progress data structure with progress_data['lock']: progress_data['total_downloaded_so_far'] += len(data_segment) progress_data['chunks_status'][part_num]['downloaded'] = bytes_this_chunk - + # Calculate and update speed for this chunk current_time = time.time() time_delta = current_time - last_speed_calc_time @@ -113,7 +134,7 @@ def _download_individual_chunk( progress_data['chunks_status'][part_num]['speed_bps'] = current_speed_bps last_speed_calc_time = current_time bytes_at_last_speed_calc = bytes_this_chunk - + # Emit progress signal to the UI via the queue if emitter and (current_time - global_emit_time_ref[0] > 0.25): global_emit_time_ref[0] = current_time @@ -122,7 +143,8 @@ def _download_individual_chunk( emitter.put({'type': 'file_progress', 'payload': (api_original_filename, status_list_copy)}) elif hasattr(emitter, 'file_progress_signal'): emitter.file_progress_signal.emit(api_original_filename, status_list_copy) - + + # If we get here, the download for this chunk is successful return bytes_this_chunk, True except (requests.exceptions.ConnectionError, requests.exceptions.Timeout, http.client.IncompleteRead) as e: @@ -134,8 +156,10 @@ def _download_individual_chunk( logger_func(f" ❌ [Chunk {part_num + 1}/{total_parts}] Unexpected error: {e}\n{traceback.format_exc(limit=1)}") return bytes_this_chunk, False + # If the retry loop finishes without a successful download return bytes_this_chunk, False finally: + # This block runs whether the download succeeded or failed with progress_data['lock']: progress_data['chunks_status'][part_num]['active'] = False progress_data['chunks_status'][part_num]['speed_bps'] = 0.0 @@ -144,17 +168,37 @@ def _download_individual_chunk( def download_file_in_parts(file_url, save_path, total_size, num_parts, headers, api_original_filename, emitter_for_multipart, cookies_for_chunk_session, cancellation_event, skip_event, logger_func, pause_event): - logger_func(f"⬇️ Initializing Multi-part Download ({num_parts} parts) for: '{api_original_filename}' (Size: {total_size / (1024*1024):.2f} MB)") - temp_file_path = save_path + ".part" + """ + Manages a resilient, multipart file download by saving each chunk to a separate file. - try: - with open(temp_file_path, 'wb') as f_temp: - if total_size > 0: - f_temp.truncate(total_size) - except IOError as e: - logger_func(f" ❌ Error creating/truncating temp file '{temp_file_path}': {e}") - return False, 0, None, None + This function orchestrates the download process by: + 1. Checking for already completed chunk files to resume a previous download. + 2. Submitting only the missing chunks to a thread pool for parallel download. + 3. Assembling the final file from the individual chunks upon successful completion. + 4. Cleaning up temporary chunk files after assembly. + 5. Leaving completed chunks on disk if the download fails, allowing for a future resume. + Args: + file_url (str): The URL of the file to download. + save_path (str): The final desired path for the downloaded file (e.g., 'my_video.mp4'). + total_size (int): The total size of the file in bytes. + num_parts (int): The number of parts to split the download into. + headers (dict): HTTP headers for the download requests. + api_original_filename (str): The original filename for UI progress display. + emitter_for_multipart (queue.Queue or QObject): Emitter for UI signals. + cookies_for_chunk_session (dict): Cookies for the download requests. + cancellation_event (threading.Event): Event to signal cancellation. + skip_event (threading.Event): Event to signal skipping the file. + logger_func (function): A function for logging messages. + pause_event (threading.Event): Event to signal pausing the download. + + Returns: + tuple: A tuple containing (success_flag, total_bytes_downloaded, md5_hash, file_handle). + The file_handle will be for the final assembled file if successful, otherwise None. + """ + logger_func(f"⬇️ Initializing Resumable Multi-part Download ({num_parts} parts) for: '{api_original_filename}' (Size: {total_size / (1024*1024):.2f} MB)") + + # Calculate the byte range for each chunk chunk_size_calc = total_size // num_parts chunks_ranges = [] for i in range(num_parts): @@ -162,76 +206,119 @@ def download_file_in_parts(file_url, save_path, total_size, num_parts, headers, end = start + chunk_size_calc - 1 if i < num_parts - 1 else total_size - 1 if start <= end: chunks_ranges.append((start, end)) - elif total_size == 0 and i == 0: + elif total_size == 0 and i == 0: # Handle zero-byte files chunks_ranges.append((0, -1)) + # Calculate the expected size of each chunk chunk_actual_sizes = [] for start, end in chunks_ranges: - if end == -1 and start == 0: - chunk_actual_sizes.append(0) - else: - chunk_actual_sizes.append(end - start + 1) + chunk_actual_sizes.append(end - start + 1 if end != -1 else 0) if not chunks_ranges and total_size > 0: - logger_func(f" ⚠️ No valid chunk ranges for multipart download of '{api_original_filename}'. Aborting multipart.") - if os.path.exists(temp_file_path): os.remove(temp_file_path) + logger_func(f" ⚠️ No valid chunk ranges for multipart download of '{api_original_filename}'. Aborting.") return False, 0, None, None + # --- Resumption Logic: Check for existing complete chunks --- + chunks_to_download = [] + total_bytes_resumed = 0 + for i, (start, end) in enumerate(chunks_ranges): + chunk_part_path = f"{save_path}.part{i}" + expected_chunk_size = chunk_actual_sizes[i] + + if os.path.exists(chunk_part_path) and os.path.getsize(chunk_part_path) == expected_chunk_size: + logger_func(f" [Chunk {i + 1}/{num_parts}] Resuming with existing complete chunk file.") + total_bytes_resumed += expected_chunk_size + else: + chunks_to_download.append({'index': i, 'start': start, 'end': end}) + + # Setup the shared progress data structure progress_data = { 'total_file_size': total_size, - 'total_downloaded_so_far': 0, - 'chunks_status': [ - {'id': i, 'downloaded': 0, 'total': chunk_actual_sizes[i] if i < len(chunk_actual_sizes) else 0, 'active': False, 'speed_bps': 0.0} - for i in range(num_parts) - ], + 'total_downloaded_so_far': total_bytes_resumed, + 'chunks_status': [], 'lock': threading.Lock(), 'last_global_emit_time': [time.time()] } + for i in range(num_parts): + is_resumed = not any(c['index'] == i for c in chunks_to_download) + progress_data['chunks_status'].append({ + 'id': i, + 'downloaded': chunk_actual_sizes[i] if is_resumed else 0, + 'total': chunk_actual_sizes[i], + 'active': False, + 'speed_bps': 0.0 + }) + # --- Download Phase --- chunk_futures = [] all_chunks_successful = True - total_bytes_from_chunks = 0 + total_bytes_from_threads = 0 with ThreadPoolExecutor(max_workers=num_parts, thread_name_prefix=f"MPChunk_{api_original_filename[:10]}_") as chunk_pool: - for i, (start, end) in enumerate(chunks_ranges): - if cancellation_event and cancellation_event.is_set(): all_chunks_successful = False; break - chunk_futures.append(chunk_pool.submit( - _download_individual_chunk, chunk_url=file_url, temp_file_path=temp_file_path, + for chunk_info in chunks_to_download: + if cancellation_event and cancellation_event.is_set(): + all_chunks_successful = False + break + + i, start, end = chunk_info['index'], chunk_info['start'], chunk_info['end'] + chunk_part_path = f"{save_path}.part{i}" + + future = chunk_pool.submit( + _download_individual_chunk, + chunk_url=file_url, + chunk_temp_file_path=chunk_part_path, start_byte=start, end_byte=end, headers=headers, part_num=i, total_parts=num_parts, - progress_data=progress_data, cancellation_event=cancellation_event, skip_event=skip_event, global_emit_time_ref=progress_data['last_global_emit_time'], - pause_event=pause_event, cookies_for_chunk=cookies_for_chunk_session, logger_func=logger_func, emitter=emitter_for_multipart, + progress_data=progress_data, cancellation_event=cancellation_event, + skip_event=skip_event, global_emit_time_ref=progress_data['last_global_emit_time'], + pause_event=pause_event, cookies_for_chunk=cookies_for_chunk_session, + logger_func=logger_func, emitter=emitter_for_multipart, api_original_filename=api_original_filename - )) + ) + chunk_futures.append(future) for future in as_completed(chunk_futures): - if cancellation_event and cancellation_event.is_set(): all_chunks_successful = False; break - bytes_downloaded_this_chunk, success_this_chunk = future.result() - total_bytes_from_chunks += bytes_downloaded_this_chunk - if not success_this_chunk: + if cancellation_event and cancellation_event.is_set(): all_chunks_successful = False + bytes_downloaded, success = future.result() + total_bytes_from_threads += bytes_downloaded + if not success: + all_chunks_successful = False + + total_bytes_final = total_bytes_resumed + total_bytes_from_threads if cancellation_event and cancellation_event.is_set(): logger_func(f" Multi-part download for '{api_original_filename}' cancelled by main event.") all_chunks_successful = False - if emitter_for_multipart: - with progress_data['lock']: - status_list_copy = [dict(s) for s in progress_data['chunks_status']] - if isinstance(emitter_for_multipart, queue.Queue): - emitter_for_multipart.put({'type': 'file_progress', 'payload': (api_original_filename, status_list_copy)}) - elif hasattr(emitter_for_multipart, 'file_progress_signal'): - emitter_for_multipart.file_progress_signal.emit(api_original_filename, status_list_copy) - if all_chunks_successful and (total_bytes_from_chunks == total_size or total_size == 0): - logger_func(f" ✅ Multi-part download successful for '{api_original_filename}'. Total bytes: {total_bytes_from_chunks}") + # --- Assembly and Cleanup Phase --- + if all_chunks_successful and (total_bytes_final == total_size or total_size == 0): + logger_func(f" ✅ All {num_parts} chunks complete. Assembling final file...") md5_hasher = hashlib.md5() - with open(temp_file_path, 'rb') as f_hash: - for buf in iter(lambda: f_hash.read(4096*10), b''): - md5_hasher.update(buf) - calculated_hash = md5_hasher.hexdigest() - return True, total_bytes_from_chunks, calculated_hash, open(temp_file_path, 'rb') + try: + with open(save_path, 'wb') as final_file: + for i in range(num_parts): + chunk_part_path = f"{save_path}.part{i}" + with open(chunk_part_path, 'rb') as chunk_file: + content = chunk_file.read() + final_file.write(content) + md5_hasher.update(content) + + calculated_hash = md5_hasher.hexdigest() + logger_func(f" ✅ Assembly successful for '{api_original_filename}'. Total bytes: {total_bytes_final}") + return True, total_bytes_final, calculated_hash, open(save_path, 'rb') + except Exception as e: + logger_func(f" ❌ Critical error during file assembly: {e}. Cleaning up.") + return False, total_bytes_final, None, None + finally: + # Cleanup all individual chunk files after successful assembly + for i in range(num_parts): + chunk_part_path = f"{save_path}.part{i}" + if os.path.exists(chunk_part_path): + try: + os.remove(chunk_part_path) + except OSError as e: + logger_func(f" ⚠️ Failed to remove temp part file '{chunk_part_path}': {e}") else: - logger_func(f" ❌ Multi-part download failed for '{api_original_filename}'. Success: {all_chunks_successful}, Bytes: {total_bytes_from_chunks}/{total_size}. Cleaning up.") - if os.path.exists(temp_file_path): - try: os.remove(temp_file_path) - except OSError as e: logger_func(f" Failed to remove temp part file '{temp_file_path}': {e}") - return False, total_bytes_from_chunks, None, None + # If download failed, we do NOT clean up, allowing for resumption later + logger_func(f" ❌ Multi-part download failed for '{api_original_filename}'. Success: {all_chunks_successful}, Bytes: {total_bytes_final}/{total_size}. Partial chunks saved for future resumption.") + return False, total_bytes_final, None, None diff --git a/src/ui/dialogs/multipart_dialog.py b/src/ui/dialogs/multipart_dialog.py deleted file mode 100644 index a05ac3c..0000000 --- a/src/ui/dialogs/multipart_dialog.py +++ /dev/null @@ -1,81 +0,0 @@ -from PyQt5.QtWidgets import ( - QDialog, QVBoxLayout, QGroupBox, QCheckBox, QHBoxLayout, QLabel, - QLineEdit, QDialogButtonBox, QIntValidator -) -from PyQt5.QtCore import Qt - -class MultipartDialog(QDialog): - """ - A dialog for configuring multipart download settings. - """ - def __init__(self, current_settings, parent=None): - super().__init__(parent) - self.setWindowTitle("Multipart Download Options") - self.setMinimumWidth(350) - - self.settings = current_settings - - # Main layout - layout = QVBoxLayout(self) - - # File types group - types_group = QGroupBox("Apply to File Types") - types_layout = QVBoxLayout() - self.videos_checkbox = QCheckBox("Videos") - self.archives_checkbox = QCheckBox("Archives") - types_layout.addWidget(self.videos_checkbox) - types_layout.addWidget(self.archives_checkbox) - types_group.setLayout(types_layout) - layout.addWidget(types_group) - - # File size group - size_group = QGroupBox("Minimum File Size") - size_layout = QHBoxLayout() - size_layout.addWidget(QLabel("Apply only if file size is over:")) - self.min_size_input = QLineEdit() - self.min_size_input.setValidator(QIntValidator(0, 99999)) - self.min_size_input.setFixedWidth(50) - size_layout.addWidget(self.min_size_input) - size_layout.addWidget(QLabel("MB")) - size_layout.addStretch() - size_group.setLayout(size_layout) - layout.addWidget(size_group) - - # Custom extensions group - extensions_group = QGroupBox("Custom File Extensions") - extensions_layout = QVBoxLayout() - extensions_layout.addWidget(QLabel("Apply to these additional extensions (comma-separated):")) - self.extensions_input = QLineEdit() - self.extensions_input.setPlaceholderText("e.g., .psd, .blend, .mkv") - extensions_layout.addWidget(self.extensions_input) - extensions_group.setLayout(extensions_layout) - layout.addWidget(extensions_group) - - # Dialog buttons - button_box = QDialogButtonBox(QDialogButtonBox.Save | QDialogButtonBox.Cancel) - button_box.accepted.connect(self.accept) - button_box.rejected.connect(self.reject) - layout.addWidget(button_box) - - self.setLayout(layout) - self._load_initial_settings() - - def _load_initial_settings(self): - """Populates the dialog with the current settings.""" - self.videos_checkbox.setChecked(self.settings.get('apply_on_videos', False)) - self.archives_checkbox.setChecked(self.settings.get('apply_on_archives', False)) - self.min_size_input.setText(str(self.settings.get('min_size_mb', 200))) - self.extensions_input.setText(", ".join(self.settings.get('custom_extensions', []))) - - def get_selected_options(self): - """Returns the configured settings from the dialog.""" - custom_extensions_raw = self.extensions_input.text().strip().lower() - custom_extensions = {ext.strip() for ext in custom_extensions_raw.split(',') if ext.strip().startswith('.')} - - return { - "enabled": True, # Implied if dialog is saved - "apply_on_videos": self.videos_checkbox.isChecked(), - "apply_on_archives": self.archives_checkbox.isChecked(), - "min_size_mb": int(self.min_size_input.text()) if self.min_size_input.text().isdigit() else 200, - "custom_extensions": sorted(list(custom_extensions)) - }