Class: Skynet::Task

Inherits:
Object
  • Object
show all
Includes:
SkynetDebugger
Defined in:
lib/skynet/task.rb,
lib/skynet/tuplespace_server.rb

Defined Under Namespace

Classes: ConstructorError, TimeoutError

Constant Summary collapse

@@log =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from SkynetDebugger

#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn

Constructor Details

#initialize(opts = {}) ⇒ Task

Returns a new instance of Task.



42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/skynet/task.rb', line 42

def initialize(opts = {})
  unless opts[:task_id] and opts[:process] and opts[:map_or_reduce]
    raise ConstructorError.new("Must provide task_id, process and map_or_reduce")      
  end
  @marshalable    = true
  @task_id        = opts[:task_id].to_i
  @data           = opts[:data]
  self.process    = opts[:process]
  @name           = opts[:name]
  @map_or_reduce  = opts[:map_or_reduce]
  @result_timeout = opts[:result_timeout]
  @retry          = opts[:retry]
end

Instance Attribute Details

#dataObject (readonly)

Returns the value of attribute data.



8
9
10
# File 'lib/skynet/task.rb', line 8

def data
  @data
end

#map_or_reduceObject (readonly)

Returns the value of attribute map_or_reduce.



8
9
10
# File 'lib/skynet/task.rb', line 8

def map_or_reduce
  @map_or_reduce
end

#marshalableObject (readonly)

Returns the value of attribute marshalable.



8
9
10
# File 'lib/skynet/task.rb', line 8

def marshalable
  @marshalable
end

#nameObject

Returns the value of attribute name.



9
10
11
# File 'lib/skynet/task.rb', line 9

def name
  @name
end

#processObject

Returns the value of attribute process.



8
9
10
# File 'lib/skynet/task.rb', line 8

def process
  @process
end

#resultObject (readonly)

Returns the value of attribute result.



8
9
10
# File 'lib/skynet/task.rb', line 8

def result
  @result
end

#result_timeoutObject

Returns the value of attribute result_timeout.



9
10
11
# File 'lib/skynet/task.rb', line 9

def result_timeout
  @result_timeout
end

#retryObject

Returns the value of attribute retry.



9
10
11
# File 'lib/skynet/task.rb', line 9

def retry
  @retry
end

Class Method Details

.debug_class_descObject



13
14
15
# File 'lib/skynet/task.rb', line 13

def self.debug_class_desc
  "TASK"
end

.master_task(job) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/skynet/task.rb', line 17

def self.master_task(job)
  options = {
    :async        => false,
    :local_master => true,
    :map_name     => job.map_name || job.name,
    :reduce_name  => job.reduce_name || job.name,          
  }
  Skynet::Job::FIELDS.each do |field|
    next if options.has_key?(field)
    options[field] = job.send(field) if job.send(field)
  end
  
  master_job = Skynet::Job.new(options)
  
  self.new(
    :task_id        => master_job.task_id, 
    :data           => nil, 
    :process        => master_job.to_h, 
    :map_or_reduce  => :master,
    :name           => master_job.name,
    :result_timeout => master_job.master_timeout,
    :retry          => master_job.master_retry || Skynet::CONFIG[:DEFAULT_MASTER_RETRY]
  )
end

Instance Method Details

#can_marshal?Boolean

Returns:

  • (Boolean)


63
64
65
# File 'lib/skynet/task.rb', line 63

def can_marshal?
  @marshalable
end

#run(iteration = nil) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/skynet/task.rb', line 79

def run(iteration=nil)
  info "running task #{name} TIMEOUT: #{result_timeout} task_id:#{task_id} MorR:#{map_or_reduce} PROCESS CLASS: #{@process.class}"
  begin
    Timeout::timeout(@result_timeout) do
      if @process.class == Proc
        debug " - #{@map_or_reduce} using Proc"
        @process.call @data
      elsif @map_or_reduce == :master
        debug " - as master"
        job = Skynet::Job.new(@process)
        job.run
      elsif @process.class == String
        debug " - #{@map_or_reduce} using class #{@process}"
        @process.constantize.send(@map_or_reduce,@data)
      end
    end
  rescue Timeout::Error => e
    # ==========
    # = XXX NEWSFEED HACK
    # = I'm printing the data hash, but that hash has all this shit added to it after runing through newsfeed.
    # = It's actually nice to be able to see what was added, but sometimes its too much data.
    # = Though the handy part will be adding instrumentation to the event_hash and seeing it onyl during a timeout.
    # ==========
    
    if @data.is_a?(Array) and @data.first.is_a?(Hash)          
      @data.each {|h|h.delete(:event_object)}                      
    end
    raise TimeoutError.new("TASK TIMED OUT! #{name} IT:[#{iteration}] timeout:#{@result_timeout} #{e.inspect} DATA: #{@data.inspect} #{e.backtrace.join("\n")}")

  # ==========
  # = XXX This rescue block is probably not necessary.  Just for debugging for now. =
  # ==========
  rescue Exception => e
    error "Error running task #{e.inspect} TASK:", self, e.backtrace.join("\n")
    raise e
  end
end

#task_idObject



75
76
77
# File 'lib/skynet/task.rb', line 75

def task_id
  @task_id.to_i
end

#task_or_masterObject



67
68
69
70
71
72
73
# File 'lib/skynet/task.rb', line 67

def task_or_master
  if @map_or_reduce == :master
    @map_or_reduce 
  else
    :task
  end
end