Class: ThreadStorm::Queue

Inherits:
Object show all
Defined in:
lib/thread_storm/queue.rb

Overview

This is tricky… we need to maintain both real queue size and fake queue size. If we use just the real queue size alone, then we will see the following (incorrect) behavior:

storm = ThreadStorm.new :size => 2, :execute_blocks => true
storm.execute{ sleep }
storm.execute{ sleep }
storm.execute{ sleep } # Doesn't block, but should.
storm.execute{ sleep } # Finally blocks.

The reason is that popping the queue (and thus decrementing its size) does not imply that the worker thread has actually finished the execution and is ready to accept another one.

Instance Method Summary collapse

Constructor Details

#initialize(max_size, enqueue_blocks) ⇒ Queue

:nodoc:



17
18
19
20
21
22
23
24
25
# File 'lib/thread_storm/queue.rb', line 17

def initialize(max_size, enqueue_blocks)
  @max_size       = max_size
  @enqueue_blocks = enqueue_blocks
  @size           = 0
  @array          = []
  @lock           = Monitor.new
  @cond1          = @lock.new_cond # Wish I could come up with better names.
  @cond2          = @lock.new_cond
end

Instance Method Details

#decr_sizeObject

Decrement the fake size, thus signaling that we’re ready to call enqueue.



53
54
55
56
57
58
# File 'lib/thread_storm/queue.rb', line 53

def decr_size
  @lock.synchronize do
    @size -= 1 unless @size == 0
    @cond2.broadcast
  end
end

#dequeueObject

dequeue needs to wait until the real size, otherwise a single call to enqueue could result to multiple successful calls to dequeue before a call to decr_size is made.



45
46
47
48
49
50
# File 'lib/thread_storm/queue.rb', line 45

def dequeue
  @lock.synchronize do
    @cond1.wait_until{ @array.size > 0 }
    @array.shift
  end
end

#enqueue(item) ⇒ Object

enqueue needs to wait on the fake size, otherwise @max_size+1 calls to enqueue could be made when @enqueue_blocks is true.



33
34
35
36
37
38
39
40
# File 'lib/thread_storm/queue.rb', line 33

def enqueue(item)
  @lock.synchronize do
    @cond2.wait_until{ @size < @max_size } if @enqueue_blocks
    @size += 1
    @array << item
    @cond1.broadcast
  end
end

#shutdownObject



60
61
62
63
64
65
66
# File 'lib/thread_storm/queue.rb', line 60

def shutdown
  @lock.synchronize do
    @array = [nil] * @max_size
    @size = @max_size
    @cond1.broadcast
  end
end

#synchronize(&block) ⇒ Object



27
28
29
# File 'lib/thread_storm/queue.rb', line 27

def synchronize(&block)
  @lock.synchronize{ yield(self) }
end