Module: HTTPX::Plugins::StreamBidi::RequestMethods
- Defined in:
- lib/httpx/plugins/stream_bidi.rb
Overview
Adds synchronization to request operations which may buffer payloads from different threads.
Instance Attribute Summary collapse
-
#headers_sent ⇒ Object
Returns the value of attribute headers_sent.
Instance Method Summary collapse
- #<<(chunk) ⇒ Object
- #can_buffer? ⇒ Boolean
- #close ⇒ Object
- #closed? ⇒ Boolean
- #flush_buffer_on_body(&cb) ⇒ Object
- #initialize ⇒ Object
-
#transition(nextstate) ⇒ Object
overrides state management transitions to introduce an intermediate
:waiting_for_chunkstate, which the request transitions to once payload is buffered.
Instance Attribute Details
#headers_sent ⇒ Object
Returns the value of attribute headers_sent.
257 258 259 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 257 def headers_sent @headers_sent end |
Instance Method Details
#<<(chunk) ⇒ Object
317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 317 def <<(chunk) @mutex.synchronize do if @drainer @body.clear if @body.respond_to?(:clear) @drainer = nil end @body << chunk transition(:body) end end |
#can_buffer? ⇒ Boolean
277 278 279 280 281 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 277 def can_buffer? return super unless @options.stream super && @state != :waiting_for_chunk end |
#close ⇒ Object
329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 329 def close return super unless @options.stream @mutex.synchronize do return if @closed @closed = true end # last chunk to send which ends the stream self << "" end |
#closed? ⇒ Boolean
271 272 273 274 275 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 271 def closed? return super unless @options.stream @closed end |
#flush_buffer_on_body(&cb) ⇒ Object
267 268 269 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 267 def flush_buffer_on_body(&cb) @flush_buffer_on_body_cb = on(:body, &cb) end |
#initialize ⇒ Object
259 260 261 262 263 264 265 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 259 def initialize(*) super @headers_sent = false @closed = false @flush_buffer_on_body_cb = nil @mutex = Thread::Mutex.new end |
#transition(nextstate) ⇒ Object
overrides state management transitions to introduce an intermediate :waiting_for_chunk state, which the request transitions to once payload is buffered.
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 286 def transition(nextstate) return super unless @options.stream headers_sent = @headers_sent case nextstate when :idle headers_sent = false if @flush_buffer_on_body_cb callbacks(:body).delete(@flush_buffer_on_body_cb) @flush_buffer_on_body_cb = nil end when :waiting_for_chunk return unless @state == :body when :body case @state when :headers headers_sent = true when :waiting_for_chunk # HACK: to allow super to pass through @state = :headers end end super.tap do # delay setting this up until after the first transition to :body @headers_sent = headers_sent end end |