Class: Swineherd::HadoopFileSystem

Inherits:
Object
  • Object
show all
Includes:
BaseFileSystem
Defined in:
lib/swineherd/filesystem/hadoopfilesystem.rb

Overview

Methods for dealing with hadoop distributed file system (hdfs). This class requires that you run with JRuby as it makes use of the native java hadoop libraries.

Defined Under Namespace

Classes: HadoopFile

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BaseFileSystem

#check_paths

Constructor Details

#initialize(*args) ⇒ HadoopFileSystem

Initialize a new hadoop file system, needs path to hadoop configuration



17
18
19
20
21
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 17

def initialize *args
  check_and_set_environment
  @conf = Java::org.apache.hadoop.conf.Configuration.new
  @hdfs = Java::org.apache.hadoop.fs.FileSystem.get(@conf)
end

Instance Attribute Details

#confObject

Returns the value of attribute conf.



12
13
14
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 12

def conf
  @conf
end

#hdfsObject

Returns the value of attribute hdfs.



12
13
14
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 12

def hdfs
  @hdfs
end

Instance Method Details

#bzip(input, output) ⇒ Object

BZIP



121
122
123
124
125
126
127
128
129
130
131
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 121

def bzip input, output
  system("#{@hadoop_home}/bin/hadoop \\
   jar         #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar     \\
   -D          mapred.output.compress=true                                  \\
   -D          mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec  \\
   -D          mapred.reduce.tasks=1                                        \\
   -mapper     \"/bin/cat\"                                                 \\
   -reducer    \"/bin/cat\"                                                 \\
   -input      \"#{input}\"                                                 \\
   -output     \"#{output}\"")
end

#check_and_set_environmentObject

Make sure environment is sane then set up environment for use



26
27
28
29
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 26

def check_and_set_environment
  check_env
  set_env
end

#check_envObject

Check that we are running with jruby, check for hadoop home. hadoop_home is preferentially set to the HADOOP_HOME environment variable if it’s set, ‘/usr/local/share/hadoop’ if HADOOP_HOME isn’t defined, and ‘/usr/lib/hadoop’ if ‘/usr/local/share/hadoop’ doesn’t exist. If all else fails inform the user that HADOOP_HOME really should be set.



271
272
273
274
275
276
277
278
279
280
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 271

def check_env
  begin
    require 'java'
  rescue LoadError => e
    raise "\nJava not found, are you sure you're running with JRuby?\n" + e.message
  end
  @hadoop_home = (ENV['HADOOP_HOME'] || '/usr/local/share/hadoop')
  @hadoop_home = '/usr/lib/hadoop' unless File.exist? @hadoop_home
  raise "\nHadoop installation not found, try setting HADOOP_HOME\n" unless File.exist? @hadoop_home
end

#close(*args) ⇒ Object



174
175
176
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 174

def close *args
  @hdfs.close
end

#copy_from_local(srcfile, dstfile) ⇒ Object

Copy local file to hdfs filesystem



170
171
172
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 170

def copy_from_local srcfile, dstfile
  @hdfs.copy_from_local_file(Path.new(srcfile), Path.new(dstfile))
end

#copy_to_local(srcfile, dstfile) ⇒ Object

Copy hdfs file to local filesystem



163
164
165
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 163

def copy_to_local srcfile, dstfile
  @hdfs.copy_to_local_file(Path.new(srcfile), Path.new(dstfile))
end

#cp(srcpath, dstpath) ⇒ Object



64
65
66
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 64

def cp srcpath, dstpath
  FileUtil.copy(@hdfs, Path.new(srcpath), @hdfs, Path.new(dstpath), false, @conf)
end

#dist_merge(inputs, output, options = {}) ⇒ Object

Merges many input files into :reduce_tasks amount of output files



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 136

def dist_merge inputs, output, options = {}
  options[:reduce_tasks]     ||= 25
  options[:partition_fields] ||= 2
  options[:sort_fields]      ||= 2
  options[:field_separator]  ||= '/t'
  names = inputs.map{|inp| File.basename(inp)}.join(',')
  cmd   = "#{@hadoop_home}/bin/hadoop \\
   jar         #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar                   \\
   -D          mapred.job.name=\"Swineherd Merge (#{names} -> #{output})\"               \\
   -D          num.key.fields.for.partition=\"#{options[:partition_fields]}\"            \\
   -D          stream.num.map.output.key.fields=\"#{options[:sort_fields]}\"             \\
   -D          mapred.text.key.partitioner.options=\"-k1,#{options[:partition_fields]}\" \\
   -D          stream.map.output.field.separator=\"'#{options[:field_separator]}'\"      \\
   -D          mapred.min.split.size=1000000000                                          \\
   -D          mapred.reduce.tasks=#{options[:reduce_tasks]}                             \\
   -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner                    \\
   -mapper     \"/bin/cat\"                                                              \\
   -reducer    \"/usr/bin/uniq\"                                                         \\
   -input      \"#{inputs.join(',')}\"                                                   \\
   -output     \"#{output}\""
  puts cmd
  system cmd
