Class: SmartQueue
- Inherits:
-
Object
- Object
- SmartQueue
- Defined in:
- lib/ruby-threading-toolkit/smart_queue.rb
Direct Known Subclasses
Instance Method Summary collapse
- #concat(messages) ⇒ Object
- #empty? ⇒ Boolean
- #flush ⇒ Object
-
#initialize ⇒ SmartQueue
constructor
A new instance of SmartQueue.
- #priority_concat(messages) ⇒ Object
- #priority_enqueue(action, data) ⇒ Object
- #priority_push(message) ⇒ Object
- #push(message) ⇒ Object
- #regular_enqueue(action, data) ⇒ Object
- #regular_limit=(limit) ⇒ Object
- #release_blocked(thread) ⇒ Object
- #select ⇒ Object
- #shift ⇒ Object
Constructor Details
#initialize ⇒ SmartQueue
Returns a new instance of SmartQueue.
18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/ruby-threading-toolkit/smart_queue.rb', line 18 def initialize @mutex = Mutex.new @arrived = ConditionVariable.new @shifted = ConditionVariable.new @regular = [] @priority = [] @regular_limit = nil @stop_list = {} end |
Instance Method Details
#concat(messages) ⇒ Object
68 69 70 |
# File 'lib/ruby-threading-toolkit/smart_queue.rb', line 68 def concat regular_enqueue :concat, end |
#empty? ⇒ Boolean
142 143 144 145 146 |
# File 'lib/ruby-threading-toolkit/smart_queue.rb', line 142 def empty? @mutex.synchronize do @regular.empty? && @priority.empty? end end |
#flush ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/ruby-threading-toolkit/smart_queue.rb', line 123 def flush @mutex.synchronize do result = [] unless @priority.empty? result += @priority @priority.clear end unless @regular.empty? result += @regular @regular.clear @shifted.signal end return result end end |
#priority_concat(messages) ⇒ Object
84 85 86 |
# File 'lib/ruby-threading-toolkit/smart_queue.rb', line 84 def priority_concat priority_enqueue :concat, end |
#priority_enqueue(action, data) ⇒ Object
72 73 74 75 76 77 78 |
# File 'lib/ruby-threading-toolkit/smart_queue.rb', line 72 def priority_enqueue action, data @mutex.synchronize do @priority.__send__(action, data) @arrived.signal end true end |
#priority_push(message) ⇒ Object
80 81 82 |
# File 'lib/ruby-threading-toolkit/smart_queue.rb', line 80 def priority_push priority_enqueue :push, end |
#push(message) ⇒ Object
64 65 66 |
# File 'lib/ruby-threading-toolkit/smart_queue.rb', line 64 def push regular_enqueue :push, end |
#regular_enqueue(action, data) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/ruby-threading-toolkit/smart_queue.rb', line 50 def regular_enqueue action, data @mutex.synchronize do @shifted.if_wait_while(@mutex) { @regular_limit && @regular.length >= @regular_limit && !@stop_list.include?(Thread.current) } if @stop_list.include?(Thread.current) @stop_list.delete(Thread.current) end @regular.__send__(action, data) @arrived.signal end true end |
#regular_limit=(limit) ⇒ Object
44 45 46 47 48 |
# File 'lib/ruby-threading-toolkit/smart_queue.rb', line 44 def regular_limit= limit @mutex.synchronize do @regular_limit = limit end end |
#release_blocked(thread) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/ruby-threading-toolkit/smart_queue.rb', line 32 def release_blocked thread @mutex.synchronize do # Cleaning up @stop_list right here, so, at maximum, it holds only one dead thread. @stop_list.delete_if {|some_thread, flag| !Thread.list.include?(some_thread)} @stop_list[thread] = true @arrived.broadcast @shifted.broadcast end end |
#select ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/ruby-threading-toolkit/smart_queue.rb', line 107 def select @mutex.synchronize do result = [] unless @priority.empty? @priority.reject! { |item| result.push(item) if yield(item) } end unless @regular.empty? @shifted.signal if @regular.reject! { |item| result.push(item) if yield(item) } end return result end end |
#shift ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/ruby-threading-toolkit/smart_queue.rb', line 88 def shift @mutex.synchronize do @arrived.if_wait_while(@mutex) { @priority.empty? && @regular.empty? && !@stop_list.include?(Thread.current) } if @stop_list.include?(Thread.current) @stop_list.delete(Thread.current) return nil elsif @priority.empty? @shifted.signal return @regular.shift else return @priority.shift end end end |