Class: Fluent::GelfInput

Inherits:
Plugin::Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_gelf.rb

Constant Summary collapse

DEFAULT_PARSER =
'json'.freeze

Instance Method Summary collapse

Constructor Details

#initializeGelfInput

Returns a new instance of GelfInput.



32
33
34
# File 'lib/fluent/plugin/in_gelf.rb', line 32

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object



51
52
53
54
55
56
# File 'lib/fluent/plugin/in_gelf.rb', line 51

def configure(conf)
  compat_parameters_convert(conf, :parser)
  super

  @parser = parser_create
end

#emit(time, record) ⇒ Object



111
112
113
114
115
# File 'lib/fluent/plugin/in_gelf.rb', line 111

def emit(time, record)
  router.emit(@tag, time, record)
rescue => e
  log.error 'gelf failed to emit', error: e.to_s, error_class: e.class.to_s, tag: @tag, record: Yajl.dump(record)
end

#listenObject



98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/fluent/plugin/in_gelf.rb', line 98

def listen
  log.info "listening gelf socket on #{@bind}:#{@port} with #{@protocol_type}"
  if @protocol_type == :tcp
    server_create(:in_tcp_server, @port, bind: @bind) do |data, conn|
      receive_data(data, conn)
    end
  else
    server_create(:in_udp_server, @port, proto: :udp, bind: @bind, max_bytes: 8192) do |data, sock|
      receive_data(data, sock)
    end
  end
end

#receive_data(data, addr) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/in_gelf.rb', line 68

def receive_data(data, addr)
  begin
    msg = Gelfd2::Parser.parse(data)
  rescue => e
    log.warn 'Gelfd failed to parse a message', error: e.to_s
    log.warn_backtrace
  end

  # Gelfd parser will return nil if it received and parsed a non-final chunk
  return if msg.nil?

  @parser.parse(msg) { |time, record|
    unless time && record
      log.warn "pattern not match: #{msg.inspect}"
      return
    end

    # Use the recorded event time if available
    time = Fluent::EventTime.from_time(Time.at(record.delete('timestamp').to_f) ) if record.key?('timestamp')

    # Postprocess recorded event
    strip_leading_underscore_(record) if @strip_leading_underscore

    emit(time, record)
  }
rescue => e
  log.error data.dump, error: e.to_s
  log.error_backtrace
end

#shutdownObject



64
65
66
# File 'lib/fluent/plugin/in_gelf.rb', line 64

def shutdown
  super
end

#startObject



58
59
60
61
62
# File 'lib/fluent/plugin/in_gelf.rb', line 58

def start
  super

  listen
end