Class: JRubyParallelProcessing::DistributedWorker
- Inherits:
-
Object
- Object
- JRubyParallelProcessing::DistributedWorker
- Defined in:
- lib/jruby_parallel_processing/distributed_worker.rb
Defined Under Namespace
Classes: PrioritizedTask
Constant Summary collapse
- HEARTBEAT_INTERVAL =
10
Class Method Summary collapse
Instance Method Summary collapse
- #execute_task(task, priority: 0) ⇒ Object
-
#initialize(host = "localhost", port = 8787) ⇒ DistributedWorker
constructor
A new instance of DistributedWorker.
- #last_heartbeat ⇒ Object
- #send_heartbeat ⇒ Object
- #shutdown ⇒ Object
Constructor Details
#initialize(host = "localhost", port = 8787) ⇒ DistributedWorker
Returns a new instance of DistributedWorker.
7 8 9 10 11 12 13 14 |
# File 'lib/jruby_parallel_processing/distributed_worker.rb', line 7 def initialize(host = "localhost", port = 8787) @server = DRb.start_service("druby://#{host}:#{port}", self) @last_heartbeat = Time.now @task_queue = java.util.concurrent.PriorityBlockingQueue.new @executor = java.util.concurrent.Executors.newFixedThreadPool(4) @heartbeat_thread = start_heartbeat @task_processing_thread = start_task_processing end |
Class Method Details
.connect_to_worker(worker_url) ⇒ Object
25 26 27 |
# File 'lib/jruby_parallel_processing/distributed_worker.rb', line 25 def self.connect_to_worker(worker_url) DRbObject.new_with_uri(worker_url) end |
Instance Method Details
#execute_task(task, priority: 0) ⇒ Object
16 17 18 19 20 21 22 23 |
# File 'lib/jruby_parallel_processing/distributed_worker.rb', line 16 def execute_task(task, priority: 0) begin @task_queue.put(PrioritizedTask.new(task, priority)) { status: :queued } rescue => e { status: :failed, error: e. } end end |
#last_heartbeat ⇒ Object
33 34 35 |
# File 'lib/jruby_parallel_processing/distributed_worker.rb', line 33 def last_heartbeat @last_heartbeat end |
#send_heartbeat ⇒ Object
29 30 31 |
# File 'lib/jruby_parallel_processing/distributed_worker.rb', line 29 def send_heartbeat @last_heartbeat = Time.now end |
#shutdown ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/jruby_parallel_processing/distributed_worker.rb', line 37 def shutdown @executor.shutdown unless @executor.await_termination(10, java.util.concurrent.TimeUnit::SECONDS) @logger.warn("Executor did not terminate in the expected time. Initiating forced shutdown.") @executor.shutdown_now @executor.await_termination(10, java.util.concurrent.TimeUnit::SECONDS) end @heartbeat_thread.interrupt if @heartbeat_thread @task_processing_thread.interrupt if @task_processing_thread @server.stop_service end |