Class: JRubyParallelProcessing::TaskQueue
- Inherits:
-
Object
- Object
- JRubyParallelProcessing::TaskQueue
- Defined in:
- lib/jruby_parallel_processing/task_queue.rb
Instance Method Summary collapse
- #add_task(priority, &task) ⇒ Object
-
#initialize(max_retries: 3, retry_delay: 0.1, max_queue_size: 10, logger: nil) ⇒ TaskQueue
constructor
A new instance of TaskQueue.
- #process_tasks ⇒ Object
- #shutdown ⇒ Object
Constructor Details
#initialize(max_retries: 3, retry_delay: 0.1, max_queue_size: 10, logger: nil) ⇒ TaskQueue
Returns a new instance of TaskQueue.
5 6 7 8 9 10 11 12 |
# File 'lib/jruby_parallel_processing/task_queue.rb', line 5 def initialize(max_retries: 3, retry_delay: 0.1, max_queue_size: 10, logger: nil) @max_retries = max_retries @retry_delay = retry_delay @queue = LinkedBlockingQueue.new(max_queue_size) @executor = Executors.new_fixed_thread_pool(10) @logger = logger || Logger.new(STDOUT) @logger.level = Logger::INFO end |
Instance Method Details
#add_task(priority, &task) ⇒ Object
14 15 16 17 |
# File 'lib/jruby_parallel_processing/task_queue.rb', line 14 def add_task(priority, &task) raise 'Queue is full' if @queue.remaining_capacity == 0 @queue.put([priority, task]) end |
#process_tasks ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/jruby_parallel_processing/task_queue.rb', line 19 def process_tasks tasks = [] while (task = @queue.poll) tasks << task end sorted_tasks = tasks.sort_by(&:first) sorted_tasks.each do |(_, task)| retry_count = 0 begin future = @executor.submit(ApplicationCallable.new(&task)) future.get rescue => e retry_count += 1 if retry_count <= @max_retries @logger.info("Task failed with error: #{e.}. Retrying (attempt #{retry_count})") sleep @retry_delay retry else @logger.error("Task failed with error: #{e.}. Max retries reached.") future.cancel(true) if future.respond_to?(:cancel) end end end end |
#shutdown ⇒ Object
45 46 47 48 49 50 51 52 53 54 |
# File 'lib/jruby_parallel_processing/task_queue.rb', line 45 def shutdown @logger.info("Shutting down executor.") @executor.shutdown unless @executor.await_termination(10, java.util.concurrent.TimeUnit::SECONDS) @logger.warn("Executor did not terminate in the expected time. Initiating forced shutdown.") @executor.shutdown_now @executor.await_termination(10, java.util.concurrent.TimeUnit::SECONDS) end @logger.info("Executor shutdown completed.") end |