Class: Hasta::EmrJobDefinition
- Inherits:
-
Object
- Object
- Hasta::EmrJobDefinition
- Extended by:
- Forwardable
- Defined in:
- lib/hasta/emr_job_definition.rb
Overview
Defines the EMR job that is being tested
Class Method Summary collapse
Instance Method Summary collapse
- #data_sink ⇒ Object
- #data_sources ⇒ Object
- #env ⇒ Object
-
#initialize(emr_node) ⇒ EmrJobDefinition
constructor
A new instance of EmrJobDefinition.
- #input_paths ⇒ Object
- #mapper ⇒ Object
- #output_path ⇒ Object
- #reducer ⇒ Object
- #ruby_files ⇒ Object
Constructor Details
#initialize(emr_node) ⇒ EmrJobDefinition
Returns a new instance of EmrJobDefinition.
32 33 34 |
# File 'lib/hasta/emr_job_definition.rb', line 32 def initialize(emr_node) @emr_node = emr_node end |
Class Method Details
.load(file_path, id, scheduled_start_time = Time.now) ⇒ Object
21 22 23 24 25 26 27 28 |
# File 'lib/hasta/emr_job_definition.rb', line 21 def self.load(file_path, id, scheduled_start_time = Time.now) emr_node = JSON.parse(File.read(file_path))['objects'].find { |node| node['type'] == 'EmrActivity' && node['id'] == id } raise ArgumentError, "No EmrActivity for id: #{id} in file: #{file_path}" unless emr_node new(EmrNode.from_json(emr_node, scheduled_start_time)) end |
Instance Method Details
#data_sink ⇒ Object
76 77 78 |
# File 'lib/hasta/emr_job_definition.rb', line 76 def data_sink @data_sink ||= S3DataSink.new(output_path) end |
#data_sources ⇒ Object
72 73 74 |
# File 'lib/hasta/emr_job_definition.rb', line 72 def data_sources @data_sources ||= input_paths.map { |path| S3DataSource.new(path) } end |
#env ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/hasta/emr_job_definition.rb', line 44 def env @env ||= Env.new( emr_node.env, Hash[ emr_node. cache_files. reject { |tag, uri| uri.end_with?('.rb') }. map { |tag, uri| ["#{tag.split('.').first.upcase}_FILE_PATH", S3URI.parse(uri)] } ] ) end |
#input_paths ⇒ Object
36 37 38 |
# File 'lib/hasta/emr_job_definition.rb', line 36 def input_paths @input_paths ||= emr_node.input_paths.map { |path| S3URI.parse(path) } end |
#mapper ⇒ Object
64 65 66 |
# File 'lib/hasta/emr_job_definition.rb', line 64 def mapper @mapper ||= parse_mapper(emr_node.mapper) end |
#output_path ⇒ Object
40 41 42 |
# File 'lib/hasta/emr_job_definition.rb', line 40 def output_path @output_path ||= S3URI.parse(emr_node.output_path) end |
#reducer ⇒ Object
68 69 70 |
# File 'lib/hasta/emr_job_definition.rb', line 68 def reducer @reducer ||= parse_reducer(emr_node.reducer) end |