Use a FixedThreadPool for concurrent API calls

This commit is contained in:
Felipe 2025-09-24 21:05:22 +00:00 committed by GitHub
parent b1974a8dfa
commit d7a63361e3

View File

@ -289,53 +289,58 @@ class WaybackMachineDownloader
page_index = 0 page_index = 0
batch_size = [@threads_count, 5].min batch_size = [@threads_count, 5].min
continue_fetching = true continue_fetching = true
fetch_pool = Concurrent::FixedThreadPool.new([@threads_count, 1].max)
begin
while continue_fetching && page_index < @maximum_pages
# Determine the range of pages to fetch in this batch
end_index = [page_index + batch_size, @maximum_pages].min
current_batch = (page_index...end_index).to_a
while continue_fetching && page_index < @maximum_pages # Create futures for concurrent API calls
# Determine the range of pages to fetch in this batch futures = current_batch.map do |page|
end_index = [page_index + batch_size, @maximum_pages].min Concurrent::Future.execute(executor: fetch_pool) do
current_batch = (page_index...end_index).to_a result = nil
@connection_pool.with_connection do |connection|
# Create futures for concurrent API calls result = get_raw_list_from_api("#{@base_url}/*", page, connection)
futures = current_batch.map do |page| end
Concurrent::Future.execute do result ||= []
result = nil [page, result]
@connection_pool.with_connection do |connection|
result = get_raw_list_from_api("#{@base_url}/*", page, connection)
end
result ||= []
[page, result]
end
end
results = []
futures.each do |future|
begin
results << future.value
rescue => e
puts "\nError fetching page #{future}: #{e.message}"
end
end
# Sort results by page number to maintain order
results.sort_by! { |page, _| page }
# Process results and check for empty pages
results.each do |page, result|
if result.nil? || result.empty?
continue_fetching = false
break
else
mutex.synchronize do
snapshot_list_to_consider.concat(result)
print "."
end end
end end
results = []
futures.each do |future|
begin
results << future.value
rescue => e
puts "\nError fetching page #{future}: #{e.message}"
end
end
# Sort results by page number to maintain order
results.sort_by! { |page, _| page }
# Process results and check for empty pages
results.each do |page, result|
if result.nil? || result.empty?
continue_fetching = false
break
else
mutex.synchronize do
snapshot_list_to_consider.concat(result)
print "."
end
end
end
page_index = end_index
sleep(RATE_LIMIT) if continue_fetching
end end
ensure
page_index = end_index fetch_pool.shutdown
fetch_pool.wait_for_termination
sleep(RATE_LIMIT) if continue_fetching
end end
end end