Class: SQS::Job::Worker
- Inherits:
-
Object
- Object
- SQS::Job::Worker
- Defined in:
- lib/sqs/job/worker.rb
Overview
A Worker maintains a SQS::Job::ThreadPool and AWS::SQS::Queue and creates SQS::Job::Handler instances to process each message received. It is also responsible for boot/configuration stuff. There should only be one worker per process.
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#initialize(queue) ⇒ Worker
constructor
A new instance of Worker.
- #log_exceptions(&block) ⇒ Object
- #run(options = {}) ⇒ Object
Constructor Details
#initialize(queue) ⇒ Worker
Returns a new instance of Worker.
9 10 11 |
# File 'lib/sqs/job/worker.rb', line 9 def initialize(queue) @queue = queue end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
7 8 9 |
# File 'lib/sqs/job/worker.rb', line 7 def queue @queue end |
Instance Method Details
#log_exceptions(&block) ⇒ Object
30 31 32 33 34 35 36 37 |
# File 'lib/sqs/job/worker.rb', line 30 def log_exceptions &block begin block.call rescue => ex SQS::Job.logger.error "Error processing message: #{ex.class.name} #{ex}\n\t#{ex.backtrace.join("\t\n")}" raise ex unless ex.is_a?(UnrecoverableException) end end |
#run(options = {}) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/sqs/job/worker.rb', line 13 def run = {} require 'sqs/job/thread_pool' min_threads = [:min_threads] || SQS::Job.min_threads max_threads = [:max_threads] || SQS::Job.max_threads @pool = SQS::Job::ThreadPool.new min_threads, max_threads do |msg| log_exceptions{ Handler.new(msg).run! } end while true # KEG: it's not clear if queue.poll accepts message_attribute_names queue.(wait_time_seconds: 10, batch_size: 10, message_attribute_names: [ 'signature', 'key_fingerprint' ]) do |msg| @pool << msg end end end |