Class: Fairy::PoolQueue
- Inherits:
-
Object
- Object
- Fairy::PoolQueue
- Defined in:
- lib/fairy/share/port.rb
Direct Known Subclasses
Instance Method Summary collapse
-
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ PoolQueue
constructor
A new instance of PoolQueue.
- #pop ⇒ Object
- #pop_all ⇒ Object
- #push(e) ⇒ Object
- #push_all(buf) ⇒ Object
Constructor Details
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ PoolQueue
Returns a new instance of PoolQueue.
906 907 908 909 910 911 912 913 914 |
# File 'lib/fairy/share/port.rb', line 906 def initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) @policy = policy @queue_threshold = CONF.POOLQUEUE_POOL_THRESHOLD @queue = [] @queue_mon = queue_mon @queue_cv = queue_cv end |
Instance Method Details
#pop ⇒ Object
936 937 938 939 940 941 |
# File 'lib/fairy/share/port.rb', line 936 def pop @queue_mon.synchronize do @queue_cv.wait_while{@queue.empty?} @queue.shift end end |
#pop_all ⇒ Object
943 944 945 946 947 948 949 950 951 |
# File 'lib/fairy/share/port.rb', line 943 def pop_all @queue_mon.synchronize do @queue_cv.wait_while{@queue.size < @queue_threshold && @queue.last != :END_OF_STREAM} # buf = @queue.dup # @queue.clear buf, @queue = @queue, [] buf end end |
#push(e) ⇒ Object
916 917 918 919 920 921 922 923 924 925 |
# File 'lib/fairy/share/port.rb', line 916 def push(e) @queue_mon.synchronize do @queue.push e if @queue.size >= @queue_threshold || e == :END_OF_STREAM || e == Import::SET_NO_IMPORT @queue_cv.broadcast end end end |
#push_all(buf) ⇒ Object
927 928 929 930 931 932 933 934 |
# File 'lib/fairy/share/port.rb', line 927 def push_all(buf) @queue_mon.synchronize do @queue.concat buf if @queue.size >= @queue_threshold || @queue.last == :END_OF_STREAM @queue_cv.broadcast end end end |