Class: LogStash::Outputs::Dynatrace

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::HttpClient
Defined in:
lib/logstash/outputs/dynatrace.rb

Defined Under Namespace

Classes: Batcher, PluginInternalQueueLeftoverError, RetryTimerTask

Constant Summary collapse

RETRYABLE_MANTICORE_EXCEPTIONS =
[
  ::Manticore::Timeout,
  ::Manticore::SocketException,
  ::Manticore::ClientProtocolException,
  ::Manticore::ResolutionFailure,
  ::Manticore::SocketTimeout
].freeze
RETRYABLE_UNKNOWN_EXCEPTION_STRINGS =
[
  /Connection reset by peer/i,
  /Read Timed out/i
].freeze

Instance Method Summary collapse

Instance Method Details

#closeObject



313
314
315
316
# File 'lib/logstash/outputs/dynatrace.rb', line 313

def close
  @timer.cancel
  client.close
end

#log_error_response(response, ingest_endpoint_url, event) ⇒ Object



167
168
169
170
171
172
173
174
# File 'lib/logstash/outputs/dynatrace.rb', line 167

def log_error_response(response, ingest_endpoint_url, event)
  log_failure(
    "Encountered non-2xx HTTP code #{response.code}",
    response_code: response.code,
    ingest_endpoint_url: ingest_endpoint_url,
    event: event
  )
end

#log_partial_success_response(response) ⇒ Object



163
164
165
# File 'lib/logstash/outputs/dynatrace.rb', line 163

def log_partial_success_response(response)
  @logger.warn("Encountered partial success response", code: response.code, body: response.body)
end

#log_retryable_response(response) ⇒ Object



153
154
155
156
157
158
159
160
161
# File 'lib/logstash/outputs/dynatrace.rb', line 153

def log_retryable_response(response)
  retry_msg = RETRY_FAILED ? 'will retry' : "won't retry"
  if response.code == 429
    @logger.debug? && @logger.debug("Encountered a 429 response, #{retry_msg}. This is not serious, just flow control via HTTP")
  else
    @logger.warn("Encountered a retryable HTTP request in HTTP output, #{retry_msg}", code: response.code,
                                                                                      body: response.body)
  end
end

#make_headersObject



145
146
147
148
149
150
151
# File 'lib/logstash/outputs/dynatrace.rb', line 145

def make_headers
  {
    'User-Agent' => "logstash-output-dynatrace/#{DynatraceConstants::VERSION} logstash/#{LOGSTASH_VERSION}",
    'Content-Type' => 'application/json; charset=utf-8',
    'Authorization' => "Api-Token #{@api_key.value}"
  }
end

#multi_receive(events) ⇒ Object



97
98
99
100
101
# File 'lib/logstash/outputs/dynatrace.rb', line 97

def multi_receive(events)
  return if events.empty?

  send_events(events)
end

#pipeline_shutdown_requested?Boolean

Returns:

  • (Boolean)


249
250
251
252
253
# File 'lib/logstash/outputs/dynatrace.rb', line 249

def pipeline_shutdown_requested?
  return super if defined?(super) # since LS 8.1.0

  nil
end

#registerObject



87
88
89
90
91
92
93
94
95
# File 'lib/logstash/outputs/dynatrace.rb', line 87

def register
  # ssl_verification_mode config is from mixin but ssl_verify_none is our documented config
  @ssl_verification_mode = 'none' if @ssl_verify_none

  @ingest_endpoint_url = @ingest_endpoint_url.to_s

  # Run named Timer as daemon thread
  @timer = java.util.Timer.new("HTTP Output #{params['id']}", true)
end

#send_event(event, attempt) ⇒ Object



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/logstash/outputs/dynatrace.rb', line 261

