Class: Fluent::FlumeOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_flume.rb

Instance Method Summary collapse

Constructor Details

#initializeFlumeOutput

Returns a new instance of FlumeOutput.



29
30
31
32
33
34
35
36
37
# File 'lib/fluent/plugin/out_flume.rb', line 29

def initialize
  require 'thrift'
  $:.unshift File.join(File.dirname(__FILE__), 'thrift')
  require 'flume_types'
  require 'flume_constants'
  require 'thrift_flume_event_server'

  super
end

Instance Method Details

#configure(conf) ⇒ Object



39
40
41
# File 'lib/fluent/plugin/out_flume.rb', line 39

def configure(conf)
  super
end

#format(tag, time, record) ⇒ Object



56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/out_flume.rb', line 56

def format(tag, time, record)
  if @remove_prefix and
      ( (tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or
        tag == @remove_prefix)
    [(tag[@removed_length..-1] || @default_category), time, record].to_msgpack
  else
    [tag, time, record].to_msgpack
  end
end

#shutdownObject



52
53
54
# File 'lib/fluent/plugin/out_flume.rb', line 52

def shutdown
  super
end

#startObject



43
44
45
46
47
48
49
50
# File 'lib/fluent/plugin/out_flume.rb', line 43

def start
  super

  if @remove_prefix
    @removed_prefix_string = @remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end
end

#write(chunk) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/fluent/plugin/out_flume.rb', line 66

def write(chunk)
  records = []
  chunk.msgpack_each { |arr|
    records << arr
  }

  transport = Thrift::Socket.new @host, @port, @timeout
  #transport = Thrift::FramedTransport.new socket
  #protocol = Thrift::BinaryProtocol.new transport, false, false
  protocol = Thrift::BinaryProtocol.new transport
  client = ThriftFlumeEventServer::Client.new protocol

  transport.open
  $log.debug "thrift client opend: #{client}"
  begin
    records.each { |r|
      tag, time, record = r
      entry = ThriftFlumeEvent.new(:body=>record.to_json.to_s.force_encoding('UTF-8'),
                                   :priority=>Priority::INFO,
                                   :timestamp=>time,
                                   :fieldss=>{'category'=>tag})
      client.append entry
    }
  ensure
    $log.debug "thrift client closing: #{client}"
    transport.close
  end
end