Class: Fluent::FlumeOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::FlumeOutput
- Defined in:
- lib/fluent/plugin/out_flume.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ FlumeOutput
constructor
A new instance of FlumeOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ FlumeOutput
Returns a new instance of FlumeOutput.
29 30 31 32 33 34 35 36 37 |
# File 'lib/fluent/plugin/out_flume.rb', line 29 def initialize require 'thrift' $:.unshift File.join(File.dirname(__FILE__), 'thrift') require 'flume_types' require 'flume_constants' require 'thrift_flume_event_server' super end |
Instance Method Details
#configure(conf) ⇒ Object
39 40 41 |
# File 'lib/fluent/plugin/out_flume.rb', line 39 def configure(conf) super end |
#format(tag, time, record) ⇒ Object
56 57 58 59 60 61 62 63 64 |
# File 'lib/fluent/plugin/out_flume.rb', line 56 def format(tag, time, record) if @remove_prefix and ( (tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or tag == @remove_prefix) [(tag[@removed_length..-1] || @default_category), time, record].to_msgpack else [tag, time, record].to_msgpack end end |
#shutdown ⇒ Object
52 53 54 |
# File 'lib/fluent/plugin/out_flume.rb', line 52 def shutdown super end |
#start ⇒ Object
43 44 45 46 47 48 49 50 |
# File 'lib/fluent/plugin/out_flume.rb', line 43 def start super if @remove_prefix @removed_prefix_string = @remove_prefix + '.' @removed_length = @removed_prefix_string.length end end |
#write(chunk) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/fluent/plugin/out_flume.rb', line 66 def write(chunk) records = [] chunk.msgpack_each { |arr| records << arr } transport = Thrift::Socket.new @host, @port, @timeout #transport = Thrift::FramedTransport.new socket #protocol = Thrift::BinaryProtocol.new transport, false, false protocol = Thrift::BinaryProtocol.new transport client = ThriftFlumeEventServer::Client.new protocol transport.open $log.debug "thrift client opend: #{client}" begin records.each { |r| tag, time, record = r entry = ThriftFlumeEvent.new(:body=>record.to_json.to_s.force_encoding('UTF-8'), :priority=>Priority::INFO, :timestamp=>time, :fieldss=>{'category'=>tag}) client.append entry } ensure $log.debug "thrift client closing: #{client}" transport.close end end |