Class: Commands::CreateJobFlowCommand

Inherits:
StepProcessingCommand show all
Defined in:
lib/commands.rb

Constant Summary collapse

OLD_OPTIONS =
[:instance_count, :slave_instance_type, :master_instance_type]

Instance Attribute Summary collapse

Attributes inherited from StepProcessingCommand

#step_commands

Attributes inherited from Command

#arg, #commands, #description, #logger, #name

Instance Method Summary collapse

Methods inherited from StepProcessingCommand

#reorder_steps

Methods inherited from Command

#get_field, #has_value, #have, #option, #require, #require_single_jobflow, #resolve

Constructor Details

#initialize(*args) ⇒ CreateJobFlowCommand

Returns a new instance of CreateJobFlowCommand.



799
800
801
802
803
# File 'lib/commands.rb', line 799

def initialize(*args)
  super(*args)
  @instance_group_commands = []
  @bootstrap_commands = []
end

Instance Attribute Details

#ainfoObject

Returns the value of attribute ainfo.



782
783
784
# File 'lib/commands.rb', line 782

def ainfo
  @ainfo
end

#aliveObject

Returns the value of attribute alive.



782
783
784
# File 'lib/commands.rb', line 782

def alive
  @alive
end

#ami_versionObject

Returns the value of attribute ami_version.



782
783
784
# File 'lib/commands.rb', line 782

def ami_version
  @ami_version
end

#azObject

Returns the value of attribute az.



782
783
784
# File 'lib/commands.rb', line 782

def az
  @az
end

#bootstrap_commandsObject

Returns the value of attribute bootstrap_commands.



782
783
784
# File 'lib/commands.rb', line 782

def bootstrap_commands
  @bootstrap_commands
end

#hadoop_versionObject

Returns the value of attribute hadoop_version.



782
783
784
# File 'lib/commands.rb', line 782

def hadoop_version
  @hadoop_version
end

#instance_countObject

Returns the value of attribute instance_count.



782
783
784
# File 'lib/commands.rb', line 782

def instance_count
  @instance_count
end

#instance_group_commandsObject

Returns the value of attribute instance_group_commands.



782
783
784
# File 'lib/commands.rb', line 782

def instance_group_commands
  @instance_group_commands
end

#instance_typeObject

Returns the value of attribute instance_type.



782
783
784
# File 'lib/commands.rb', line 782

def instance_type
  @instance_type
end

#jobflow_nameObject

Returns the value of attribute jobflow_name.



782
783
784
# File 'lib/commands.rb', line 782

def jobflow_name
  @jobflow_name
end

#key_pairObject

Returns the value of attribute key_pair.



782
783
784
# File 'lib/commands.rb', line 782

def key_pair
  @key_pair
end

#key_pair_fileObject

Returns the value of attribute key_pair_file.



782
783
784
# File 'lib/commands.rb', line 782

def key_pair_file
  @key_pair_file
end

#log_uriObject

Returns the value of attribute log_uri.



782
783
784
# File 'lib/commands.rb', line 782

def log_uri
  @log_uri
end

#master_instance_typeObject

Returns the value of attribute master_instance_type.



782
783
784
# File 'lib/commands.rb', line 782

def master_instance_type
  @master_instance_type
end

#plain_outputObject

Returns the value of attribute plain_output.



782
783
784
# File 'lib/commands.rb', line 782

def plain_output
  @plain_output
end

#slave_instance_typeObject

Returns the value of attribute slave_instance_type.



782
783
784
# File 'lib/commands.rb', line 782

def slave_instance_type
  @slave_instance_type
end

#subnet_idObject

Returns the value of attribute subnet_id.



782
783
784
# File 'lib/commands.rb', line 782

def subnet_id
  @subnet_id
end

#with_supported_productsObject

Returns the value of attribute with_supported_products.



782
783
784
# File 'lib/commands.rb', line 782

def with_supported_products
  @with_supported_products
end

#with_termination_protectionObject

Returns the value of attribute with_termination_protection.



782
783
784
# File 'lib/commands.rb', line 782

def with_termination_protection
  @with_termination_protection
end

Instance Method Details

#add_bootstrap_command(bootstrap_command) ⇒ Object



809
810
811
# File 'lib/commands.rb', line 809

def add_bootstrap_command(bootstrap_command)
  @bootstrap_commands << bootstrap_command
end

#add_instance_group_command(instance_group_command) ⇒ Object



813
814
815
# File 'lib/commands.rb', line 813

def add_instance_group_command(instance_group_command)
  @instance_group_commands << instance_group_command
end

#add_step_command(step) ⇒ Object



805
806
807
# File 'lib/commands.rb', line 805

def add_step_command(step)
  @step_commands << step
end

#apply_jobflow_option(field_symbol, *keys) ⇒ Object



867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
# File 'lib/commands.rb', line 867

def apply_jobflow_option(field_symbol, *keys)
  value = get_field(field_symbol)
  if value != nil then 
    map = @jobflow
    for key in keys[0..-2] do
      nmap = map[key]
      if nmap == nil then
        map[key] = {}
        nmap = map[key]
      end
      map = nmap
    end
    map[keys.last] = value
  end
