Module: Politics::StaticQueueWorker

Defined in:
lib/politics/static_queue_worker.rb

Overview

The StaticQueueWorker mixin allows a processing daemon to “lease” or checkout a portion of a problem space to ensure no other process is processing that same space at the same time. The processing space is cut into N “buckets”, each of which is placed in a queue. Processes then fetch entries from the queue and process them. It is up to the application to map the bucket number onto its specific problem space.

Note that memcached is used for leader election. The leader owns the queue during the iteration period and other peers fetch buckets from the current leader during the iteration.

The leader hands out buckets in order. Once all the buckets have been processed, the leader returns nil to the processors which causes them to sleep until the end of the iteration. Then everyone wakes up, a new leader is elected, and the processing starts all over again.

DRb and mDNS are used for peer discovery and communication.

Example usage:

class Analyzer
  include Politics::StaticQueueWorker
  TOTAL_BUCKETS = 16

  def start
    register_worker(self.class.name, TOTAL_BUCKETS)
    process_bucket do |bucket|
      puts "Analyzing bucket #{bucket} of #{TOTAL_BUCKETS}"
      sleep 5
    end
  end
end

Note: process_bucket never returns i.e. this should be the main loop of your processing daemon.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(model) ⇒ Object

:nodoc:



61
62
63
64
65
# File 'lib/politics/static_queue_worker.rb', line 61

def self.included(model) #:nodoc:
  model.class_eval do
    attr_accessor :group_name, :iteration_length
  end
end

Instance Method Details

#bucket_requestObject



111
112
113
114
115
116
117
# File 'lib/politics/static_queue_worker.rb', line 111

def bucket_request
  if leader?
    [@buckets.pop, until_next_iteration]
  else
    :not_leader
  end
end

#process_bucket(&block) ⇒ Object

Fetch a bucket out of the queue and pass it to the given block to be processed.

bucket

The bucket number to process, within the range 0…TOTAL_BUCKETS

Raises:

  • (ArgumentError)


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/politics/static_queue_worker.rb', line 88

def process_bucket(&block)
  raise ArgumentError, "process_bucket requires a block!" unless block_given?
  raise ArgumentError, "You must call register_worker before processing!" unless @memcache_client

  begin
    nominate
    if leader?
      # Drb thread handles leader duties
      log.info { "#{@uri} has been elected leader" }
      relax until_next_iteration
      initialize_buckets          
    else
      # Get a bucket from the leader and process it
      begin
        bucket_process(*leader.bucket_request, &block)
      rescue DRb::DRbError => dre
        log.error { "Error talking to leader: #{dre.message}" }
        relax until_next_iteration
      end
    end
  end while loop?
end

#register_worker(name, bucket_count, config = {}) ⇒ Object

Register this process as able to work on buckets.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/politics/static_queue_worker.rb', line 68

def register_worker(name, bucket_count, config={})
  options = { :iteration_length => 60, :servers => ['127.0.0.1:11211'] }
  options.merge!(config)

  self.group_name = name
  self.iteration_length = options[:iteration_length]
  @memcache_client = client_for(Array(options[:servers]))

  @buckets = []
  @bucket_count = bucket_count
  initialize_buckets

register_with_bonjour

log.info { "Registered #{self} in group #{group_name} at port #{@port}" }
end