Class: Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/rubysl/thread/thread.rb

Overview

This class provides a way to synchronize communication between threads.

Example:

require 'thread'

queue = Queue.new

producer = Thread.new do
  5.times do |i|
    sleep rand(i) # simulate expense
    queue << i
    puts "#{i} produced"
  end
end

consumer = Thread.new do
  5.times do |i|
    value = queue.pop
    sleep rand(i/2) # simulate expense
    puts "consumed #{value}"
  end
end

consumer.join

Direct Known Subclasses

SizedQueue

Instance Method Summary collapse

Constructor Details

#initializeQueue

Creates a new queue.



234
235
236
237
238
239
240
241
242
# File 'lib/rubysl/thread/thread.rb', line 234

def initialize
  @que = []
  @que.taint		# enable tainted comunication
  self.taint
  @waiting = []
  @waiting.taint
  @mutex = Mutex.new
  @resource = ConditionVariable.new
end

Instance Method Details

#clearObject

Removes all objects from the queue.



309
310
311
# File 'lib/rubysl/thread/thread.rb', line 309

def clear
  @que.clear
end

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)


302
303
304
# File 'lib/rubysl/thread/thread.rb', line 302

def empty?
  @que.empty?
end

#lengthObject Also known as: size

Returns the length of the queue.



316
317
318
# File 'lib/rubysl/thread/thread.rb', line 316

def length
  @que.length
end

#num_waitingObject

Returns the number of threads waiting on the queue.



328
329
330
# File 'lib/rubysl/thread/thread.rb', line 328

def num_waiting
  @waiting.size
end

#pop(non_block = false) ⇒ Object Also known as: shift, deq

Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and an exception is raised.



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/rubysl/thread/thread.rb', line 269

def pop(non_block=false)
  while true
    @mutex.synchronize do
      #FIXME: some code in net or somewhere violates encapsulation
      #and demands that a waiting queue exist for Queue, as a result
      #we have to do a linear search here to remove the current Thread.
      @waiting.delete(Thread.current)
      if @que.empty?
        raise ThreadError, "queue empty" if non_block
        @waiting.push Thread.current
        @resource.wait(@mutex)
      else
        retval = @que.shift
        @resource.signal
        return retval
      end
    end
  end
end

#push(obj) ⇒ Object Also known as: <<, enq

Pushes obj to the queue.



247
248
249
250
251
252
# File 'lib/rubysl/thread/thread.rb', line 247

def push(obj)
  @mutex.synchronize do
    @que.push obj
    @resource.signal
  end
end