Class: LogStash::Outputs::Tcp

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::NormalizeConfigSupport
Defined in:
lib/logstash/outputs/tcp.rb

Overview

Write events over a TCP socket.

Each event json is separated by a newline.

Can either accept connections from clients or connect to a server, depending on ‘mode`.

Defined Under Namespace

Classes: Client

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Tcp

Returns a new instance of Tcp.



192
193
194
195
# File 'lib/logstash/outputs/tcp.rb', line 192

def initialize(*args)
  super(*args)
  setup_ssl_params!
end

Instance Method Details

#BaseObject



311
312
313
314
315
316
317
318
319
320
# File 'lib/logstash/outputs/tcp.rb', line 311

def close
  @closed.make_true
  @server_socket.close rescue nil if @server_socket

  return unless @client_threads
  @client_threads.each do |thread|
    client = thread[:client]
    client.close rescue nil if client
  end
end

#log_error(msg, e, backtrace: @logger.info?, **details) ⇒ Object



328
329
330
331
332
# File 'lib/logstash/outputs/tcp.rb', line 328

def log_error(msg, e, backtrace: @logger.info?, **details)
  details = details.merge message: e.message, exception: e.class
  details[:backtrace] = e.backtrace if backtrace
  @logger.error(msg, details)
end

#log_warn(msg, e, backtrace: @logger.debug?, **details) ⇒ Object



322
323
324
325
326
# File 'lib/logstash/outputs/tcp.rb', line 322

def log_warn(msg, e, backtrace: @logger.debug?, **details)
  details = details.merge message: e.message, exception: e.class
  details[:backtrace] = e.backtrace if backtrace
  @logger.warn(msg, details)
end

#BaseObject



306
307
308
# File 'lib/logstash/outputs/tcp.rb', line 306

def receive(event)
  @codec.encode(event)
end

#BaseObject



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/logstash/outputs/tcp.rb', line 198

def register
  require "socket"
  require "stud/try"

  validate_ssl_config!

  @closed = Concurrent::AtomicBoolean.new(false)
  @thread_no = Concurrent::AtomicFixnum.new(0)
  setup_ssl if @ssl_enabled

  if server?
    run_as_server
  else
    run_as_client
  end
end

#run_as_clientObject



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/logstash/outputs/tcp.rb', line 278

def run_as_client
  client_socket = nil
  @codec.on_event do |event, payload|
    begin
      client_socket = connect unless client_socket
      while payload && payload.bytesize > 0
        begin
          written_bytes_size = client_socket.write_nonblock(payload)
          payload = payload.byteslice(written_bytes_size..-1)
        rescue IO::WaitReadable
          IO.select([client_socket])
          retry
        rescue IO::WaitWritable
          IO.select(nil, [client_socket])
          retry
        end
      end
    rescue => e
      log_warn "client socket failed:", e, host: @host, port: @port, socket: (client_socket ? client_socket.to_s : nil)
      client_socket.close rescue nil
      client_socket = nil
      sleep @reconnect_interval
      retry
    end
  end
end

#run_as_serverObject



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/logstash/outputs/tcp.rb', line 215

def run_as_server
  @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
  begin
    @server_socket = TCPServer.new(@host, @port)
  rescue Errno::EADDRINUSE
    @logger.error("Could not start tcp server: Address in use", host: @host, port: @port)
    raise
  end
  if @ssl_enabled
    @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context)
  end # @ssl_enabled
  @client_threads = Concurrent::Array.new

  @accept_thread = Thread.new(@server_socket) do |server_socket|
    LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept")
    loop do
      break if @closed.value
      # OpenSSL::SSL::SSLServer does not support the #accept_nonblock method.
      # When SSL is enabled, it needs to use the blocking counterpart and ignore
      # SSLError errors, as they may be client's issues such as missing client's
      # certificates, ciphers, etc. If it's not rescued here, it would close the
      # TCP server and exit the plugin.
      # On the other hand, IOError should normally happen when the pipeline configuration
      # is reloaded, as the stream gets closed in the thread
      if @ssl_enabled
        begin
          client_socket = server_socket.accept
        rescue OpenSSL::SSL::SSLError => e
          log_warn("SSL Error", e)
          retry unless @closed.value
        rescue IOError => e
          log_warn("IO Error", e)
          retry unless @closed.value
        end
      else
        client_socket = server_socket.accept_nonblock exception: false
        if client_socket == :wait_readable
          IO.select [ server_socket ]
          next
        end
      end

      Thread.start(client_socket) do |client_socket|
        # monkeypatch a 'peer' method onto the socket.
        client_socket.extend(::LogStash::Util::SocketPeer)
        @logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}")
        client = Client.new(client_socket, self)
        Thread.current[:client] = client
        LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@thread_no.increment}")
        @client_threads << Thread.current
        client.run unless @closed.value
      end
    end
  end

  @codec.on_event do |event, payload|
    @client_threads.select!(&:alive?)
    @client_threads.each do |client_thread|
      client_thread[:client].write(payload)
    end
  end
end