Proper connection pool lifecycle management

This commit is contained in:
Felipe 2024-12-31 16:48:29 +00:00
parent 01f0fc9587
commit 4d5f187f15

View File

@ -13,6 +13,100 @@ require_relative 'wayback_machine_downloader/tidy_bytes'
require_relative 'wayback_machine_downloader/to_regex'
require_relative 'wayback_machine_downloader/archive_api'
class ConnectionPool
MAX_AGE = 300
CLEANUP_INTERVAL = 60
DEFAULT_TIMEOUT = 30
MAX_RETRIES = 3
def initialize(size)
@size = size
@pool = Concurrent::Map.new
@creation_times = Concurrent::Map.new
@cleanup_thread = schedule_cleanup
end
def with_connection(&block)
conn = acquire_connection
begin
yield conn
ensure
release_connection(conn)
end
end
def shutdown
@cleanup_thread&.exit
@pool.each_value { |conn| conn.finish if conn&.started? }
@pool.clear
@creation_times.clear
end
private
def acquire_connection
thread_id = Thread.current.object_id
conn = @pool[thread_id]
if should_create_new?(conn)
conn&.finish if conn&.started?
conn = create_connection
@pool[thread_id] = conn
@creation_times[thread_id] = Time.now
end
conn
end
def release_connection(conn)
return unless conn
if conn.started? && Time.now - @creation_times[Thread.current.object_id] > MAX_AGE
conn.finish
@pool.delete(Thread.current.object_id)
@creation_times.delete(Thread.current.object_id)
end
end
def should_create_new?(conn)
return true if conn.nil?
return true unless conn.started?
return true if Time.now - @creation_times[Thread.current.object_id] > MAX_AGE
false
end
def create_connection
http = Net::HTTP.new("web.archive.org", 443)
http.use_ssl = true
http.read_timeout = DEFAULT_TIMEOUT
http.open_timeout = DEFAULT_TIMEOUT
http.keep_alive_timeout = 30
http.max_retries = MAX_RETRIES
http.start
http
end
def schedule_cleanup
Thread.new do
loop do
cleanup_old_connections
sleep CLEANUP_INTERVAL
end
end
end
def cleanup_old_connections
current_time = Time.now
@creation_times.each do |thread_id, creation_time|
if current_time - creation_time > MAX_AGE
conn = @pool[thread_id]
conn&.finish if conn&.started?
@pool.delete(thread_id)
@creation_times.delete(thread_id)
end
end
end
end
class WaybackMachineDownloader
include ArchiveAPI
@ -23,7 +117,6 @@ class WaybackMachineDownloader
RETRY_DELAY = 2
RATE_LIMIT = 0.25 # Delay between requests in seconds
CONNECTION_POOL_SIZE = 10
HTTP_CACHE_SIZE = 1000
MEMORY_BUFFER_SIZE = 16384 # 16KB chunks
attr_accessor :base_url, :exact_url, :directory, :all_timestamps,
@ -45,8 +138,8 @@ class WaybackMachineDownloader
@threads_count = [params[:threads_count].to_i, 1].max # Garante mínimo de 1 thread
@timeout = params[:timeout] || DEFAULT_TIMEOUT
@logger = setup_logger
@http_cache = Concurrent::Map.new
@failed_downloads = Concurrent::Array.new
@connection_pool = ConnectionPool.new(CONNECTION_POOL_SIZE)
end
def backup_name
@ -96,11 +189,9 @@ class WaybackMachineDownloader
end
def get_all_snapshots_to_consider
http = setup_http_client
snapshot_list_to_consider = []
begin
http.start do |connection|
@connection_pool.with_connection do |connection|
puts "Getting snapshot pages"
# Fetch the initial set of snapshots
@ -118,9 +209,6 @@ class WaybackMachineDownloader
end
end
end
ensure
http.finish if http.started?
end
puts " found #{snapshot_list_to_consider.length} snapshots to consider."
puts
@ -229,28 +317,19 @@ class WaybackMachineDownloader
thread_count = [@threads_count, CONNECTION_POOL_SIZE].min
pool = Concurrent::FixedThreadPool.new(thread_count)
semaphore = Concurrent::Semaphore.new(CONNECTION_POOL_SIZE)
file_list_by_timestamp.each do |file_remote_info|
pool.post do
semaphore.acquire
http = nil
begin
http = setup_http_client
http.start do |connection|
@connection_pool.with_connection do |connection|
result = download_file(file_remote_info, connection)
@download_mutex.synchronize do
@processed_file_count += 1
puts result if result
end
end
ensure
semaphore.release
http&.finish if http&.started?
sleep(RATE_LIMIT)
end
end
end
pool.shutdown
pool.wait_for_termination
@ -330,10 +409,6 @@ class WaybackMachineDownloader
@file_list_by_timestamp ||= get_file_list_by_timestamp
end
def semaphore
@semaphore ||= Mutex.new
end
private
def validate_params(params)
@ -350,21 +425,6 @@ class WaybackMachineDownloader
logger
end
def setup_http_client
cached_client = @http_cache[Thread.current.object_id]
return cached_client if cached_client&.active?
http = Net::HTTP.new("web.archive.org", 443)
http.use_ssl = true
http.read_timeout = @timeout
http.open_timeout = @timeout
http.keep_alive_timeout = 30
http.max_retries = MAX_RETRIES
@http_cache[Thread.current.object_id] = http
http
end
def download_with_retry(file_path, file_url, file_timestamp, connection)
retries = 0
begin
@ -404,10 +464,7 @@ class WaybackMachineDownloader
end
def cleanup
@http_cache.each_value do |client|
client.finish if client&.started?
end
@http_cache.clear
@connection_pool.shutdown
if @failed_downloads.any?
@logger.error("Failed downloads summary:")