Class: SizedQueue

Inherits:
Queue
  • Object
show all
Defined in:
lib/rubysl/thread/thread.rb

Overview

This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.

See Queue for an example of how a SizedQueue works.

Instance Method Summary collapse

Methods inherited from Queue

#clear, #empty?, #length

Constructor Details

#initialize(max) ⇒ SizedQueue

Creates a fixed-length queue with a maximum size of max.

Raises:

  • (ArgumentError)


343
344
345
346
347
348
349
350
351
# File 'lib/rubysl/thread/thread.rb', line 343

def initialize(max)
  raise ArgumentError, "queue size must be positive" unless max > 0
  @max = max
  @queue_wait = []
  @queue_wait.taint		# enable tainted comunication
  @size_mutex = Mutex.new
  @sem = ConditionVariable.new
  super()
end

Instance Method Details

#maxObject

Returns the maximum size of the queue.



356
357
358
# File 'lib/rubysl/thread/thread.rb', line 356

def max
  @max
end

#max=(max) ⇒ Object

Sets the maximum size of the queue.



363
364
365
366
367
368
369
# File 'lib/rubysl/thread/thread.rb', line 363

def max=(max)
  @size_mutex.synchronize do
    @max = max
    @sem.broadcast(@size_mutex)
  end
  max
end

#num_waitingObject

Returns the number of threads waiting on the queue.



427
428
429
# File 'lib/rubysl/thread/thread.rb', line 427

def num_waiting
  @waiting.size + @queue_wait.size
end

#pop(*args) ⇒ Object Also known as: shift, deq

Retrieves data from the queue and runs a waiting thread, if any.



402
403
404
405
406
407
408
409
410
411
412
# File 'lib/rubysl/thread/thread.rb', line 402

def pop(*args)
  retval = super
  
  @size_mutex.synchronize do
    if @que.size < @max
      @sem.broadcast
    end      
  end
  
  return retval
end

#push(obj) ⇒ Object Also known as: <<, enq

Pushes obj to the queue. If there is no space left in the queue, waits until space becomes available.



375
376
377
378
379
380
381
382
383
384
385
386
387
# File 'lib/rubysl/thread/thread.rb', line 375

def push(obj)
  while true
    @size_mutex.synchronize do
      @queue_wait.delete(Thread.current)
      if @que.size >= @max
        @queue_wait.push Thread.current
        @sem.wait(@size_mutex)
      else
        return super(obj)
      end
    end
  end
end