Class: ThreadedPipeline
- Inherits:
-
Object
- Object
- ThreadedPipeline
- Defined in:
- lib/threaded_pipeline.rb,
lib/threaded_pipeline/version.rb
Overview
Create a pipeline where each stage runs in its own thread. Each stage must accept a single argument and will pass its result to the next stage. The results of the last stage are then returned (unless opted out).
Example
threaded_pipeline = ThreadedPipeline.new
threaded_pipeline.stages << -> (url) { fetch_large_csv(url) }
threaded_pipeline.stages << -> (local_file) { process_local_file(local_file) }
results = threaded_pipeline.process([list, of, large, csv, urls])
Example
another_pipeline = ThreadedPipeline.new(discard_results: true)
another_pipeline.stages << -> (url) { api_query(url) }
another_pipeline.stages << -> (returned_data) { process_returned_data(returned_data) }
another_pipeline.stages << -> (processed_results) { record_results_in_database(processed_results) }
while url = web_crawl_urls
another_pipeline.feed(url)
end
another_pipeline.finish
Constant Summary collapse
- VERSION =
'0.0.1'
Instance Attribute Summary collapse
-
#stages ⇒ Object
Each stage will process the results of the previous one.
-
#started ⇒ Object
readonly
Returns the value of attribute started.
Instance Method Summary collapse
-
#feed(element) ⇒ Object
Add another element to the list of work to be processed.
-
#finish ⇒ Object
Wait for all the threads to finish and return the results.
-
#initialize(discard_results: false) ⇒ ThreadedPipeline
constructor
A new instance of ThreadedPipeline.
-
#process(enumerable) ⇒ Object
The elements of enumerable will begin processing immediately.
-
#process_unthreaded(enumerable) ⇒ Object
Process the enumerale list without using threads.
Constructor Details
#initialize(discard_results: false) ⇒ ThreadedPipeline
Returns a new instance of ThreadedPipeline.
34 35 36 37 38 |
# File 'lib/threaded_pipeline.rb', line 34 def initialize(discard_results: false) @stages = [] @started = false @discard_results = discard_results end |
Instance Attribute Details
#stages ⇒ Object
Each stage will process the results of the previous one.
my_threaded_pipeline.stages << ->(arg) { process(arg) }
31 32 33 |
# File 'lib/threaded_pipeline.rb', line 31 def stages @stages end |
#started ⇒ Object (readonly)
Returns the value of attribute started.
32 33 34 |
# File 'lib/threaded_pipeline.rb', line 32 def started @started end |
Instance Method Details
#feed(element) ⇒ Object
Add another element to the list of work to be processed. Work will start on the first element immediately (only feed once you have all your stages added). You could use .process if you already have the full list. This method is not thread safe (wrap access in a mutex if feeding from multiple threads).
66 67 68 69 |
# File 'lib/threaded_pipeline.rb', line 66 def feed(element) initialize_run unless @started queue_hash[stages.first] << element end |
#finish ⇒ Object
Wait for all the threads to finish and return the results.
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/threaded_pipeline.rb', line 73 def finish raise "You never started pipeline #{inspect}" unless @started queue_hash[stages.first] << finish_object @threads.each(&:join) @started = false @queue_hash = nil @finish_object = nil @results unless @discard_results end |
#process(enumerable) ⇒ Object
The elements of enumerable will begin processing immediately.
41 42 43 44 45 |
# File 'lib/threaded_pipeline.rb', line 41 def process(enumerable) initialize_run initialize_first_queue(enumerable) finish end |
#process_unthreaded(enumerable) ⇒ Object
Process the enumerale list without using threads. Maybe you have a bug you want to work on without threading. Or you have a benchmark you want to run.
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/threaded_pipeline.rb', line 50 def process_unthreaded(enumerable) initialize_run @results = enumerable.map do |element| stages.each do |stage| element = stage[element] end element end finish end |