Module: HTTPX::Plugins::StreamBidi::RequestMethods

Defined in:
lib/httpx/plugins/stream_bidi.rb

Overview

Adds synchronization to request operations which may buffer payloads from different threads.

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#headers_sentObject

Returns the value of attribute headers_sent.



257
258
259
# File 'lib/httpx/plugins/stream_bidi.rb', line 257

def headers_sent
  @headers_sent
end

Instance Method Details

#<<(chunk) ⇒ Object



317
318
319
320
321
322
323
324
325
326
327
# File 'lib/httpx/plugins/stream_bidi.rb', line 317

def <<(chunk)
  @mutex.synchronize do
    if @drainer
      @body.clear if @body.respond_to?(:clear)
      @drainer = nil
    end
    @body << chunk

    transition(:body)
  end
end

#can_buffer?Boolean

Returns:

  • (Boolean)


277
278
279
280
281
# File 'lib/httpx/plugins/stream_bidi.rb', line 277

def can_buffer?
  return super unless @options.stream

  super && @state != :waiting_for_chunk
end

#closeObject



329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/httpx/plugins/stream_bidi.rb', line 329

def close
  return super unless @options.stream

  @mutex.synchronize do
    return if @closed

    @closed = true
  end

  # last chunk to send which ends the stream
  self << ""
end

#closed?Boolean

Returns:

  • (Boolean)


271
272
273
274
275
# File 'lib/httpx/plugins/stream_bidi.rb', line 271

def closed?
  return super unless @options.stream

  @closed
end

#flush_buffer_on_body(&cb) ⇒ Object



267
268
269
# File 'lib/httpx/plugins/stream_bidi.rb', line 267

def flush_buffer_on_body(&cb)
  @flush_buffer_on_body_cb = on(:body, &cb)
end

#initializeObject



259
260
261
262
263
264
265
# File 'lib/httpx/plugins/stream_bidi.rb', line 259

def initialize(*)
  super
  @headers_sent = false
  @closed = false
  @flush_buffer_on_body_cb = nil
  @mutex = Thread::Mutex.new
end

#transition(nextstate) ⇒ Object

overrides state management transitions to introduce an intermediate :waiting_for_chunk state, which the request transitions to once payload is buffered.



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/httpx/plugins/stream_bidi.rb', line 286

def transition(nextstate)
  return super unless @options.stream

  headers_sent = @headers_sent

  case nextstate
  when :idle
    headers_sent = false

    if @flush_buffer_on_body_cb
      callbacks(:body).delete(@flush_buffer_on_body_cb)
      @flush_buffer_on_body_cb = nil
    end
  when :waiting_for_chunk
    return unless @state == :body
  when :body
    case @state
    when :headers
      headers_sent = true
    when :waiting_for_chunk
      # HACK: to allow super to pass through
      @state = :headers
    end
  end

  super.tap do
    # delay setting this up until after the first transition to :body
    @headers_sent = headers_sent
  end
end