Class: Commands::StreamStepCommand
- Inherits:
-
StepCommand
- Object
- Command
- StepCommand
- Commands::StreamStepCommand
- Defined in:
- lib/commands.rb
Constant Summary collapse
- GENERIC_OPTIONS =
Set.new(%w(-conf -D -fs -jt -files -libjars -archives))
Instance Attribute Summary collapse
-
#args ⇒ Object
Returns the value of attribute args.
-
#cache ⇒ Object
Returns the value of attribute cache.
-
#cache_archive ⇒ Object
Returns the value of attribute cache_archive.
-
#input ⇒ Object
Returns the value of attribute input.
-
#jobconf ⇒ Object
Returns the value of attribute jobconf.
-
#mapper ⇒ Object
Returns the value of attribute mapper.
-
#output ⇒ Object
Returns the value of attribute output.
-
#reducer ⇒ Object
Returns the value of attribute reducer.
Attributes inherited from StepCommand
#apps_path, #beta_path, #enable_debugging_path, #hive_cmd, #hive_path, #pig_cmd, #pig_path, #script_runner_path, #step_action, #step_name
Attributes inherited from Command
#arg, #commands, #description, #logger, #name
Instance Method Summary collapse
-
#initialize(*args) ⇒ StreamStepCommand
constructor
A new instance of StreamStepCommand.
- #sort_streaming_args(streaming_args) ⇒ Object
- #steps ⇒ Object
Methods inherited from StepCommand
#default_enable_debugging_path, #default_hive_cmd, #default_hive_path, #default_pig_cmd, #default_pig_path, #default_resize_jobflow_cmd, #default_script_runner_path, #ensure_install_cmd, #extra_args, #reorder_steps, #script_args, #validate
Methods inherited from Command
#enact, #get_field, #has_value, #have, #option, #require, #require_single_jobflow, #resolve, #validate
Constructor Details
#initialize(*args) ⇒ StreamStepCommand
Returns a new instance of StreamStepCommand.
517 518 519 520 |
# File 'lib/commands.rb', line 517 def initialize(*args) super(*args) @jobconf = [] end |
Instance Attribute Details
#args ⇒ Object
Returns the value of attribute args.
513 514 515 |
# File 'lib/commands.rb', line 513 def args @args end |
#cache ⇒ Object
Returns the value of attribute cache.
513 514 515 |
# File 'lib/commands.rb', line 513 def cache @cache end |
#cache_archive ⇒ Object
Returns the value of attribute cache_archive.
513 514 515 |
# File 'lib/commands.rb', line 513 def cache_archive @cache_archive end |
#input ⇒ Object
Returns the value of attribute input.
513 514 515 |
# File 'lib/commands.rb', line 513 def input @input end |
#jobconf ⇒ Object
Returns the value of attribute jobconf.
513 514 515 |
# File 'lib/commands.rb', line 513 def jobconf @jobconf end |
#mapper ⇒ Object
Returns the value of attribute mapper.
513 514 515 |
# File 'lib/commands.rb', line 513 def mapper @mapper end |
#output ⇒ Object
Returns the value of attribute output.
513 514 515 |
# File 'lib/commands.rb', line 513 def output @output end |
#reducer ⇒ Object
Returns the value of attribute reducer.
513 514 515 |
# File 'lib/commands.rb', line 513 def reducer @reducer end |
Instance Method Details
#sort_streaming_args(streaming_args) ⇒ Object
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 |
# File 'lib/commands.rb', line 555 def sort_streaming_args(streaming_args) sorted_streaming_args = [] i=0 while streaming_args && i < streaming_args.length if GENERIC_OPTIONS.include?(streaming_args[i]) then if i+1 < streaming_args.length sorted_streaming_args.unshift(streaming_args[i+1]) sorted_streaming_args.unshift(streaming_args[i]) i=i+2 else raise RuntimeError, "Missing value for argument #{streaming_args[i]}" end else sorted_streaming_args << streaming_args[i] i=i+1 end end return sorted_streaming_args end |
#steps ⇒ Object
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 |
# File 'lib/commands.rb', line 522 def steps timestr = Time.now.strftime("%Y-%m-%dT%H%M%S") = [] for ca in get_field(:cache, []) do << "-cacheFile" << ca end for ca in get_field(:cache_archive, []) do << "-cacheArchive" << ca end for jc in get_field(:jobconf, []) do << "-jobconf" << jc end # Note that the streaming options should go before command options for # Hadoop 0.20 step = { "Name" => get_field(:step_name, "Example Streaming Step"), "ActionOnFailure" => get_field(:step_action, "CANCEL_AND_WAIT"), "HadoopJarStep" => { "Jar" => "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "Args" => (sort_streaming_args(get_field(:args))) + () + [ "-input", get_field(:input, "s3n://elasticmapreduce/samples/wordcount/input"), "-output", get_field(:output, "hdfs:///examples/output/#{timestr}"), "-mapper", get_field(:mapper, "s3n://elasticmapreduce/samples/wordcount/wordSplitter.py"), "-reducer", get_field(:reducer, "aggregate") ] } } return [ step ] end |