mirror of
https://github.com/Yuvi9587/Kemono-Downloader.git
synced 2025-12-29 16:14:44 +00:00
Commit
This commit is contained in:
@@ -13,14 +13,14 @@ DOWNLOAD_CHUNK_SIZE_ITER = 1024 * 256 # 256KB for iter_content within a chunk d
|
||||
|
||||
|
||||
def _download_individual_chunk(chunk_url, temp_file_path, start_byte, end_byte, headers,
|
||||
part_num, total_parts, progress_data, cancellation_event, skip_event, logger,
|
||||
signals=None, api_original_filename=None): # Added signals and api_original_filename
|
||||
part_num, total_parts, progress_data, cancellation_event, skip_event,
|
||||
logger_func, emitter=None, api_original_filename=None): # Renamed logger, signals to emitter
|
||||
"""Downloads a single chunk of a file and writes it to the temp file."""
|
||||
if cancellation_event and cancellation_event.is_set():
|
||||
logger(f" [Chunk {part_num + 1}/{total_parts}] Download cancelled before start.")
|
||||
logger_func(f" [Chunk {part_num + 1}/{total_parts}] Download cancelled before start.")
|
||||
return 0, False # bytes_downloaded, success
|
||||
if skip_event and skip_event.is_set():
|
||||
logger(f" [Chunk {part_num + 1}/{total_parts}] Skip event triggered before start.")
|
||||
logger_func(f" [Chunk {part_num + 1}/{total_parts}] Skip event triggered before start.")
|
||||
return 0, False
|
||||
|
||||
chunk_headers = headers.copy()
|
||||
@@ -44,15 +44,15 @@ def _download_individual_chunk(chunk_url, temp_file_path, start_byte, end_byte,
|
||||
|
||||
for attempt in range(MAX_CHUNK_DOWNLOAD_RETRIES + 1):
|
||||
if cancellation_event and cancellation_event.is_set():
|
||||
logger(f" [Chunk {part_num + 1}/{total_parts}] Cancelled during retry loop.")
|
||||
logger_func(f" [Chunk {part_num + 1}/{total_parts}] Cancelled during retry loop.")
|
||||
return bytes_this_chunk, False
|
||||
if skip_event and skip_event.is_set():
|
||||
logger(f" [Chunk {part_num + 1}/{total_parts}] Skip event during retry loop.")
|
||||
logger_func(f" [Chunk {part_num + 1}/{total_parts}] Skip event during retry loop.")
|
||||
return bytes_this_chunk, False
|
||||
|
||||
try:
|
||||
if attempt > 0:
|
||||
logger(f" [Chunk {part_num + 1}/{total_parts}] Retrying download (Attempt {attempt}/{MAX_CHUNK_DOWNLOAD_RETRIES})...")
|
||||
logger_func(f" [Chunk {part_num + 1}/{total_parts}] Retrying download (Attempt {attempt}/{MAX_CHUNK_DOWNLOAD_RETRIES})...")
|
||||
time.sleep(CHUNK_DOWNLOAD_RETRY_DELAY * (2 ** (attempt - 1)))
|
||||
# Reset speed calculation on retry
|
||||
last_speed_calc_time = time.time()
|
||||
@@ -60,14 +60,14 @@ def _download_individual_chunk(chunk_url, temp_file_path, start_byte, end_byte,
|
||||
|
||||
# Enhanced log message for chunk start
|
||||
log_msg = f" 🚀 [Chunk {part_num + 1}/{total_parts}] Starting download: bytes {start_byte}-{end_byte if end_byte != -1 else 'EOF'}"
|
||||
logger(log_msg)
|
||||
logger_func(log_msg)
|
||||
print(f"DEBUG_MULTIPART: {log_msg}") # Direct console print for debugging
|
||||
response = requests.get(chunk_url, headers=chunk_headers, timeout=(10, 120), stream=True)
|
||||
response.raise_for_status()
|
||||
|
||||
# For 0-byte files, if end_byte was -1, we expect 0 content.
|
||||
if start_byte == 0 and end_byte == -1 and int(response.headers.get('Content-Length', 0)) == 0:
|
||||
logger(f" [Chunk {part_num + 1}/{total_parts}] Confirmed 0-byte file.")
|
||||
logger_func(f" [Chunk {part_num + 1}/{total_parts}] Confirmed 0-byte file.")
|
||||
with progress_data['lock']:
|
||||
progress_data['chunks_status'][part_num]['active'] = False
|
||||
progress_data['chunks_status'][part_num]['speed_bps'] = 0
|
||||
@@ -77,10 +77,10 @@ def _download_individual_chunk(chunk_url, temp_file_path, start_byte, end_byte,
|
||||
f.seek(start_byte)
|
||||
for data_segment in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE_ITER):
|
||||
if cancellation_event and cancellation_event.is_set():
|
||||
logger(f" [Chunk {part_num + 1}/{total_parts}] Cancelled during data iteration.")
|
||||
logger_func(f" [Chunk {part_num + 1}/{total_parts}] Cancelled during data iteration.")
|
||||
return bytes_this_chunk, False
|
||||
if skip_event and skip_event.is_set():
|
||||
logger(f" [Chunk {part_num + 1}/{total_parts}] Skip event during data iteration.")
|
||||
logger_func(f" [Chunk {part_num + 1}/{total_parts}] Skip event during data iteration.")
|
||||
return bytes_this_chunk, False
|
||||
if data_segment:
|
||||
f.write(data_segment)
|
||||
@@ -103,26 +103,29 @@ def _download_individual_chunk(chunk_url, temp_file_path, start_byte, end_byte,
|
||||
|
||||
# Emit progress more frequently from within the chunk download
|
||||
if current_time - last_progress_emit_time_for_chunk > 0.1: # Emit up to 10 times/sec per chunk
|
||||
if signals and hasattr(signals, 'file_progress_signal'):
|
||||
if emitter:
|
||||
# Ensure we read the latest total downloaded from progress_data
|
||||
# Send a copy of the chunks_status list
|
||||
status_list_copy = [dict(s) for s in progress_data['chunks_status']] # Make a deep enough copy
|
||||
signals.file_progress_signal.emit(api_original_filename, status_list_copy)
|
||||
if isinstance(emitter, queue.Queue):
|
||||
emitter.put({'type': 'file_progress', 'payload': (api_original_filename, status_list_copy)})
|
||||
elif hasattr(emitter, 'file_progress_signal'): # PostProcessorSignals-like
|
||||
emitter.file_progress_signal.emit(api_original_filename, status_list_copy)
|
||||
last_progress_emit_time_for_chunk = current_time
|
||||
return bytes_this_chunk, True
|
||||
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout, http.client.IncompleteRead) as e:
|
||||
logger(f" ❌ [Chunk {part_num + 1}/{total_parts}] Retryable error: {e}")
|
||||
logger_func(f" ❌ [Chunk {part_num + 1}/{total_parts}] Retryable error: {e}")
|
||||
if attempt == MAX_CHUNK_DOWNLOAD_RETRIES:
|
||||
logger(f" ❌ [Chunk {part_num + 1}/{total_parts}] Failed after {MAX_CHUNK_DOWNLOAD_RETRIES} retries.")
|
||||
logger_func(f" ❌ [Chunk {part_num + 1}/{total_parts}] Failed after {MAX_CHUNK_DOWNLOAD_RETRIES} retries.")
|
||||
return bytes_this_chunk, False
|
||||
except requests.exceptions.RequestException as e: # Includes 4xx/5xx errors after raise_for_status
|
||||
logger(f" ❌ [Chunk {part_num + 1}/{total_parts}] Non-retryable error: {e}")
|
||||
logger_func(f" ❌ [Chunk {part_num + 1}/{total_parts}] Non-retryable error: {e}")
|
||||
return bytes_this_chunk, False
|
||||
except Exception as e:
|
||||
logger(f" ❌ [Chunk {part_num + 1}/{total_parts}] Unexpected error: {e}\n{traceback.format_exc(limit=1)}")
|
||||
logger_func(f" ❌ [Chunk {part_num + 1}/{total_parts}] Unexpected error: {e}\n{traceback.format_exc(limit=1)}")
|
||||
return bytes_this_chunk, False
|
||||
|
||||
|
||||
# Ensure final status is marked as inactive if loop finishes due to retries
|
||||
with progress_data['lock']:
|
||||
progress_data['chunks_status'][part_num]['active'] = False
|
||||
@@ -130,15 +133,15 @@ def _download_individual_chunk(chunk_url, temp_file_path, start_byte, end_byte,
|
||||
return bytes_this_chunk, False # Should be unreachable
|
||||
|
||||
|
||||
def download_file_in_parts(file_url, save_path, total_size, num_parts, headers,
|
||||
api_original_filename, signals, cancellation_event, skip_event, logger):
|
||||
def download_file_in_parts(file_url, save_path, total_size, num_parts, headers, api_original_filename,
|
||||
emitter_for_multipart, cancellation_event, skip_event, logger_func): # Renamed signals, logger
|
||||
"""
|
||||
Downloads a file in multiple parts concurrently.
|
||||
Returns: (download_successful_flag, downloaded_bytes, calculated_file_hash, temp_file_handle_or_None)
|
||||
The temp_file_handle will be an open read-binary file handle to the .part file if successful, otherwise None.
|
||||
It is the responsibility of the caller to close this handle and rename/delete the .part file.
|
||||
"""
|
||||
logger(f"⬇️ Initializing Multi-part Download ({num_parts} parts) for: '{api_original_filename}' (Size: {total_size / (1024*1024):.2f} MB)")
|
||||
logger_func(f"⬇️ Initializing Multi-part Download ({num_parts} parts) for: '{api_original_filename}' (Size: {total_size / (1024*1024):.2f} MB)")
|
||||
temp_file_path = save_path + ".part"
|
||||
|
||||
try:
|
||||
@@ -146,7 +149,7 @@ def download_file_in_parts(file_url, save_path, total_size, num_parts, headers,
|
||||
if total_size > 0:
|
||||
f_temp.truncate(total_size) # Pre-allocate space
|
||||
except IOError as e:
|
||||
logger(f" ❌ Error creating/truncating temp file '{temp_file_path}': {e}")
|
||||
logger_func(f" ❌ Error creating/truncating temp file '{temp_file_path}': {e}")
|
||||
return False, 0, None, None
|
||||
|
||||
chunk_size_calc = total_size // num_parts
|
||||
@@ -167,7 +170,7 @@ def download_file_in_parts(file_url, save_path, total_size, num_parts, headers,
|
||||
chunk_actual_sizes.append(end - start + 1)
|
||||
|
||||
if not chunks_ranges and total_size > 0:
|
||||
logger(f" ⚠️ No valid chunk ranges for multipart download of '{api_original_filename}'. Aborting multipart.")
|
||||
logger_func(f" ⚠️ No valid chunk ranges for multipart download of '{api_original_filename}'. Aborting multipart.")
|
||||
if os.path.exists(temp_file_path): os.remove(temp_file_path)
|
||||
return False, 0, None, None
|
||||
|
||||
@@ -191,8 +194,9 @@ def download_file_in_parts(file_url, save_path, total_size, num_parts, headers,
|
||||
chunk_futures.append(chunk_pool.submit(
|
||||
_download_individual_chunk, chunk_url=file_url, temp_file_path=temp_file_path,
|
||||
start_byte=start, end_byte=end, headers=headers, part_num=i, total_parts=num_parts,
|
||||
progress_data=progress_data, cancellation_event=cancellation_event, skip_event=skip_event, logger=logger,
|
||||
signals=signals, api_original_filename=api_original_filename # Pass them here
|
||||
progress_data=progress_data, cancellation_event=cancellation_event, skip_event=skip_event,
|
||||
logger_func=logger_func, emitter=emitter_for_multipart, # Pass emitter
|
||||
api_original_filename=api_original_filename
|
||||
))
|
||||
|
||||
for future in as_completed(chunk_futures):
|
||||
@@ -201,21 +205,23 @@ def download_file_in_parts(file_url, save_path, total_size, num_parts, headers,
|
||||
total_bytes_from_chunks += bytes_downloaded_this_chunk
|
||||
if not success_this_chunk:
|
||||
all_chunks_successful = False
|
||||
# Progress is emitted from within _download_individual_chunk
|
||||
|
||||
if cancellation_event and cancellation_event.is_set():
|
||||
logger(f" Multi-part download for '{api_original_filename}' cancelled by main event.")
|
||||
logger_func(f" Multi-part download for '{api_original_filename}' cancelled by main event.")
|
||||
all_chunks_successful = False
|
||||
|
||||
|
||||
# Ensure a final progress update is sent with all chunks marked inactive (unless still active due to error)
|
||||
if signals and hasattr(signals, 'file_progress_signal'):
|
||||
if emitter_for_multipart:
|
||||
with progress_data['lock']:
|
||||
# Ensure all chunks are marked inactive for the final signal if download didn't fully succeed or was cancelled
|
||||
status_list_copy = [dict(s) for s in progress_data['chunks_status']]
|
||||
signals.file_progress_signal.emit(api_original_filename, status_list_copy)
|
||||
status_list_copy = [dict(s) for s in progress_data['chunks_status']]
|
||||
if isinstance(emitter_for_multipart, queue.Queue):
|
||||
emitter_for_multipart.put({'type': 'file_progress', 'payload': (api_original_filename, status_list_copy)})
|
||||
elif hasattr(emitter_for_multipart, 'file_progress_signal'): # PostProcessorSignals-like
|
||||
emitter_for_multipart.file_progress_signal.emit(api_original_filename, status_list_copy)
|
||||
|
||||
if all_chunks_successful and (total_bytes_from_chunks == total_size or total_size == 0):
|
||||
logger(f" ✅ Multi-part download successful for '{api_original_filename}'. Total bytes: {total_bytes_from_chunks}")
|
||||
logger_func(f" ✅ Multi-part download successful for '{api_original_filename}'. Total bytes: {total_bytes_from_chunks}")
|
||||
md5_hasher = hashlib.md5()
|
||||
with open(temp_file_path, 'rb') as f_hash:
|
||||
for buf in iter(lambda: f_hash.read(4096*10), b''): # Read in larger buffers for hashing
|
||||
@@ -225,8 +231,8 @@ def download_file_in_parts(file_url, save_path, total_size, num_parts, headers,
|
||||
# The caller is responsible for closing this handle and renaming/deleting the .part file.
|
||||
return True, total_bytes_from_chunks, calculated_hash, open(temp_file_path, 'rb')
|
||||
else:
|
||||
logger(f" ❌ Multi-part download failed for '{api_original_filename}'. Success: {all_chunks_successful}, Bytes: {total_bytes_from_chunks}/{total_size}. Cleaning up.")
|
||||
logger_func(f" ❌ Multi-part download failed for '{api_original_filename}'. Success: {all_chunks_successful}, Bytes: {total_bytes_from_chunks}/{total_size}. Cleaning up.")
|
||||
if os.path.exists(temp_file_path):
|
||||
try: os.remove(temp_file_path)
|
||||
except OSError as e: logger(f" Failed to remove temp part file '{temp_file_path}': {e}")
|
||||
except OSError as e: logger_func(f" Failed to remove temp part file '{temp_file_path}': {e}")
|
||||
return False, total_bytes_from_chunks, None, None
|
||||
Reference in New Issue
Block a user