Class: Pipes::Store
- Inherits:
-
Object
- Object
- Pipes::Store
- Defined in:
- lib/pipes/store.rb
Overview
Stages are stored in Redis in the following manner: pipes:stages:stage_1 [‘ContentWriterStrategy’, args: [‘en-US’], …] pipes:stages:stage_2 [‘PublisherStrategy’, args: [‘en-US’]]
The jobs stored in Redis are Marshalled Ruby objects, so the structure is more-or-less arbitrary, though at a performance cost.
Jobs are queued up in the following steps
1. Strategies in stage n? No, look in stage n+1 until last stage.
Yes, shift off the next stage and queue up its jobs
2. Strategies run concurrently. Keep track of how many are currently running to
know when the next stage should be started.
Class Method Summary collapse
-
.add_pipe(stages, options = {}) ⇒ Object
Add a new set of stages to Redis.
-
.clear(stage) ⇒ Object
Clear a specific stage queue.
-
.clear_all ⇒ Object
Find all stage queues in Redis (even ones not configured), and clear them.
-
.done ⇒ Object
Register that a job has finished.
-
.next_stage ⇒ Object
Fire off the next available stage, if available.
-
.run_stage(jobs) ⇒ Object
Actually enqueue the jobs.
Class Method Details
.add_pipe(stages, options = {}) ⇒ Object
Add a new set of stages to Redis.
23 24 25 26 27 28 29 30 31 |
# File 'lib/pipes/store.rb', line 23 def self.add_pipe(stages, = {}) stages.each do |stage| stage[:jobs].each do |job| pending = pending_jobs(stage[:name]) pending << job if valid_for_queue?(stage[:name], pending, job, ) end end next_stage end |
.clear(stage) ⇒ Object
Clear a specific stage queue.
69 70 71 |
# File 'lib/pipes/store.rb', line 69 def self.clear(stage) pending_jobs(stage).clear end |
.clear_all ⇒ Object
Find all stage queues in Redis (even ones not configured), and clear them.
75 76 77 78 79 80 |
# File 'lib/pipes/store.rb', line 75 def self.clear_all stage_keys = Redis.current.keys "#{@redis_stages_key}:*" Redis.current.del *stage_keys unless stage_keys.empty? remaining_jobs.clear end |
.done ⇒ Object
Register that a job has finished.
61 62 63 64 65 |
# File 'lib/pipes/store.rb', line 61 def self.done if remaining_jobs.decrement == 0 next_stage end end |
.next_stage ⇒ Object
Fire off the next available stage, if available.
35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/pipes/store.rb', line 35 def self.next_stage return unless remaining_jobs == 0 # Always start at the first stage, in case new stragies have been added mid-pipe stages.each do |stage| if !(jobs = pending_jobs(stage)).empty? run_stage(jobs) clear(stage) return end end end |
.run_stage(jobs) ⇒ Object
Actually enqueue the jobs.
50 51 52 53 54 55 56 57 |
# File 'lib/pipes/store.rb', line 50 def self.run_stage(jobs) remaining_jobs.clear remaining_jobs.incr(jobs.count) jobs.each do |job| Resque.enqueue(job[:class], *job[:args]) end end |