Class: Fairy::PoolQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/share/port.rb

Direct Known Subclasses

SizedPoolQueue

Instance Method Summary collapse

Constructor Details

#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ PoolQueue

Returns a new instance of PoolQueue.



906
907
908
909
910
911
912
913
914
# File 'lib/fairy/share/port.rb', line 906

def initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond)
  @policy = policy

  @queue_threshold = CONF.POOLQUEUE_POOL_THRESHOLD

  @queue = []
  @queue_mon = queue_mon
  @queue_cv = queue_cv
end

Instance Method Details

#popObject



936
937
938
939
940
941
# File 'lib/fairy/share/port.rb', line 936

def pop
  @queue_mon.synchronize do
  @queue_cv.wait_while{@queue.empty?}
  @queue.shift
  end
end

#pop_allObject



943
944
945
946
947
948
949
950
951
# File 'lib/fairy/share/port.rb', line 943

def pop_all
  @queue_mon.synchronize do
  @queue_cv.wait_while{@queue.size < @queue_threshold && @queue.last != :END_OF_STREAM}
# buf = @queue.dup
# @queue.clear
  buf, @queue = @queue, []
  buf
  end
end

#push(e) ⇒ Object



916
917
918
919
920
921
922
923
924
925
# File 'lib/fairy/share/port.rb', line 916

def push(e)
  @queue_mon.synchronize do
  @queue.push e
  if @queue.size >= @queue_threshold || 
 e == :END_OF_STREAM || 
 e == Import::SET_NO_IMPORT
    @queue_cv.broadcast
  end
  end
end

#push_all(buf) ⇒ Object



927
928
929
930
931
932
933
934
# File 'lib/fairy/share/port.rb', line 927

def push_all(buf)
  @queue_mon.synchronize do
  @queue.concat buf
  if @queue.size >= @queue_threshold || @queue.last == :END_OF_STREAM
    @queue_cv.broadcast
  end
  end
end