Class: HTTPX::Connection

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Callbacks, Loggable
Defined in:
lib/httpx/connection.rb

Overview

The Connection can be watched for IO events.

It contains the io object to read/write from, and knows what to do when it can.

It defers connecting until absolutely necessary. Connection should be triggered from the IO selector (until then, any request will be queued).

A connection boots up its parser after connection is established. All pending requests will be redirected there after connection.

A connection can be prevented from closing by the parser, that is, if there are pending requests. This will signal that the connection was prematurely closed, due to a possible number of conditions:

  • Remote peer closed the connection (“Connection: close”);

  • Remote peer doesn’t support pipelining;

A connection may also route requests for a different host for which the io was connected to, provided that the IP is the same and the port and scheme as well. This will allow to share the same socket to send HTTP/2 requests to different hosts.

Defined Under Namespace

Classes: HTTP1, HTTP2

Constant Summary

Constants included from Loggable

Loggable::COLORS, Loggable::USE_DEBUG_LOG

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Callbacks

#callbacks_for?, #emit, #on, #once

Methods included from Loggable

#log, #log_exception, #log_redact, #log_redact_body, #log_redact_headers

Constructor Details

#initialize(uri, options) ⇒ Connection

Returns a new instance of Connection.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/httpx/connection.rb', line 47

def initialize(uri, options)
  @current_session = @current_selector = @max_concurrent_requests =
                       @parser = @sibling = @coalesced_connection = @altsvc_connection =
                                              @family = @io = @ssl_session = @timeout =
                                                                @connected_at = @response_received_at = nil

  @exhausted = @cloned = @main_sibling = false

  @options = Options.new(options)
  @type = initialize_type(uri, @options)
  @origins = [uri.origin]
  @origin = Utils.to_uri(uri.origin)
  @window_size = @options.window_size
  @read_buffer = Buffer.new(@options.buffer_size)
  @write_buffer = Buffer.new(@options.buffer_size)
  @pending = []
  @inflight = 0
  @keep_alive_timeout = @options.timeout[:keep_alive_timeout]

  if @options.io
    # if there's an already open IO, get its
    # peer address, and force-initiate the parser
    transition(:already_open)
    @io = build_socket
    parser
  else
    transition(:idle)
  end
  self.addresses = @options.addresses if @options.addresses
end

Instance Attribute Details

#current_selector=(value) ⇒ Object (writeonly)

Sets the attribute current_selector

Parameters:

  • the value to set the attribute current_selector to.



41
42
43
# File 'lib/httpx/connection.rb', line 41

def current_selector=(value)
  @current_selector = value
end

#current_sessionObject

Returns the value of attribute current_session.



43
44
45
# File 'lib/httpx/connection.rb', line 43

def current_session
  @current_session
end

#familyObject

Returns the value of attribute family.



43
44
45
# File 'lib/httpx/connection.rb', line 43

def family
  @family
end

#ioObject (readonly)

Returns the value of attribute io.



39
40
41
# File 'lib/httpx/connection.rb', line 39

def io
  @io
end

#optionsObject (readonly)

Returns the value of attribute options.



39
40
41
# File 'lib/httpx/connection.rb', line 39

def options
  @options
end

#originObject (readonly)

Returns the value of attribute origin.



39
40
41
# File 'lib/httpx/connection.rb', line 39

def origin
  @origin
end

#originsObject (readonly)

Returns the value of attribute origins.



39
40
41
# File 'lib/httpx/connection.rb', line 39

def origins
  @origins
end

#pendingObject (readonly)

Returns the value of attribute pending.



39
40
41
# File 'lib/httpx/connection.rb', line 39

def pending
  @pending
end

#sibling=(connection) ⇒ Object



365
366
367
368
369
370
371
372
373
374
375
# File 'lib/httpx/connection.rb', line 365

def sibling=(connection)
  @sibling = connection

  return unless connection

  @main_sibling = connection.sibling.nil?

  return unless @main_sibling

  connection.sibling = self
end

#stateObject (readonly)

Returns the value of attribute state.



39
40
41
# File 'lib/httpx/connection.rb', line 39

def state
  @state
end

#typeObject (readonly)

Returns the value of attribute type.



39
40
41
# File 'lib/httpx/connection.rb', line 39

def type
  @type
end

Instance Method Details

#addressesObject



92
93
94
# File 'lib/httpx/connection.rb', line 92

def addresses
  @io && @io.addresses
end

#addresses=(addrs) ⇒ Object

this is a semi-private method, to be used by the resolver to initiate the io object.



84
85
86
87
88
89
90
# File 'lib/httpx/connection.rb', line 84

def addresses=(addrs)
  if @io
    @io.add_addresses(addrs)
  else
    @io = build_socket(addrs)
  end
end

#addresses?Boolean

Returns:



96
97
98
# File 'lib/httpx/connection.rb', line 96

def addresses?
  @io && @io.addresses?
end

#callObject



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/httpx/connection.rb', line 214

def call
  case @state
  when :idle
    connect

    # when opening the tcp or ssl socket fails
    return if @state == :closed

    consume
  when :closed
    return
  when :closing
    consume
    transition(:closed)
  when :open
    consume
  end
  nil
