Class: HTTPX::Pool
- Inherits:
-
Object
- Object
- HTTPX::Pool
- Defined in:
- lib/httpx/pool.rb
Constant Summary collapse
- POOL_TIMEOUT =
5
Instance Method Summary collapse
- #checkin_connection(connection) ⇒ Object
- #checkin_resolver(resolver) ⇒ Object
-
#checkout_connection(uri, options) ⇒ Object
opens a connection to the IP reachable through
uri
. - #checkout_mergeable_connection(connection) ⇒ Object
- #checkout_resolver(options) ⇒ Object
-
#initialize(options) ⇒ Pool
constructor
Sets up the connection pool with the given
options
, which can be the following:. -
#inspect ⇒ Object
:nocov:.
-
#pop_connection ⇒ Object
connections returned by this function are not expected to return to the connection pool.
- #reset_resolvers ⇒ Object
Constructor Details
#initialize(options) ⇒ Pool
Sets up the connection pool with the given options
, which can be the following:
- :max_connections
-
the maximum number of connections held in the pool.
- :max_connections_per_origin
-
the maximum number of connections held in the pool pointing to a given origin.
- :pool_timeout
-
the number of seconds to wait for a connection to a given origin (before raising HTTPX::PoolTimeoutError)
22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/httpx/pool.rb', line 22 def initialize() @max_connections = .fetch(:max_connections, Float::INFINITY) @max_connections_per_origin = .fetch(:max_connections_per_origin, Float::INFINITY) @pool_timeout = .fetch(:pool_timeout, POOL_TIMEOUT) @resolvers = Hash.new { |hs, resolver_type| hs[resolver_type] = [] } @resolver_mtx = Thread::Mutex.new @connections = [] @connection_mtx = Thread::Mutex.new @connections_counter = 0 @max_connections_cond = ConditionVariable.new @origin_counters = Hash.new(0) @origin_conds = Hash.new { |hs, orig| hs[orig] = ConditionVariable.new } end |
Instance Method Details
#checkin_connection(connection) ⇒ Object
90 91 92 93 94 95 96 97 98 99 |
# File 'lib/httpx/pool.rb', line 90 def checkin_connection(connection) return if connection..io @connection_mtx.synchronize do @connections << connection @max_connections_cond.signal @origin_conds[connection.origin.to_s].signal end end |
#checkin_resolver(resolver) ⇒ Object
130 131 132 133 134 135 136 137 138 |
# File 'lib/httpx/pool.rb', line 130 def checkin_resolver(resolver) @resolver_mtx.synchronize do resolvers = @resolvers[resolver.class] resolver = resolver.multi resolvers << resolver unless resolvers.include?(resolver) end end |
#checkout_connection(uri, options) ⇒ Object
opens a connection to the IP reachable through uri
. Many hostnames are reachable through the same IP, so we try to maximize pipelining by opening as few connections as possible.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/httpx/pool.rb', line 47 def checkout_connection(uri, ) return checkout_new_connection(uri, ) if .io @connection_mtx.synchronize do acquire_connection(uri, ) || begin if @connections_counter == @max_connections # this takes precedence over per-origin @max_connections_cond.wait(@connection_mtx, @pool_timeout) if (conn = acquire_connection(uri, )) return conn end if @connections_counter == @max_connections # if no matching usable connection was found, the pool will make room and drop a closed connection. if none is found, # this means that all of them are persistent or being used, so raise a timeout error. conn = @connections.find { |c| c.state == :closed } raise PoolTimeoutError.new(@pool_timeout, "Timed out after #{@pool_timeout} seconds while waiting for a connection") unless conn drop_connection(conn) end end if @origin_counters[uri.origin] == @max_connections_per_origin @origin_conds[uri.origin].wait(@connection_mtx, @pool_timeout) return acquire_connection(uri, ) || raise(PoolTimeoutError.new(@pool_timeout, "Timed out after #{@pool_timeout} seconds while waiting for a connection to #{uri.origin}")) end @connections_counter += 1 @origin_counters[uri.origin] += 1 checkout_new_connection(uri, ) end end end |
#checkout_mergeable_connection(connection) ⇒ Object
101 102 103 104 105 106 107 108 109 110 |
# File 'lib/httpx/pool.rb', line 101 def checkout_mergeable_connection(connection) return if connection..io @connection_mtx.synchronize do idx = @connections.find_index do |ch| ch != connection && ch.mergeable?(connection) end @connections.delete_at(idx) if idx end end |
#checkout_resolver(options) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/httpx/pool.rb', line 116 def checkout_resolver() resolver_type = .resolver_class resolver_type = Resolver.resolver_for(resolver_type, ) @resolver_mtx.synchronize do resolvers = @resolvers[resolver_type] idx = resolvers.find_index do |res| res. == end resolvers.delete_at(idx) if idx end || checkout_new_resolver(resolver_type, ) end |
#inspect ⇒ Object
:nocov:
141 142 143 144 145 146 |
# File 'lib/httpx/pool.rb', line 141 def inspect "#<#{self.class}:#{object_id} " \ "@max_connections_per_origin=#{@max_connections_per_origin} " \ "@pool_timeout=#{@pool_timeout} " \ "@connections=#{@connections.size}>" end |
#pop_connection ⇒ Object
connections returned by this function are not expected to return to the connection pool.
37 38 39 40 41 |
# File 'lib/httpx/pool.rb', line 37 def pop_connection @connection_mtx.synchronize do drop_connection end end |
#reset_resolvers ⇒ Object
112 113 114 |
# File 'lib/httpx/pool.rb', line 112 def reset_resolvers @resolver_mtx.synchronize { @resolvers.clear } end |