Class: SqsBuffer::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/sqs_buffer/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ Client

Returns a new instance of Client.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/sqs_buffer/client.rb', line 7

def initialize(opts)
  @queue_url = opts.fetch(:queue_url) { |k| missing_key!(k) }
  client     = opts.fetch(:client) { |k| missing_key!(k) }

  @poller = Aws::SQS::QueuePoller.new(@queue_url, client: client)
  @skip_delete            = opts.fetch(:skip_delete, true)
  @max_number_of_messages = opts.fetch(:max_number_of_messages, 10).to_i
  @logger                 = opts.fetch(:logger, Logger.new(STDOUT))
  @before_request_block   = Concurrent::MutexAtomicReference.new
  @process_block          = Concurrent::MutexAtomicReference.new
  @message_queue          = Concurrent::Array.new
  @last_process_time      = Concurrent::AtomicFixnum.new(Time.now.to_i)
  @running                = Concurrent::AtomicBoolean.new(false)

  @max_wait_time = Concurrent::AtomicFixnum.new(
    opts.fetch(:max_wait_time, 300).to_i
  )
  @max_queue_threshold = Concurrent::AtomicFixnum.new(
    opts.fetch(:max_queue_threshold, 100).to_i
  )
  configure_before_request_block
end

Instance Method Details

#before_request_block(&block) ⇒ Object



116
117
118
# File 'lib/sqs_buffer/client.rb', line 116

def before_request_block(&block)
  @before_request_block.value = block
end

#bufferObject



75
76
77
78
# File 'lib/sqs_buffer/client.rb', line 75

def buffer
  # Return a copy of the array events to guard against potential mutation
  Marshal.load( Marshal.dump(@message_queue) )
end

#buffer_empty?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/sqs_buffer/client.rb', line 67

def buffer_empty?
  @message_queue.empty?
end

#buffer_full?Boolean

Returns:

  • (Boolean)


63
64
65
# File 'lib/sqs_buffer/client.rb', line 63

def buffer_full?
  @message_queue.length >= @max_queue_threshold.value
end

#buffer_lengthObject



71
72
73
# File 'lib/sqs_buffer/client.rb', line 71

def buffer_length
  @message_queue.length
end

#last_process_time_stale?Boolean

Returns:

  • (Boolean)


92
93
94
# File 'lib/sqs_buffer/client.rb', line 92

def last_process_time_stale?
  @last_process_time.value < Time.now.to_i - @max_wait_time.value
end

#process_all_messagesObject



100
101
102
103
104
105
106
107
108
109
110
# File 'lib/sqs_buffer/client.rb', line 100

def process_all_messages
  if @process_block.value
    call_process_block_safely
  else
    @logger.info "No process block was given. Discarding all messages."
  end
  delete_all_messages
  touch_process_time
rescue StandardError => e
  @logger.error "An exception(#{e.message}) occurred while processing the message queue: #{@message_queue.join("\n")} | Backtrace: #{e.backtrace}"
end

#process_block(&block) ⇒ Object



112
113
114
# File 'lib/sqs_buffer/client.rb', line 112

def process_block(&block)
  @process_block.value = block
end

#queue_urlObject



55
56
57
# File 'lib/sqs_buffer/client.rb', line 55

def queue_url
  @queue_url
end

#running?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/sqs_buffer/client.rb', line 84

def running?
  @running.true? && worker_thread_alive?
end

#shutting_down?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/sqs_buffer/client.rb', line 80

def shutting_down?
  @running.false? && worker_thread_alive?
end

#start_pollingObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/sqs_buffer/client.rb', line 30

def start_polling
  @running.make_true

  @worker_thread = Thread.new do
    begin
      sleep_seconds ||= 1
      opts = {
        skip_delete: @skip_delete,
        max_number_of_messages: @max_number_of_messages
      }
      @poller.poll(opts) do |messages|
        store_messages(messages)
        sleep_seconds = 1
      end
    rescue => e
      sleep_seconds = sleep_seconds * 2
      @logger.error "An unhandled exception(#{e.message}) occurred in worker thread. Sleeping #{sleep_seconds} seconds before retry. | Backtrace: #{e.backtrace}"
      sleep([sleep_seconds, 30].min)
      retry
    end
  end # End worker thread

  @running.value
end

#stop_pollingObject



59
60
61
# File 'lib/sqs_buffer/client.rb', line 59

def stop_polling
  @running.make_false
end

#time_since_last_processObject



96
97
98
# File 'lib/sqs_buffer/client.rb', line 96

def time_since_last_process
  Time.now.to_i - @last_process_time.value
end

#worker_thread_alive?Boolean

Returns:

  • (Boolean)


88
89
90
# File 'lib/sqs_buffer/client.rb', line 88

def worker_thread_alive?
  !@worker_thread.nil? && @worker_thread.alive?
end