Class: LogStash::Inputs::Lumberjack

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/lumberjack.rb

Overview

Receive events using the Lumberjack protocol.

This input can be used to reliably and securely transport events between Logstash instances. To do so, use the <<plugins-outputs-lumberjack,lumberjack output plugin>> in the sending Logstash instance(s).

It can also be used to receive events from the deprecated github.com/elastic/logstash-forwarder[logstash-forwarder] tool that has been replaced by github.com/elastic/beats/tree/master/filebeat[Filebeat].

Constant Summary collapse

BUFFERED_QUEUE_SIZE =

TODO(sissel): Add CA to authenticate clients with.

1
RECONNECT_BACKOFF_SLEEP =
0.5

Instance Method Summary collapse

Instance Method Details

#create_event(fields, &block) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/logstash/inputs/lumberjack.rb', line 110

def create_event(fields, &block)
  line = fields.delete("line")

  @codec.decode(line, identity(fields)) do |event|
    decorate(event)
    fields.each do |k,v|
      v.force_encoding(Encoding::UTF_8)
      event.set(k,v)
    end
    block.call(event)
  end
end

#registerObject



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/logstash/inputs/lumberjack.rb', line 47

def register
  require "lumberjack/server"
  require "concurrent"
  require "logstash/circuit_breaker"
  require "logstash/sized_queue_timeout"

  @logger.info("Starting lumberjack input listener", :address => "#{@host}:#{@port}")
  @lumberjack = Lumberjack::Server.new(:address => @host, :port => @port,
    :ssl_certificate => @ssl_certificate, :ssl_key => @ssl_key,
    :ssl_key_passphrase => @ssl_key_passphrase)

  # Create a reusable threadpool, we do not limit the number of connections
  # to the input, the circuit breaker with the timeout should take care 
  # of `blocked` threads and prevent logstash to go oom.
  @threadpool = Concurrent::CachedThreadPool.new(:idletime => 15)

  # in 1.5 the main SizeQueue doesnt have the concept of timeout
  # We are using a small plugin buffer to move events to the internal queue
  @buffered_queue = LogStash::SizedQueueTimeout.new(BUFFERED_QUEUE_SIZE)

  @circuit_breaker = LogStash::CircuitBreaker.new("Lumberjack input",
                          :exceptions => [LogStash::SizedQueueTimeout::TimeoutError])

  @codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
end

#run(output_queue) ⇒ Object

def register



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/logstash/inputs/lumberjack.rb', line 73

def run(output_queue)
  @output_queue = output_queue
  start_buffer_broker

  @codec.eviction_block(method(:flush_event))

  # Accepting new events coming from LSF
  while !stop? do
    # Wrapping the accept call into a CircuitBreaker
    if @circuit_breaker.closed?
      connection = @lumberjack.accept # call that creates a new connection
      next if connection.nil? # if the connection is nil the connection was close.
      invoke(connection) do |event|
        if stop?
          connection.close
          break
        end

        @circuit_breaker.execute { @buffered_queue.push(event, @congestion_threshold) }
      end
    else
      @logger.warn("Lumberjack input: the pipeline is blocked, temporary refusing new connection.")
      sleep(RECONNECT_BACKOFF_SLEEP)
    end
  end
end

#stopObject



101
102
103
104
# File 'lib/logstash/inputs/lumberjack.rb', line 101

def stop
  @lumberjack.close
  @codec.flush { |event| flush_event(event) }
end