rescue IOError => e
  @write_buffer.clear
  on_io_error(e)
rescue StandardError => e
  @write_buffer.clear
  on_error(e)
rescue Exception => e # rubocop:disable Lint/RescueException
  force_close(true)
  raise e
end

#closeObject



243
244
245
246
247
# File 'lib/httpx/connection.rb', line 243

def close
  transition(:active) if @state == :inactive

  @parser.close if @parser
end

#coalescable?(connection) ⇒ Boolean

coalescable connections need to be mergeable! but internally, #mergeable? is called before #coalescable?

Returns:



137
138
139
140
141
142
143
144
145
146
# File 'lib/httpx/connection.rb', line 137

def coalescable?(connection)
  if @io.protocol == "h2" &&
     @origin.scheme == "https" &&
     connection.origin.scheme == "https" &&
     @io.can_verify_peer?
    @io.verify_hostname(connection.origin.host)
  else
    @origin == connection.origin
  end
end

#coalesce!(connection) ⇒ Object

coalesces self into connection.



124
125
126
127
128
129
# File 'lib/httpx/connection.rb', line 124

def coalesce!(connection)
  @coalesced_connection = connection

  close_sibling
  connection.merge(self)
end

#coalesced?Boolean

Returns:



131
132
133
# File 'lib/httpx/connection.rb', line 131

def coalesced?
  @coalesced_connection
end

#connecting?Boolean

Returns:



181
182
183
# File 'lib/httpx/connection.rb', line 181

def connecting?
  @state == :idle
end

#deactivateObject



351
352
353
# File 'lib/httpx/connection.rb', line 351

def deactivate
  transition(:inactive)
end

#disconnectObject

disconnects from the current session it’s attached to



386
387
388
389
390
391
392
393
394
# File 'lib/httpx/connection.rb', line 386

def disconnect
  return if @exhausted # it'll reset

  return unless (current_session = @current_session) && (current_selector = @current_selector)

  @current_session = @current_selector = nil

  current_session.deselect_connection(self, current_selector, @cloned)
end

#force_close(delete_pending = false) ⇒ Object

bypasses state machine rules while setting the connection in the :closed state.



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/httpx/connection.rb', line 263

def force_close(delete_pending = false)
  force_purge
  return unless @state == :closed

  if delete_pending
    @pending.clear
  elsif (parser = @parser)
    enqueue_pending_requests_from_parser(parser)
  end

  return unless @pending.empty?

  disconnect
  emit(:force_closed, delete_pending)
end

#force_reset(cloned = false) ⇒ Object

bypasses the state machine to force closing of connections still connecting. only used for Happy Eyeballs v2.



281
282
283
284
285
# File 'lib/httpx/connection.rb', line 281

def force_reset(cloned = false)
  @state = :closing
  @cloned = cloned
  transition(:closed)
end

#handle_connect_error(error) ⇒ Object



377
378
379
380
381
382
383
# File 'lib/httpx/connection.rb', line 377

def handle_connect_error(error)
  return on_error(error) unless @sibling && @sibling.connecting?

  @sibling.merge(self)

  force_reset(true)
end

#handle_socket_timeout(interval) ⇒ Object



359
360
361
362
363
# File 'lib/httpx/connection.rb', line 359

def handle_socket_timeout(interval)
  error = OperationTimeoutError.new(interval, "timed out while waiting on select")
  error.set_backtrace(caller)
  on_error(error)
end

#idlingObject



337
338
339
340
341
342
343
344
345
# File 'lib/httpx/connection.rb', line 337

def idling
  purge_after_closed
  @write_buffer.clear
  transition(:idle)
  return unless @parser

  enqueue_pending_requests_from_parser(parser)
  @parser = nil
end

#inflight?Boolean

Returns:



185
186
187
188
189
190
191
192
# File 'lib/httpx/connection.rb', line 185

def inflight?
  @parser && (
    # parser may be dealing with other requests (possibly started from a different fiber)
    !@parser.empty? ||
    # connection may be doing connection termination handshake
    !@write_buffer.empty?
  )
end

#inspectObject

:nocov:



428
429
430
431
432
433
434
# File 'lib/httpx/connection.rb', line 428

def inspect
  "#<#{self.class}:#{object_id} " \
    "@origin=#{@origin} " \
    "@state=#{@state} " \
    "@pending=#{@pending.size} " \
    "@io=#{@io}>"
end

#interestsObject



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/httpx/connection.rb', line 194

def interests
  # connecting
  if connecting?
    connect

    return @io.interests if connecting?
  end

  return @parser.interests if @parser

  nil
rescue StandardError => e
  on_error(e)
  nil
end

#io_connected?Boolean

Returns:



175
176
177
178
179
# File 'lib/httpx/connection.rb', line 175

def io_connected?
  return @coalesced_connection.io_connected? if @coalesced_connection

  @io && @io.state == :connected
end

#match?(uri, options) ⇒ Boolean

Returns:



