Class: Pigeon::Queue
- Inherits:
-
Object
- Object
- Pigeon::Queue
- Defined in:
- lib/pigeon/queue.rb
Defined Under Namespace
Classes: BlockRequired, TaskNotQueued
Instance Attribute Summary collapse
-
#processors ⇒ Object
readonly
Properties ===========================================================.
Class Method Summary collapse
-
.filter(name, &block) ⇒ Object
Defines a new filter with the given name and uses the supplied block to evaluate if a task qualifies or not.
-
.filters ⇒ Object
Returns the current filter configuration.
Instance Method Summary collapse
-
#<<(task) ⇒ Object
Adds a task to the queue.
-
#add_processor(processor, &claim) ⇒ Object
Adds a processor to the queue and adds an observer claim method.
-
#claim(task) ⇒ Object
Claims a task.
-
#each ⇒ Object
Iterates over each of the tasks in the queue.
-
#empty?(filter_name = nil, &block) ⇒ Boolean
Returns true if the queue is empty, false otherwise.
- #execute_add_task!(task) ⇒ Object
-
#filter(filter_name, &block) ⇒ Object
Creates a named filter for the queue using the provided block to select the tasks which should match.
-
#include?(task) ⇒ Boolean
Returns true if the task is queued, false otherwise.
-
#initialize(&block) ⇒ Queue
constructor
Creates a new queue.
-
#length(filter_name = nil, &block) ⇒ Object
(also: #size, #count)
Returns the number of entries in the queue.
-
#observe(filter_name = nil, &block) ⇒ Object
Sets up a callback for the queue that will execute the block if new tasks are added to the queue.
-
#peek(filter_name = nil, &block) ⇒ Object
Peeks at the next task in the queue, or if filter_name is provided, then the next task meeting those filter conditions.
-
#pop(filter_name = nil, &block) ⇒ Object
Returns the next task from the queue.
-
#pull(filter_name = nil, &block) ⇒ Object
Removes all tasks from the queue.
-
#remove_observer(filter_name = nil, &block) ⇒ Object
Removes references to the callback function specified.
-
#remove_processor(processor, &claim) ⇒ Object
Removes a processor from the queue and removes an observer claim method.
-
#sort_by(&block) ⇒ Object
Returns the contents sorted by the given block.
-
#to_a ⇒ Object
Copies the list of queued tasks to a new Array.
Constructor Details
#initialize(&block) ⇒ Queue
Creates a new queue. If a block is given, it is used to compare two tasks and order them, so it should take two arguments and return the relative difference (-1, 0, 1) like Array#sort would work.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/pigeon/queue.rb', line 48 def initialize(&block) @filters = self.class.filters.dup @filters.extend(MonitorMixin) @observers = { } @observers.extend(MonitorMixin) @claimable_task = { } @processors = [ ] @next_task = { } if (block_given?) @sort_by = block else @sort_by = lambda { |a,b| a.priority <=> b.priority } end @tasks = Pigeon::SortedArray.new(&@sort_by) end |
Instance Attribute Details
#processors ⇒ Object (readonly)
Properties ===========================================================
24 25 26 |
# File 'lib/pigeon/queue.rb', line 24 def processors @processors end |
Class Method Details
.filter(name, &block) ⇒ Object
Defines a new filter with the given name and uses the supplied block to evaluate if a task qualifies or not.
39 40 41 |
# File 'lib/pigeon/queue.rb', line 39 def self.filter(name, &block) filters[name] = block end |
.filters ⇒ Object
Returns the current filter configuration. This is stored as a Hash with the key being the filter name, the value being the matching block. The nil key is the default filter which accepts all tasks.
31 32 33 34 35 |
# File 'lib/pigeon/queue.rb', line 31 def self.filters @filters ||= { nil => lambda { |task| true } } end |
Instance Method Details
#<<(task) ⇒ Object
Adds a task to the queue.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/pigeon/queue.rb', line 136 def <<(task) unless (task.is_a?(Pigeon::Task)) raise "Cannot add task of class #{task.class} to #{self.class}" end # If there is an insert operation already in progress, put this task in # the backlog for subsequent processing. Pigeon::Engine.execute_in_main_thread do self.execute_add_task!(task) end task end |
#add_processor(processor, &claim) ⇒ Object
Adds a processor to the queue and adds an observer claim method.
106 107 108 109 110 111 112 |
# File 'lib/pigeon/queue.rb', line 106 def add_processor(processor, &claim) @observers.synchronize do @processors << processor end observe(&claim) if (claim) end |
#claim(task) ⇒ Object
Claims a task. This is used to indicate that the task will be processed without having to be inserted into the queue.
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/pigeon/queue.rb', line 294 def claim(task) @filters.synchronize do if (@claimable_task[task]) @claimable_task[task] = false elsif (@tasks.delete(task)) @next_task.each do |filter_name, next_task| if (task == next_task) @next_task[filter_name] = nil end end else raise TaskNotQueued, task end end task end |
#each ⇒ Object
Iterates over each of the tasks in the queue.
200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/pigeon/queue.rb', line 200 def each tasks = nil @filters.synchronize do tasks = @tasks.dup end tasks.each do yield(task) end end |
#empty?(filter_name = nil, &block) ⇒ Boolean
Returns true if the queue is empty, false otherwise. If filter_name is given, then will return true if there are no matching tasks, false otherwise. An optional block can further restrict qualifying tasks.
322 323 324 325 326 327 328 329 330 |
# File 'lib/pigeon/queue.rb', line 322 def empty?(filter_name = nil, &block) if (block_given?) @filters.synchronize do !@tasks.find(&block) end else !peek(filter_name) end end |
#execute_add_task!(task) ⇒ Object
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/pigeon/queue.rb', line 151 def execute_add_task!(task) # Set the claimable task flag for this task since it is not yet in the # actual task queue. @claimable_task[task] = true unless (@observers.empty?) @observers.synchronize do @observers.each do |filter_name, list| # Check if this task matches the filter restrictions, and if it # does then call the observer chain in order. if (@filters[filter_name].call(task)) @observers[filter_name].each do |proc| case (proc.arity) when 2 proc.call(self, task) else proc.call(task) end # An observer callback has the opportunity to claim a task, # and if it does, the claimable task flag will be false. Loop # only while the task is claimable. break unless (@claimable_task[task]) end end end end end # If this task wasn't claimed by an observer then insert it in the # main task queue. if (@claimable_task.delete(task)) @filters.synchronize do @tasks << task # Update the next task slots for all of the unassigned filters and # trigger observer callbacks as required. @next_task.each do |filter_name, next_task| next if (next_task) if (@filters[filter_name].call(task)) @next_task[filter_name] = task end end end end end |
#filter(filter_name, &block) ⇒ Object
Creates a named filter for the queue using the provided block to select the tasks which should match.
125 126 127 128 129 130 131 132 133 |
# File 'lib/pigeon/queue.rb', line 125 def filter(filter_name, &block) raise BlockRequired unless (block_given?) @filters.synchronize do @filters[filter_name] = block end assign_next_task(filter_name) end |
#include?(task) ⇒ Boolean
Returns true if the task is queued, false otherwise.
313 314 315 316 317 |
# File 'lib/pigeon/queue.rb', line 313 def include?(task) @filters.synchronize do @tasks.include?(task) end end |
#length(filter_name = nil, &block) ⇒ Object Also known as: size, count
Returns the number of entries in the queue. If filter_name is given, then will return the number of matching tasks. An optional block can further restrict qualifying tasks.
335 336 337 338 339 340 341 |
# File 'lib/pigeon/queue.rb', line 335 def length(filter_name = nil, &block) filter_proc = @filters[filter_name] @filters.synchronize do filter_proc ? @tasks.count(&filter_proc) : nil end end |
#observe(filter_name = nil, &block) ⇒ Object
Sets up a callback for the queue that will execute the block if new tasks are added to the queue. If filter_name is specified, this block will be run for tasks matching that filtered subset.
82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/pigeon/queue.rb', line 82 def observe(filter_name = nil, &block) raise BlockRequired unless (block_given?) @observers.synchronize do set = @observers[filter_name] ||= [ ] set << block end assign_next_task(filter_name) end |
#peek(filter_name = nil, &block) ⇒ Object
Peeks at the next task in the queue, or if filter_name is provided, then the next task meeting those filter conditions. An optional block can also be used to further restrict the qualifying tasks.
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/pigeon/queue.rb', line 215 def peek(filter_name = nil, &block) if (block_given?) @filters.synchronize do @tasks.find(&block) end elsif (filter_name) @next_task[filter_name] ||= begin @filters.synchronize do filter_proc = @filters[filter_name] filter_proc and @tasks.find(&filter_proc) end end else @filters.synchronize do @tasks.first end end end |
#pop(filter_name = nil, &block) ⇒ Object
Returns the next task from the queue. If a filter_name is given, then will only select tasks matching that filter’s conditions. An optional block can also be used to further restrict the qualifying tasks. The task will be removed from the queue and must be re-inserted if it is to be scheduled again.
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/pigeon/queue.rb', line 263 def pop(filter_name = nil, &block) @filters.synchronize do task = if (block_given?) @tasks.find(&block) elsif (filter_name) @next_task[filter_name] || begin filter_proc = @filters[filter_name] filter_proc and @tasks.find(&filter_proc) end else @tasks.first end if (task) @tasks.delete(task) @next_task.each do |_filter_name, next_task| if (task == next_task) @next_task[_filter_name] = nil end end end task end end |
#pull(filter_name = nil, &block) ⇒ Object
Removes all tasks from the queue. If a filter_name is given, then will only remove tasks matching that filter’s conditions. An optional block can also be used to further restrict the qualifying tasks.
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/pigeon/queue.rb', line 238 def pull(filter_name = nil, &block) if (!block_given? and filter_name) block = @filters[filter_name] end @filters.synchronize do tasks = block ? @tasks.select(&block) : @tasks @tasks -= tasks @next_task.keys.each do |_filter_name| if (tasks.include?(@next_task[_filter_name])) @next_task[_filter_name] = nil end end tasks end end |
#remove_observer(filter_name = nil, &block) ⇒ Object
Removes references to the callback function specified. Note that the same Proc must be passed in, as a block with an identical function will not be considered equivalent.
97 98 99 100 101 102 103 |
# File 'lib/pigeon/queue.rb', line 97 def remove_observer(filter_name = nil, &block) @observers.synchronize do set = @observers[filter_name] set and set.delete(block) end end |
#remove_processor(processor, &claim) ⇒ Object
Removes a processor from the queue and removes an observer claim method.
115 116 117 118 119 120 121 |
# File 'lib/pigeon/queue.rb', line 115 def remove_processor(processor, &claim) @observers.synchronize do @processors.delete(processor) end remove_observer(&claim) if (claim) end |
#sort_by(&block) ⇒ Object
Returns the contents sorted by the given block. The block will be passed a single Task and the results are sorted by the return value.
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/pigeon/queue.rb', line 68 def sort_by(&block) raise BlockRequired unless (block_given?) @sort_by = block @filters.synchronize do @tasks = Pigeon::SortedArray.new(&@sort_by) + @tasks @next_task = { } end end |
#to_a ⇒ Object
Copies the list of queued tasks to a new Array.
346 347 348 349 350 |
# File 'lib/pigeon/queue.rb', line 346 def to_a @filters.synchronize do @tasks.dup end end |