Module: HTTPX::Plugins::StreamBidi::RequestMethods

Defined in:
lib/httpx/plugins/stream_bidi.rb

Overview

Adds synchronization to request operations which may buffer payloads from different threads.

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#headers_sentObject

Returns the value of attribute headers_sent.



251
252
253
# File 'lib/httpx/plugins/stream_bidi.rb', line 251

def headers_sent
  @headers_sent
end

Instance Method Details

#<<(chunk) ⇒ Object



299
300
301
302
303
304
305
306
307
308
309
# File 'lib/httpx/plugins/stream_bidi.rb', line 299

def <<(chunk)
  @mutex.synchronize do
    if @drainer
      @body.clear if @body.respond_to?(:clear)
      @drainer = nil
    end
    @body << chunk

    transition(:body)
  end
end

#can_buffer?Boolean

Returns:

  • (Boolean)


266
267
268
269
270
# File 'lib/httpx/plugins/stream_bidi.rb', line 266

def can_buffer?
  return super unless @options.stream

  super && @state != :waiting_for_chunk
end

#closeObject



311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/httpx/plugins/stream_bidi.rb', line 311

def close
  return super unless @options.stream

  @mutex.synchronize do
    return if @closed

    @closed = true
  end

  # last chunk to send which ends the stream
  self << ""
end

#closed?Boolean

Returns:

  • (Boolean)


260
261
262
263
264
# File 'lib/httpx/plugins/stream_bidi.rb', line 260

def closed?
  return super unless @options.stream

  @closed
end

#initializeObject



253
254
255
256
257
258
# File 'lib/httpx/plugins/stream_bidi.rb', line 253

def initialize(*)
  super
  @headers_sent = false
  @closed = false
  @mutex = Thread::Mutex.new
end

#transition(nextstate) ⇒ Object

overrides state management transitions to introduce an intermediate :waiting_for_chunk state, which the request transitions to once payload is buffered.



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/httpx/plugins/stream_bidi.rb', line 275

def transition(nextstate)
  return super unless @options.stream

  headers_sent = @headers_sent

  case nextstate
  when :waiting_for_chunk
    return unless @state == :body
  when :body
    case @state
    when :headers
      headers_sent = true
    when :waiting_for_chunk
      # HACK: to allow super to pass through
      @state = :headers
    end
  end

  super.tap do
    # delay setting this up until after the first transition to :body
    @headers_sent = headers_sent
  end
end