Class: SqsConsumer::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/sqs_consumer/consumer.rb

Constant Summary collapse

RECEIVE_MESSAGES_DEFAULT =
{max_number_of_messages: 1, wait_time_seconds: 1}

Instance Method Summary collapse

Constructor Details

#initialize(endpoint:, region:, work_queue_name:, throttle_sleep_duration: 0.0, receive_messages_options: {}, message_filter: nil, processor_action: nil, success_result: nil, error_callback: nil) ⇒ Consumer

Returns a new instance of Consumer.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/sqs_consumer/consumer.rb', line 8

def initialize(endpoint:, region:, work_queue_name:, throttle_sleep_duration: 0.0,
  receive_messages_options: {}, message_filter: nil, processor_action: nil, success_result: nil, error_callback: nil)
  @client_config = {
    endpoint: endpoint,
    region: region
  }
  @work_queue_name = work_queue_name
  @throttle_sleep_duration = throttle_sleep_duration
  @logger = PaulBunyan.logger
  @receive_messages_options = receive_messages_options.merge(RECEIVE_MESSAGES_DEFAULT)
  @message_filter = message_filter
  @processor_action = processor_action
  @success_result = success_result
  @error_callback = error_callback
  @running = false
end

Instance Method Details

#create_queueObject



37
38
39
40
41
# File 'lib/sqs_consumer/consumer.rb', line 37

def create_queue
  sqs_client = Aws::SQS::Client.new(@client_config)
  sqs_queue_url = sqs_client.get_queue_url(queue_name: @work_queue_name)[:queue_url]
  Aws::SQS::Queue.new(sqs_queue_url, sqs_client.config)
end

#runObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/sqs_consumer/consumer.rb', line 43

def run
  @running = true
  queue = create_queue

  loop do
    sqs_messages = queue.receive_messages(@receive_messages_options)

    @logger.debug { "Got #{sqs_messages.size} messages from SQS." } unless sqs_messages.size == 0

    sqs_messages.each do |sqs_message|
      begin
        if should_process? sqs_message
          result = @processor_action&.call(sqs_message)
          if @success_result.nil? || @success_result == result
            sqs_message.delete
          end
        end
      rescue => e
        @error_callback&.call(e, sqs_message)
      end
    end
    break unless running?
    sleep @throttle_sleep_duration unless @throttle_sleep_duration.zero?
  end
end

#running?Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/sqs_consumer/consumer.rb', line 33

def running?
  @running
end

#should_process?(sqs_message) ⇒ Boolean

Returns:

  • (Boolean)


25
26
27
# File 'lib/sqs_consumer/consumer.rb', line 25

def should_process?(sqs_message)
  @message_filter.nil? || @message_filter.call(sqs_message.message_attributes)
end

#stopObject



29
30
31
# File 'lib/sqs_consumer/consumer.rb', line 29

def stop
  @running = false
end