Class: Fluent::FlumeOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::FlumeOutput
- Defined in:
- lib/fluent/plugin/out_flume.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ FlumeOutput
constructor
A new instance of FlumeOutput.
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ FlumeOutput
Returns a new instance of FlumeOutput.
38 39 40 41 42 43 44 45 |
# File 'lib/fluent/plugin/out_flume.rb', line 38 def initialize require 'thrift' require 'fluent/plugin/thrift/flume_types' $:.unshift File.join(File.dirname(__FILE__), 'thrift') require 'flume_constants' require 'thrift_source_protocol' super end |
Instance Method Details
#configure(conf) ⇒ Object
47 48 49 50 51 52 53 54 55 |
# File 'lib/fluent/plugin/out_flume.rb', line 47 def configure(conf) # override default buffer_chunk_limit conf['buffer_chunk_limit'] ||= '1m' super @formatter = Plugin.new_formatter(@format) @formatter.configure(conf) end |
#emit(tag, es, chain) ⇒ Object
66 67 68 69 70 71 72 |
# File 'lib/fluent/plugin/out_flume.rb', line 66 def emit(tag, es, chain) if @remove_prefix and ((tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or tag == @remove_prefix) tag = (tag[@removed_length..-1] || @default_category) end super(tag, es, chain, tag) end |
#format(tag, time, record) ⇒ Object
74 75 76 77 78 |
# File 'lib/fluent/plugin/out_flume.rb', line 74 def format(tag, time, record) fr = @formatter.format(tag, time, record) fr.chomp! if @trim_nl [time, fr].to_msgpack end |
#start ⇒ Object
57 58 59 60 61 62 63 64 |
# File 'lib/fluent/plugin/out_flume.rb', line 57 def start super if @remove_prefix @removed_prefix_string = @remove_prefix + '.' @removed_length = @removed_prefix_string.length end end |
#write(chunk) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/fluent/plugin/out_flume.rb', line 80 def write(chunk) socket = Thrift::Socket.new @host, @port, @timeout transport = Thrift::FramedTransport.new socket #protocol = Thrift::BinaryProtocol.new transport, false, false protocol = Thrift::CompactProtocol.new transport client = ThriftSourceProtocol::Client.new protocol tag = chunk.key count = 0 header = {} transport.open log.debug "thrift client opened: #{client}" begin chunk.msgpack_each { |time, record| header['timestamp'.freeze] = time.to_s header['tag'.freeze] = tag entry = ThriftFlumeEvent.new(:body => record.force_encoding('UTF-8'), :headers => header) client.append entry count += 1 } log.debug "Writing #{count} entries to flume" ensure log.debug "thrift client closing: #{client}" transport.close end end |