Module: Politics::StaticQueueWorker
- Defined in:
- lib/politics/static_queue_worker.rb
Overview
The StaticQueueWorker mixin allows a processing daemon to “lease” or checkout a portion of a problem space to ensure no other process is processing that same space at the same time. The processing space is cut into N “buckets”, each of which is placed in a queue. Processes then fetch entries from the queue and process them. It is up to the application to map the bucket number onto its specific problem space.
Note that memcached is used for leader election. The leader owns the queue during the iteration period and other peers fetch buckets from the current leader during the iteration.
The leader hands out buckets in order. Once all the buckets have been processed, the leader returns nil to the processors which causes them to sleep until the end of the iteration. Then everyone wakes up, a new leader is elected, and the processing starts all over again.
DRb and mDNS are used for peer discovery and communication.
Example usage:
class Analyzer
include Politics::StaticQueueWorker
TOTAL_BUCKETS = 16
def start
register_worker(self.class.name, TOTAL_BUCKETS)
process_bucket do |bucket|
puts "Analyzing bucket #{bucket} of #{TOTAL_BUCKETS}"
sleep 5
end
end
end
Note: process_bucket never returns i.e. this should be the main loop of your processing daemon.
Class Method Summary collapse
-
.included(model) ⇒ Object
:nodoc:.
Instance Method Summary collapse
- #bucket_request ⇒ Object
-
#process_bucket(&block) ⇒ Object
Fetch a bucket out of the queue and pass it to the given block to be processed.
-
#register_worker(name, bucket_count, config = {}) ⇒ Object
Register this process as able to work on buckets.
Class Method Details
.included(model) ⇒ Object
:nodoc:
61 62 63 64 65 |
# File 'lib/politics/static_queue_worker.rb', line 61 def self.included(model) #:nodoc: model.class_eval do attr_accessor :group_name, :iteration_length end end |
Instance Method Details
#bucket_request ⇒ Object
111 112 113 114 115 116 117 |
# File 'lib/politics/static_queue_worker.rb', line 111 def bucket_request if leader? [@buckets.pop, until_next_iteration] else :not_leader end end |
#process_bucket(&block) ⇒ Object
Fetch a bucket out of the queue and pass it to the given block to be processed.
bucket
-
The bucket number to process, within the range 0…TOTAL_BUCKETS
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/politics/static_queue_worker.rb', line 88 def process_bucket(&block) raise ArgumentError, "process_bucket requires a block!" unless block_given? raise ArgumentError, "You must call register_worker before processing!" unless @memcache_client begin nominate if leader? # Drb thread handles leader duties log.info { "#{@uri} has been elected leader" } relax until_next_iteration initialize_buckets else # Get a bucket from the leader and process it begin bucket_process(*leader.bucket_request, &block) rescue DRb::DRbError => dre log.error { "Error talking to leader: #{dre.}" } relax until_next_iteration end end end while loop? end |
#register_worker(name, bucket_count, config = {}) ⇒ Object
Register this process as able to work on buckets.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/politics/static_queue_worker.rb', line 68 def register_worker(name, bucket_count, config={}) = { :iteration_length => 60, :servers => ['127.0.0.1:11211'] } .merge!(config) self.group_name = name self.iteration_length = [:iteration_length] @memcache_client = client_for(Array([:servers])) @buckets = [] @bucket_count = bucket_count initialize_buckets register_with_bonjour log.info { "Registered #{self} in group #{group_name} at port #{@port}" } end |