Class: EmrClient

Inherits:
Object show all
Defined in:
lib/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(commands, logger, client_class) ⇒ EmrClient

Returns a new instance of EmrClient.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/client.rb', line 11

def initialize(commands, logger, client_class)
  @commands = commands
  @logger = logger
  @options = commands.global_options

  @config = {
    :endpoint            => @options[:endpoint] || "https://elasticmapreduce.amazonaws.com",
    :ca_file             => File.join(File.dirname(__FILE__), "cacert.pem"),
    :aws_access_key      => @options[:aws_access_id],
    :aws_secret_key      => @options[:aws_secret_key],
    :signature_algorithm => :V2,
    :content_type        => 'JSON',
    :verbose             => (@options[:verbose] != nil)
  }

  @client = Amazon::RetryDelegator.new(
    client_class.new_aws_query(@config),
    :retry_if => Proc.new { |*opts| self.is_retryable_error_response(*opts) }
  )
end

Instance Attribute Details

#commandsObject

Returns the value of attribute commands.



9
10
11
# File 'lib/client.rb', line 9

def commands
  @commands
end

#loggerObject

Returns the value of attribute logger.



9
10
11
# File 'lib/client.rb', line 9

def logger
  @logger
end

#optionsObject

Returns the value of attribute options.



9
10
11
# File 'lib/client.rb', line 9

def options
  @options
end

Instance Method Details

#add_instance_groups(options) ⇒ Object



109
110
111
112
113
114
# File 'lib/client.rb', line 109

def add_instance_groups(options)
  logger.trace "AddInstanceGroups(#{options.inspect})"
  result = @client.AddInstanceGroups(options)
  logger.trace result.inspect
  return raise_on_error(result)
end

#add_steps(jobflow_id, steps) ⇒ Object



67
68
69
70
71
72
# File 'lib/client.rb', line 67

def add_steps(jobflow_id, steps)
  logger.trace "AddJobFlowSteps('JobFlowId' => #{jobflow_id.inspect}, 'Steps' => #{steps.inspect})"
  result = @client.AddJobFlowSteps('JobFlowId' => jobflow_id, 'Steps' => steps)
  logger.trace result.inspect
  return raise_on_error(result)
end

#describe_jobflow(options) ⇒ Object



81
82
83
84
85
86
# File 'lib/client.rb', line 81

def describe_jobflow(options)
  logger.trace "DescribeJobFlows(#{options.inspect})"
  result = @client.DescribeJobFlows(options.merge('DescriptionType' => 'EXTENDED'))
  logger.trace result.inspect
  return raise_on_error(result)
end

#describe_jobflow_with_id(jobflow_id) ⇒ Object



56
57
58
59
60
61
62
63
64
65
# File 'lib/client.rb', line 56

def describe_jobflow_with_id(jobflow_id)
  logger.trace "DescribeJobFlows('JobFlowIds' => [ #{jobflow_id} ])"
  result = @client.DescribeJobFlows('JobFlowIds' => [ jobflow_id ], 'DescriptionType' => 'EXTENDED')
  logger.trace result.inspect
  raise_on_error(result)
  if result == nil || result['JobFlows'].size() == 0 then
    raise RuntimeError, "Jobflow with id #{jobflow_id} not found"
  end
  return result['JobFlows'].first
end

#is_error_response(response) ⇒ Object



45
46
47
# File 'lib/client.rb', line 45

def is_error_response(response)
  response != nil && response.key?('Error')
end

#is_retryable_error_response(response) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/client.rb', line 32

def is_retryable_error_response(response)
  if response == nil then
    false
  else
    ret = false
    if response['Error'] then 
      # note: 'Timeout' is not retryable because the operation might have completed just the connection timed out
      ret ||= ['InternalFailure', 'Throttling', 'ServiceUnavailable'].include?(response['Error']['Code'])
    end
    ret 
  end
end

#modify_instance_groups(options) ⇒ Object



102
103
104
105
106
107
# File 'lib/client.rb', line 102

def modify_instance_groups(options)
  logger.trace "ModifyInstanceGroups(#{options.inspect})"
  result = @client.ModifyInstanceGroups(options)
  logger.trace result.inspect
  return raise_on_error(result)
end

#raise_on_error(response) ⇒ Object



49
50
51
52
53
54
# File 'lib/client.rb', line 49

def raise_on_error(response)
  if is_error_response(response) then
    raise RuntimeError, response["Error"]["Message"]
  end
  return response
end

#run_jobflow(jobflow) ⇒ Object



74
75
76
77
78
79
# File 'lib/client.rb', line 74

def run_jobflow(jobflow)
  logger.trace "RunJobFlow(#{jobflow.inspect})"
  result = @client.RunJobFlow(jobflow)
  logger.trace result.inspect
  return raise_on_error(result)
end

#set_termination_protection(jobflow_ids, protected) ⇒ Object



88
89
90
91
92
93
# File 'lib/client.rb', line 88

def set_termination_protection(jobflow_ids, protected)
  logger.trace "SetTerminationProtection('JobFlowIds' => #{jobflow_ids.inspect}, 'TerminationProtected' => #{protected})"
  result = @client.SetTerminationProtection('JobFlowIds' => jobflow_ids, 'TerminationProtected' => protected)
  logger.trace result.inspect
  return raise_on_error(result)
end

#terminate_jobflows(jobflow_ids) ⇒ Object



95
96
97
98
99
100
# File 'lib/client.rb', line 95

def terminate_jobflows(jobflow_ids)
  logger.trace "TerminateJobFlows('JobFlowIds' => #{jobflow_ids.inspect})"
  result = @client.TerminateJobFlows('JobFlowIds' => jobflow_ids)
  logger.trace result.inspect
  return raise_on_error(result)
end