Module: HTTPX::Plugins::StreamBidi::RequestMethods

Defined in:
lib/httpx/plugins/stream_bidi.rb

Overview

Adds synchronization to request operations which may buffer payloads from different threads.

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#headers_sentObject

Returns the value of attribute headers_sent.



251
252
253
# File 'lib/httpx/plugins/stream_bidi.rb', line 251

def headers_sent
  @headers_sent
end

Instance Method Details

#<<(chunk) ⇒ Object



311
312
313
314
315
316
317
318
319
320
321
# File 'lib/httpx/plugins/stream_bidi.rb', line 311

def <<(chunk)
  @mutex.synchronize do
    if @drainer
      @body.clear if @body.respond_to?(:clear)
      @drainer = nil
    end
    @body << chunk

    transition(:body)
  end
end

#can_buffer?Boolean

Returns:

  • (Boolean)


271
272
273
274
275
# File 'lib/httpx/plugins/stream_bidi.rb', line 271

def can_buffer?
  return super unless @options.stream

  super && @state != :waiting_for_chunk
end

#closeObject



323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/httpx/plugins/stream_bidi.rb', line 323

def close
  return super unless @options.stream

  @mutex.synchronize do
    return if @closed

    @closed = true
  end

  # last chunk to send which ends the stream
  self << ""
end

#closed?Boolean

Returns:

  • (Boolean)


265
266
267
268
269
# File 'lib/httpx/plugins/stream_bidi.rb', line 265

def closed?
  return super unless @options.stream

  @closed
end

#flush_buffer_on_body(&cb) ⇒ Object



261
262
263
# File 'lib/httpx/plugins/stream_bidi.rb', line 261

def flush_buffer_on_body(&cb)
  @flush_buffer_on_body_cb = on(:body, &cb)
end

#initializeObject



253
254
255
256
257
258
259
# File 'lib/httpx/plugins/stream_bidi.rb', line 253

def initialize(*)
  super
  @headers_sent = false
  @closed = false
  @flush_buffer_on_body_cb = nil
  @mutex = Thread::Mutex.new
end

#transition(nextstate) ⇒ Object

overrides state management transitions to introduce an intermediate :waiting_for_chunk state, which the request transitions to once payload is buffered.



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/httpx/plugins/stream_bidi.rb', line 280

def transition(nextstate)
  return super unless @options.stream

  headers_sent = @headers_sent

  case nextstate
  when :idle
    headers_sent = false

    if @flush_buffer_on_body_cb
      callbacks(:body).delete(@flush_buffer_on_body_cb)
      @flush_buffer_on_body_cb = nil
    end
  when :waiting_for_chunk
    return unless @state == :body
  when :body
    case @state
    when :headers
      headers_sent = true
    when :waiting_for_chunk
      # HACK: to allow super to pass through
      @state = :headers
    end
  end

  super.tap do
    # delay setting this up until after the first transition to :body
    @headers_sent = headers_sent
  end
end