Class: JRubyParallelProcessing::DataProcessor

Inherits:
Object
  • Object
show all
Includes:
ChunkProcessor
Defined in:
lib/jruby_parallel_processing/data_processor.rb

Instance Method Summary collapse

Methods included from ChunkProcessor

#process_chunk

Constructor Details

#initialize(data_array: nil, stream: nil, in_threads: 4, chunk_size: nil, logger: nil, queue_size: 100, timeout: 10) ⇒ DataProcessor

Returns a new instance of DataProcessor.



7
8
9
10
11
12
13
14
15
16
# File 'lib/jruby_parallel_processing/data_processor.rb', line 7

def initialize(data_array: nil, stream: nil, in_threads: 4, chunk_size: nil, logger: nil, queue_size: 100, timeout: 10)
  @data_array = data_array
  @stream = stream
  @in_threads = in_threads
  @chunk_size = chunk_size || (@data_array.size.to_f / in_threads).ceil if @data_array
  @logger = logger || Logger.new(STDOUT)
  @queue_size = queue_size
  @timeout = timeout
  @middlewares = { before_process: [], after_process: [] }
end

Instance Method Details

#add_middleware(position, &middleware_block) ⇒ Object

Raises:

  • (ArgumentError)


39
40
41
42
43
# File 'lib/jruby_parallel_processing/data_processor.rb', line 39

def add_middleware(position, &middleware_block)
  raise ArgumentError, "Invalid middleware position" unless @middlewares.key?(position)

  @middlewares[position] << middleware_block
end

#process(&block) ⇒ Object

Raises:

  • (ArgumentError)


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/jruby_parallel_processing/data_processor.rb', line 18

def process(&block)
  raise ArgumentError, "Data array or stream must be provided" unless @data_array || @stream

  execute_middleware(:before_process)

  @logger.info("Processing started with #{@in_threads} threads")
  executor = Executors.new_fixed_thread_pool(@in_threads)
  futures = []

  if @data_array
    chunks = @data_array.each_slice(@chunk_size).to_a
    futures = submit_chunks(executor, chunks, &block)
  elsif @stream
    futures = submit_stream(executor, @stream, &block)
  end

  await_completion(futures, executor)

  execute_middleware(:after_process)
end