Class: Fluent::Plugin::GelfOutput

Inherits:
Output
  • Object
show all
Includes:
GelfPluginUtil
Defined in:
lib/fluent/plugin/out_gelf.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
'memory'.freeze
DEFAULT_TIMEKEY =
5
DEFAULT_TIMEKEY_WAIT =
0
MAX_PAYLOAD_SIZE =

bytes

1000000

Constants included from GelfPluginUtil

GelfPluginUtil::LEVEL_MAP

Instance Method Summary collapse

Methods included from GelfPluginUtil

#make_gelfentry

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fluent/plugin/out_gelf.rb', line 42

def configure(conf)
  super(conf)

  # a destination hostname or IP address must be provided
  raise Fluent::ConfigError.new("'host' parameter (hostname or address of Graylog2 server) is required") unless conf.has_key?('host')

  # choose protocol to pass to gelf-rb Notifier constructor
  # (@protocol is used instead of conf['protocol'] to leverage config_param default)
  if @protocol == 'udp' then @proto = GELF::Protocol::UDP
  elsif @protocol == 'tcp' then @proto = GELF::Protocol::TCP
  else raise Fluent::ConfigError.new("'protocol' parameter should be either 'udp' (default) or 'tcp'")
  end
end

#format(tag, time, record) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/out_gelf.rb', line 79

def format(tag, time, record)
  if defined? Fluent::EventTime and time.is_a? Fluent::EventTime then
    timestamp = time.to_i + (time.nsec.to_f/1000000000).round(3)
  else
    timestamp = time.to_i
  end

  begin
    make_gelfentry(
      tag,timestamp,record,
      {
        :use_record_host => @use_record_host,
        :add_msec_time => @add_msec_time,
        :max_bytes => @max_bytes
      }
    ).to_msgpack
  rescue Exception => e
    log.error sprintf(
      'Error trying to serialize %s: %s',
      record.to_s.force_encoding('UTF-8'),
      e.message.to_s.force_encoding('UTF-8')
    )
  end
end

#formatted_to_msgpack_binaryObject



38
39
40
# File 'lib/fluent/plugin/out_gelf.rb', line 38

def formatted_to_msgpack_binary
  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


34
35
36
# File 'lib/fluent/plugin/out_gelf.rb', line 34

def multi_workers_ready?
  true
end

#shutdownObject



75
76
77
# File 'lib/fluent/plugin/out_gelf.rb', line 75

def shutdown
  super
end

#startObject



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/fluent/plugin/out_gelf.rb', line 56

def start
  super

  options = {:facility => 'fluentd', :protocol => @proto}

  # add tls key (tls_options) only when tls = True
  # see https://github.com/graylog-labs/gelf-rb/blob/72916932b789f7a6768c3cdd6ab69a3c942dbcef/lib/gelf/notifier.rb#L133-L140
  if @tls then
    options[:tls] = @tls_options
  end

  @conn = GELF::Notifier.new(@host, @port, 'WAN', options)

  # Errors are not coming from Ruby so we use direct mapping
  @conn.level_mapping = 'direct'
  # file and line from Ruby are in this class, not relevant
  @conn.collect_file_and_line = false
end

#write(chunk) ⇒ Object



104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/out_gelf.rb', line 104

def write(chunk)
  chunk.msgpack_each do |data|
    begin
      @conn.notify!(data)
    rescue Exception => e
      log.warn "failed to flush the buffer.", error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id
      raise e
    end
  end
end