Class: Skynet::Message

Inherits:
Object
  • Object
show all
Includes:
SkynetDebugger
Defined in:
lib/skynet/message.rb,
lib/skynet/tuplespace_server.rb

Direct Known Subclasses

WorkerStatusMessage, WorkerVersionMessage

Defined Under Namespace

Classes: BadMessage, Payload

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from SkynetDebugger

#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn

Constructor Details

#initialize(opts) ⇒ Message

Returns a new instance of Message.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/skynet/message.rb', line 54

def initialize(opts)
  if opts.is_a?(Array)
    self.class.fields.each_with_index do |field, ii|
      self.send("#{field}=",opts[ii] || nil)
    end
  elsif opts
    self.class.fields.each do |field|
      value = opts[field] || opts[field.to_s] || nil
      self.send("#{field}=",value) if value
    end
    opts_raw_payload = opts[:raw_payload] || opts["raw_payload"]
    if opts_raw_payload
      self.raw_payload = opts_raw_payload
    end
    self.retry ||= 0
  end
  self.payload      
end

Class Attribute Details

.fieldsObject

Returns the value of attribute fields.



11
12
13
# File 'lib/skynet/message.rb', line 11

def fields
  @fields
end

Instance Attribute Details

#payload_typeObject

Returns the value of attribute payload_type.



35
36
37
# File 'lib/skynet/message.rb', line 35

def payload_type
  @payload_type
end

#tasktypeObject

Returns the value of attribute tasktype.



35
36
37
# File 'lib/skynet/message.rb', line 35

def tasktype
  @tasktype
end

Class Method Details

.error_message(message, error) ⇒ Object



205
206
207
# File 'lib/skynet/message.rb', line 205

def self.error_message(message,error)
  result_message(message,error,:result,:error)
end

.error_template(message) ⇒ Object



213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/skynet/message.rb', line 213

def self.error_template(message)
  template = {
    :tasktype  => message.tasktype,
    :drburi    => message.drburi, 
    :version   => message.version, 
    :task_id   => message.task_id, 
    :queue_id  => message.queue_id
  }
  fields.collect do |field|
    template[field]
  end
end

.fallback_task_message(message) ⇒ Object



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/skynet/message.rb', line 230

def self.fallback_task_message(message)
   template = {}
   if message.retry
     if (message.retry and message.iteration >= message.retry)
       template[:iteration] = -1
     else
       template[:iteration] = message.iteration + 1
     end
   # Originally I was gonna do this for map and reduce, but we don't know that here, just whether its a master.
   elsif message.payload_type.to_sym == :master and Skynet::CONFIG[:DEFAULT_MASTER_RETRY] and message.iteration >= Skynet::CONFIG[:DEFAULT_MASTER_RETRY]
     template[:iteration] = -1           
   elsif Skynet::CONFIG[:MAX_RETRIES] and message.iteration >= Skynet::CONFIG[:MAX_RETRIES]
     template[:iteration] = -1           
   else
     template[:iteration] = message.iteration + 1
   end

   template[:expire_time] = Time.now.to_i + message.expiry

   fields.each do |field|
     template[field] = message.send(field) unless template.has_key?(field)
   end
   # debug "BUILDING NEXT FALLBACK TASK MESSAGE OFF"#, template
   Skynet::Message.new(template)
end

.fallback_template(message) ⇒ Object



260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/skynet/message.rb', line 260

def self.fallback_template(message)
  template = {
    :tasktype  => message.tasktype,
    :drburi    => message.drburi, 
    :version   => message.version, 
    :task_id   => message.task_id, 
    :queue_id  => message.queue_id,        
    :iteration => (1..Skynet::CONFIG[:MAX_RETRIES]),        
  }
  fields.collect do |field|
    template[field]
  end
end

.new_task_message(task, job) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/skynet/message.rb', line 37

def self.new_task_message(task,job)
  self.new(
    :job_id       => job.job_id,
    :expire_time  => job.start_after,
    :version      => job.version,
    :queue_id     => job.queue_id || 0,
    :iteration    => 0,
    :tasktype     => :task, 
    :task_id      => task.task_id,
    :payload      => task,
    :payload_type => task.task_or_master,
    :expiry       => task.result_timeout, 
    :name         => task.name,       
    :retry        => task.retry
  )
end

.next_task_template(version = nil, payload_type = nil, queue_id = 0) ⇒ Object



142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/skynet/message.rb', line 142

def self.next_task_template(version=nil, payload_type=nil, queue_id=0)
  template = {
    :expire_time  => (0 .. Time.now.to_i),
    :tasktype     => :task,
    :queue_id     => queue_id,
    :version      => version,
    :payload_type => payload_type,
    :iteration    => (0..Skynet::CONFIG[:MAX_RETRIES]),
  }

  fields.collect do |field|
    template[field]
  end
