Module: LogStash::Outputs::WebHdfsHelper
- Included in:
- WebHdfs
- Defined in:
- lib/logstash/outputs/webhdfs_helper.rb
Instance Method Summary collapse
-
#compress_gzip(data) ⇒ String
Compress data using the gzip methods.
-
#compress_snappy_file(data) ⇒ String
Compress snappy file.
- #compress_snappy_stream(data) ⇒ Object
- #get_snappy_header! ⇒ Object
-
#load_module(module_name) ⇒ Object
Load a module.
-
#prepare_client(host, port, username) ⇒ WebHDFS
Setup a WebHDFS client.
-
#test_client(client) ⇒ Object
Test client connection.
Instance Method Details
#compress_gzip(data) ⇒ String
Compress data using the gzip methods.
59 60 61 62 63 64 65 66 67 68 |
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 59 def compress_gzip(data) buffer = StringIO.new('','w') compressor = Zlib::GzipWriter.new(buffer) begin compressor.write(data) ensure compressor.close() end buffer.string end |
#compress_snappy_file(data) ⇒ String
Compress snappy file.
73 74 75 76 77 78 79 80 81 |
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 73 def compress_snappy_file(data) # Encode data to ASCII_8BIT (binary) data= data.encode(Encoding::ASCII_8BIT, "binary", :undef => :replace) buffer = StringIO.new('', 'w') buffer.set_encoding(Encoding::ASCII_8BIT) compressed = Snappy.deflate(data) buffer << [compressed.size, compressed].pack("Na*") buffer.string end |
#compress_snappy_stream(data) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 83 def compress_snappy_stream(data) # Encode data to ASCII_8BIT (binary) data= data.encode(Encoding::ASCII_8BIT, "binary", :undef => :replace) buffer = StringIO.new buffer.set_encoding(Encoding::ASCII_8BIT) chunks = data.scan(/.{1,#{@snappy_bufsize}}/m) chunks.each do |chunk| compressed = Snappy.deflate(chunk) buffer << [chunk.size, compressed.size, compressed].pack("NNa*") end return buffer.string end |
#get_snappy_header! ⇒ Object
96 97 98 |
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 96 def get_snappy_header! [MAGIC, DEFAULT_VERSION, MINIMUM_COMPATIBLE_VERSION].pack("a8NN") end |
#load_module(module_name) ⇒ Object
Load a module
10 11 12 13 14 15 16 17 |
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 10 def load_module(module_name) begin require module_name rescue LoadError @logger.error("Module #{module_name} could not be loaded.") raise end end |
#prepare_client(host, port, username) ⇒ WebHDFS
Setup a WebHDFS client
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 24 def prepare_client(host, port, username) client = WebHDFS::Client.new(host, port, username) if @use_kerberos_auth require 'gssapi' client.kerberos = true client.kerberos_keytab = @kerberos_keytab end if @use_ssl_auth require 'openssl' client.ssl = true client.ssl_key = OpenSSL::PKey::RSA.new(open(@ssl_key)) client.ssl_cert = OpenSSL::X509::Certificate.new(open(@ssl_cert)) end client.httpfs_mode = @use_httpfs client.open_timeout = @open_timeout client.read_timeout = @read_timeout client.retry_known_errors = @retry_known_errors client.retry_interval = @retry_interval if @retry_interval client.retry_times = @retry_times if @retry_times client end |
#test_client(client) ⇒ Object
Test client connection.
47 48 49 50 51 52 53 54 |
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 47 def test_client(client) begin client.list('/') rescue => e @logger.error("Webhdfs check request failed. (namenode: #{client.host}:#{client.port}, Exception: #{e.message})") raise end end |