Class: Skynet::Job::LocalMessageQueue
- Inherits:
-
Object
- Object
- Skynet::Job::LocalMessageQueue
show all
- Includes:
- SkynetDebugger
- Defined in:
- lib/skynet/skynet_job.rb
Overview
Instance Attribute Summary collapse
Instance Method Summary
collapse
#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn
Constructor Details
Returns a new instance of LocalMessageQueue.
766
767
768
769
|
# File 'lib/skynet/skynet_job.rb', line 766
def initialize
@messages = []
@results = []
end
|
Instance Attribute Details
#messages ⇒ Object
Returns the value of attribute messages.
764
765
766
|
# File 'lib/skynet/skynet_job.rb', line 764
def messages
@messages
end
|
#results ⇒ Object
Returns the value of attribute results.
764
765
766
|
# File 'lib/skynet/skynet_job.rb', line 764
def results
@results
end
|
Instance Method Details
#empty? ⇒ Boolean
784
785
786
|
# File 'lib/skynet/skynet_job.rb', line 784
def empty?
@messages.empty?
end
|
#get_worker_version ⇒ Object
771
772
773
|
# File 'lib/skynet/skynet_job.rb', line 771
def get_worker_version
1
end
|
#in_use? ⇒ Boolean
788
789
790
|
# File 'lib/skynet/skynet_job.rb', line 788
def in_use?
(not empty?)
end
|
#reset! ⇒ Object
792
793
794
795
|
# File 'lib/skynet/skynet_job.rb', line 792
def reset!
@messages = []
@results = []
end
|
#run_message(message) ⇒ Object
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
|
# File 'lib/skynet/skynet_job.rb', line 797
def run_message(message)
result = nil
(message.retry + 1).times do
task = message.payload
debug "RUN TASKS LOCALLY SUBMITTING #{message.name} task #{task.task_id}", task
begin
result = task.run
break
rescue Skynet::Task::TimeoutError => e
result = e
error "Skynet::Job::LocalMessageQueue Task timed out while executing #{e.inspect} #{e.backtrace.join("\n")}"
next
rescue Exception => e
error "Skynet::Job::LocalMessageQueue :#{__LINE__} #{e.inspect} #{e.backtrace.join("\n")}"
result = e
next
end
end
message.result_message(result)
end
|
#take_result(job_id, timeout = nil) ⇒ Object
775
776
777
778
|
# File 'lib/skynet/skynet_job.rb', line 775
def take_result(job_id,timeout=nil)
raise Skynet::RequestExpiredError.new if @messages.empty?
run_message(@messages.shift)
end
|
#write_message(message, timeout = nil) ⇒ Object
780
781
782
|
# File 'lib/skynet/skynet_job.rb', line 780
def write_message(message,timeout=nil)
@messages << message
end
|