Class: EmrClient
Instance Attribute Summary collapse
-
#commands ⇒ Object
Returns the value of attribute commands.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#options ⇒ Object
Returns the value of attribute options.
Instance Method Summary collapse
- #add_instance_groups(options) ⇒ Object
- #add_steps(jobflow_id, steps) ⇒ Object
- #describe_jobflow(options) ⇒ Object
- #describe_jobflow_with_id(jobflow_id) ⇒ Object
-
#initialize(commands, logger, client_class) ⇒ EmrClient
constructor
A new instance of EmrClient.
- #is_error_response(response) ⇒ Object
- #is_retryable_error_response(response) ⇒ Object
- #modify_instance_groups(options) ⇒ Object
- #raise_on_error(response) ⇒ Object
- #run_jobflow(jobflow) ⇒ Object
- #set_termination_protection(jobflow_ids, protected) ⇒ Object
- #terminate_jobflows(jobflow_ids) ⇒ Object
Constructor Details
#initialize(commands, logger, client_class) ⇒ EmrClient
Returns a new instance of EmrClient.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/client.rb', line 11 def initialize(commands, logger, client_class) @commands = commands @logger = logger = commands. @config = { :endpoint => [:endpoint] || "https://elasticmapreduce.amazonaws.com", :ca_file => File.join(File.dirname(__FILE__), "cacert.pem"), :aws_access_key => [:aws_access_id], :aws_secret_key => [:aws_secret_key], :signature_algorithm => :V2, :content_type => 'JSON', :verbose => ([:verbose] != nil), :connect_timeout => 60.0, :timeout => 160.0 } @client = Amazon::RetryDelegator.new( client_class.new_aws_query(@config), :retry_if => Proc.new { |*opts| self.is_retryable_error_response(*opts) } ) end |
Instance Attribute Details
#commands ⇒ Object
Returns the value of attribute commands.
9 10 11 |
# File 'lib/client.rb', line 9 def commands @commands end |
#logger ⇒ Object
Returns the value of attribute logger.
9 10 11 |
# File 'lib/client.rb', line 9 def logger @logger end |
#options ⇒ Object
Returns the value of attribute options.
9 10 11 |
# File 'lib/client.rb', line 9 def end |
Instance Method Details
#add_instance_groups(options) ⇒ Object
111 112 113 114 115 116 |
# File 'lib/client.rb', line 111 def add_instance_groups() logger.trace "AddInstanceGroups(#{options.inspect})" result = @client.AddInstanceGroups() logger.trace result.inspect return raise_on_error(result) end |
#add_steps(jobflow_id, steps) ⇒ Object
69 70 71 72 73 74 |
# File 'lib/client.rb', line 69 def add_steps(jobflow_id, steps) logger.trace "AddJobFlowSteps('JobFlowId' => #{jobflow_id.inspect}, 'Steps' => #{steps.inspect})" result = @client.AddJobFlowSteps('JobFlowId' => jobflow_id, 'Steps' => steps) logger.trace result.inspect return raise_on_error(result) end |
#describe_jobflow(options) ⇒ Object
83 84 85 86 87 88 |
# File 'lib/client.rb', line 83 def describe_jobflow() logger.trace "DescribeJobFlows(#{options.inspect})" result = @client.DescribeJobFlows(.merge('DescriptionType' => 'EXTENDED')) logger.trace result.inspect return raise_on_error(result) end |
#describe_jobflow_with_id(jobflow_id) ⇒ Object
58 59 60 61 62 63 64 65 66 67 |
# File 'lib/client.rb', line 58 def describe_jobflow_with_id(jobflow_id) logger.trace "DescribeJobFlows('JobFlowIds' => [ #{jobflow_id} ])" result = @client.DescribeJobFlows('JobFlowIds' => [ jobflow_id ], 'DescriptionType' => 'EXTENDED') logger.trace result.inspect raise_on_error(result) if result == nil || result['JobFlows'].size() == 0 then raise RuntimeError, "Jobflow with id #{jobflow_id} not found" end return result['JobFlows'].first end |
#is_error_response(response) ⇒ Object
47 48 49 |
# File 'lib/client.rb', line 47 def is_error_response(response) response != nil && response.key?('Error') end |
#is_retryable_error_response(response) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/client.rb', line 34 def is_retryable_error_response(response) if response == nil then false else ret = false if response['Error'] then # note: 'Timeout' is not retryable because the operation might have completed just the connection timed out ret ||= ['InternalFailure', 'Throttling', 'ServiceUnavailable'].include?(response['Error']['Code']) end ret end end |
#modify_instance_groups(options) ⇒ Object
104 105 106 107 108 109 |
# File 'lib/client.rb', line 104 def modify_instance_groups() logger.trace "ModifyInstanceGroups(#{options.inspect})" result = @client.ModifyInstanceGroups() logger.trace result.inspect return raise_on_error(result) end |
#raise_on_error(response) ⇒ Object
51 52 53 54 55 56 |
# File 'lib/client.rb', line 51 def raise_on_error(response) if is_error_response(response) then raise RuntimeError, response["Error"]["Message"] end return response end |
#run_jobflow(jobflow) ⇒ Object
76 77 78 79 80 81 |
# File 'lib/client.rb', line 76 def run_jobflow(jobflow) logger.trace "RunJobFlow(#{jobflow.inspect})" result = @client.RunJobFlow(jobflow) logger.trace result.inspect return raise_on_error(result) end |
#set_termination_protection(jobflow_ids, protected) ⇒ Object
90 91 92 93 94 95 |
# File 'lib/client.rb', line 90 def set_termination_protection(jobflow_ids, protected) logger.trace "SetTerminationProtection('JobFlowIds' => #{jobflow_ids.inspect}, 'TerminationProtected' => #{protected})" result = @client.SetTerminationProtection('JobFlowIds' => jobflow_ids, 'TerminationProtected' => protected) logger.trace result.inspect return raise_on_error(result) end |
#terminate_jobflows(jobflow_ids) ⇒ Object
97 98 99 100 101 102 |
# File 'lib/client.rb', line 97 def terminate_jobflows(jobflow_ids) logger.trace "TerminateJobFlows('JobFlowIds' => #{jobflow_ids.inspect})" result = @client.TerminateJobFlows('JobFlowIds' => jobflow_ids) logger.trace result.inspect return raise_on_error(result) end |