Class: ThreadStorm::Queue
Overview
This is tricky… we need to maintain both real queue size and fake queue size. If we use just the real queue size alone, then we will see the following (incorrect) behavior:
storm = ThreadStorm.new :size => 2, :execute_blocks => true
storm.execute{ sleep }
storm.execute{ sleep }
storm.execute{ sleep } # Doesn't block, but should.
storm.execute{ sleep } # Finally blocks.
The reason is that popping the queue (and thus decrementing its size) does not imply that the worker thread has actually finished the execution and is ready to accept another one.
Instance Method Summary collapse
-
#decr_size ⇒ Object
Decrement the fake size, thus signaling that we’re ready to call
enqueue
. -
#dequeue ⇒ Object
dequeue
needs to wait until the real size, otherwise a single call toenqueue
could result to multiple successful calls todequeue
before a call todecr_size
is made. -
#enqueue(item) ⇒ Object
enqueue
needs to wait on the fake size, otherwise @max_size+1 calls toenqueue
could be made when @enqueue_blocks is true. -
#initialize(max_size, enqueue_blocks) ⇒ Queue
constructor
:nodoc:.
- #shutdown ⇒ Object
- #synchronize(&block) ⇒ Object
Constructor Details
#initialize(max_size, enqueue_blocks) ⇒ Queue
:nodoc:
17 18 19 20 21 22 23 24 25 |
# File 'lib/thread_storm/queue.rb', line 17 def initialize(max_size, enqueue_blocks) @max_size = max_size @enqueue_blocks = enqueue_blocks @size = 0 @array = [] @lock = Monitor.new @cond1 = @lock.new_cond # Wish I could come up with better names. @cond2 = @lock.new_cond end |
Instance Method Details
#decr_size ⇒ Object
Decrement the fake size, thus signaling that we’re ready to call enqueue
.
53 54 55 56 57 58 |
# File 'lib/thread_storm/queue.rb', line 53 def decr_size @lock.synchronize do @size -= 1 unless @size == 0 @cond2.broadcast end end |
#dequeue ⇒ Object
dequeue
needs to wait until the real size, otherwise a single call to enqueue
could result to multiple successful calls to dequeue
before a call to decr_size
is made.
45 46 47 48 49 50 |
# File 'lib/thread_storm/queue.rb', line 45 def dequeue @lock.synchronize do @cond1.wait_until{ @array.size > 0 } @array.shift end end |
#enqueue(item) ⇒ Object
enqueue
needs to wait on the fake size, otherwise @max_size+1 calls to enqueue
could be made when @enqueue_blocks is true.
33 34 35 36 37 38 39 40 |
# File 'lib/thread_storm/queue.rb', line 33 def enqueue(item) @lock.synchronize do @cond2.wait_until{ @size < @max_size } if @enqueue_blocks @size += 1 @array << item @cond1.broadcast end end |
#shutdown ⇒ Object
60 61 62 63 64 65 66 |
# File 'lib/thread_storm/queue.rb', line 60 def shutdown @lock.synchronize do @array = [nil] * @max_size @size = @max_size @cond1.broadcast end end |
#synchronize(&block) ⇒ Object
27 28 29 |
# File 'lib/thread_storm/queue.rb', line 27 def synchronize(&block) @lock.synchronize{ yield(self) } end |