Class: Commands::CreateJobFlowCommand

Inherits:
StepProcessingCommand show all
Defined in:
lib/commands.rb

Constant Summary collapse

OLD_OPTIONS =
[:instance_count, :slave_instance_type, :master_instance_type]
DEFAULT_HADOOP_VERSION =

FIXME: add code to setup collapse instance group commands

"0.20"

Instance Attribute Summary collapse

Attributes inherited from StepProcessingCommand

#step_commands

Attributes inherited from Command

#arg, #commands, #description, #logger, #name

Instance Method Summary collapse

Methods inherited from StepProcessingCommand

#reorder_steps

Methods inherited from Command

#get_field, #has_value, #have, #option, #require, #require_single_jobflow, #resolve

Constructor Details

#initialize(*args) ⇒ CreateJobFlowCommand

Returns a new instance of CreateJobFlowCommand.



770
771
772
773
774
# File 'lib/commands.rb', line 770

def initialize(*args)
  super(*args)
  @instance_group_commands = []
  @bootstrap_commands = []
end

Instance Attribute Details

#ainfoObject

Returns the value of attribute ainfo.



759
760
761
# File 'lib/commands.rb', line 759

def ainfo
  @ainfo
end

#aliveObject

Returns the value of attribute alive.



759
760
761
# File 'lib/commands.rb', line 759

def alive
  @alive
end

#azObject

Returns the value of attribute az.



759
760
761
# File 'lib/commands.rb', line 759

def az
  @az
end

#bootstrap_commandsObject

Returns the value of attribute bootstrap_commands.



759
760
761
# File 'lib/commands.rb', line 759

def bootstrap_commands
  @bootstrap_commands
end

#hadoop_versionObject

Returns the value of attribute hadoop_version.



759
760
761
# File 'lib/commands.rb', line 759

def hadoop_version
  @hadoop_version
end

#instance_countObject

Returns the value of attribute instance_count.



759
760
761
# File 'lib/commands.rb', line 759

def instance_count
  @instance_count
end

#instance_group_commandsObject

Returns the value of attribute instance_group_commands.



759
760
761
# File 'lib/commands.rb', line 759

def instance_group_commands
  @instance_group_commands
end

#instance_typeObject

Returns the value of attribute instance_type.



759
760
761
# File 'lib/commands.rb', line 759

def instance_type
  @instance_type
end

#jobflow_nameObject

Returns the value of attribute jobflow_name.



759
760
761
# File 'lib/commands.rb', line 759

def jobflow_name
  @jobflow_name
end

#key_pairObject

Returns the value of attribute key_pair.



759
760
761
# File 'lib/commands.rb', line 759

def key_pair
  @key_pair
end

#key_pair_fileObject

Returns the value of attribute key_pair_file.



759
760
761
# File 'lib/commands.rb', line 759

def key_pair_file
  @key_pair_file
end

#log_uriObject

Returns the value of attribute log_uri.



759
760
761
# File 'lib/commands.rb', line 759

def log_uri
  @log_uri
end

#master_instance_typeObject

Returns the value of attribute master_instance_type.



759
760
761
# File 'lib/commands.rb', line 759

def master_instance_type
  @master_instance_type
end

#plain_outputObject

Returns the value of attribute plain_output.



759
760
761
# File 'lib/commands.rb', line 759

def plain_output
  @plain_output
end

#slave_instance_typeObject

Returns the value of attribute slave_instance_type.



759
760
761
# File 'lib/commands.rb', line 759

def slave_instance_type
  @slave_instance_type
end

#with_termination_protectionObject

Returns the value of attribute with_termination_protection.



759
760
761
# File 'lib/commands.rb', line 759

def with_termination_protection
  @with_termination_protection
end

Instance Method Details

#add_bootstrap_command(bootstrap_command) ⇒ Object



780
781
782
# File 'lib/commands.rb', line 780

def add_bootstrap_command(bootstrap_command)
  @bootstrap_commands << bootstrap_command
end

#add_instance_group_command(instance_group_command) ⇒ Object



784
785
786
# File 'lib/commands.rb', line 784

def add_instance_group_command(instance_group_command)
  @instance_group_commands << instance_group_command
end

#add_step_command(step) ⇒ Object



776
777
778
# File 'lib/commands.rb', line 776

def add_step_command(step)
  @step_commands << step
end

#apply_jobflow_option(field_symbol, *keys) ⇒ Object



837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
# File 'lib/commands.rb', line 837

