Class: Swineherd::HadoopFileSystem
- Inherits:
-
Object
- Object
- Swineherd::HadoopFileSystem
- Includes:
- BaseFileSystem
- Defined in:
- lib/swineherd/filesystem/hadoopfilesystem.rb
Overview
Methods for dealing with hadoop distributed file system (hdfs). This class requires that you run with JRuby as it makes use of the native java hadoop libraries.
Defined Under Namespace
Classes: HadoopFile
Instance Attribute Summary collapse
-
#conf ⇒ Object
Returns the value of attribute conf.
-
#hdfs ⇒ Object
Returns the value of attribute hdfs.
Instance Method Summary collapse
-
#bzip(input, output) ⇒ Object
BZIP.
-
#check_and_set_environment ⇒ Object
Make sure environment is sane then set up environment for use.
-
#check_env ⇒ Object
Check that we are running with jruby, check for hadoop home.
- #close(*args) ⇒ Object
-
#copy_from_local(srcfile, dstfile) ⇒ Object
Copy local file to hdfs filesystem.
-
#copy_to_local(srcfile, dstfile) ⇒ Object
Copy hdfs file to local filesystem.
- #cp(srcpath, dstpath) ⇒ Object
-
#dist_merge(inputs, output, options = {}) ⇒ Object
Merges many input files into :reduce_tasks amount of output files.
- #entries(dirpath) ⇒ Object
- #exists?(path) ⇒ Boolean
-
#initialize(*args) ⇒ HadoopFileSystem
constructor
Initialize a new hadoop file system, needs path to hadoop configuration.
-
#lr(path) ⇒ Object
Recursively list paths.
-
#merge(srcdir, dstfile) ⇒ Object
Merge all part files in a directory into one file.
- #mkpath(path) ⇒ Object
- #mv(srcpath, dstpath) ⇒ Object
- #open(path, mode = "r", &blk) ⇒ Object
- #rm(path) ⇒ Object
-
#set_env ⇒ Object
Place hadoop jars in class path, require appropriate jars, set hadoop conf.
- #size(path) ⇒ Object
-
#stream(input, output) ⇒ Object
This is hackety.
- #type(path) ⇒ Object
Methods included from BaseFileSystem
Constructor Details
#initialize(*args) ⇒ HadoopFileSystem
Initialize a new hadoop file system, needs path to hadoop configuration
17 18 19 20 21 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 17 def initialize *args check_and_set_environment @conf = Java::org.apache.hadoop.conf.Configuration.new @hdfs = Java::org.apache.hadoop.fs.FileSystem.get(@conf) end |
Instance Attribute Details
#conf ⇒ Object
Returns the value of attribute conf.
12 13 14 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 12 def conf @conf end |
#hdfs ⇒ Object
Returns the value of attribute hdfs.
12 13 14 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 12 def hdfs @hdfs end |
Instance Method Details
#bzip(input, output) ⇒ Object
BZIP
121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 121 def bzip input, output system("#{@hadoop_home}/bin/hadoop \\ jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\ -D mapred.output.compress=true \\ -D mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec \\ -D mapred.reduce.tasks=1 \\ -mapper \"/bin/cat\" \\ -reducer \"/bin/cat\" \\ -input \"#{input}\" \\ -output \"#{output}\"") end |
#check_and_set_environment ⇒ Object
Make sure environment is sane then set up environment for use
26 27 28 29 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 26 def check_and_set_environment check_env set_env end |
#check_env ⇒ Object
Check that we are running with jruby, check for hadoop home. hadoop_home is preferentially set to the HADOOP_HOME environment variable if it’s set, ‘/usr/local/share/hadoop’ if HADOOP_HOME isn’t defined, and ‘/usr/lib/hadoop’ if ‘/usr/local/share/hadoop’ doesn’t exist. If all else fails inform the user that HADOOP_HOME really should be set.
271 272 273 274 275 276 277 278 279 280 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 271 def check_env begin require 'java' rescue LoadError => e raise "\nJava not found, are you sure you're running with JRuby?\n" + e. end @hadoop_home = (ENV['HADOOP_HOME'] || '/usr/local/share/hadoop') @hadoop_home = '/usr/lib/hadoop' unless File.exist? @hadoop_home raise "\nHadoop installation not found, try setting HADOOP_HOME\n" unless File.exist? @hadoop_home end |
#close(*args) ⇒ Object
174 175 176 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 174 def close *args @hdfs.close end |
#copy_from_local(srcfile, dstfile) ⇒ Object
Copy local file to hdfs filesystem
170 171 172 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 170 def copy_from_local srcfile, dstfile @hdfs.copy_from_local_file(Path.new(srcfile), Path.new(dstfile)) end |
#copy_to_local(srcfile, dstfile) ⇒ Object
Copy hdfs file to local filesystem
163 164 165 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 163 def copy_to_local srcfile, dstfile @hdfs.copy_to_local_file(Path.new(srcfile), Path.new(dstfile)) end |
#cp(srcpath, dstpath) ⇒ Object
64 65 66 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 64 def cp srcpath, dstpath FileUtil.copy(@hdfs, Path.new(srcpath), @hdfs, Path.new(dstpath), false, @conf) end |
#dist_merge(inputs, output, options = {}) ⇒ Object
Merges many input files into :reduce_tasks amount of output files
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 136 def dist_merge inputs, output, = {} [:reduce_tasks] ||= 25 [:partition_fields] ||= 2 [:sort_fields] ||= 2 [:field_separator] ||= '/t' names = inputs.map{|inp| File.basename(inp)}.join(',') cmd = "#{@hadoop_home}/bin/hadoop \\ jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\ -D mapred.job.name=\"Swineherd Merge (#{names} -> #{output})\" \\ -D num.key.fields.for.partition=\"#{[:partition_fields]}\" \\ -D stream.num.map.output.key.fields=\"#{[:sort_fields]}\" \\ -D mapred.text.key.partitioner.options=\"-k1,#{[:partition_fields]}\" \\ -D stream.map.output.field.separator=\"'#{[:field_separator]}'\" \\ -D mapred.min.split.size=1000000000 \\ -D mapred.reduce.tasks=#{[:reduce_tasks]} \\ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \\ -mapper \"/bin/cat\" \\ -reducer \"/usr/bin/uniq\" \\ -input \"#{inputs.join(',')}\" \\ -output \"#{output}\"" puts cmd system cmd end |
#entries(dirpath) ⇒ Object
88 89 90 91 92 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 88 def entries dirpath return unless type(dirpath) == "directory" list = @hdfs.list_status(Path.new(dirpath)) list.map{|path| path.get_path.to_s} rescue [] end |
#exists?(path) ⇒ Boolean
56 57 58 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 56 def exists? path @hdfs.exists(Path.new(path)) end |
#lr(path) ⇒ Object
Recursively list paths
42 43 44 45 46 47 48 49 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 42 def lr path paths = entries(path) if (paths && !paths.empty?) paths.map{|e| lr(e)}.flatten else path end end |
#merge(srcdir, dstfile) ⇒ Object
Merge all part files in a directory into one file.
97 98 99 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 97 def merge srcdir, dstfile FileUtil.copy_merge(@hdfs, Path.new(srcdir), @hdfs, Path.new(dstfile), false, @conf, "") end |
#mkpath(path) ⇒ Object
68 69 70 71 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 68 def mkpath path @hdfs.mkdirs(Path.new(path)) path end |
#mv(srcpath, dstpath) ⇒ Object
60 61 62 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 60 def mv srcpath, dstpath @hdfs.rename(Path.new(srcpath), Path.new(dstpath)) end |
#open(path, mode = "r", &blk) ⇒ Object
31 32 33 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 31 def open path, mode="r", &blk HadoopFile.new(path,mode,self,&blk) end |
#rm(path) ⇒ Object
51 52 53 54 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 51 def rm path @hdfs.delete(Path.new(path), true) [path] end |
#set_env ⇒ Object
Place hadoop jars in class path, require appropriate jars, set hadoop conf
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 285 def set_env require 'java' @hadoop_conf = (ENV['HADOOP_CONF_DIR'] || File.join(@hadoop_home, 'conf')) @hadoop_conf += "/" unless @hadoop_conf.end_with? "/" $CLASSPATH << @hadoop_conf Dir["#{@hadoop_home}/hadoop*.jar", "#{@hadoop_home}/lib/*.jar"].each{|jar| require jar} java_import 'org.apache.hadoop.conf.Configuration' java_import 'org.apache.hadoop.fs.Path' java_import 'org.apache.hadoop.fs.FileSystem' java_import 'org.apache.hadoop.fs.FileUtil' java_import 'org.apache.hadoop.mapreduce.lib.input.FileInputFormat' java_import 'org.apache.hadoop.mapreduce.lib.output.FileOutputFormat' java_import 'org.apache.hadoop.fs.FSDataOutputStream' java_import 'org.apache.hadoop.fs.FSDataInputStream' end |
#size(path) ⇒ Object
35 36 37 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 35 def size path lr(path).inject(0){|sz, f| sz += @hdfs.get_file_status(Path.new(f)).get_len} end |
#stream(input, output) ⇒ Object
This is hackety. Use with caution.
104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 104 def stream input, output require 'uri' input_fs_scheme = URI.parse(input).scheme output_fs_scheme = URI.parse(output).scheme system("#{@hadoop_home}/bin/hadoop \\ jar #{@hadoop_home}/contrib/streaming/hadoop-*streaming*.jar \\ -D mapred.job.name=\"Stream { #{input_fs_scheme}(#{File.basename(input)}) -> #{output_fs_scheme}(#{File.basename(output)}) }\" \\ -D mapred.min.split.size=1000000000 \\ -D mapred.reduce.tasks=0 \\ -mapper \"/bin/cat\" \\ -input \"#{input}\" \\ -output \"#{output}\"") end |
#type(path) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/swineherd/filesystem/hadoopfilesystem.rb', line 73 def type path return "unknown" unless exists? path status = @hdfs.get_file_status(Path.new(path)) return "directory" if status.is_dir? "file" # case # when status.isFile then # return "file" # when status.is_directory? then # return "directory" # when status.is_symlink? then # return "symlink" # end end |