end

#entries(dirpath) ⇒ Object



88
89
90
91
92
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 88

def entries dirpath
  return unless type(dirpath) == "directory"
  list = @hdfs.list_status(Path.new(dirpath))
  list.map{|path| path.get_path.to_s} rescue []
end

#exists?(path) ⇒ Boolean

Returns:

  • (Boolean)


56
57
58
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 56

def exists? path
  @hdfs.exists(Path.new(path))
end

#lr(path) ⇒ Object

Recursively list paths



42
43
44
45
46
47
48
49
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 42

def lr path
  paths = entries(path)
  if (paths && !paths.empty?)
    paths.map{|e| lr(e)}.flatten
  else
    path
  end
end

#merge(srcdir, dstfile) ⇒ Object

Merge all part files in a directory into one file.



97
98
99
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 97

def merge srcdir, dstfile
  FileUtil.copy_merge(@hdfs, Path.new(srcdir), @hdfs, Path.new(dstfile), false, @conf, "")
end

#mkpath(path) ⇒ Object



68
69
70
71
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 68

def mkpath path
  @hdfs.mkdirs(Path.new(path))
  path
end

#mv(srcpath, dstpath) ⇒ Object



60
61
62
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 60

def mv srcpath, dstpath
  @hdfs.rename(Path.new(srcpath), Path.new(dstpath))
end

#open(path, mode = "r", &blk) ⇒ Object



31
32
33
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 31

def open path, mode="r", &blk
  HadoopFile.new(path,mode,self,&blk)
end

#rm(path) ⇒ Object



51
52
53
54
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 51

def rm path
  @hdfs.delete(Path.new(path), true)
  [path]
end

#set_envObject

Place hadoop jars in class path, require appropriate jars, set hadoop conf



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 285

def set_env
  require 'java'
  @hadoop_conf = (ENV['HADOOP_CONF_DIR'] || File.join(@hadoop_home, 'conf'))
  @hadoop_conf += "/" unless @hadoop_conf.end_with? "/"
  $CLASSPATH << @hadoop_conf
  Dir["#{@hadoop_home}/hadoop*.jar", "#{@hadoop_home}/lib/*.jar"].each{|jar| require jar}

  java_import 'org.apache.hadoop.conf.Configuration'
  java_import 'org.apache.hadoop.fs.Path'
  java_import 'org.apache.hadoop.fs.FileSystem'
  java_import 'org.apache.hadoop.fs.FileUtil'
  java_import 'org.apache.hadoop.mapreduce.lib.input.FileInputFormat'
  java_import 'org.apache.hadoop.mapreduce.lib.output.FileOutputFormat'
  java_import 'org.apache.hadoop.fs.FSDataOutputStream'
  java_import 'org.apache.hadoop.fs.FSDataInputStream'

end

#size(path) ⇒ Object



35
36
37
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 35

def size path
  lr(path).inject(0){|sz, f| sz += @hdfs.get_file_status(Path.new(f)).get_len}
end

#stream(input, output) ⇒ Object

This is hackety. Use with caution.



104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 104

def stream input, output
  require 'uri'
  input_fs_scheme  = URI.parse(input).scheme
  output_fs_scheme = URI.parse(output).scheme
  system("#{@hadoop_home}/bin/hadoop \\
   jar         #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar                     \\
   -D          mapred.job.name=\"Stream { #{input_fs_scheme}(#{File.basename(input)}) -> #{output_fs_scheme}(#{File.basename(output)}) }\" \\
   -D          mapred.min.split.size=1000000000                                            \\
   -D          mapred.reduce.tasks=0                                                       \\
   -mapper     \"/bin/cat\"                                                                \\
   -input      \"#{input}\"                                                                \\
   -output     \"#{output}\"")
end

#type(path) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 73

def type path
  return "unknown" unless exists? path
  status = @hdfs.get_file_status(Path.new(path))
  return "directory" if status.is_dir?
  "file"
  # case
  # when status.isFile then
  #   return "file"
  # when status.is_directory? then
  #   return "directory"
  # when status.is_symlink? then
  #   return "symlink"
  # end
end