Module: Datacraft::Runner
- Included in:
- Datacraft
- Defined in:
- lib/datacraft/runner.rb
Overview
The runner for the whole process
Instance Method Summary collapse
-
#build(consumers) ⇒ Object
build and close consumers.
-
#pprocess_rows ⇒ Object
process rows in parallel.
-
#process(row) ⇒ Object
tweak & consume one row.
-
#process_rows ⇒ Object
process rows sequentially.
-
#report(measurements) ⇒ Object
output benchmark results.
-
#run(instruction) ⇒ Object
run the instruction.
Instance Method Details
#build(consumers) ⇒ Object
build and close consumers
86 87 88 89 90 91 |
# File 'lib/datacraft/runner.rb', line 86 def build(consumers) consumers.each do |consumer| consumer.build if consumer.respond_to? :build consumer.close if consumer.respond_to? :close end end |
#pprocess_rows ⇒ Object
process rows in parallel
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/datacraft/runner.rb', line 67 def pprocess_rows thread_number = [@inst.sources.size, @inst.[:n_threads]].min queue = Queue.new @inst.sources.each { |p| queue << p } threads = thread_number.times.map do Thread.new do begin while p = queue.pop(true) p.each { |row| process row } end rescue ThreadError end end end threads.each(&:join) end |
#process(row) ⇒ Object
tweak & consume one row
56 57 58 59 60 61 62 63 64 |
# File 'lib/datacraft/runner.rb', line 56 def process(row) @inst.tweakers.each do |tweaker| row = tweaker.tweak row return nil unless row end @inst.consumers.each do |consumer| consumer << row end end |
#process_rows ⇒ Object
process rows sequentially
47 48 49 50 51 52 53 |
# File 'lib/datacraft/runner.rb', line 47 def process_rows @inst.sources.each do |source| source.each do |row| process row end end end |
#report(measurements) ⇒ Object
output benchmark results
38 39 40 41 42 43 44 |
# File 'lib/datacraft/runner.rb', line 38 def report(measurements) width = measurements.max_by { |m| m.label.size }.label.size + 1 puts "#{' ' * width}#{Benchmark::CAPTION}" measurements.each do |m| puts "#{m.label.to_s.ljust width} #{m}" end end |
#run(instruction) ⇒ Object
run the instruction
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/datacraft/runner.rb', line 7 def run(instruction) @inst = instruction measurements = [] # run pre_build hooks if @inst.respond_to? :pre_hooks measurements << Benchmark.measure('pre build:') do @inst.pre_hooks.each(&:call) end end # process the rows measurements << Benchmark.measure('process rows:') do @inst.[:parallel] ? pprocess_rows : process_rows end # build measurements << Benchmark.measure('build:') do build @inst.consumers end # run post_build hooks if @inst.respond_to? :post_hooks measurements << Benchmark.measure('post build:') do @inst.post_hooks.each(&:call) end end report measurements if @inst.[:benchmark] end |