Class: EmrClient

Inherits:
Object show all
Defined in:
lib/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(commands, logger, client_class) ⇒ EmrClient

Returns a new instance of EmrClient.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/client.rb', line 11

def initialize(commands, logger, client_class)
  @commands = commands
  @logger = logger
  @options = commands.global_options

  @config = {
    :endpoint            => @options[:endpoint] || "https://elasticmapreduce.amazonaws.com",
    :ca_file             => File.join(File.dirname(__FILE__), "cacert.pem"),
    :aws_access_key      => @options[:aws_access_id],
    :aws_secret_key      => @options[:aws_secret_key],
    :signature_algorithm => :V2,
    :content_type        => 'JSON',
    :verbose             => (@options[:verbose] != nil),
    :connect_timeout     => 60.0,
    :timeout             => 160.0
  }

  @client = Amazon::RetryDelegator.new(
    client_class.new_aws_query(@config),
    :retry_if => Proc.new { |*opts| self.is_retryable_error_response(*opts) }
  )
end

Instance Attribute Details

#commandsObject

Returns the value of attribute commands.



9
10
11
# File 'lib/client.rb', line 9

def commands
  @commands
end

#loggerObject

Returns the value of attribute logger.



9
10
11
# File 'lib/client.rb', line 9

def logger
  @logger
end

#optionsObject

Returns the value of attribute options.



9
10
11
# File 'lib/client.rb', line 9

def options
  @options
end

Instance Method Details

#add_instance_groups(options) ⇒ Object



111
112
113
114
115
116
# File 'lib/client.rb', line 111

def add_instance_groups(options)
  logger.trace "AddInstanceGroups(#{options.inspect})"
  result = @client.AddInstanceGroups(options)
  logger.trace result.inspect
  return raise_on_error(result)
end

#add_steps(jobflow_id, steps) ⇒ Object



69
70
71
72
73
74
# File 'lib/client.rb', line 69

def add_steps(jobflow_id, steps)
  logger.trace "AddJobFlowSteps('JobFlowId' => #{jobflow_id.inspect}, 'Steps' => #{steps.inspect})"
  result = @client.AddJobFlowSteps('JobFlowId' => jobflow_id, 'Steps' => steps)
  logger.trace result.inspect
  return raise_on_error(result)
end

#describe_jobflow(options) ⇒ Object



83
84
85
86
87
88
# File 'lib/client.rb', line 83

def describe_jobflow(options)
  logger.trace "DescribeJobFlows(#{options.inspect})"
  result = @client.DescribeJobFlows(options.merge('DescriptionType' => 'EXTENDED'))
  logger.trace result.inspect
  return raise_on_error(result)
end

#describe_jobflow_with_id(jobflow_id) ⇒ Object



58
59
60
61
62
63
64
65
66
67
# File 'lib/client.rb', line 58

def describe_jobflow_with_id(jobflow_id)
  logger.trace "DescribeJobFlows('JobFlowIds' => [ #{jobflow_id} ])"
  result = @client.DescribeJobFlows('JobFlowIds' => [ jobflow_id ], 'DescriptionType' => 'EXTENDED')
  logger.trace result.inspect
  raise_on_error(result)
  if result == nil || result['JobFlows'].size() == 0 then
    raise RuntimeError, "Jobflow with id #{jobflow_id} not found"
  end
  return result['JobFlows'].first
end

#is_error_response(response) ⇒ Object



47
48
49
# File 'lib/client.rb', line 47

def is_error_response(response)
  response != nil && response.key?('Error')
end

#is_retryable_error_response(response) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/client.rb', line 34

def is_retryable_error_response(response)
  if response == nil then
    false
  else
    ret = false
    if response['Error'] then 
      # note: 'Timeout' is not retryable because the operation might have completed just the connection timed out
      ret ||= ['InternalFailure', 'Throttling', 'ServiceUnavailable'].include?(response['Error']['Code'])
    end
    ret 
  end
end

#modify_instance_groups(options) ⇒ Object



104
105
106
107
108
109
# File 'lib/client.rb', line 104

def modify_instance_groups(options)
  logger.trace "ModifyInstanceGroups(#{options.inspect})"
  result = @client.ModifyInstanceGroups(options)
  logger.trace result.inspect
  return raise_on_error(result)
end

#raise_on_error(response) ⇒ Object



51
52
53
54
55
56
# File 'lib/client.rb', line 51

def raise_on_error(response)
  if is_error_response(response) then
    raise RuntimeError, response["Error"]["Message"]
  end
  return response
end

#run_jobflow(jobflow) ⇒ Object



76
77
78
79
80
81
# File 'lib/client.rb', line 76

def run_jobflow(jobflow)
  logger.trace "RunJobFlow(#{jobflow.inspect})"
  result = @client.RunJobFlow(jobflow)
  logger.trace result.inspect
  return raise_on_error(result)
end

#set_termination_protection(jobflow_ids, protected) ⇒ Object



90
91
92
93
94
95
# File 'lib/client.rb', line 90

def set_termination_protection(jobflow_ids, protected)
  logger.trace "SetTerminationProtection('JobFlowIds' => #{jobflow_ids.inspect}, 'TerminationProtected' => #{protected})"
  result = @client.SetTerminationProtection('JobFlowIds' => jobflow_ids, 'TerminationProtected' => protected)
  logger.trace result.inspect
  return raise_on_error(result)
end

#terminate_jobflows(jobflow_ids) ⇒ Object



97
98
99
100
101
102
# File 'lib/client.rb', line 97

def terminate_jobflows(jobflow_ids)
  logger.trace "TerminateJobFlows('JobFlowIds' => #{jobflow_ids.inspect})"
  result = @client.TerminateJobFlows('JobFlowIds' => jobflow_ids)
  logger.trace result.inspect
  return raise_on_error(result)
end