Class: SqsGrep::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/sqs-grep/runner.rb

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Runner

Returns a new instance of Runner.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/sqs-grep/runner.rb', line 10

def initialize(config)
  @config = config

  @config.sqs_client ||= begin
                           effective_options = SqsGrep::Client.core_v2_options.merge(config.client_options)
                           Aws::SQS::Client.new(effective_options)
                         end

  if @config.invoke_lambda
    @config.lambda_client ||= begin
                                effective_options = SqsGrep::Client.core_v2_options.merge(config.client_options)
                                Aws::Lambda::Client.new(effective_options)
                              end
  end
end

Instance Method Details

#runObject

Sets $stdout.sync



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/sqs-grep/runner.rb', line 27

def run
  queue_name = @config.queue_name
  queue_url = resolve_queue queue_name
  function_name = @config.invoke_lambda

  send_to_url = if @config.send_to
                  resolve_queue @config.send_to
                end

  $stdout.sync = true

  seen_message_ids = Set.new
  num_matched = 0

  loop do
    r = @config.sqs_client.receive_message(
      queue_url: queue_url,
      attribute_names: %w[ All ],
      max_number_of_messages: 10,
      visibility_timeout: @config.visibility_timeout,
      wait_time_seconds: @config.wait_time_seconds,
    )

    break if r.messages.empty?

    r.messages.each do |m|
      # puts "%s\t%s\t%s" % [ queue_name, m.message_id, m.receipt_handle ]

      if seen_message_ids.include? m.message_id
        $stderr.puts "Already seen message #{m.message_id} - bailing out in case we're looping"
        return 0
      end
      seen_message_ids << m.message_id

      matches = (m.body.match(@config.pattern) != nil)

      if matches ^ @config.invert_match
        json_data = {
          queue_url: queue_url,
          queue_name: queue_name,
          message_id: m.message_id,
          attributes: m.attributes.to_h,
          body: m.body,
        }
        if !@config.json_format
          puts "%s\t%s" % [ queue_name, m.message_id ]
          puts m.attributes.inspect
          puts m.body
          puts ""
        end

        if send_to_url
          # FIXME? discards message attributes
          send_res = @config.sqs_client.send_message(
            queue_url: send_to_url,
            message_body: m.body,
          )
          if !@config.json_format
            p send_res
            puts ""
          end
        end

        if function_name
          send_res = @config.lambda_client.invoke(
            function_name: function_name,
            invocation_type: 'Event',
            payload: m.body
          )
          if !@config.json_format
            p send_res
            puts ""
          end
        end

        if @config.delete_matched
          delete_res = @config.sqs_client.delete_message(
            queue_url: queue_url,
            receipt_handle: m.receipt_handle,
          )
          if !@config.json_format
            p delete_res
            puts ""
          end
        end

        if @config.json_format
          puts JSON.pretty_generate(json_data)
        end

        num_matched = num_matched + 1
        return 0 if @config.max_count and num_matched >= @config.max_count
      end
    end
  end

  return 0
end