Class: Fluent::GelfInput
- Inherits:
-
Plugin::Input
- Object
- Plugin::Input
- Fluent::GelfInput
- Defined in:
- lib/fluent/plugin/in_gelf.rb
Constant Summary collapse
- DEFAULT_PARSER =
'json'.freeze
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(time, record) ⇒ Object
-
#initialize ⇒ GelfInput
constructor
A new instance of GelfInput.
- #listen ⇒ Object
- #receive_data(data, addr) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ GelfInput
Returns a new instance of GelfInput.
32 33 34 |
# File 'lib/fluent/plugin/in_gelf.rb', line 32 def initialize super end |
Instance Method Details
#configure(conf) ⇒ Object
51 52 53 54 55 56 |
# File 'lib/fluent/plugin/in_gelf.rb', line 51 def configure(conf) compat_parameters_convert(conf, :parser) super @parser = parser_create end |
#emit(time, record) ⇒ Object
111 112 113 114 115 |
# File 'lib/fluent/plugin/in_gelf.rb', line 111 def emit(time, record) router.emit(@tag, time, record) rescue => e log.error 'gelf failed to emit', error: e.to_s, error_class: e.class.to_s, tag: @tag, record: Yajl.dump(record) end |
#listen ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/fluent/plugin/in_gelf.rb', line 98 def listen log.info "listening gelf socket on #{@bind}:#{@port} with #{@protocol_type}" if @protocol_type == :tcp server_create(:in_tcp_server, @port, bind: @bind) do |data, conn| receive_data(data, conn) end else server_create(:in_udp_server, @port, proto: :udp, bind: @bind, max_bytes: 8192) do |data, sock| receive_data(data, sock) end end end |
#receive_data(data, addr) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/fluent/plugin/in_gelf.rb', line 68 def receive_data(data, addr) begin msg = Gelfd2::Parser.parse(data) rescue => e log.warn 'Gelfd failed to parse a message', error: e.to_s log.warn_backtrace end # Gelfd parser will return nil if it received and parsed a non-final chunk return if msg.nil? @parser.parse(msg) { |time, record| unless time && record log.warn "pattern not match: #{msg.inspect}" return end # Use the recorded event time if available time = Fluent::EventTime.from_time(Time.at(record.delete('timestamp').to_f) ) if record.key?('timestamp') # Postprocess recorded event strip_leading_underscore_(record) if @strip_leading_underscore emit(time, record) } rescue => e log.error data.dump, error: e.to_s log.error_backtrace end |
#shutdown ⇒ Object
64 65 66 |
# File 'lib/fluent/plugin/in_gelf.rb', line 64 def shutdown super end |
#start ⇒ Object
58 59 60 61 62 |
# File 'lib/fluent/plugin/in_gelf.rb', line 58 def start super listen end |