Class: SqsConsumer::Consumer
- Inherits:
-
Object
- Object
- SqsConsumer::Consumer
- Defined in:
- lib/sqs_consumer/consumer.rb
Constant Summary collapse
- RECEIVE_MESSAGES_DEFAULT =
{max_number_of_messages: 1, wait_time_seconds: 1}
Instance Method Summary collapse
- #create_queue ⇒ Object
-
#initialize(endpoint:, region:, work_queue_name:, throttle_sleep_duration: 0.0, receive_messages_options: {}, message_filter: nil, processor_action: nil, success_result: nil, error_callback: nil) ⇒ Consumer
constructor
A new instance of Consumer.
- #run ⇒ Object
- #running? ⇒ Boolean
- #should_process?(sqs_message) ⇒ Boolean
- #stop ⇒ Object
Constructor Details
#initialize(endpoint:, region:, work_queue_name:, throttle_sleep_duration: 0.0, receive_messages_options: {}, message_filter: nil, processor_action: nil, success_result: nil, error_callback: nil) ⇒ Consumer
Returns a new instance of Consumer.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/sqs_consumer/consumer.rb', line 8 def initialize(endpoint:, region:, work_queue_name:, throttle_sleep_duration: 0.0, receive_messages_options: {}, message_filter: nil, processor_action: nil, success_result: nil, error_callback: nil) @client_config = { endpoint: endpoint, region: region } @work_queue_name = work_queue_name @throttle_sleep_duration = throttle_sleep_duration @logger = PaulBunyan.logger = .merge(RECEIVE_MESSAGES_DEFAULT) = @processor_action = processor_action @success_result = success_result @error_callback = error_callback @running = false end |
Instance Method Details
#create_queue ⇒ Object
37 38 39 40 41 |
# File 'lib/sqs_consumer/consumer.rb', line 37 def create_queue sqs_client = Aws::SQS::Client.new(@client_config) sqs_queue_url = sqs_client.get_queue_url(queue_name: @work_queue_name)[:queue_url] Aws::SQS::Queue.new(sqs_queue_url, sqs_client.config) end |
#run ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/sqs_consumer/consumer.rb', line 43 def run @running = true queue = create_queue loop do = queue.() @logger.debug { "Got #{sqs_messages.size} messages from SQS." } unless .size == 0 .each do || begin if should_process? result = @processor_action&.call() if @success_result.nil? || @success_result == result .delete end end rescue => e @error_callback&.call(e, ) end end break unless running? sleep @throttle_sleep_duration unless @throttle_sleep_duration.zero? end end |
#running? ⇒ Boolean
33 34 35 |
# File 'lib/sqs_consumer/consumer.rb', line 33 def running? @running end |
#should_process?(sqs_message) ⇒ Boolean
25 26 27 |
# File 'lib/sqs_consumer/consumer.rb', line 25 def should_process?() .nil? || .call(.) end |
#stop ⇒ Object
29 30 31 |
# File 'lib/sqs_consumer/consumer.rb', line 29 def stop @running = false end |