Class: JRubyParallelProcessing::TaskQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/jruby_parallel_processing/task_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(max_retries: 3, retry_delay: 0.1, max_queue_size: 10, logger: nil) ⇒ TaskQueue

Returns a new instance of TaskQueue.



5
6
7
8
9
10
11
12
# File 'lib/jruby_parallel_processing/task_queue.rb', line 5

def initialize(max_retries: 3, retry_delay: 0.1, max_queue_size: 10, logger: nil)
  @max_retries = max_retries
  @retry_delay = retry_delay
  @queue = LinkedBlockingQueue.new(max_queue_size)
  @executor = Executors.new_fixed_thread_pool(10)
  @logger = logger || Logger.new(STDOUT)
  @logger.level = Logger::INFO
end

Instance Method Details

#add_task(priority, &task) ⇒ Object



14
15
16
17
# File 'lib/jruby_parallel_processing/task_queue.rb', line 14

def add_task(priority, &task)
  raise 'Queue is full' if @queue.remaining_capacity == 0
  @queue.put([priority, task])
end

#process_tasksObject



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/jruby_parallel_processing/task_queue.rb', line 19

def process_tasks
  tasks = []
  while (task = @queue.poll)
    tasks << task
  end

  sorted_tasks = tasks.sort_by(&:first)
  sorted_tasks.each do |(_, task)|
    retry_count = 0
    begin
      future = @executor.submit(ApplicationCallable.new(&task))
      future.get  
    rescue => e
      retry_count += 1
      if retry_count <= @max_retries
        @logger.info("Task failed with error: #{e.message}. Retrying (attempt #{retry_count})")
        sleep @retry_delay
        retry
      else
        @logger.error("Task failed with error: #{e.message}. Max retries reached.")
        future.cancel(true) if future.respond_to?(:cancel)
      end
    end
  end
end

#shutdownObject



45
46
47
48
49
50
51
52
53
54
# File 'lib/jruby_parallel_processing/task_queue.rb', line 45

def shutdown
  @logger.info("Shutting down executor.")
  @executor.shutdown
  unless @executor.await_termination(10, java.util.concurrent.TimeUnit::SECONDS)
    @logger.warn("Executor did not terminate in the expected time. Initiating forced shutdown.")
    @executor.shutdown_now
    @executor.await_termination(10, java.util.concurrent.TimeUnit::SECONDS)
  end
  @logger.info("Executor shutdown completed.")
end