Class: Skynet::Job::LocalMessageQueue

Inherits:
Object
  • Object
show all
Includes:
SkynetDebugger
Defined in:
lib/skynet/skynet_job.rb

Overview

class Skynet::AsyncJob

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from SkynetDebugger

#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn

Constructor Details

#initializeLocalMessageQueue

Returns a new instance of LocalMessageQueue.



766
767
768
769
# File 'lib/skynet/skynet_job.rb', line 766

def initialize
  @messages    = []
  @results     = []
end

Instance Attribute Details

#messagesObject (readonly)

Returns the value of attribute messages.



764
765
766
# File 'lib/skynet/skynet_job.rb', line 764

def messages
  @messages
end

#resultsObject (readonly)

Returns the value of attribute results.



764
765
766
# File 'lib/skynet/skynet_job.rb', line 764

def results
  @results
end

Instance Method Details

#empty?Boolean

Returns:

  • (Boolean)


784
785
786
# File 'lib/skynet/skynet_job.rb', line 784

def empty?
  @messages.empty?
end

#get_worker_versionObject



771
772
773
# File 'lib/skynet/skynet_job.rb', line 771

def get_worker_version
  1
end

#in_use?Boolean

Returns:

  • (Boolean)


788
789
790
# File 'lib/skynet/skynet_job.rb', line 788

def in_use?
  (not empty?)
end

#reset!Object



792
793
794
795
# File 'lib/skynet/skynet_job.rb', line 792

def reset!
  @messages = []
  @results  = []
end

#run_message(message) ⇒ Object



797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
# File 'lib/skynet/skynet_job.rb', line 797

def run_message(message)
  result = nil
  (message.retry + 1).times do
    task = message.payload
    debug "RUN TASKS LOCALLY SUBMITTING #{message.name} task #{task.task_id}", task
    begin
      result = task.run
      break
    rescue Skynet::Task::TimeoutError => e
      result = e
      error "Skynet::Job::LocalMessageQueue Task timed out while executing #{e.inspect} #{e.backtrace.join("\n")}"
      next
    rescue Exception => e
      error "Skynet::Job::LocalMessageQueue :#{__LINE__} #{e.inspect} #{e.backtrace.join("\n")}"
      result = e
      next
    end
  end
  message.result_message(result)
end

#take_result(job_id, timeout = nil) ⇒ Object



775
776
777
778
# File 'lib/skynet/skynet_job.rb', line 775

def take_result(job_id,timeout=nil)
  raise Skynet::RequestExpiredError.new if @messages.empty?
  run_message(@messages.shift)
end

#write_message(message, timeout = nil) ⇒ Object



780
781
782
# File 'lib/skynet/skynet_job.rb', line 780

def write_message(message,timeout=nil)
  @messages << message
end