end

#create_jobflowObject



921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
# File 'lib/commands.rb', line 921

def create_jobflow
  @jobflow = {
    "Name"   => get_field(:jobflow_name, default_job_flow_name),
    "Instances" => {
      "KeepJobFlowAliveWhenNoSteps" => (get_field(:alive) ? "true" : "false"),
      "TerminationProtected"        => (get_field(:with_termination_protection) ? "true" : "false"),
      "InstanceGroups" => []
    },
    "Steps" => [],
    "BootstrapActions" => []
  }
  products_string = get_field(:with_supported_products)
  if products_string then
    products = products_string.split(/,/).map { |s| s.strip }
    @jobflow["SupportedProducts"] = products
  end
  @jobflow
end

#default_hadoop_versionObject

FIXME: add code to setup collapse instance group commands



791
792
793
794
795
796
797
# File 'lib/commands.rb', line 791

def default_hadoop_version
  if get_field(:ami_version) == "1.0" then
    "0.20"
  else
    "0.20.205"
  end
end

#default_job_flow_nameObject



940
941
942
943
944
945
946
# File 'lib/commands.rb', line 940

def default_job_flow_name
  name = "Development Job Flow"
  if get_field(:alive) then
    name += " (requires manual termination)"
  end
  return name
end

#enact(client) ⇒ Object



830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
# File 'lib/commands.rb', line 830

def enact(client)
  @jobflow = create_jobflow

  apply_jobflow_option(:ainfo, "AdditionalInfo")
  apply_jobflow_option(:key_pair, "Instances", "Ec2KeyName")
  apply_jobflow_option(:hadoop_version, "Instances", "HadoopVersion")
  apply_jobflow_option(:az, "Instances", "Placement", "AvailabilityZone")
  apply_jobflow_option(:log_uri, "LogUri")
  apply_jobflow_option(:ami_version, "AmiVersion")
  apply_jobflow_option(:subnet_id, "Instances", "Ec2SubnetId")
 
  @jobflow["AmiVersion"] ||= "latest"

  self.step_commands = reorder_steps(@jobflow, self.step_commands)
  @jobflow["Steps"] = step_commands.map { |x| x.steps }.flatten

  setup_instance_groups
  @jobflow["Instances"]["InstanceGroups"] = instance_group_commands.map { |x| x.instance_group }

  bootstrap_action_index = 1
  for bootstrap_action_command in bootstrap_commands do
    @jobflow["BootstrapActions"] << bootstrap_action_command.bootstrap_action(
      bootstrap_action_index)
    bootstrap_action_index += 1
  end

  run_result = client.run_jobflow(@jobflow)
  jobflow_id = run_result['JobFlowId']
  commands.global_options[:jobflow] << jobflow_id 

  if have(:plain_output) then
    logger.puts jobflow_id
  else
    logger.puts "Created job flow " + jobflow_id
  end
end

#have_role(instance_group_commands, role) ⇒ Object



892
893
894
895
896
# File 'lib/commands.rb', line 892

def have_role(instance_group_commands, role)
  instance_group_commands.select { |x| 
    x.instance_role.upcase == role 
  }.size > 0
end

#new_instance_group_command(role, instance_count, instance_type) ⇒ Object



883
884
885
886
887
888
889
890
# File 'lib/commands.rb', line 883

def new_instance_group_command(role, instance_count, instance_type)
  igc = CreateInstanceGroupCommand.new(
    "--instance-group ROLE", "Specify an instance group", role, commands
  )
  igc.instance_count = instance_count
  igc.instance_type = instance_type
  return igc
end

#setup_instance_groupsObject



898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
# File 'lib/commands.rb', line 898

def setup_instance_groups
  instance_groups = []
  if ! have_role(instance_group_commands, "MASTER") then
    mit = get_field(:master_instance_type, get_field(:instance_type, "m1.small"))
    master_instance_group = new_instance_group_command("MASTER", 1, mit)
    instance_group_commands << master_instance_group
  end
  if ! have_role(instance_group_commands, "CORE") then
    ni = get_field(:instance_count, 1).to_i
    if ni > 1 then
      sit = get_field(:slave_instance_type, get_field(:instance_type, "m1.small"))
      slave_instance_group = new_instance_group_command("CORE", ni-1, sit)
      slave_instance_group.instance_role = "CORE"
      instance_group_commands << slave_instance_group
    end
  else
    # Verify that user has not specified both --instance-group core and --num-instances
    if get_field(:instance_count) != nil then
      raise RuntimeError, "option --num-instances cannot be used when a core instance group is specified."
    end
  end
end

#validateObject



817
818
819
820
821
822
823
824
825
826
827
828
# File 'lib/commands.rb', line 817

def validate
  for step in step_commands do
    if step.is_a?(EnableDebuggingCommand) then
      require(:log_uri, "You must supply a logUri if you enable debugging when creating a job flow")
    end
  end

  for cmd in step_commands + instance_group_commands + bootstrap_commands do
    cmd.validate
  end

end