Module: HTTPX::Plugins::StreamBidi::RequestMethods
- Defined in:
- lib/httpx/plugins/stream_bidi.rb
Overview
Adds synchronization to request operations which may buffer payloads from different threads.
Instance Attribute Summary collapse
-
#headers_sent ⇒ Object
Returns the value of attribute headers_sent.
Instance Method Summary collapse
- #<<(chunk) ⇒ Object
- #can_buffer? ⇒ Boolean
- #close ⇒ Object
- #closed? ⇒ Boolean
- #flush_buffer_on_body(&cb) ⇒ Object
- #initialize ⇒ Object
-
#transition(nextstate) ⇒ Object
overrides state management transitions to introduce an intermediate
:waiting_for_chunkstate, which the request transitions to once payload is buffered.
Instance Attribute Details
#headers_sent ⇒ Object
Returns the value of attribute headers_sent.
251 252 253 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 251 def headers_sent @headers_sent end |
Instance Method Details
#<<(chunk) ⇒ Object
311 312 313 314 315 316 317 318 319 320 321 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 311 def <<(chunk) @mutex.synchronize do if @drainer @body.clear if @body.respond_to?(:clear) @drainer = nil end @body << chunk transition(:body) end end |
#can_buffer? ⇒ Boolean
271 272 273 274 275 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 271 def can_buffer? return super unless @options.stream super && @state != :waiting_for_chunk end |
#close ⇒ Object
323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 323 def close return super unless @options.stream @mutex.synchronize do return if @closed @closed = true end # last chunk to send which ends the stream self << "" end |
#closed? ⇒ Boolean
265 266 267 268 269 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 265 def closed? return super unless @options.stream @closed end |
#flush_buffer_on_body(&cb) ⇒ Object
261 262 263 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 261 def flush_buffer_on_body(&cb) @flush_buffer_on_body_cb = on(:body, &cb) end |
#initialize ⇒ Object
253 254 255 256 257 258 259 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 253 def initialize(*) super @headers_sent = false @closed = false @flush_buffer_on_body_cb = nil @mutex = Thread::Mutex.new end |
#transition(nextstate) ⇒ Object
overrides state management transitions to introduce an intermediate :waiting_for_chunk state, which the request transitions to once payload is buffered.
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/httpx/plugins/stream_bidi.rb', line 280 def transition(nextstate) return super unless @options.stream headers_sent = @headers_sent case nextstate when :idle headers_sent = false if @flush_buffer_on_body_cb callbacks(:body).delete(@flush_buffer_on_body_cb) @flush_buffer_on_body_cb = nil end when :waiting_for_chunk return unless @state == :body when :body case @state when :headers headers_sent = true when :waiting_for_chunk # HACK: to allow super to pass through @state = :headers end end super.tap do # delay setting this up until after the first transition to :body @headers_sent = headers_sent end end |