def send_event(event, attempt)
  headers = make_headers

  # Compress the body and add appropriate header
  if @http_compression == true
    headers["Content-Encoding"] = "gzip"
    event = gzip(event)
  end

  # Create an async request
  response = client.post(ingest_endpoint_url, body: event, headers: headers)

  if response_success?(response)
    # some events were not accepted but we don't know which ones or why
    log_partial_success_response(response) if response_partial_success?(response)
    [:success, event, attempt]
  elsif retryable_response?(response)
    log_retryable_response(response)
    [:retry, event, attempt]
  else
    log_error_response(response, ingest_endpoint_url, event)
    [:failure, event, attempt]
  end
rescue StandardError => e
  will_retry = retryable_exception?(e)
  log_params = {
    ingest_endpoint_url: ingest_endpoint_url,
    message: e.message,
    class: e.class,
    will_retry: will_retry
  }
  if @logger.debug?
    # backtraces are big
    log_params[:backtrace] = e.backtrace
    if @debug_include_headers
      # headers can have sensitive data
      log_params[:headers] = headers
    end
    if @debug_include_body
      # body can be big and may have sensitive data
      log_params[:body] = event
    end
  end
  log_failure('Could not fetch URL', log_params)

  if will_retry
    [:retry, event, attempt]
  else
    [:failure, event, attempt]
  end
end

#send_events(events) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/logstash/outputs/dynatrace.rb', line 176

def send_events(events)
  successes = java.util.concurrent.atomic.AtomicInteger.new(0)
  failures  = java.util.concurrent.atomic.AtomicInteger.new(0)

  pending = Queue.new
  batcher = Batcher.new(@max_payload_size)

  events.each do |event|
    serialized_event = LogStash::Json.dump(event.to_hash)
    if serialized_event.bytesize > @max_payload_size
      log_params = { size: serialized_event.bytesize }
      log_params[:body] = serialized_event if @debug_include_body
      log_warning('Event larger than max_payload_size dropped', log_params)
      next
    end

    next if batcher.offer(serialized_event)

    pending << [batcher.drain_and_serialize, 0] unless batcher.empty?
    batcher.offer(serialized_event)
  end

  pending << [batcher.drain_and_serialize, 0] unless batcher.empty?

  while popped = pending.pop
    break if popped == :done

    event, attempt = popped

    if attempt > 2 && pipeline_shutdown_requested?
      raise PluginInternalQueueLeftoverError, 'Received pipeline shutdown request but http output has unfinished events. ' \
              'If persistent queue is enabled, events will be retried.'
    end

    action, event, attempt = send_event(event, attempt)
    begin
      action = :failure if action == :retry && !RETRY_FAILED

      case action
      when :success
        successes.incrementAndGet
      when :retry
        next_attempt = attempt + 1
        sleep_for = sleep_for_attempt(next_attempt)
        @logger.info("Retrying http request, will sleep for #{sleep_for} seconds")
        timer_task = RetryTimerTask.new(pending, event, next_attempt)
        @timer.schedule(timer_task, sleep_for * 1000)
      when :failure
        failures.incrementAndGet
      else
        # this should never happen. It means send_event returned a symbol we didn't recognize
        raise "Unknown action #{action}"
      end

      pending << :done if %i[success failure].include?(action) && (successes.get + failures.get == 1)
    rescue StandardError => e
      # This should never happen unless there's a flat out bug in the code
      @logger.error('Error sending HTTP Request',
                    class: e.class.name,
                    message: e.message,
                    backtrace: e.backtrace)
      failures.incrementAndGet
      raise e
    end
  end
rescue StandardError => e
  @logger.error('Error in http output loop',
                class: e.class.name,
                message: e.message,
                backtrace: e.backtrace)
  raise e
end

#sleep_for_attempt(attempt) ⇒ Object



255
256
257
258
259
# File 'lib/logstash/outputs/dynatrace.rb', line 255

def sleep_for_attempt(attempt)
  sleep_for = attempt**2
  sleep_for = sleep_for <= 60 ? sleep_for : 60
  (sleep_for / 2) + (rand(0..sleep_for) / 2)
end