Class: SQS::Job::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/sqs/job/worker.rb

Overview

A Worker maintains a SQS::Job::ThreadPool and AWS::SQS::Queue and creates SQS::Job::Handler instances to process each message received. It is also responsible for boot/configuration stuff. There should only be one worker per process.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue) ⇒ Worker

Returns a new instance of Worker.



9
10
11
# File 'lib/sqs/job/worker.rb', line 9

def initialize(queue)
  @queue = queue
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



7
8
9
# File 'lib/sqs/job/worker.rb', line 7

def queue
  @queue
end

Instance Method Details

#log_exceptions(&block) ⇒ Object



30
31
32
33
34
35
36
37
# File 'lib/sqs/job/worker.rb', line 30

def log_exceptions &block
  begin
    block.call
  rescue => ex
    SQS::Job.logger.error "Error processing message: #{ex.class.name} #{ex}\n\t#{ex.backtrace.join("\t\n")}"
    raise ex unless ex.is_a?(UnrecoverableException)
  end
end

#run(options = {}) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/sqs/job/worker.rb', line 13

def run options = {}
  require 'sqs/job/thread_pool'

  min_threads = options[:min_threads] || SQS::Job.min_threads 
  max_threads = options[:max_threads] || SQS::Job.max_threads
  @pool = SQS::Job::ThreadPool.new min_threads, max_threads do |msg|
    log_exceptions{ Handler.new(msg).run! }
  end
  
  while true
    # KEG: it's not clear if queue.poll accepts message_attribute_names
    queue.receive_messages(wait_time_seconds: 10, batch_size: 10, message_attribute_names: [ 'signature', 'key_fingerprint' ]) do |msg|
      @pool << msg
    end
  end
end