Class: LogStash::Inputs::Lumberjack
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Lumberjack
- Defined in:
- lib/logstash/inputs/lumberjack.rb
Overview
Receive events using the Lumberjack protocol.
This input can be used to reliably and securely transport events between Logstash instances. To do so, use the <<plugins-outputs-lumberjack,lumberjack output plugin>> in the sending Logstash instance(s).
It can also be used to receive events from the deprecated github.com/elastic/logstash-forwarder[logstash-forwarder] tool that has been replaced by github.com/elastic/beats/tree/master/filebeat[Filebeat].
Constant Summary collapse
- BUFFERED_QUEUE_SIZE =
TODO(sissel): Add CA to authenticate clients with.
1
- RECONNECT_BACKOFF_SLEEP =
0.5
Instance Method Summary collapse
- #create_event(fields, &block) ⇒ Object
- #register ⇒ Object
-
#run(output_queue) ⇒ Object
def register.
- #stop ⇒ Object
Instance Method Details
#create_event(fields, &block) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/logstash/inputs/lumberjack.rb', line 110 def create_event(fields, &block) line = fields.delete("line") @codec.decode(line, identity(fields)) do |event| decorate(event) fields.each do |k,v| v.force_encoding(Encoding::UTF_8) event.set(k,v) end block.call(event) end end |
#register ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/logstash/inputs/lumberjack.rb', line 47 def register require "lumberjack/server" require "concurrent" require "logstash/circuit_breaker" require "logstash/sized_queue_timeout" @logger.info("Starting lumberjack input listener", :address => "#{@host}:#{@port}") @lumberjack = Lumberjack::Server.new(:address => @host, :port => @port, :ssl_certificate => @ssl_certificate, :ssl_key => @ssl_key, :ssl_key_passphrase => @ssl_key_passphrase) # Create a reusable threadpool, we do not limit the number of connections # to the input, the circuit breaker with the timeout should take care # of `blocked` threads and prevent logstash to go oom. @threadpool = Concurrent::CachedThreadPool.new(:idletime => 15) # in 1.5 the main SizeQueue doesnt have the concept of timeout # We are using a small plugin buffer to move events to the internal queue @buffered_queue = LogStash::SizedQueueTimeout.new(BUFFERED_QUEUE_SIZE) @circuit_breaker = LogStash::CircuitBreaker.new("Lumberjack input", :exceptions => [LogStash::SizedQueueTimeout::TimeoutError]) @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) end |
#run(output_queue) ⇒ Object
def register
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/logstash/inputs/lumberjack.rb', line 73 def run(output_queue) @output_queue = output_queue start_buffer_broker @codec.eviction_block(method(:flush_event)) # Accepting new events coming from LSF while !stop? do # Wrapping the accept call into a CircuitBreaker if @circuit_breaker.closed? connection = @lumberjack.accept # call that creates a new connection next if connection.nil? # if the connection is nil the connection was close. invoke(connection) do |event| if stop? connection.close break end @circuit_breaker.execute { @buffered_queue.push(event, @congestion_threshold) } end else @logger.warn("Lumberjack input: the pipeline is blocked, temporary refusing new connection.") sleep(RECONNECT_BACKOFF_SLEEP) end end end |
#stop ⇒ Object
101 102 103 104 |
# File 'lib/logstash/inputs/lumberjack.rb', line 101 def stop @lumberjack.close @codec.flush { |event| flush_event(event) } end |