Class: LogStash::Outputs::Thinkingdata
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::Thinkingdata
- Includes:
- PluginMixins::HttpClient, Stud::Buffer
- Defined in:
- lib/logstash/outputs/thinkingdata.rb
Overview
An thinkingdata output that does nothing.
Constant Summary collapse
- PLUGIN_VERSION =
"1.1.1"
Instance Attribute Summary collapse
-
#buffer_state ⇒ Object
Returns the value of attribute buffer_state.
Instance Method Summary collapse
Instance Attribute Details
#buffer_state ⇒ Object
Returns the value of attribute buffer_state.
16 17 18 |
# File 'lib/logstash/outputs/thinkingdata.rb', line 16 def buffer_state @buffer_state end |
Instance Method Details
#close ⇒ Object
132 133 134 135 136 137 138 |
# File 'lib/logstash/outputs/thinkingdata.rb', line 132 def close buffer_state[:timer].kill buffer_flush(:final => true) @report_thread.kill @client.close report end |
#flush(events, final) ⇒ Object
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/logstash/outputs/thinkingdata.rb', line 142 def flush(events, final) events = data_valid(events) data = events.to_json if @compress == 0 compress_type = 'none' else gz = StringIO.new("w") gz.set_encoding("BINARY") z = Zlib::GzipWriter.new(gz) z.write(data) z.close data = gz.string compress_type = 'gzip' end if @appid.nil? || @appid.empty? headers = {'custom_appid' => 'true', 'version' => PLUGIN_VERSION, 'user-agent' => 'logstash_' + PLUGIN_VERSION, 'compress' => compress_type, 'TA-Integration-Type' => 'logstash', 'TA-Integration-Version' => PLUGIN_VERSION, 'TA-Integration-Count' => events.length.to_s} else headers = {'appid' => @appid, 'version' => PLUGIN_VERSION, 'user-agent' => 'logstash_' + PLUGIN_VERSION, 'compress' => compress_type, 'TA-Integration-Type' => 'logstash', 'TA-Integration-Version' => PLUGIN_VERSION, 'TA-Integration-Count' => events.length.to_s} end until do_send(data, headers) sleep 5 end @total_send_count += events.length end |
#multi_receive(events) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/logstash/outputs/thinkingdata.rb', line 104 def multi_receive(events) return if events.empty? @receive_count += events.length events.each do |event| begin content = JSON.parse(event.get("message")) content['#uuid'] = SecureRandom.uuid if @uuid if is_filebeat_input?(event) #filebeat input 记录 host = event.get("[host][name]") file = event.get("[log][file][path]") file = event.get("[source]") if file.nil? offset = event.get("[log][offset]") offset = event.get("[offset]") if offset.nil? log_detail = "host: #{host}, file: #{file}" record_filebeat_status(log_detail, offset) if @is_filebeat_status_record end buffer_receive(content) rescue => e @logger.error("Could not process content", :content => event.to_s, :Exception => e) @parse_error_count += 1 end end end |
#register ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/logstash/outputs/thinkingdata.rb', line 49 def register @logger.info("Registering thinkingdata Output", :url => @url, :appid => @appid, :flush_interval_sec => @flush_interval_sec, :flush_batch_size => @flush_batch_size, :compress => @compress, :uuid => @uuid, :is_filebeat_status_record => @is_filebeat_status_record, :appid_check => @appid_check ) http_client_config = client_config http_client_config[:user_agent] = "thinkingdata_logstash_output_plugin_" + PLUGIN_VERSION @client = Manticore::Client.new(http_client_config) ta_appid_check if @appid_check && !@appid.nil? && !@appid.empty? @receive_count = 0 @parse_error_count = 0 @last_report_count = 0 @total_send_count = 0 buffer_config = { :max_items => @flush_batch_size.to_i, :max_interval => @flush_interval_sec.to_i, :logger => @logger } buffer_initialize(buffer_config) @filebeat_status = {} if @is_filebeat_status_record @report_thread = Thread.new do loop do sleep 60 report end end end |