mirror of
https://github.com/Yuvi9587/Kemono-Downloader.git
synced 2025-12-29 16:14:44 +00:00
Commit
This commit is contained in:
1
src/core/__init__.py
Normal file
1
src/core/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# ...existing code...
|
||||
321
src/core/api_client.py
Normal file
321
src/core/api_client.py
Normal file
@@ -0,0 +1,321 @@
|
||||
# --- Standard Library Imports ---
|
||||
import time
|
||||
import traceback
|
||||
from urllib.parse import urlparse
|
||||
|
||||
# --- Third-Party Library Imports ---
|
||||
import requests
|
||||
|
||||
# --- Local Application Imports ---
|
||||
from ..utils.network_utils import extract_post_info, prepare_cookies_for_request
|
||||
from ..config.constants import (
|
||||
STYLE_DATE_POST_TITLE
|
||||
)
|
||||
|
||||
|
||||
def fetch_posts_paginated(api_url_base, headers, offset, logger, cancellation_event=None, pause_event=None, cookies_dict=None):
|
||||
"""
|
||||
Fetches a single page of posts from the API with retry logic.
|
||||
|
||||
Args:
|
||||
api_url_base (str): The base URL for the user's posts.
|
||||
headers (dict): The request headers.
|
||||
offset (int): The offset for pagination.
|
||||
logger (callable): Function to log messages.
|
||||
cancellation_event (threading.Event): Event to signal cancellation.
|
||||
pause_event (threading.Event): Event to signal pause.
|
||||
cookies_dict (dict): A dictionary of cookies to include in the request.
|
||||
|
||||
Returns:
|
||||
list: A list of post data dictionaries from the API.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If the fetch fails after all retries or encounters a non-retryable error.
|
||||
"""
|
||||
if cancellation_event and cancellation_event.is_set():
|
||||
logger(" Fetch cancelled before request.")
|
||||
raise RuntimeError("Fetch operation cancelled by user.")
|
||||
if pause_event and pause_event.is_set():
|
||||
logger(" Post fetching paused...")
|
||||
while pause_event.is_set():
|
||||
if cancellation_event and cancellation_event.is_set():
|
||||
logger(" Post fetching cancelled while paused.")
|
||||
raise RuntimeError("Fetch operation cancelled by user.")
|
||||
time.sleep(0.5)
|
||||
logger(" Post fetching resumed.")
|
||||
|
||||
paginated_url = f'{api_url_base}?o={offset}'
|
||||
max_retries = 3
|
||||
retry_delay = 5
|
||||
|
||||
for attempt in range(max_retries):
|
||||
if cancellation_event and cancellation_event.is_set():
|
||||
raise RuntimeError("Fetch operation cancelled by user during retry loop.")
|
||||
|
||||
log_message = f" Fetching: {paginated_url} (Page approx. {offset // 50 + 1})"
|
||||
if attempt > 0:
|
||||
log_message += f" (Attempt {attempt + 1}/{max_retries})"
|
||||
logger(log_message)
|
||||
|
||||
try:
|
||||
response = requests.get(paginated_url, headers=headers, timeout=(15, 90), cookies=cookies_dict)
|
||||
response.raise_for_status()
|
||||
|
||||
if 'application/json' not in response.headers.get('Content-Type', '').lower():
|
||||
logger(f"⚠️ Unexpected content type from API: {response.headers.get('Content-Type')}. Body: {response.text[:200]}")
|
||||
return []
|
||||
|
||||
return response.json()
|
||||
|
||||
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
|
||||
logger(f" ⚠️ Retryable network error on page fetch (Attempt {attempt + 1}): {e}")
|
||||
if attempt < max_retries - 1:
|
||||
delay = retry_delay * (2 ** attempt)
|
||||
logger(f" Retrying in {delay} seconds...")
|
||||
time.sleep(delay)
|
||||
continue
|
||||
else:
|
||||
logger(f" ❌ Failed to fetch page after {max_retries} attempts.")
|
||||
raise RuntimeError(f"Timeout or connection error fetching offset {offset}")
|
||||
except requests.exceptions.RequestException as e:
|
||||
err_msg = f"Error fetching offset {offset}: {e}"
|
||||
if e.response is not None:
|
||||
err_msg += f" (Status: {e.response.status_code}, Body: {e.response.text[:200]})"
|
||||
raise RuntimeError(err_msg)
|
||||
except ValueError as e: # JSON decode error
|
||||
raise RuntimeError(f"Error decoding JSON from offset {offset}: {e}. Response: {response.text[:200]}")
|
||||
|
||||
raise RuntimeError(f"Failed to fetch page {paginated_url} after all attempts.")
|
||||
|
||||
|
||||
def fetch_post_comments(api_domain, service, user_id, post_id, headers, logger, cancellation_event=None, pause_event=None, cookies_dict=None):
|
||||
"""Fetches all comments for a specific post."""
|
||||
if cancellation_event and cancellation_event.is_set():
|
||||
raise RuntimeError("Comment fetch operation cancelled by user.")
|
||||
|
||||
comments_api_url = f"https://{api_domain}/api/v1/{service}/user/{user_id}/post/{post_id}/comments"
|
||||
logger(f" Fetching comments: {comments_api_url}")
|
||||
|
||||
try:
|
||||
response = requests.get(comments_api_url, headers=headers, timeout=(10, 30), cookies=cookies_dict)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise RuntimeError(f"Error fetching comments for post {post_id}: {e}")
|
||||
except ValueError as e:
|
||||
raise RuntimeError(f"Error decoding JSON from comments API for post {post_id}: {e}")
|
||||
|
||||
def download_from_api (
|
||||
api_url_input ,
|
||||
logger =print ,
|
||||
start_page =None ,
|
||||
end_page =None ,
|
||||
manga_mode =False ,
|
||||
cancellation_event =None ,
|
||||
pause_event =None ,
|
||||
use_cookie =False ,
|
||||
cookie_text ="",
|
||||
selected_cookie_file =None ,
|
||||
app_base_dir =None ,
|
||||
manga_filename_style_for_sort_check =None
|
||||
):
|
||||
headers ={
|
||||
'User-Agent':'Mozilla/5.0',
|
||||
'Accept':'application/json'
|
||||
}
|
||||
|
||||
service ,user_id ,target_post_id =extract_post_info (api_url_input )
|
||||
|
||||
if cancellation_event and cancellation_event .is_set ():
|
||||
logger (" Download_from_api cancelled at start.")
|
||||
return
|
||||
|
||||
parsed_input_url_for_domain =urlparse (api_url_input )
|
||||
api_domain =parsed_input_url_for_domain .netloc
|
||||
if not any (d in api_domain .lower ()for d in ['kemono.su','kemono.party','coomer.su','coomer.party']):
|
||||
logger (f"⚠️ Unrecognized domain '{api_domain }' from input URL. Defaulting to kemono.su for API calls.")
|
||||
api_domain ="kemono.su"
|
||||
cookies_for_api =None
|
||||
if use_cookie and app_base_dir :
|
||||
cookies_for_api =prepare_cookies_for_request (use_cookie ,cookie_text ,selected_cookie_file ,app_base_dir ,logger ,target_domain =api_domain )
|
||||
if target_post_id :
|
||||
direct_post_api_url =f"https://{api_domain }/api/v1/{service }/user/{user_id }/post/{target_post_id }"
|
||||
logger (f" Attempting direct fetch for target post: {direct_post_api_url }")
|
||||
try :
|
||||
direct_response =requests .get (direct_post_api_url ,headers =headers ,timeout =(10 ,30 ),cookies =cookies_for_api )
|
||||
direct_response .raise_for_status ()
|
||||
direct_post_data =direct_response .json ()
|
||||
if isinstance (direct_post_data ,list )and direct_post_data :
|
||||
direct_post_data =direct_post_data [0 ]
|
||||
if isinstance (direct_post_data ,dict )and 'post'in direct_post_data and isinstance (direct_post_data ['post'],dict ):
|
||||
direct_post_data =direct_post_data ['post']
|
||||
if isinstance (direct_post_data ,dict )and direct_post_data .get ('id')==target_post_id :
|
||||
logger (f" ✅ Direct fetch successful for post {target_post_id }.")
|
||||
yield [direct_post_data ]
|
||||
return
|
||||
else :
|
||||
response_type =type (direct_post_data ).__name__
|
||||
response_snippet =str (direct_post_data )[:200 ]
|
||||
logger (f" ⚠️ Direct fetch for post {target_post_id } returned unexpected data (Type: {response_type }, Snippet: '{response_snippet }'). Falling back to pagination.")
|
||||
except requests .exceptions .RequestException as e :
|
||||
logger (f" ⚠️ Direct fetch failed for post {target_post_id }: {e }. Falling back to pagination.")
|
||||
except Exception as e :
|
||||
logger (f" ⚠️ Unexpected error during direct fetch for post {target_post_id }: {e }. Falling back to pagination.")
|
||||
if not service or not user_id :
|
||||
logger (f"❌ Invalid URL or could not extract service/user: {api_url_input }")
|
||||
return
|
||||
if target_post_id and (start_page or end_page ):
|
||||
logger ("⚠️ Page range (start/end page) is ignored when a specific post URL is provided (searching all pages for the post).")
|
||||
|
||||
is_manga_mode_fetch_all_and_sort_oldest_first =manga_mode and (manga_filename_style_for_sort_check !=STYLE_DATE_POST_TITLE )and not target_post_id
|
||||
api_base_url =f"https://{api_domain }/api/v1/{service }/user/{user_id }"
|
||||
page_size =50
|
||||
if is_manga_mode_fetch_all_and_sort_oldest_first :
|
||||
logger (f" Manga Mode (Style: {manga_filename_style_for_sort_check if manga_filename_style_for_sort_check else 'Default'} - Oldest First Sort Active): Fetching all posts to sort by date...")
|
||||
all_posts_for_manga_mode =[]
|
||||
current_offset_manga =0
|
||||
if start_page and start_page >1 :
|
||||
current_offset_manga =(start_page -1 )*page_size
|
||||
logger (f" Manga Mode: Starting fetch from page {start_page } (offset {current_offset_manga }).")
|
||||
elif start_page :
|
||||
logger (f" Manga Mode: Starting fetch from page 1 (offset 0).")
|
||||
if end_page :
|
||||
logger (f" Manga Mode: Will fetch up to page {end_page }.")
|
||||
while True :
|
||||
if pause_event and pause_event .is_set ():
|
||||
logger (" Manga mode post fetching paused...")
|
||||
while pause_event .is_set ():
|
||||
if cancellation_event and cancellation_event .is_set ():
|
||||
logger (" Manga mode post fetching cancelled while paused.")
|
||||
break
|
||||
time .sleep (0.5 )
|
||||
if not (cancellation_event and cancellation_event .is_set ()):logger (" Manga mode post fetching resumed.")
|
||||
if cancellation_event and cancellation_event .is_set ():
|
||||
logger (" Manga mode post fetching cancelled.")
|
||||
break
|
||||
current_page_num_manga =(current_offset_manga //page_size )+1
|
||||
if end_page and current_page_num_manga >end_page :
|
||||
logger (f" Manga Mode: Reached specified end page ({end_page }). Stopping post fetch.")
|
||||
break
|
||||
try :
|
||||
posts_batch_manga =fetch_posts_paginated (api_base_url ,headers ,current_offset_manga ,logger ,cancellation_event ,pause_event ,cookies_dict =cookies_for_api )
|
||||
if not isinstance (posts_batch_manga ,list ):
|
||||
logger (f"❌ API Error (Manga Mode): Expected list of posts, got {type (posts_batch_manga )}.")
|
||||
break
|
||||
if not posts_batch_manga :
|
||||
logger ("✅ Reached end of posts (Manga Mode fetch all).")
|
||||
if start_page and not end_page and current_page_num_manga <start_page :
|
||||
logger (f" Manga Mode: No posts found on or after specified start page {start_page }.")
|
||||
elif end_page and current_page_num_manga <=end_page and not all_posts_for_manga_mode :
|
||||
logger (f" Manga Mode: No posts found within the specified page range ({start_page or 1 }-{end_page }).")
|
||||
break
|
||||
all_posts_for_manga_mode .extend (posts_batch_manga )
|
||||
current_offset_manga +=page_size
|
||||
time .sleep (0.6 )
|
||||
except RuntimeError as e :
|
||||
if "cancelled by user"in str (e ).lower ():
|
||||
logger (f"ℹ️ Manga mode pagination stopped due to cancellation: {e }")
|
||||
else :
|
||||
logger (f"❌ {e }\n Aborting manga mode pagination.")
|
||||
break
|
||||
except Exception as e :
|
||||
logger (f"❌ Unexpected error during manga mode fetch: {e }")
|
||||
traceback .print_exc ()
|
||||
break
|
||||
if cancellation_event and cancellation_event .is_set ():return
|
||||
if all_posts_for_manga_mode :
|
||||
logger (f" Manga Mode: Fetched {len (all_posts_for_manga_mode )} total posts. Sorting by publication date (oldest first)...")
|
||||
def sort_key_tuple (post ):
|
||||
published_date_str =post .get ('published')
|
||||
added_date_str =post .get ('added')
|
||||
post_id_str =post .get ('id',"0")
|
||||
primary_sort_val ="0000-00-00T00:00:00"
|
||||
if published_date_str :
|
||||
primary_sort_val =published_date_str
|
||||
elif added_date_str :
|
||||
logger (f" ⚠️ Post ID {post_id_str } missing 'published' date, using 'added' date '{added_date_str }' for primary sorting.")
|
||||
primary_sort_val =added_date_str
|
||||
else :
|
||||
logger (f" ⚠️ Post ID {post_id_str } missing both 'published' and 'added' dates. Placing at start of sort (using default earliest date).")
|
||||
secondary_sort_val =0
|
||||
try :
|
||||
secondary_sort_val =int (post_id_str )
|
||||
except ValueError :
|
||||
logger (f" ⚠️ Post ID '{post_id_str }' is not a valid integer for secondary sorting, using 0.")
|
||||
return (primary_sort_val ,secondary_sort_val )
|
||||
all_posts_for_manga_mode .sort (key =sort_key_tuple )
|
||||
for i in range (0 ,len (all_posts_for_manga_mode ),page_size ):
|
||||
if cancellation_event and cancellation_event .is_set ():
|
||||
logger (" Manga mode post yielding cancelled.")
|
||||
break
|
||||
yield all_posts_for_manga_mode [i :i +page_size ]
|
||||
return
|
||||
|
||||
|
||||
|
||||
if manga_mode and not target_post_id and (manga_filename_style_for_sort_check ==STYLE_DATE_POST_TITLE ):
|
||||
logger (f" Manga Mode (Style: {STYLE_DATE_POST_TITLE }): Processing posts in default API order (newest first).")
|
||||
|
||||
current_page_num =1
|
||||
current_offset =0
|
||||
processed_target_post_flag =False
|
||||
if start_page and start_page >1 and not target_post_id :
|
||||
current_offset =(start_page -1 )*page_size
|
||||
current_page_num =start_page
|
||||
logger (f" Starting from page {current_page_num } (calculated offset {current_offset }).")
|
||||
while True :
|
||||
if pause_event and pause_event .is_set ():
|
||||
logger (" Post fetching loop paused...")
|
||||
while pause_event .is_set ():
|
||||
if cancellation_event and cancellation_event .is_set ():
|
||||
logger (" Post fetching loop cancelled while paused.")
|
||||
break
|
||||
time .sleep (0.5 )
|
||||
if not (cancellation_event and cancellation_event .is_set ()):logger (" Post fetching loop resumed.")
|
||||
if cancellation_event and cancellation_event .is_set ():
|
||||
logger (" Post fetching loop cancelled.")
|
||||
break
|
||||
if target_post_id and processed_target_post_flag :
|
||||
break
|
||||
if not target_post_id and end_page and current_page_num >end_page :
|
||||
logger (f"✅ Reached specified end page ({end_page }) for creator feed. Stopping.")
|
||||
break
|
||||
try :
|
||||
posts_batch =fetch_posts_paginated (api_base_url ,headers ,current_offset ,logger ,cancellation_event ,pause_event ,cookies_dict =cookies_for_api )
|
||||
if not isinstance (posts_batch ,list ):
|
||||
logger (f"❌ API Error: Expected list of posts, got {type (posts_batch )} at page {current_page_num } (offset {current_offset }).")
|
||||
break
|
||||
except RuntimeError as e :
|
||||
if "cancelled by user"in str (e ).lower ():
|
||||
logger (f"ℹ️ Pagination stopped due to cancellation: {e }")
|
||||
else :
|
||||
logger (f"❌ {e }\n Aborting pagination at page {current_page_num } (offset {current_offset }).")
|
||||
break
|
||||
except Exception as e :
|
||||
logger (f"❌ Unexpected error fetching page {current_page_num } (offset {current_offset }): {e }")
|
||||
traceback .print_exc ()
|
||||
break
|
||||
if not posts_batch :
|
||||
if target_post_id and not processed_target_post_flag :
|
||||
logger (f"❌ Target post {target_post_id } not found after checking all available pages (API returned no more posts at offset {current_offset }).")
|
||||
elif not target_post_id :
|
||||
if current_page_num ==(start_page or 1 ):
|
||||
logger (f"😕 No posts found on the first page checked (page {current_page_num }, offset {current_offset }).")
|
||||
else :
|
||||
logger (f"✅ Reached end of posts (no more content from API at offset {current_offset }).")
|
||||
break
|
||||
if target_post_id and not processed_target_post_flag :
|
||||
matching_post =next ((p for p in posts_batch if str (p .get ('id'))==str (target_post_id )),None )
|
||||
if matching_post :
|
||||
logger (f"🎯 Found target post {target_post_id } on page {current_page_num } (offset {current_offset }).")
|
||||
yield [matching_post ]
|
||||
processed_target_post_flag =True
|
||||
elif not target_post_id :
|
||||
yield posts_batch
|
||||
if processed_target_post_flag :
|
||||
break
|
||||
current_offset +=page_size
|
||||
current_page_num +=1
|
||||
time .sleep (0.6 )
|
||||
if target_post_id and not processed_target_post_flag and not (cancellation_event and cancellation_event .is_set ()):
|
||||
logger (f"❌ Target post {target_post_id } could not be found after checking all relevant pages (final check after loop).")
|
||||
241
src/core/manager.py
Normal file
241
src/core/manager.py
Normal file
@@ -0,0 +1,241 @@
|
||||
# --- Standard Library Imports ---
|
||||
import threading
|
||||
import time
|
||||
import os
|
||||
import json
|
||||
import traceback
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
|
||||
|
||||
# --- Local Application Imports ---
|
||||
# These imports reflect the new, organized project structure.
|
||||
from .api_client import download_from_api
|
||||
from .workers import PostProcessorWorker, DownloadThread
|
||||
from ..config.constants import (
|
||||
STYLE_DATE_BASED, STYLE_POST_TITLE_GLOBAL_NUMBERING,
|
||||
MAX_THREADS, POST_WORKER_BATCH_THRESHOLD, POST_WORKER_NUM_BATCHES,
|
||||
POST_WORKER_BATCH_DELAY_SECONDS
|
||||
)
|
||||
from ..utils.file_utils import clean_folder_name
|
||||
|
||||
|
||||
class DownloadManager:
|
||||
"""
|
||||
Manages the entire download lifecycle, acting as a bridge between the UI
|
||||
and the backend workers. It handles thread pools, task submission,
|
||||
and state management for a download session.
|
||||
"""
|
||||
|
||||
def __init__(self, progress_queue):
|
||||
"""
|
||||
Initializes the DownloadManager.
|
||||
|
||||
Args:
|
||||
progress_queue (queue.Queue): A thread-safe queue for sending
|
||||
status updates to the UI.
|
||||
"""
|
||||
self.progress_queue = progress_queue
|
||||
self.thread_pool = None
|
||||
self.active_futures = []
|
||||
|
||||
# --- Session State ---
|
||||
self.cancellation_event = threading.Event()
|
||||
self.pause_event = threading.Event()
|
||||
self.is_running = False
|
||||
|
||||
self.total_posts = 0
|
||||
self.processed_posts = 0
|
||||
self.total_downloads = 0
|
||||
self.total_skips = 0
|
||||
self.all_kept_original_filenames = []
|
||||
|
||||
def _log(self, message):
|
||||
"""Puts a progress message into the queue for the UI."""
|
||||
self.progress_queue.put({'type': 'progress', 'payload': (message,)})
|
||||
|
||||
def start_session(self, config, restore_data=None):
|
||||
"""
|
||||
Starts a new download session based on the provided configuration.
|
||||
This is the main entry point called by the UI.
|
||||
|
||||
Args:
|
||||
config (dict): A dictionary containing all settings from the UI.
|
||||
restore_data (dict, optional): Data from a previous, interrupted session.
|
||||
"""
|
||||
if self.is_running:
|
||||
self._log("❌ Cannot start a new session: A session is already in progress.")
|
||||
return
|
||||
|
||||
# --- Reset state for the new session ---
|
||||
self.is_running = True
|
||||
self.cancellation_event.clear()
|
||||
self.pause_event.clear()
|
||||
self.active_futures.clear()
|
||||
self.total_posts = 0
|
||||
self.processed_posts = 0
|
||||
self.total_downloads = 0
|
||||
self.total_skips = 0
|
||||
self.all_kept_original_filenames = []
|
||||
|
||||
# --- Decide execution strategy (multi-threaded vs. single-threaded) ---
|
||||
is_single_post = bool(config.get('target_post_id_from_initial_url'))
|
||||
use_multithreading = config.get('use_multithreading', True)
|
||||
is_manga_sequential = config.get('manga_mode_active') and config.get('manga_filename_style') in [STYLE_DATE_BASED, STYLE_POST_TITLE_GLOBAL_NUMBERING]
|
||||
|
||||
should_use_multithreading_for_posts = use_multithreading and not is_single_post and not is_manga_sequential
|
||||
|
||||
if should_use_multithreading_for_posts:
|
||||
# Start a separate thread to manage fetching and queuing to the thread pool
|
||||
fetcher_thread = threading.Thread(
|
||||
target=self._fetch_and_queue_posts_for_pool,
|
||||
args=(config, restore_data),
|
||||
daemon=True
|
||||
)
|
||||
fetcher_thread.start()
|
||||
else:
|
||||
# For single posts or sequential manga mode, use a single worker thread
|
||||
# which is simpler and ensures order.
|
||||
self._start_single_threaded_session(config)
|
||||
|
||||
def _start_single_threaded_session(self, config):
|
||||
"""Handles downloads that are best processed by a single worker thread."""
|
||||
self._log("ℹ️ Initializing single-threaded download process...")
|
||||
|
||||
# The original DownloadThread is now a pure Python thread, not a QThread.
|
||||
# We run its `run` method in a standard Python thread.
|
||||
self.worker_thread = threading.Thread(
|
||||
target=self._run_single_worker,
|
||||
args=(config,),
|
||||
daemon=True
|
||||
)
|
||||
self.worker_thread.start()
|
||||
|
||||
def _run_single_worker(self, config):
|
||||
"""Target function for the single-worker thread."""
|
||||
try:
|
||||
# Pass the queue directly to the worker for it to send updates
|
||||
worker = DownloadThread(config, self.progress_queue)
|
||||
worker.run() # This is the main blocking call for this thread
|
||||
except Exception as e:
|
||||
self._log(f"❌ CRITICAL ERROR in single-worker thread: {e}")
|
||||
self._log(traceback.format_exc())
|
||||
finally:
|
||||
self.is_running = False
|
||||
|
||||
def _fetch_and_queue_posts_for_pool(self, config, restore_data):
|
||||
"""
|
||||
Fetches all posts from the API and submits them as tasks to a thread pool.
|
||||
This method runs in its own dedicated thread to avoid blocking.
|
||||
"""
|
||||
try:
|
||||
num_workers = min(config.get('num_threads', 4), MAX_THREADS)
|
||||
self.thread_pool = ThreadPoolExecutor(max_workers=num_workers, thread_name_prefix='PostWorker_')
|
||||
|
||||
# Fetch posts
|
||||
# In a real implementation, this would call `api_client.download_from_api`
|
||||
if restore_data:
|
||||
all_posts = restore_data['all_posts_data']
|
||||
processed_ids = set(restore_data['processed_post_ids'])
|
||||
posts_to_process = [p for p in all_posts if p.get('id') not in processed_ids]
|
||||
self.total_posts = len(all_posts)
|
||||
self.processed_posts = len(processed_ids)
|
||||
self._log(f"🔄 Restoring session. {len(posts_to_process)} posts remaining.")
|
||||
else:
|
||||
posts_to_process = self._get_all_posts(config)
|
||||
self.total_posts = len(posts_to_process)
|
||||
self.processed_posts = 0
|
||||
|
||||
self.progress_queue.put({'type': 'overall_progress', 'payload': (self.total_posts, self.processed_posts)})
|
||||
|
||||
if not posts_to_process:
|
||||
self._log("✅ No new posts to process.")
|
||||
return
|
||||
|
||||
# Submit tasks to the pool
|
||||
for post_data in posts_to_process:
|
||||
if self.cancellation_event.is_set():
|
||||
break
|
||||
# Each PostProcessorWorker gets the queue to send its own updates
|
||||
worker = PostProcessorWorker(post_data, config, self.progress_queue)
|
||||
future = self.thread_pool.submit(worker.process)
|
||||
future.add_done_callback(self._handle_future_result)
|
||||
self.active_futures.append(future)
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"❌ CRITICAL ERROR in post fetcher thread: {e}")
|
||||
self._log(traceback.format_exc())
|
||||
finally:
|
||||
# Wait for all submitted tasks to complete before shutting down
|
||||
if self.thread_pool:
|
||||
self.thread_pool.shutdown(wait=True)
|
||||
self.is_running = False
|
||||
self._log("🏁 All processing tasks have completed.")
|
||||
# Emit final signal
|
||||
self.progress_queue.put({
|
||||
'type': 'finished',
|
||||
'payload': (self.total_downloads, self.total_skips, self.cancellation_event.is_set(), self.all_kept_original_filenames)
|
||||
})
|
||||
|
||||
def _get_all_posts(self, config):
|
||||
"""Helper to fetch all posts using the API client."""
|
||||
all_posts = []
|
||||
# This generator yields batches of posts
|
||||
post_generator = download_from_api(
|
||||
api_url_input=config['api_url'],
|
||||
logger=self._log,
|
||||
# ... pass other relevant config keys ...
|
||||
cancellation_event=self.cancellation_event,
|
||||
pause_event=self.pause_event
|
||||
)
|
||||
for batch in post_generator:
|
||||
all_posts.extend(batch)
|
||||
return all_posts
|
||||
|
||||
def _handle_future_result(self, future: Future):
|
||||
"""Callback executed when a worker task completes."""
|
||||
if self.cancellation_event.is_set():
|
||||
return
|
||||
|
||||
with threading.Lock(): # Protect shared counters
|
||||
self.processed_posts += 1
|
||||
try:
|
||||
if future.cancelled():
|
||||
self._log("⚠️ A post processing task was cancelled.")
|
||||
self.total_skips += 1
|
||||
else:
|
||||
result = future.result()
|
||||
# Unpack result tuple from the worker
|
||||
(dl_count, skip_count, kept_originals,
|
||||
retryable, permanent, history) = result
|
||||
self.total_downloads += dl_count
|
||||
self.total_skips += skip_count
|
||||
self.all_kept_original_filenames.extend(kept_originals)
|
||||
|
||||
# Queue up results for UI to handle
|
||||
if retryable:
|
||||
self.progress_queue.put({'type': 'retryable_failure', 'payload': (retryable,)})
|
||||
if permanent:
|
||||
self.progress_queue.put({'type': 'permanent_failure', 'payload': (permanent,)})
|
||||
if history:
|
||||
self.progress_queue.put({'type': 'post_processed_history', 'payload': (history,)})
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"❌ Worker task resulted in an exception: {e}")
|
||||
self.total_skips += 1 # Count errored posts as skipped
|
||||
|
||||
# Update overall progress
|
||||
self.progress_queue.put({'type': 'overall_progress', 'payload': (self.total_posts, self.processed_posts)})
|
||||
|
||||
def cancel_session(self):
|
||||
"""Cancels the current running session."""
|
||||
if not self.is_running:
|
||||
return
|
||||
self._log("⚠️ Cancellation requested by user...")
|
||||
self.cancellation_event.set()
|
||||
|
||||
# For single thread mode, the worker checks the event
|
||||
# For multi-thread mode, shut down the pool
|
||||
if self.thread_pool:
|
||||
# Don't wait, just cancel pending futures and let the fetcher thread exit
|
||||
self.thread_pool.shutdown(wait=False, cancel_futures=True)
|
||||
|
||||
self.is_running = False
|
||||
1676
src/core/workers.py
Normal file
1676
src/core/workers.py
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user