Module: LogStash::Outputs::WebHdfsHelper

Included in:
WebHdfs
Defined in:
lib/logstash/outputs/webhdfs_helper.rb

Instance Method Summary collapse

Instance Method Details

#compress_gzip(data) ⇒ String

Compress data using the gzip methods.

Parameters:

  • data (String)

    stream of data to be compressed

Returns:

  • (String)

    the compressed stream of data



59
60
61
62
63
64
65
66
67
68
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 59

def compress_gzip(data)
  buffer = StringIO.new('','w')
  compressor = Zlib::GzipWriter.new(buffer)
  begin
    compressor.write(data)
  ensure
    compressor.close()
  end
  buffer.string
end

#compress_snappy_file(data) ⇒ String

Compress snappy file.

Parameters:

  • data (binary)

    stream of data to be compressed

Returns:

  • (String)

    the compressed stream of data



73
74
75
76
77
78
79
80
81
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 73

def compress_snappy_file(data)
  # Encode data to ASCII_8BIT (binary)
  data= data.encode(Encoding::ASCII_8BIT, "binary", :undef => :replace)
  buffer = StringIO.new('', 'w')
  buffer.set_encoding(Encoding::ASCII_8BIT)
  compressed = Snappy.deflate(data)
  buffer << [compressed.size, compressed].pack("Na*")
  buffer.string
end

#compress_snappy_stream(data) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 83

def compress_snappy_stream(data)
  # Encode data to ASCII_8BIT (binary)
  data= data.encode(Encoding::ASCII_8BIT, "binary", :undef => :replace)
  buffer = StringIO.new
  buffer.set_encoding(Encoding::ASCII_8BIT)
  chunks = data.scan(/.{1,#{@snappy_bufsize}}/m)
  chunks.each do |chunk|
    compressed = Snappy.deflate(chunk)
    buffer << [chunk.size, compressed.size, compressed].pack("NNa*")
  end
  return buffer.string
end

#get_snappy_header!Object



96
97
98
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 96

def get_snappy_header!
  [MAGIC, DEFAULT_VERSION, MINIMUM_COMPATIBLE_VERSION].pack("a8NN")
end

#load_module(module_name) ⇒ Object

Load a module

Parameters:

  • module_name (String)

    A module name

Raises:

  • (LoadError)

    If the module count not be loaded



10
11
12
13
14
15
16
17
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 10

def load_module(module_name)
  begin
    require module_name
  rescue LoadError
    @logger.error("Module #{module_name} could not be loaded.")
    raise
  end
end

#prepare_client(host, port, username) ⇒ WebHDFS

Setup a WebHDFS client

Parameters:

  • host (String)

    The WebHDFS location

  • port (Number)

    The port used to do the communication

  • username (String)

    A valid HDFS user

Returns:

  • (WebHDFS)

    An setup client instance



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 24

def prepare_client(host, port, username)
  client = WebHDFS::Client.new(host, port, username)
  if @use_kerberos_auth
    require 'gssapi'
    client.kerberos = true
    client.kerberos_keytab = @kerberos_keytab
  end
  if @use_ssl_auth
    require 'openssl'
    client.ssl = true
    client.ssl_key = OpenSSL::PKey::RSA.new(open(@ssl_key))
    client.ssl_cert = OpenSSL::X509::Certificate.new(open(@ssl_cert))
  end
  client.httpfs_mode = @use_httpfs
  client.open_timeout = @open_timeout
  client.read_timeout = @read_timeout
  client.retry_known_errors = @retry_known_errors
  client.retry_interval = @retry_interval if @retry_interval
  client.retry_times = @retry_times if @retry_times
  client
end

#test_client(client) ⇒ Object

Test client connection.

Parameters:

  • client (WebHDFS)

    webhdfs client object.



47
48
49
50
51
52
53
54
# File 'lib/logstash/outputs/webhdfs_helper.rb', line 47

def test_client(client)
  begin
    client.list('/')
  rescue => e
    @logger.error("Webhdfs check request failed. (namenode: #{client.host}:#{client.port}, Exception: #{e.message})")
    raise
  end
end