Class: Skynet::Task
- Inherits:
-
Object
- Object
- Skynet::Task
- Includes:
- SkynetDebugger
- Defined in:
- lib/skynet/task.rb,
lib/skynet/tuplespace_server.rb
Defined Under Namespace
Classes: ConstructorError, TimeoutError
Constant Summary collapse
- @@log =
nil
Instance Attribute Summary collapse
-
#data ⇒ Object
readonly
Returns the value of attribute data.
-
#map_or_reduce ⇒ Object
readonly
Returns the value of attribute map_or_reduce.
-
#marshalable ⇒ Object
readonly
Returns the value of attribute marshalable.
-
#name ⇒ Object
Returns the value of attribute name.
-
#process ⇒ Object
Returns the value of attribute process.
-
#result ⇒ Object
readonly
Returns the value of attribute result.
-
#result_timeout ⇒ Object
Returns the value of attribute result_timeout.
-
#retry ⇒ Object
Returns the value of attribute retry.
Class Method Summary collapse
Instance Method Summary collapse
- #can_marshal? ⇒ Boolean
-
#initialize(opts = {}) ⇒ Task
constructor
A new instance of Task.
- #run(iteration = nil) ⇒ Object
- #task_id ⇒ Object
- #task_or_master ⇒ Object
Methods included from SkynetDebugger
#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn
Constructor Details
#initialize(opts = {}) ⇒ Task
Returns a new instance of Task.
42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/skynet/task.rb', line 42 def initialize(opts = {}) unless opts[:task_id] and opts[:process] and opts[:map_or_reduce] raise ConstructorError.new("Must provide task_id, process and map_or_reduce") end @marshalable = true @task_id = opts[:task_id].to_i @data = opts[:data] self.process = opts[:process] @name = opts[:name] @map_or_reduce = opts[:map_or_reduce] @result_timeout = opts[:result_timeout] @retry = opts[:retry] end |
Instance Attribute Details
#data ⇒ Object (readonly)
Returns the value of attribute data.
8 9 10 |
# File 'lib/skynet/task.rb', line 8 def data @data end |
#map_or_reduce ⇒ Object (readonly)
Returns the value of attribute map_or_reduce.
8 9 10 |
# File 'lib/skynet/task.rb', line 8 def map_or_reduce @map_or_reduce end |
#marshalable ⇒ Object (readonly)
Returns the value of attribute marshalable.
8 9 10 |
# File 'lib/skynet/task.rb', line 8 def marshalable @marshalable end |
#name ⇒ Object
Returns the value of attribute name.
9 10 11 |
# File 'lib/skynet/task.rb', line 9 def name @name end |
#process ⇒ Object
Returns the value of attribute process.
8 9 10 |
# File 'lib/skynet/task.rb', line 8 def process @process end |
#result ⇒ Object (readonly)
Returns the value of attribute result.
8 9 10 |
# File 'lib/skynet/task.rb', line 8 def result @result end |
#result_timeout ⇒ Object
Returns the value of attribute result_timeout.
9 10 11 |
# File 'lib/skynet/task.rb', line 9 def result_timeout @result_timeout end |
#retry ⇒ Object
Returns the value of attribute retry.
9 10 11 |
# File 'lib/skynet/task.rb', line 9 def retry @retry end |
Class Method Details
.debug_class_desc ⇒ Object
13 14 15 |
# File 'lib/skynet/task.rb', line 13 def self.debug_class_desc "TASK" end |
.master_task(job) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/skynet/task.rb', line 17 def self.master_task(job) = { :async => false, :local_master => true, :map_name => job.map_name || job.name, :reduce_name => job.reduce_name || job.name, } Skynet::Job::FIELDS.each do |field| next if .has_key?(field) [field] = job.send(field) if job.send(field) end master_job = Skynet::Job.new() self.new( :task_id => master_job.task_id, :data => nil, :process => master_job.to_h, :map_or_reduce => :master, :name => master_job.name, :result_timeout => master_job.master_timeout, :retry => master_job.master_retry || Skynet::CONFIG[:DEFAULT_MASTER_RETRY] ) end |
Instance Method Details
#can_marshal? ⇒ Boolean
63 64 65 |
# File 'lib/skynet/task.rb', line 63 def can_marshal? @marshalable end |
#run(iteration = nil) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/skynet/task.rb', line 79 def run(iteration=nil) info "running task #{name} TIMEOUT: #{result_timeout} task_id:#{task_id} MorR:#{map_or_reduce} PROCESS CLASS: #{@process.class}" begin Timeout::timeout(@result_timeout) do if @process.class == Proc debug " - #{@map_or_reduce} using Proc" @process.call @data elsif @map_or_reduce == :master debug " - as master" job = Skynet::Job.new(@process) job.run elsif @process.class == String debug " - #{@map_or_reduce} using class #{@process}" @process.constantize.send(@map_or_reduce,@data) end end rescue Timeout::Error => e # ========== # = XXX NEWSFEED HACK # = I'm printing the data hash, but that hash has all this shit added to it after runing through newsfeed. # = It's actually nice to be able to see what was added, but sometimes its too much data. # = Though the handy part will be adding instrumentation to the event_hash and seeing it onyl during a timeout. # ========== if @data.is_a?(Array) and @data.first.is_a?(Hash) @data.each {|h|h.delete(:event_object)} end raise TimeoutError.new("TASK TIMED OUT! #{name} IT:[#{iteration}] timeout:#{@result_timeout} #{e.inspect} DATA: #{@data.inspect} #{e.backtrace.join("\n")}") # ========== # = XXX This rescue block is probably not necessary. Just for debugging for now. = # ========== rescue Exception => e error "Error running task #{e.inspect} TASK:", self, e.backtrace.join("\n") raise e end end |
#task_id ⇒ Object
75 76 77 |
# File 'lib/skynet/task.rb', line 75 def task_id @task_id.to_i end |
#task_or_master ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/skynet/task.rb', line 67 def task_or_master if @map_or_reduce == :master @map_or_reduce else :task end end |