Class: Chicago::Flow::PipelineStage
- Inherits:
-
Object
- Object
- Chicago::Flow::PipelineStage
- Defined in:
- lib/chicago/flow/pipeline_stage.rb
Instance Attribute Summary collapse
-
#transformation_chain ⇒ Object
readonly
Returns the value of attribute transformation_chain.
Instance Method Summary collapse
- #execute ⇒ Object
-
#initialize(source, options = {}) ⇒ PipelineStage
constructor
A new instance of PipelineStage.
- #register_sink(name, sink) ⇒ Object
- #required_sinks ⇒ Object
- #unregistered_sinks ⇒ Object
- #validate_pipeline ⇒ Object
Constructor Details
#initialize(source, options = {}) ⇒ PipelineStage
Returns a new instance of PipelineStage.
15 16 17 18 19 20 21 |
# File 'lib/chicago/flow/pipeline_stage.rb', line 15 def initialize(source, ={}) @source = source @sinks = [:sinks] || {} @transformations = [:transformations] || [] @error_handler = [:error_handler] || RaisingErrorHandler.new @transformation_chain = TransformationChain.new(*@transformations) end |
Instance Attribute Details
#transformation_chain ⇒ Object (readonly)
Returns the value of attribute transformation_chain.
13 14 15 |
# File 'lib/chicago/flow/pipeline_stage.rb', line 13 def transformation_chain @transformation_chain end |
Instance Method Details
#execute ⇒ Object
34 35 36 37 38 39 40 41 42 |
# File 'lib/chicago/flow/pipeline_stage.rb', line 34 def execute validate_pipeline @sinks.values.each(&:open) @source.each do |row| transformation_chain.process(row).each {|row| process_row(row) } end transformation_chain.flush.each {|row| process_row(row) } @sinks.values.each(&:close) end |
#register_sink(name, sink) ⇒ Object
23 24 25 26 |
# File 'lib/chicago/flow/pipeline_stage.rb', line 23 def register_sink(name, sink) @sinks[name.to_sym] = sink self end |
#required_sinks ⇒ Object
44 45 46 |
# File 'lib/chicago/flow/pipeline_stage.rb', line 44 def required_sinks transformation_chain.output_streams | [:default] end |
#unregistered_sinks ⇒ Object
48 49 50 |
# File 'lib/chicago/flow/pipeline_stage.rb', line 48 def unregistered_sinks required_sinks - @sinks.keys end |
#validate_pipeline ⇒ Object
28 29 30 31 32 |
# File 'lib/chicago/flow/pipeline_stage.rb', line 28 def validate_pipeline unless unregistered_sinks.empty? @error_handler.unregistered_sinks(unregistered_sinks) end end |