Class: Fairy::ChunkedPoolQueue
- Inherits:
-
Object
- Object
- Fairy::ChunkedPoolQueue
- Defined in:
- lib/fairy/share/port.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#fib_cv ⇒ Object
Returns the value of attribute fib_cv.
Instance Method Summary collapse
-
#initialize(policy, queues_mon = XThread::Monitor.new, queues_cv = queues_mon.new_cond) ⇒ ChunkedPoolQueue
constructor
multi push threads single pop thread.
- #pop ⇒ Object
- #pop_all ⇒ Object
- #push(e) ⇒ Object
- #push_all(buf) ⇒ Object
Constructor Details
#initialize(policy, queues_mon = XThread::Monitor.new, queues_cv = queues_mon.new_cond) ⇒ ChunkedPoolQueue
multi push threads single pop thread
1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 |
# File 'lib/fairy/share/port.rb', line 1020 def initialize(policy, queues_mon = XThread::Monitor.new, queues_cv = queues_mon.new_cond) @policy = policy @queue_threshold = CONF.POOLQUEUE_POOL_THRESHOLD @queue_max = CONF.POSTQUEUE_MAX_TRANSFER_SIZE @push_queue = [] @push_queue_mutex = Mutex.new @queues = [] @queues_mon = queues_mon @queues_cv = queues_cv @pop_queue = nil # @pop_queue_mutex = Mutex.new end |
Instance Attribute Details
#fib_cv ⇒ Object
Returns the value of attribute fib_cv.
1037 1038 1039 |
# File 'lib/fairy/share/port.rb', line 1037 def fib_cv @fib_cv end |
Instance Method Details
#pop ⇒ Object
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 |
# File 'lib/fairy/share/port.rb', line 1068 def pop # @pop_queue.synchronize do while !@pop_queue || @pop_queue.empty? @queues_mon.synchronize do @queues_cv.wait_until{@pop_queue = @queues.shift} end end e = @pop_queue.shift @pop_queue = nil if @pop_queue.empty? e end |
#pop_all ⇒ Object
1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 |
# File 'lib/fairy/share/port.rb', line 1080 def pop_all # @pop_queue.synchronize do while !@pop_queue @queues_mon.synchronize do @queues_cv.wait_until{@pop_queue = @queues.shift} end end buf, @pop_queue = @pop_queue, nil buf # end end |
#push(e) ⇒ Object
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 |
# File 'lib/fairy/share/port.rb', line 1039 def push(e) @push_queue_mutex.synchronize do @push_queue.push e if @push_queue.size >= @queue_threshold || e == :END_OF_STREAM || e == Import::SET_NO_IMPORT @queues_mon.synchronize do @queues.push @push_queue @push_queue = [] @queues_cv.broadcast end end end end |
#push_all(buf) ⇒ Object
1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 |
# File 'lib/fairy/share/port.rb', line 1054 def push_all(buf) @push_queue_mutex.synchronize do @push_queue.concat buf if @push_queue.size > @queue_threshold || @push_queue.last == :END_OF_STREAM @queues_mon.synchronize do @queues.push @push_queue @push_queue = [] @queues_cv.broadcast end end end end |