Class: Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/rubysl/thread/thread.rb

Overview

This class provides a way to synchronize communication between threads.

Example:

require 'thread'

queue = Queue.new

producer = Thread.new do
  5.times do |i|
    sleep rand(i) # simulate expense
    queue << i
    puts "#{i} produced"
  end
end

consumer = Thread.new do
  5.times do |i|
    value = queue.pop
    sleep rand(i/2) # simulate expense
    puts "consumed #{value}"
  end
end

consumer.join

Direct Known Subclasses

SizedQueue

Instance Method Summary collapse

Constructor Details

#initializeQueue

Creates a new queue.



143
144
145
146
147
148
149
150
151
# File 'lib/rubysl/thread/thread.rb', line 143

def initialize
  @que = []
  @que.taint		# enable tainted comunication
  self.taint
  @waiting = []
  @waiting.taint
  @mutex = Mutex.new
  @resource = ConditionVariable.new
end

Instance Method Details

#clearObject

Removes all objects from the queue.



218
219
220
# File 'lib/rubysl/thread/thread.rb', line 218

def clear
  @que.clear
end

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)


211
212
213
# File 'lib/rubysl/thread/thread.rb', line 211

def empty?
  @que.empty?
end

#lengthObject Also known as: size

Returns the length of the queue.



225
226
227
# File 'lib/rubysl/thread/thread.rb', line 225

def length
  @que.length
end

#num_waitingObject

Returns the number of threads waiting on the queue.



237
238
239
# File 'lib/rubysl/thread/thread.rb', line 237

def num_waiting
  @waiting.size
end

#pop(non_block = false) ⇒ Object Also known as: shift, deq

Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and an exception is raised.



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/rubysl/thread/thread.rb', line 178

def pop(non_block=false)
  while true
    @mutex.synchronize do
      #FIXME: some code in net or somewhere violates encapsulation
      #and demands that a waiting queue exist for Queue, as a result
      #we have to do a linear search here to remove the current Thread.
      @waiting.delete(Thread.current)
      if @que.empty?
        raise ThreadError, "queue empty" if non_block
        @waiting.push Thread.current
        @resource.wait(@mutex)
      else
        retval = @que.shift
        @resource.signal
        return retval
      end
    end
  end
end

#push(obj) ⇒ Object Also known as: <<, enq

Pushes obj to the queue.



156
157
158
159
160
161
# File 'lib/rubysl/thread/thread.rb', line 156

def push(obj)
  @mutex.synchronize do
    @que.push obj
    @resource.signal
  end
end