From d7a63361e35604dd405b56db321a4375d2dfdf49 Mon Sep 17 00:00:00 2001 From: Felipe <41008398+StrawberryMaster@users.noreply.github.com> Date: Wed, 24 Sep 2025 21:05:22 +0000 Subject: [PATCH] Use a FixedThreadPool for concurrent API calls --- lib/wayback_machine_downloader.rb | 91 ++++++++++++++++--------------- 1 file changed, 48 insertions(+), 43 deletions(-) diff --git a/lib/wayback_machine_downloader.rb b/lib/wayback_machine_downloader.rb index 19552f8..ca2b63a 100644 --- a/lib/wayback_machine_downloader.rb +++ b/lib/wayback_machine_downloader.rb @@ -289,53 +289,58 @@ class WaybackMachineDownloader page_index = 0 batch_size = [@threads_count, 5].min continue_fetching = true + fetch_pool = Concurrent::FixedThreadPool.new([@threads_count, 1].max) + begin + while continue_fetching && page_index < @maximum_pages + # Determine the range of pages to fetch in this batch + end_index = [page_index + batch_size, @maximum_pages].min + current_batch = (page_index...end_index).to_a - while continue_fetching && page_index < @maximum_pages - # Determine the range of pages to fetch in this batch - end_index = [page_index + batch_size, @maximum_pages].min - current_batch = (page_index...end_index).to_a - - # Create futures for concurrent API calls - futures = current_batch.map do |page| - Concurrent::Future.execute do - result = nil - @connection_pool.with_connection do |connection| - result = get_raw_list_from_api("#{@base_url}/*", page, connection) - end - result ||= [] - [page, result] - end - end - - results = [] - - futures.each do |future| - begin - results << future.value - rescue => e - puts "\nError fetching page #{future}: #{e.message}" - end - end - - # Sort results by page number to maintain order - results.sort_by! { |page, _| page } - - # Process results and check for empty pages - results.each do |page, result| - if result.nil? || result.empty? - continue_fetching = false - break - else - mutex.synchronize do - snapshot_list_to_consider.concat(result) - print "." + # Create futures for concurrent API calls + futures = current_batch.map do |page| + Concurrent::Future.execute(executor: fetch_pool) do + result = nil + @connection_pool.with_connection do |connection| + result = get_raw_list_from_api("#{@base_url}/*", page, connection) + end + result ||= [] + [page, result] end end + + results = [] + + futures.each do |future| + begin + results << future.value + rescue => e + puts "\nError fetching page #{future}: #{e.message}" + end + end + + # Sort results by page number to maintain order + results.sort_by! { |page, _| page } + + # Process results and check for empty pages + results.each do |page, result| + if result.nil? || result.empty? + continue_fetching = false + break + else + mutex.synchronize do + snapshot_list_to_consider.concat(result) + print "." + end + end + end + + page_index = end_index + + sleep(RATE_LIMIT) if continue_fetching end - - page_index = end_index - - sleep(RATE_LIMIT) if continue_fetching + ensure + fetch_pool.shutdown + fetch_pool.wait_for_termination end end