Class: Welder::Pipeline

Inherits:
Object
  • Object
show all
Includes:
Support::CallableHandler
Defined in:
lib/welder/pipeline.rb

Overview

A wrapper around a sequence of callables (i.e. anything that responds to a single-argument ‘call’ method, which includes procs, blocks and other pipelines)

Pipelines are immutable. There are mechanisms to create more complex pipelines out of existing ones, but they will always create a new pipeline

Instance Method Summary collapse

Methods included from Support::CallableHandler

#callable!

Constructor Details

#initialize(*lambdas, valves: nil, &block) ⇒ Pipeline

Creates a new pipeline out of a sequence of callables. If a block is given, it is executed as the last step of the pipeline. It also accepts valves, with act as witnesses of the steps the pipeline goes through

Examples:

Create an empty pipeline (with no steps)

square_and_double = Welder::Pipeline.new

Create a pipeline from an anonymous function

square = Welder::Pipeline.new(->(x){ x ** 2 })

Create a pipeline from a block

square_and_double = Welder::Pipeline.new { |x| x ** 2 }

Parameters:

  • lambdas (Array<#call>)

    Array of callables accepting 1 argument

  • valves (Array<#call>) (defaults to: nil)

    Array of callables accepting 3 arguments

  • block (Block)

    A block to execute as the last step in the pipeline

Raises:

  • (CallableExpectedError)

    When trying to create a pipeline out of a non-callable



33
34
35
36
37
38
39
40
# File 'lib/welder/pipeline.rb', line 33

def initialize(*lambdas, valves: nil, &block)
  callable!(*lambdas, *valves)

  @pipes = [*lambdas]
  @pipes << block if block

  @valves = [*valves]
end

Instance Method Details

#-(other) ⇒ Object

Create a new pipeline that keeps all the steps in the current one, but adds a valve overseeing the process. The valve will get called at every stage, as a side effect, but it will never modify the final outcome of the pipeline

Parameters:

  • other (#call)

    The callable to invoke at every step of the pipeline. It must accept 3 arguments: (input, lambda, output)



88
89
90
# File 'lib/welder/pipeline.rb', line 88

def -(other)
  self.class.new(self, valves: [*other])
end

#call(input, valves = []) ⇒ *

Apply the sequence of functions to a particular input. Any valves present in the pipeline are called as a side effect

Parameters:

  • input (*)

    The input for the first element of the pipeline

  • valves (Array<#call>) (defaults to: [])

    An array of valves to be called at every step of the pipeline

Returns:

  • (*)

    The output resulting of passing the input through the whole pipeline



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/welder/pipeline.rb', line 51

def call(input, valves = [])
  valves = @valves.concat(valves)

  @pipes.reduce(input) do |a, e|
    if e.is_a?(Pipeline)
      e.call(a, valves)
    else
      e.call(a).tap do |output|
        valves.each { |valve| valve.call(a, e, output) }
      end
    end
  end
end

#|(other) ⇒ Welder::Pipeline

Compose a pipeline with another one (or a callable). This method does not modify existing pipelines. Instead, it creates a new pipeline composed of the previous two

Parameters:

  • other (#call)

    The callable to add as the last step of the pipeline, which has to accept 1 argument

Returns:

  • (Welder::Pipeline)

    a new pipeline composed of the current one and ‘other’, in that order

Raises:

  • (CallableExpectedError)

    When trying to create a pipeline out of non-callables



77
78
79
# File 'lib/welder/pipeline.rb', line 77

def |(other)
  self.class.new(self, other)
end