Class: JRubyParallelProcessing::DataProcessor
- Inherits:
-
Object
- Object
- JRubyParallelProcessing::DataProcessor
- Includes:
- ChunkProcessor
- Defined in:
- lib/jruby_parallel_processing/data_processor.rb
Instance Method Summary collapse
- #add_middleware(position, &middleware_block) ⇒ Object
-
#initialize(data_array: nil, stream: nil, in_threads: 4, chunk_size: nil, logger: nil, queue_size: 100, timeout: 10) ⇒ DataProcessor
constructor
A new instance of DataProcessor.
- #process(&block) ⇒ Object
Methods included from ChunkProcessor
Constructor Details
#initialize(data_array: nil, stream: nil, in_threads: 4, chunk_size: nil, logger: nil, queue_size: 100, timeout: 10) ⇒ DataProcessor
Returns a new instance of DataProcessor.
7 8 9 10 11 12 13 14 15 16 |
# File 'lib/jruby_parallel_processing/data_processor.rb', line 7 def initialize(data_array: nil, stream: nil, in_threads: 4, chunk_size: nil, logger: nil, queue_size: 100, timeout: 10) @data_array = data_array @stream = stream @in_threads = in_threads @chunk_size = chunk_size || (@data_array.size.to_f / in_threads).ceil if @data_array @logger = logger || Logger.new(STDOUT) @queue_size = queue_size @timeout = timeout @middlewares = { before_process: [], after_process: [] } end |
Instance Method Details
#add_middleware(position, &middleware_block) ⇒ Object
39 40 41 42 43 |
# File 'lib/jruby_parallel_processing/data_processor.rb', line 39 def add_middleware(position, &middleware_block) raise ArgumentError, "Invalid middleware position" unless @middlewares.key?(position) @middlewares[position] << middleware_block end |
#process(&block) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/jruby_parallel_processing/data_processor.rb', line 18 def process(&block) raise ArgumentError, "Data array or stream must be provided" unless @data_array || @stream execute_middleware(:before_process) @logger.info("Processing started with #{@in_threads} threads") executor = Executors.new_fixed_thread_pool(@in_threads) futures = [] if @data_array chunks = @data_array.each_slice(@chunk_size).to_a futures = submit_chunks(executor, chunks, &block) elsif @stream futures = submit_stream(executor, @stream, &block) end await_completion(futures, executor) execute_middleware(:after_process) end |