Method: Fluent::Plugin::Output#update_retry_state

Defined in:
lib/fluent/plugin/output.rb

#update_retry_state(chunk_id, using_secondary, error = nil) ⇒ Object



1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
# File 'lib/fluent/plugin/output.rb', line 1309

def update_retry_state(chunk_id, using_secondary, error = nil)
  @retry_mutex.synchronize do
    @num_errors_metrics.inc
    chunk_id_hex = dump_unique_id_hex(chunk_id)

    unless @retry
      @retry = retry_state(@buffer_config.retry_randomize)

      if @retry.limit?
        handle_limit_reached(error)
      elsif error
        log_retry_error(error, chunk_id_hex, using_secondary)
      end

      return
    end

    # @retry exists

    # Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when
    # @retry.step is called almost as many times as the number of flush threads in a short time.
    if Time.now >= @retry.next_time
      @retry.step
    else
      @retry.recalc_next_time # to prevent all flush threads from retrying at the same time
    end

    if @retry.limit?
      handle_limit_reached(error)
    elsif error
      log_retry_error(error, chunk_id_hex, using_secondary)
    end
  end
end