100
101
102
103
104
105
106
107
108
109
110
# File 'lib/httpx/connection.rb', line 100

def match?(uri, options)
  return false if !used? && (@state == :closing || @state == :closed)

  @origins.include?(uri.origin) &&
    # if there is more than one origin to match, it means that this connection
    # was the result of coalescing. To prevent blind trust in the case where the
    # origin came from an ORIGIN frame, we're going to verify the hostname with the
    # SSL certificate
    (@origins.size == 1 || @origin == uri.origin || (@io.is_a?(SSL) && @io.verify_hostname(uri.host))) &&
    @options == options
end

#merge(connection) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/httpx/connection.rb', line 148

def merge(connection)
  @origins |= connection.instance_variable_get(:@origins)
  if @ssl_session.nil? && connection.ssl_session
    @ssl_session = connection.ssl_session
    @io.session_new_cb do |sess|
      @ssl_session = sess
    end if @io
  end
  connection.purge_pending do |req|
    req.transition(:idle)
    send(req)
  end
end

#mergeable?(connection) ⇒ Boolean

Returns:



112
113
114
115
116
117
118
119
120
121
# File 'lib/httpx/connection.rb', line 112

def mergeable?(connection)
  return false if @state == :closing || @state == :closed || !@io

  return false unless connection.addresses

  (
    (open? && @origin == connection.origin) ||
    !(@io.addresses & (connection.addresses || [])).empty?
  ) && @options == connection.options
end

#on_connect_error(e) ⇒ Object



396
397
398
399
400
401
402
# File 'lib/httpx/connection.rb', line 396

def on_connect_error(e)
  # connect errors, exit gracefully
  error = ConnectionError.new(e.message)
  error.set_backtrace(e.backtrace)
  handle_connect_error(error) if connecting?
  force_close
end

#on_error(error, request = nil) ⇒ Object



409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
# File 'lib/httpx/connection.rb', line 409

def on_error(error, request = nil)
  if error.is_a?(OperationTimeoutError)

    # inactive connections do not contribute to the select loop, therefore
    # they should not fail due to such errors.
    return if @state == :inactive

    if @timeout
      @timeout -= error.timeout
      return unless @timeout <= 0
    end

    error = error.to_connection_error if connecting?
  end
  handle_error(error, request)
  reset
end

#on_io_error(e) ⇒ Object



404
405
406
407
# File 'lib/httpx/connection.rb', line 404

def on_io_error(e)
  on_error(e)
  force_close(true)
end

#open?Boolean

Returns:



355
356
357
# File 'lib/httpx/connection.rb', line 355

def open?
  @state == :open || @state == :inactive
end

#peerObject



78
79
80
# File 'lib/httpx/connection.rb', line 78

def peer
  @origin
end

#purge_pending(&block) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/httpx/connection.rb', line 162

def purge_pending(&block)
  pendings = []
  if @parser
    pending = @parser.pending
    @inflight -= pending.size
    pendings << pending
  end
  pendings << @pending
  pendings.each do |pending|
    pending.reject!(&block)
  end
end

#resetObject



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/httpx/connection.rb', line 287

def reset
  return if @state == :closing || @state == :closed

  parser = @parser

  if parser && parser.respond_to?(:max_concurrent_requests)
    # if connection being reset has at some downgraded the number of concurrent
    # requests, such as in the case where an attempt to use HTTP/1 pipelining failed,
    # keep that information around.
    @max_concurrent_requests = parser.max_concurrent_requests
  end

  transition(:closing)

  transition(:closed)
end

#send(request) ⇒ Object



304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/httpx/connection.rb', line 304

def send(request)
  return @coalesced_connection.send(request) if @coalesced_connection

  if @parser && !@write_buffer.full?
    if @response_received_at && @keep_alive_timeout &&
       Utils.elapsed_time(@response_received_at) > @keep_alive_timeout
      # when pushing a request into an existing connection, we have to check whether there
      # is the possibility that the connection might have extended the keep alive timeout.
      # for such cases, we want to ping for availability before deciding to shovel requests.
      log(level: 3) { "keep alive timeout expired, pinging connection..." }
      @pending << request
      transition(:active) if @state == :inactive
      parser.ping
      request.ping!
      return
    end

    send_request_to_parser(request)
  else
    @pending << request
  end
end

#terminateObject



249
250
251
252
253
254
255
256
257
258
259
# File 'lib/httpx/connection.rb', line 249

def terminate
  case @state
  when :idle
    purge_after_closed
    disconnect
  when :closed
    @connected_at = nil
  end

  close
end

#timeoutObject



327
328
329
330
331
332
333
334
335
# File 'lib/httpx/connection.rb', line 327

def timeout
  return if @state == :closed || @state == :inactive

  return @timeout if @timeout

  return @options.timeout[:connect_timeout] if @state == :idle

  @options.timeout[:operation_timeout]
end

#to_ioObject



210
211
212
# File 'lib/httpx/connection.rb', line 210

def to_io
  @io.to_io
end

#used?Boolean

Returns:



347
348
349
# File 'lib/httpx/connection.rb', line 347

def used?
  @connected_at
end