Method: Fluent::Plugin::Output#configure

Defined in:
lib/fluent/plugin/output.rb

#configure(conf) ⇒ Object



266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
# File 'lib/fluent/plugin/output.rb', line 266

def configure(conf)
  unless implement?(:synchronous) || implement?(:buffered) || implement?(:delayed_commit)
    raise "BUG: output plugin must implement some methods. see developer documents."
  end

  has_buffer_section = (conf.elements(name: 'buffer').size > 0)
  has_flush_interval = conf.has_key?('flush_interval')

  super

  @num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "num_errors", help_text: "Number of count num errors")
  @emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_count", help_text: "Number of count emits")
  @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records")
  @emit_size_metrics =  metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events")
  @write_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events")
  @rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations")
  @flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time")
  @slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)")

  if has_buffer_section
    unless implement?(:buffered) || implement?(:delayed_commit)
      raise Fluent::ConfigError, "<buffer> section is configured, but plugin '#{self.class}' doesn't support buffering"
    end
    @buffering = true
  else # no buffer sections
    if implement?(:synchronous)
      if !implement?(:buffered) && !implement?(:delayed_commit)
        if @as_secondary
          raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't."
        end
        @buffering = false
      else
        if @as_secondary
          # secondary plugin always works as buffered plugin without buffer instance
          @buffering = true
        else
          # @buffering.nil? shows that enabling buffering or not will be decided in lazy way in #start
          @buffering = nil
        end
      end
    else # buffered or delayed_commit is supported by `unless` of first line in this method
      @buffering = true
    end
  end
  # Enable to update record size metrics or not
  @enable_size_metrics = !!system_config.enable_size_metrics

  if @as_secondary
    if !@buffering && !@buffering.nil?
      raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't"
    end
  end

  if (@buffering || @buffering.nil?) && !@as_secondary
    # When @buffering.nil?, @buffer_config was initialized with default value for all parameters.
    # If so, this configuration MUST success.
    @chunk_keys = @buffer_config.chunk_keys.dup
    @chunk_key_time = !!@chunk_keys.delete('time')
    @chunk_key_tag = !!@chunk_keys.delete('tag')
    if @chunk_keys.any? { |key|
        begin
          k = Fluent::PluginHelper::RecordAccessor::Accessor.parse_parameter(key)
          if k.is_a?(String)
            k !~ CHUNK_KEY_PATTERN
          else
            if key.start_with?('$[')
              raise Fluent::ConfigError, "in chunk_keys: bracket notation is not allowed"
            else
              false
            end
          end
        rescue => e
          raise Fluent::ConfigError, "in chunk_keys: #{e.message}"
        end
      }
      raise Fluent::ConfigError, "chunk_keys specification includes invalid char"
    else
      @chunk_key_accessors = Hash[@chunk_keys.map { |key| [key.to_sym, Fluent::PluginHelper::RecordAccessor::Accessor.new(key)] }]
    end

    if @chunk_key_time
      raise Fluent::ConfigError, "<buffer ...> argument includes 'time', but timekey is not configured" unless @buffer_config.timekey
      Fluent::Timezone.validate!(@buffer_config.timekey_zone)
      @timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone
      @timekey = @buffer_config.timekey
      if @timekey <= 0
        raise Fluent::ConfigError, "timekey should be greater than 0. current timekey: #{@timekey}"
      end
      @timekey_use_utc = @buffer_config.timekey_use_utc
      @offset = Fluent::Timezone.utc_offset(@timekey_zone)
      @calculate_offset = @offset.respond_to?(:call) ? @offset : nil
      @output_time_formatter_cache = {}
    end

    if (@chunk_key_tag ? 1 : 0) + @chunk_keys.size >= CHUNKING_FIELD_WARN_NUM
      log.warn "many chunk keys specified, and it may cause too many chunks on your system."
    end

    # no chunk keys or only tags (chunking can be done without iterating event stream)
    @simple_chunking = !@chunk_key_time && @chunk_keys.empty?

    @flush_mode = @buffer_config.flush_mode
    if @flush_mode == :default
      if has_flush_interval
        log.info "'flush_interval' is configured at out side of <buffer>. 'flush_mode' is set to 'interval' to keep existing behaviour"
        @flush_mode = :interval
      else
        @flush_mode = (@chunk_key_time ? :lazy : :interval)
      end
    end

    buffer_type = @buffer_config[:@type]
    buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, [])
    @buffer = Plugin.new_buffer(buffer_type, parent: self)
    @buffer.configure(buffer_conf)
    keep_buffer_config_compat
    @buffer.enable_update_timekeys if @chunk_key_time

    @flush_at_shutdown = @buffer_config.flush_at_shutdown
    if @flush_at_shutdown.nil?
      @flush_at_shutdown = if @buffer.persistent?
                             false
                           else
                             true # flush_at_shutdown is true in default for on-memory buffer
                           end
    elsif !@flush_at_shutdown && !@buffer.persistent?
      buf_type = Plugin.lookup_type_from_class(@buffer.class)
      log.warn "'flush_at_shutdown' is false, and buffer plugin '#{buf_type}' is not persistent buffer."
      log.warn "your configuration will lose buffered data at shutdown. please confirm your configuration again."
    end

    if (@flush_mode != :interval) && buffer_conf.has_key?('flush_interval')
      if buffer_conf.has_key?('flush_mode')
        raise Fluent::ConfigError, "'flush_interval' can't be specified when 'flush_mode' is not 'interval' explicitly: '#{@flush_mode}'"
      else
        log.warn "'flush_interval' is ignored because default 'flush_mode' is not 'interval': '#{@flush_mode}'"
      end
    end

    if @buffer.queued_chunks_limit_size.nil?
      @buffer.queued_chunks_limit_size = @buffer_config.flush_thread_count
    end
  end

  if @secondary_config
    raise Fluent::ConfigError, "Invalid <secondary> section for non-buffered plugin" unless @buffering
    raise Fluent::ConfigError, "<secondary> section cannot have <buffer> section" if @secondary_config.buffer
    raise Fluent::ConfigError, "<secondary> section cannot have <secondary> section" if @secondary_config.secondary
    if @buffer_config.retry_forever
      log.warn "<secondary> with 'retry_forever', only unrecoverable errors are moved to secondary"
    end

    secondary_type = @secondary_config[:@type]
    unless secondary_type
      secondary_type = conf['@type'] # primary plugin type
    end
    secondary_conf = conf.elements(name: 'secondary').first
    @secondary = Plugin.new_output(secondary_type)
    unless @secondary.respond_to?(:acts_as_secondary)
      raise Fluent::ConfigError, "Failed to setup secondary plugin in '#{conf['@type']}'. '#{secondary_type}' plugin in not allowed due to non buffered output"
    end
    @secondary.acts_as_secondary(self)
    @secondary.configure(secondary_conf)
    if (@secondary.class.to_s != "Fluent::Plugin::SecondaryFileOutput") &&
       (self.class != @secondary.class) &&
       (@custom_format || @secondary.implement?(:custom_format))
      log.warn "Use different plugin for secondary. Check the plugin works with primary like secondary_file", primary: self.class.to_s, secondary: @secondary.class.to_s
    end
  else
    @secondary = nil
  end

  self
end