Class: Commands::StreamStepCommand

Inherits:
StepCommand show all
Defined in:
lib/commands.rb

Constant Summary collapse

GENERIC_OPTIONS =
Set.new(%w(-conf -D -fs -jt -files -libjars -archives))

Instance Attribute Summary collapse

Attributes inherited from StepCommand

#apps_path, #beta_path, #enable_debugging_path, #hive_cmd, #hive_path, #pig_cmd, #pig_path, #script_runner_path, #step_action, #step_name

Attributes inherited from Command

#arg, #commands, #description, #logger, #name

Instance Method Summary collapse

Methods inherited from StepCommand

#default_enable_debugging_path, #default_hive_cmd, #default_hive_path, #default_pig_cmd, #default_pig_path, #default_resize_jobflow_cmd, #default_script_runner_path, #ensure_install_cmd, #extra_args, #reorder_steps, #script_args, #validate

Methods inherited from Command

#enact, #get_field, #has_value, #have, #option, #require, #require_single_jobflow, #resolve, #validate

Constructor Details

#initialize(*args) ⇒ StreamStepCommand

Returns a new instance of StreamStepCommand.



517
518
519
520
# File 'lib/commands.rb', line 517

def initialize(*args)
  super(*args)
  @jobconf = []
end

Instance Attribute Details

#argsObject

Returns the value of attribute args.



513
514
515
# File 'lib/commands.rb', line 513

def args
  @args
end

#cacheObject

Returns the value of attribute cache.



513
514
515
# File 'lib/commands.rb', line 513

def cache
  @cache
end

#cache_archiveObject

Returns the value of attribute cache_archive.



513
514
515
# File 'lib/commands.rb', line 513

def cache_archive
  @cache_archive
end

#inputObject

Returns the value of attribute input.



513
514
515
# File 'lib/commands.rb', line 513

def input
  @input
end

#jobconfObject

Returns the value of attribute jobconf.



513
514
515
# File 'lib/commands.rb', line 513

def jobconf
  @jobconf
end

#mapperObject

Returns the value of attribute mapper.



513
514
515
# File 'lib/commands.rb', line 513

def mapper
  @mapper
end

#outputObject

Returns the value of attribute output.



513
514
515
# File 'lib/commands.rb', line 513

def output
  @output
end

#reducerObject

Returns the value of attribute reducer.



513
514
515
# File 'lib/commands.rb', line 513

def reducer
  @reducer
end

Instance Method Details

#sort_streaming_args(streaming_args) ⇒ Object



555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
# File 'lib/commands.rb', line 555

def sort_streaming_args(streaming_args)
  sorted_streaming_args = []
  i=0
  while streaming_args && i < streaming_args.length
    if GENERIC_OPTIONS.include?(streaming_args[i]) then
      if i+1 < streaming_args.length
        sorted_streaming_args.unshift(streaming_args[i+1])
        sorted_streaming_args.unshift(streaming_args[i])
        i=i+2
      else
        raise RuntimeError, "Missing value for argument #{streaming_args[i]}"
      end
    else
      sorted_streaming_args << streaming_args[i]
      i=i+1
    end
  end
  return sorted_streaming_args
end

#stepsObject



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
# File 'lib/commands.rb', line 522

def steps
  timestr = Time.now.strftime("%Y-%m-%dT%H%M%S")
  stream_options = []
  for ca in get_field(:cache, []) do
    stream_options << "-cacheFile" << ca
  end
  
  for ca in get_field(:cache_archive, []) do
    stream_options << "-cacheArchive" << ca
  end
  
  for jc in get_field(:jobconf, []) do
    stream_options << "-jobconf" << jc
  end

  # Note that the streaming options should go before command options for
  # Hadoop 0.20
  step = {
    "Name"            => get_field(:step_name, "Example Streaming Step"),
    "ActionOnFailure" => get_field(:step_action, "CANCEL_AND_WAIT"),
    "HadoopJarStep"   => {
      "Jar" => "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
      "Args" => (sort_streaming_args(get_field(:args))) + (stream_options) + [
        "-input",     get_field(:input, "s3n://elasticmapreduce/samples/wordcount/input"),
        "-output",    get_field(:output, "hdfs:///examples/output/#{timestr}"),
        "-mapper",    get_field(:mapper, "s3n://elasticmapreduce/samples/wordcount/wordSplitter.py"),
        "-reducer",   get_field(:reducer, "aggregate")
      ]
    }
  }
  return [ step ]
end