Class: JRubyParallelProcessing::DistributedWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/jruby_parallel_processing/distributed_worker.rb

Defined Under Namespace

Classes: PrioritizedTask

Constant Summary collapse

HEARTBEAT_INTERVAL =
10

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host = "localhost", port = 8787) ⇒ DistributedWorker

Returns a new instance of DistributedWorker.



7
8
9
10
11
12
13
14
# File 'lib/jruby_parallel_processing/distributed_worker.rb', line 7

def initialize(host = "localhost", port = 8787)
  @server = DRb.start_service("druby://#{host}:#{port}", self)
  @last_heartbeat = Time.now
  @task_queue = java.util.concurrent.PriorityBlockingQueue.new
  @executor = java.util.concurrent.Executors.newFixedThreadPool(4)  
  @heartbeat_thread = start_heartbeat
  @task_processing_thread = start_task_processing
end

Class Method Details

.connect_to_worker(worker_url) ⇒ Object



25
26
27
# File 'lib/jruby_parallel_processing/distributed_worker.rb', line 25

def self.connect_to_worker(worker_url)
  DRbObject.new_with_uri(worker_url)
end

Instance Method Details

#execute_task(task, priority: 0) ⇒ Object



16
17
18
19
20
21
22
23
# File 'lib/jruby_parallel_processing/distributed_worker.rb', line 16

def execute_task(task, priority: 0)
  begin
    @task_queue.put(PrioritizedTask.new(task, priority))
    { status: :queued }
  rescue => e
    { status: :failed, error: e.message }
  end
end

#last_heartbeatObject



33
34
35
# File 'lib/jruby_parallel_processing/distributed_worker.rb', line 33

def last_heartbeat
  @last_heartbeat
end

#send_heartbeatObject



29
30
31
# File 'lib/jruby_parallel_processing/distributed_worker.rb', line 29

def send_heartbeat
  @last_heartbeat = Time.now
end

#shutdownObject



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/jruby_parallel_processing/distributed_worker.rb', line 37

def shutdown
 @executor.shutdown
 unless @executor.await_termination(10, java.util.concurrent.TimeUnit::SECONDS)
   @logger.warn("Executor did not terminate in the expected time. Initiating forced shutdown.")
   @executor.shutdown_now
   @executor.await_termination(10, java.util.concurrent.TimeUnit::SECONDS)
 end

 @heartbeat_thread.interrupt if @heartbeat_thread
 @task_processing_thread.interrupt if @task_processing_thread
 @server.stop_service
end