Class: ThreadedPipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/threaded_pipeline.rb,
lib/threaded_pipeline/version.rb

Overview

Create a pipeline where each stage runs in its own thread. Each stage must accept a single argument and will pass its result to the next stage. The results of the last stage are then returned (unless opted out).

Example

threaded_pipeline = ThreadedPipeline.new
threaded_pipeline.stages << -> (url) { fetch_large_csv(url) }
threaded_pipeline.stages << -> (local_file) { process_local_file(local_file) }
results = threaded_pipeline.process([list, of, large, csv, urls])

Example

another_pipeline = ThreadedPipeline.new(discard_results: true)
another_pipeline.stages << -> (url) { api_query(url) }
another_pipeline.stages << -> (returned_data) { process_returned_data(returned_data) }
another_pipeline.stages << -> (processed_results) { record_results_in_database(processed_results) }
while url = web_crawl_urls
  another_pipeline.feed(url)
end
another_pipeline.finish

Constant Summary collapse

VERSION =
'0.0.1'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(discard_results: false) ⇒ ThreadedPipeline

Returns a new instance of ThreadedPipeline.


34
35
36
37
38
# File 'lib/threaded_pipeline.rb', line 34

def initialize(discard_results: false)
  @stages = []
  @started = false
  @discard_results = discard_results
end

Instance Attribute Details

#stagesObject

Each stage will process the results of the previous one.

my_threaded_pipeline.stages << ->(arg) { process(arg) }

31
32
33
# File 'lib/threaded_pipeline.rb', line 31

def stages
  @stages
end

#startedObject (readonly)

Returns the value of attribute started.


32
33
34
# File 'lib/threaded_pipeline.rb', line 32

def started
  @started
end

Instance Method Details

#feed(element) ⇒ Object

Add another element to the list of work to be processed. Work will start on the first element immediately (only feed once you have all your stages added). You could use .process if you already have the full list. This method is not thread safe (wrap access in a mutex if feeding from multiple threads).


66
67
68
69
# File 'lib/threaded_pipeline.rb', line 66

def feed(element)
  initialize_run unless @started
  queue_hash[stages.first] << element
end

#finishObject

Wait for all the threads to finish and return the results.

Returns:

  • results of last stage (unless discard_results was set to true)


73
74
75
76
77
78
79
80
81
82
# File 'lib/threaded_pipeline.rb', line 73

def finish
  raise "You never started pipeline #{inspect}" unless @started

  queue_hash[stages.first] << finish_object
  @threads.each(&:join)
  @started = false
  @queue_hash = nil
  @finish_object = nil
  @results unless @discard_results
end

#process(enumerable) ⇒ Object

The elements of enumerable will begin processing immediately.


41
42
43
44
45
# File 'lib/threaded_pipeline.rb', line 41

def process(enumerable)
  initialize_run
  initialize_first_queue(enumerable)
  finish
end

#process_unthreaded(enumerable) ⇒ Object

Process the enumerale list without using threads. Maybe you have a bug you want to work on without threading. Or you have a benchmark you want to run.


50
51
52
53
54
55
56
57
58
59
# File 'lib/threaded_pipeline.rb', line 50

def process_unthreaded(enumerable)
  initialize_run
  @results = enumerable.map do |element|
    stages.each do |stage|
      element = stage[element]
    end
    element
  end
  finish
end