266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
|
# File 'lib/fluent/plugin/output.rb', line 266
def configure(conf)
unless implement?(:synchronous) || implement?(:buffered) || implement?(:delayed_commit)
raise "BUG: output plugin must implement some methods. see developer documents."
end
has_buffer_section = (conf.elements(name: 'buffer').size > 0)
has_flush_interval = conf.has_key?('flush_interval')
super
@num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "num_errors", help_text: "Number of count num errors")
@emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_count", help_text: "Number of count emits")
@emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records")
@emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events")
@write_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events")
@rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations")
@flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time")
@slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)")
if has_buffer_section
unless implement?(:buffered) || implement?(:delayed_commit)
raise Fluent::ConfigError, "<buffer> section is configured, but plugin '#{self.class}' doesn't support buffering"
end
@buffering = true
else if implement?(:synchronous)
if !implement?(:buffered) && !implement?(:delayed_commit)
if @as_secondary
raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't."
end
@buffering = false
else
if @as_secondary
@buffering = true
else
@buffering = nil
end
end
else @buffering = true
end
end
@enable_size_metrics = !!system_config.enable_size_metrics
if @as_secondary
if !@buffering && !@buffering.nil?
raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't"
end
end
if (@buffering || @buffering.nil?) && !@as_secondary
@chunk_keys = @buffer_config.chunk_keys.dup
@chunk_key_time = !!@chunk_keys.delete('time')
@chunk_key_tag = !!@chunk_keys.delete('tag')
if @chunk_keys.any? { |key|
begin
k = Fluent::PluginHelper::RecordAccessor::Accessor.parse_parameter(key)
if k.is_a?(String)
k !~ CHUNK_KEY_PATTERN
else
if key.start_with?('$[')
raise Fluent::ConfigError, "in chunk_keys: bracket notation is not allowed"
else
false
end
end
rescue => e
raise Fluent::ConfigError, "in chunk_keys: #{e.message}"
end
}
raise Fluent::ConfigError, "chunk_keys specification includes invalid char"
else
@chunk_key_accessors = Hash[@chunk_keys.map { |key| [key.to_sym, Fluent::PluginHelper::RecordAccessor::Accessor.new(key)] }]
end
if @chunk_key_time
raise Fluent::ConfigError, "<buffer ...> argument includes 'time', but timekey is not configured" unless @buffer_config.timekey
Fluent::Timezone.validate!(@buffer_config.timekey_zone)
@timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone
@timekey = @buffer_config.timekey
if @timekey <= 0
raise Fluent::ConfigError, "timekey should be greater than 0. current timekey: #{@timekey}"
end
@timekey_use_utc = @buffer_config.timekey_use_utc
@offset = Fluent::Timezone.utc_offset(@timekey_zone)
@calculate_offset = @offset.respond_to?(:call) ? @offset : nil
@output_time_formatter_cache = {}
end
if (@chunk_key_tag ? 1 : 0) + @chunk_keys.size >= CHUNKING_FIELD_WARN_NUM
log.warn "many chunk keys specified, and it may cause too many chunks on your system."
end
@simple_chunking = !@chunk_key_time && @chunk_keys.empty?
@flush_mode = @buffer_config.flush_mode
if @flush_mode == :default
if has_flush_interval
log.info "'flush_interval' is configured at out side of <buffer>. 'flush_mode' is set to 'interval' to keep existing behaviour"
@flush_mode = :interval
else
@flush_mode = (@chunk_key_time ? :lazy : :interval)
end
end
buffer_type = @buffer_config[:@type]
buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, [])
@buffer = Plugin.new_buffer(buffer_type, parent: self)
@buffer.configure(buffer_conf)
keep_buffer_config_compat
@buffer.enable_update_timekeys if @chunk_key_time
@flush_at_shutdown = @buffer_config.flush_at_shutdown
if @flush_at_shutdown.nil?
@flush_at_shutdown = if @buffer.persistent?
false
else
true end
elsif !@flush_at_shutdown && !@buffer.persistent?
buf_type = Plugin.lookup_type_from_class(@buffer.class)
log.warn "'flush_at_shutdown' is false, and buffer plugin '#{buf_type}' is not persistent buffer."
log.warn "your configuration will lose buffered data at shutdown. please confirm your configuration again."
end
if (@flush_mode != :interval) && buffer_conf.has_key?('flush_interval')
if buffer_conf.has_key?('flush_mode')
raise Fluent::ConfigError, "'flush_interval' can't be specified when 'flush_mode' is not 'interval' explicitly: '#{@flush_mode}'"
else
log.warn "'flush_interval' is ignored because default 'flush_mode' is not 'interval': '#{@flush_mode}'"
end
end
if @buffer.queued_chunks_limit_size.nil?
@buffer.queued_chunks_limit_size = @buffer_config.flush_thread_count
end
end
if @secondary_config
raise Fluent::ConfigError, "Invalid <secondary> section for non-buffered plugin" unless @buffering
raise Fluent::ConfigError, "<secondary> section cannot have <buffer> section" if @secondary_config.buffer
raise Fluent::ConfigError, "<secondary> section cannot have <secondary> section" if @secondary_config.secondary
if @buffer_config.retry_forever
log.warn "<secondary> with 'retry_forever', only unrecoverable errors are moved to secondary"
end
secondary_type = @secondary_config[:@type]
unless secondary_type
secondary_type = conf['@type'] end
secondary_conf = conf.elements(name: 'secondary').first
@secondary = Plugin.new_output(secondary_type)
unless @secondary.respond_to?(:acts_as_secondary)
raise Fluent::ConfigError, "Failed to setup secondary plugin in '#{conf['@type']}'. '#{secondary_type}' plugin in not allowed due to non buffered output"
end
@secondary.acts_as_secondary(self)
@secondary.configure(secondary_conf)
if (@secondary.class.to_s != "Fluent::Plugin::SecondaryFileOutput") &&
(self.class != @secondary.class) &&
(@custom_format || @secondary.implement?(:custom_format))
log.warn "Use different plugin for secondary. Check the plugin works with primary like secondary_file", primary: self.class.to_s, secondary: @secondary.class.to_s
end
else
@secondary = nil
end
self
end
|