Class: Hasta::EmrJobDefinition

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/hasta/emr_job_definition.rb

Overview

Defines the EMR job that is being tested

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(emr_node) ⇒ EmrJobDefinition

Returns a new instance of EmrJobDefinition.



32
33
34
# File 'lib/hasta/emr_job_definition.rb', line 32

def initialize(emr_node)
  @emr_node = emr_node
end

Class Method Details

.load(file_path, id, scheduled_start_time = Time.now) ⇒ Object

Raises:

  • (ArgumentError)


21
22
23
24
25
26
27
28
# File 'lib/hasta/emr_job_definition.rb', line 21

def self.load(file_path, id, scheduled_start_time = Time.now)
  emr_node = JSON.parse(File.read(file_path))['objects'].find { |node|
    node['type'] == 'EmrActivity' && node['id'] == id
  }

  raise ArgumentError, "No EmrActivity for id: #{id} in file: #{file_path}" unless emr_node
  new(EmrNode.from_json(emr_node, scheduled_start_time))
end

Instance Method Details

#data_sinkObject



76
77
78
# File 'lib/hasta/emr_job_definition.rb', line 76

def data_sink
  @data_sink ||= S3DataSink.new(output_path)
end

#data_sourcesObject



72
73
74
# File 'lib/hasta/emr_job_definition.rb', line 72

def data_sources
  @data_sources ||= input_paths.map { |path| S3DataSource.new(path) }
end

#envObject



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/hasta/emr_job_definition.rb', line 44

def env
  @env ||= Env.new(
    emr_node.env,
    Hash[
      emr_node.
        cache_files.
        reject { |tag, uri| uri.end_with?('.rb') }.
        map { |tag, uri| ["#{tag.split('.').first.upcase}_FILE_PATH", S3URI.parse(uri)] }
    ]
  )
end

#input_pathsObject



36
37
38
# File 'lib/hasta/emr_job_definition.rb', line 36

def input_paths
  @input_paths ||= emr_node.input_paths.map { |path| S3URI.parse(path) }
end

#mapperObject



64
65
66
# File 'lib/hasta/emr_job_definition.rb', line 64

def mapper
  @mapper ||= parse_mapper(emr_node.mapper)
end

#output_pathObject



40
41
42
# File 'lib/hasta/emr_job_definition.rb', line 40

def output_path
  @output_path ||= S3URI.parse(emr_node.output_path)
end

#reducerObject



68
69
70
# File 'lib/hasta/emr_job_definition.rb', line 68

def reducer
  @reducer ||= parse_reducer(emr_node.reducer)
end

#ruby_filesObject



56
57
58
59
60
61
62
# File 'lib/hasta/emr_job_definition.rb', line 56

def ruby_files
  @ruby_files ||= emr_node.
    cache_files.
    values.
    select { |uri| uri.end_with?('.rb') }.
    map { |uri| local_path_to_step_file(S3URI.parse(uri)) }
end