end

.outstanding_results_template(queue_id = 0) ⇒ Object



195
196
197
198
199
200
201
202
203
# File 'lib/skynet/message.rb', line 195

def self.outstanding_results_template(queue_id=0)
  template = {
    :tasktype => :result,
    :queue_id => queue_id
  }
  fields.collect do |field|
    template[field]
  end
end

.outstanding_tasks_template(iteration = nil, queue_id = 0) ⇒ Object



184
185
186
187
188
189
190
191
192
193
# File 'lib/skynet/message.rb', line 184

def self.outstanding_tasks_template(iteration=nil,queue_id=0)
  template = {
    :tasktype  => :task,
    :queue_id  => queue_id,
    :iteration => iteration
  }
  fields.collect do |field|
    template[field]
  end
end

.result_message(message, result, tasktype = :result, resulttype = :result) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/skynet/message.rb', line 167

def self.result_message(message,result,tasktype=:result, resulttype=:result)
  template = {
    :tasktype     => tasktype,
    :payload      => result,
    :payload_type => resulttype
  }

  fields.each do |field|
    template[field] = message.send(field) unless template.has_key?(field)
  end
  new(template)
end

.result_template(job_id, tasktype = :result) ⇒ Object



157
158
159
160
161
162
163
164
165
# File 'lib/skynet/message.rb', line 157

def self.result_template(job_id,tasktype=:result)
  template = {
    :tasktype => tasktype,
    :job_id   => job_id
  }
  fields.collect do |field|
    template[field]
  end
end

Instance Method Details

#[](ii) ⇒ Object



112
113
114
# File 'lib/skynet/message.rb', line 112

def [](ii)
  send(self.class.fields[ii])
end

#error_message(error) ⇒ Object



209
210
211
# File 'lib/skynet/message.rb', line 209

def error_message(error)
  self.class.error_message(self,error)
end

#error_templateObject



226
227
228
# File 'lib/skynet/message.rb', line 226

def error_template
  self.class.error_template(self)
end

#fallback_task_messageObject



256
257
258
# File 'lib/skynet/message.rb', line 256

def fallback_task_message
  self.class.fallback_task_message(self)
end

#fallback_templateObject



274
275
276
# File 'lib/skynet/message.rb', line 274

def fallback_template
  self.class.fallback_template(self)
end

#fieldsObject



73
74
75
# File 'lib/skynet/message.rb', line 73

def fields
  self.class.fields
end

#payloadObject



95
96
97
98
99
100
101
# File 'lib/skynet/message.rb', line 95

def payload    
  @payload ||= begin
      YAML::load(self.raw_payload) if self.raw_payload
    rescue Exception => e
      raise BadMessage.new("Couldnt marshal payload #{e.inspect} #{e.backtrace.join("\n")}")
    end
end

#payload=(data) ⇒ Object



90
91
92
93
# File 'lib/skynet/message.rb', line 90

def payload=(data)  
  @payload = data
  self.raw_payload = data.to_yaml if data.respond_to?(:to_yaml) and not payload.kind_of?(Proc)
end

#raw_payloadObject



108
109
110
# File 'lib/skynet/message.rb', line 108

def raw_payload
  @raw_payload
end

#raw_payload=(data) ⇒ Object



103
104
105
106
# File 'lib/skynet/message.rb', line 103

def raw_payload=(data)
  @raw_payload = data
  @payload=nil
end

#result_message(result, tasktype = :result, resulttype = :result) ⇒ Object



180
181
182
# File 'lib/skynet/message.rb', line 180

def result_message(result,tasktype=:result, resulttype=:result)
  self.class.result_message(self,result,tasktype,resulttype)
end

#taskObject

alias for payload



86
87
88
# File 'lib/skynet/message.rb', line 86

def task
  payload
end

#timeoutObject



138
139
140
# File 'lib/skynet/message.rb', line 138

def timeout
  expire_time * 2
end

#to_aObject



116
117
118
119
120
# File 'lib/skynet/message.rb', line 116

def to_a
  self.class.fields.collect do |field|
    self.send(field)
  end
end

#to_hObject



130
131
132
# File 'lib/skynet/message.rb', line 130

def to_h
  to_hash
end

#to_hashObject



122
123
124
125
126
127
128
# File 'lib/skynet/message.rb', line 122

def to_hash
  hash = {}
  self.class.fields.each do |field|
    hash[field] = self.send(field)
  end                                          
  hash
end

#to_sObject



134
135
136
# File 'lib/skynet/message.rb', line 134

def to_s
  to_a
end