Class: HTTPX::Pool
- Inherits:
-
Object
- Object
- HTTPX::Pool
- Defined in:
- lib/httpx/pool.rb
Constant Summary collapse
- POOL_TIMEOUT =
5
Instance Method Summary collapse
- #checkin_connection(connection) ⇒ Object
- #checkin_resolver(resolver) ⇒ Object
-
#checkout_connection(uri, options) ⇒ Object
opens a connection to the IP reachable through
uri
. - #checkout_mergeable_connection(connection) ⇒ Object
- #checkout_resolver(options) ⇒ Object
-
#initialize(options) ⇒ Pool
constructor
Sets up the connection pool with the given
options
, which can be the following:. -
#inspect ⇒ Object
:nocov:.
-
#pop_connection ⇒ Object
connections returned by this function are not expected to return to the connection pool.
- #reset_resolvers ⇒ Object
Constructor Details
#initialize(options) ⇒ Pool
Sets up the connection pool with the given options
, which can be the following:
- :max_connections
-
the maximum number of connections held in the pool.
- :max_connections_per_origin
-
the maximum number of connections held in the pool pointing to a given origin.
- :pool_timeout
-
the number of seconds to wait for a connection to a given origin (before raising HTTPX::PoolTimeoutError)
20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/httpx/pool.rb', line 20 def initialize() @max_connections = .fetch(:max_connections, Float::INFINITY) @max_connections_per_origin = .fetch(:max_connections_per_origin, Float::INFINITY) @pool_timeout = .fetch(:pool_timeout, POOL_TIMEOUT) @resolvers = Hash.new { |hs, resolver_type| hs[resolver_type] = [] } @resolver_mtx = Thread::Mutex.new @connections = [] @connection_mtx = Thread::Mutex.new @connections_counter = 0 @max_connections_cond = ConditionVariable.new @origin_counters = Hash.new(0) @origin_conds = Hash.new { |hs, orig| hs[orig] = ConditionVariable.new } end |
Instance Method Details
#checkin_connection(connection) ⇒ Object
85 86 87 88 89 90 91 92 93 94 |
# File 'lib/httpx/pool.rb', line 85 def checkin_connection(connection) return if connection..io @connection_mtx.synchronize do @connections << connection @max_connections_cond.signal @origin_conds[connection.origin.to_s].signal end end |
#checkin_resolver(resolver) ⇒ Object
125 126 127 128 129 130 131 132 133 |
# File 'lib/httpx/pool.rb', line 125 def checkin_resolver(resolver) @resolver_mtx.synchronize do resolvers = @resolvers[resolver.class] resolver = resolver.multi resolvers << resolver unless resolvers.include?(resolver) end end |
#checkout_connection(uri, options) ⇒ Object
opens a connection to the IP reachable through uri
. Many hostnames are reachable through the same IP, so we try to maximize pipelining by opening as few connections as possible.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/httpx/pool.rb', line 45 def checkout_connection(uri, ) return checkout_new_connection(uri, ) if .io @connection_mtx.synchronize do acquire_connection(uri, ) || begin if @connections_counter == @max_connections # this takes precedence over per-origin @max_connections_cond.wait(@connection_mtx, @pool_timeout) acquire_connection(uri, ) || begin if @connections_counter == @max_connections # if no matching usable connection was found, the pool will make room and drop a closed connection. if none is found, # this means that all of them are persistent or being used, so raise a timeout error. conn = @connections.find { |c| c.state == :closed } raise PoolTimeoutError.new(@pool_timeout, "Timed out after #{@pool_timeout} seconds while waiting for a connection") unless conn drop_connection(conn) end end end if @origin_counters[uri.origin] == @max_connections_per_origin @origin_conds[uri.origin].wait(@connection_mtx, @pool_timeout) return acquire_connection(uri, ) || raise(PoolTimeoutError.new(@pool_timeout, "Timed out after #{@pool_timeout} seconds while waiting for a connection to #{uri.origin}")) end @connections_counter += 1 @origin_counters[uri.origin] += 1 checkout_new_connection(uri, ) end end end |
#checkout_mergeable_connection(connection) ⇒ Object
96 97 98 99 100 101 102 103 104 105 |
# File 'lib/httpx/pool.rb', line 96 def checkout_mergeable_connection(connection) return if connection..io @connection_mtx.synchronize do idx = @connections.find_index do |ch| ch != connection && ch.mergeable?(connection) end @connections.delete_at(idx) if idx end end |
#checkout_resolver(options) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/httpx/pool.rb', line 111 def checkout_resolver() resolver_type = .resolver_class resolver_type = Resolver.resolver_for(resolver_type) @resolver_mtx.synchronize do resolvers = @resolvers[resolver_type] idx = resolvers.find_index do |res| res. == end resolvers.delete_at(idx) if idx end || checkout_new_resolver(resolver_type, ) end |
#inspect ⇒ Object
:nocov:
136 137 138 139 140 141 |
# File 'lib/httpx/pool.rb', line 136 def inspect "#<#{self.class}:#{object_id} " \ "@max_connections_per_origin=#{@max_connections_per_origin} " \ "@pool_timeout=#{@pool_timeout} " \ "@connections=#{@connections.size}>" end |
#pop_connection ⇒ Object
connections returned by this function are not expected to return to the connection pool.
35 36 37 38 39 |
# File 'lib/httpx/pool.rb', line 35 def pop_connection @connection_mtx.synchronize do drop_connection end end |
#reset_resolvers ⇒ Object
107 108 109 |
# File 'lib/httpx/pool.rb', line 107 def reset_resolvers @resolver_mtx.synchronize { @resolvers.clear } end |