Class: SizedQueue

Inherits:
Queue
  • Object
show all
Defined in:
lib/rubysl/thread/thread.rb

Overview

This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.

See Queue for an example of how a SizedQueue works.

Instance Method Summary collapse

Methods inherited from Queue

#clear, #empty?, #length

Constructor Details

#initialize(max) ⇒ SizedQueue

Creates a fixed-length queue with a maximum size of max.

Raises:

  • (ArgumentError)


252
253
254
255
256
257
258
259
260
# File 'lib/rubysl/thread/thread.rb', line 252

def initialize(max)
  raise ArgumentError, "queue size must be positive" unless max > 0
  @max = max
  @queue_wait = []
  @queue_wait.taint		# enable tainted comunication
  @size_mutex = Mutex.new
  @sem = ConditionVariable.new
  super()
end

Instance Method Details

#maxObject

Returns the maximum size of the queue.



265
266
267
# File 'lib/rubysl/thread/thread.rb', line 265

def max
  @max
end

#max=(max) ⇒ Object

Sets the maximum size of the queue.



272
273
274
275
276
277
278
# File 'lib/rubysl/thread/thread.rb', line 272

def max=(max)
  @size_mutex.synchronize do
    @max = max
    @sem.broadcast
  end
  max
end

#num_waitingObject

Returns the number of threads waiting on the queue.



336
337
338
# File 'lib/rubysl/thread/thread.rb', line 336

def num_waiting
  @waiting.size + @queue_wait.size
end

#pop(*args) ⇒ Object Also known as: shift, deq

Retrieves data from the queue and runs a waiting thread, if any.



311
312
313
314
315
316
317
318
319
320
321
# File 'lib/rubysl/thread/thread.rb', line 311

def pop(*args)
  retval = super

  @size_mutex.synchronize do
    if @que.size < @max
      @sem.broadcast
    end
  end

  return retval
end

#push(obj) ⇒ Object Also known as: <<, enq

Pushes obj to the queue. If there is no space left in the queue, waits until space becomes available.



284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/rubysl/thread/thread.rb', line 284

def push(obj)
  while true
    @size_mutex.synchronize do
      @queue_wait.delete(Thread.current)
      if @que.size >= @max
        @queue_wait.push Thread.current
        @sem.wait(@size_mutex)
      else
        return super(obj)
      end
    end
  end
end