def apply_jobflow_option(field_symbol, *keys)
  value = get_field(field_symbol)
  if value != nil then
    map = @jobflow
    for key in keys[0..-2] do
      nmap = map[key]
      if nmap == nil then
        map[key] = {}
        nmap = map[key]
      end
      map = nmap
    end
    map[keys.last] = value
  end
end

#create_jobflowObject



891
892
893
894
895
896
897
898
899
900
901
902
# File 'lib/commands.rb', line 891

def create_jobflow
  @jobflow = {
    "Name"   => get_field(:jobflow_name, default_job_flow_name),
    "Instances" => {
      "KeepJobFlowAliveWhenNoSteps" => (get_field(:alive) ? "true" : "false"),
      "TerminationProtected"        => (get_field(:with_termination_protection) ? "true" : "false"),
      "InstanceGroups" => []
    },
    "Steps" => [],
    "BootstrapActions" => []
  }
end

#default_job_flow_nameObject



904
905
906
907
908
909
910
# File 'lib/commands.rb', line 904

def default_job_flow_name
  name = "Development Job Flow"
  if get_field(:alive) then
    name += " (requires manual termination)"
  end
  return name
end

#enact(client) ⇒ Object



804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
# File 'lib/commands.rb', line 804

def enact(client)
  @jobflow = create_jobflow

  apply_jobflow_option(:ainfo, "AdditionalInfo")
  apply_jobflow_option(:key_pair, "Instances", "Ec2KeyName")
  apply_jobflow_option(:hadoop_version, "Instances", "HadoopVersion")
  apply_jobflow_option(:az, "Instances", "Placement", "AvailabilityZone")
  apply_jobflow_option(:log_uri, "LogUri")

  self.step_commands = reorder_steps(@jobflow, self.step_commands)
  @jobflow["Steps"] = step_commands.map { |x| x.steps }.flatten

  setup_instance_groups
  @jobflow["Instances"]["InstanceGroups"] = instance_group_commands.map { |x| x.instance_group }

  bootstrap_action_index = 1
  for bootstrap_action_command in bootstrap_commands do
    @jobflow["BootstrapActions"] << bootstrap_action_command.bootstrap_action(
      bootstrap_action_index)
    bootstrap_action_index += 1
  end

  run_result = client.run_jobflow(@jobflow)
  jobflow_id = run_result['JobFlowId']
  commands.global_options[:jobflow] << jobflow_id 

  if have(:plain_output) then
    logger.puts jobflow_id
  else
    logger.puts "Created job flow " + jobflow_id
  end
end

#have_role(instance_group_commands, role) ⇒ Object



862
863
864
865
866
# File 'lib/commands.rb', line 862

def have_role(instance_group_commands, role)
  instance_group_commands.select { |x| 
    x.instance_role.upcase == role 
  }.size > 0
end

#new_instance_group_command(role, instance_count, instance_type) ⇒ Object



853
854
855
856
857
858
859
860
# File 'lib/commands.rb', line 853

def new_instance_group_command(role, instance_count, instance_type)
  igc = CreateInstanceGroupCommand.new(
    "--instance-group ROLE", "Specify an instance group", role, commands
  )
  igc.instance_count = instance_count
  igc.instance_type = instance_type
  return igc
end

#setup_instance_groupsObject



868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
# File 'lib/commands.rb', line 868

def setup_instance_groups
  instance_groups = []
  if ! have_role(instance_group_commands, "MASTER") then
    mit = get_field(:master_instance_type, get_field(:instance_type, "m1.small"))
    master_instance_group = new_instance_group_command("MASTER", 1, mit)
    instance_group_commands << master_instance_group
  end
  if ! have_role(instance_group_commands, "CORE") then
    ni = get_field(:instance_count, 1).to_i
    if ni > 1 then
      sit = get_field(:slave_instance_type, get_field(:instance_type, "m1.small"))
      slave_instance_group = new_instance_group_command("CORE", ni-1, sit)
      slave_instance_group.instance_role = "CORE"
      instance_group_commands << slave_instance_group
    end
  else
    # Verify that user has not specified both --instance-group core and --num-instances
    if get_field(:instance_count) != nil then
      raise RuntimeError, "option --num-instances cannot be used when a core instance group is specified."
    end
  end
end

#validateObject



788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
# File 'lib/commands.rb', line 788

def validate
  for step in step_commands do
    if step.is_a?(EnableDebuggingCommand) then
      require(:log_uri, "You must supply a logUri if you enable debugging when creating a job flow")
    end
  end

  for cmd in step_commands + instance_group_commands + bootstrap_commands do
    cmd.validate
  end

  if ! have(:hadoop_version) then
    @hadoop_version = DEFAULT_HADOOP_VERSION
  end
end