Class: Skynet::Message
- Inherits:
-
Object
show all
- Includes:
- SkynetDebugger
- Defined in:
- lib/skynet/message.rb,
lib/skynet/tuplespace_server.rb
Defined Under Namespace
Classes: BadMessage, Payload
Class Attribute Summary collapse
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn
Constructor Details
#initialize(opts) ⇒ Message
Returns a new instance of Message.
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
# File 'lib/skynet/message.rb', line 54
def initialize(opts)
if opts.is_a?(Array)
self.class.fields.each_with_index do |field, ii|
self.send("#{field}=",opts[ii] || nil)
end
elsif opts
self.class.fields.each do |field|
value = opts[field] || opts[field.to_s] || nil
self.send("#{field}=",value) if value
end
opts_raw_payload = opts[:raw_payload] || opts["raw_payload"]
if opts_raw_payload
self.raw_payload = opts_raw_payload
end
self.retry ||= 0
end
self.payload
end
|
Class Attribute Details
.fields ⇒ Object
Returns the value of attribute fields.
11
12
13
|
# File 'lib/skynet/message.rb', line 11
def fields
@fields
end
|
Instance Attribute Details
#payload_type ⇒ Object
Returns the value of attribute payload_type.
35
36
37
|
# File 'lib/skynet/message.rb', line 35
def payload_type
@payload_type
end
|
#tasktype ⇒ Object
Returns the value of attribute tasktype.
35
36
37
|
# File 'lib/skynet/message.rb', line 35
def tasktype
@tasktype
end
|
Class Method Details
.error_message(message, error) ⇒ Object
205
206
207
|
# File 'lib/skynet/message.rb', line 205
def self.error_message(message,error)
result_message(message,error,:result,:error)
end
|
.error_template(message) ⇒ Object
213
214
215
216
217
218
219
220
221
222
223
224
|
# File 'lib/skynet/message.rb', line 213
def self.error_template(message)
template = {
:tasktype => message.tasktype,
:drburi => message.drburi,
:version => message.version,
:task_id => message.task_id,
:queue_id => message.queue_id
}
fields.collect do |field|
template[field]
end
end
|
.fallback_task_message(message) ⇒ Object
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
|
# File 'lib/skynet/message.rb', line 230
def self.fallback_task_message(message)
template = {}
if message.retry
if (message.retry and message.iteration >= message.retry)
template[:iteration] = -1
else
template[:iteration] = message.iteration + 1
end
elsif message.payload_type.to_sym == :master and Skynet::CONFIG[:DEFAULT_MASTER_RETRY] and message.iteration >= Skynet::CONFIG[:DEFAULT_MASTER_RETRY]
template[:iteration] = -1
elsif Skynet::CONFIG[:MAX_RETRIES] and message.iteration >= Skynet::CONFIG[:MAX_RETRIES]
template[:iteration] = -1
else
template[:iteration] = message.iteration + 1
end
template[:expire_time] = Time.now.to_i + message.expiry
fields.each do |field|
template[field] = message.send(field) unless template.has_key?(field)
end
Skynet::Message.new(template)
end
|
.fallback_template(message) ⇒ Object
260
261
262
263
264
265
266
267
268
269
270
271
272
|
# File 'lib/skynet/message.rb', line 260
def self.fallback_template(message)
template = {
:tasktype => message.tasktype,
:drburi => message.drburi,
:version => message.version,
:task_id => message.task_id,
:queue_id => message.queue_id,
:iteration => (1..Skynet::CONFIG[:MAX_RETRIES]),
}
fields.collect do |field|
template[field]
end
end
|
.new_task_message(task, job) ⇒ Object
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
# File 'lib/skynet/message.rb', line 37
def self.new_task_message(task,job)
self.new(
:job_id => job.job_id,
:expire_time => job.start_after,
:version => job.version,
:queue_id => job.queue_id || 0,
:iteration => 0,
:tasktype => :task,
:task_id => task.task_id,
:payload => task,
:payload_type => task.task_or_master,
:expiry => task.result_timeout,
:name => task.name,
:retry => task.retry
)
end
|
.next_task_template(version = nil, payload_type = nil, queue_id = 0) ⇒ Object
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
# File 'lib/skynet/message.rb', line 142
def self.next_task_template(version=nil, payload_type=nil, queue_id=0)
template = {
:expire_time => (0 .. Time.now.to_i),
:tasktype => :task,
:queue_id => queue_id,
:version => version,
:payload_type => payload_type,
:iteration => (0..Skynet::CONFIG[:MAX_RETRIES]),
}
fields.collect do |field|
template[field]
end
end
|
.outstanding_results_template(queue_id = 0) ⇒ Object
195
196
197
198
199
200
201
202
203
|
# File 'lib/skynet/message.rb', line 195
def self.outstanding_results_template(queue_id=0)
template = {
:tasktype => :result,
:queue_id => queue_id
}
fields.collect do |field|
template[field]
end
end
|
.outstanding_tasks_template(iteration = nil, queue_id = 0) ⇒ Object
184
185
186
187
188
189
190
191
192
193
|
# File 'lib/skynet/message.rb', line 184
def self.outstanding_tasks_template(iteration=nil,queue_id=0)
template = {
:tasktype => :task,
:queue_id => queue_id,
:iteration => iteration
}
fields.collect do |field|
template[field]
end
end
|
.result_message(message, result, tasktype = :result, resulttype = :result) ⇒ Object
167
168
169
170
171
172
173
174
175
176
177
178
|
# File 'lib/skynet/message.rb', line 167
def self.result_message(message,result,tasktype=:result, resulttype=:result)
template = {
:tasktype => tasktype,
:payload => result,
:payload_type => resulttype
}
fields.each do |field|
template[field] = message.send(field) unless template.has_key?(field)
end
new(template)
end
|
.result_template(job_id, tasktype = :result) ⇒ Object
157
158
159
160
161
162
163
164
165
|
# File 'lib/skynet/message.rb', line 157
def self.result_template(job_id,tasktype=:result)
template = {
:tasktype => tasktype,
:job_id => job_id
}
fields.collect do |field|
template[field]
end
end
|
Instance Method Details
#[](ii) ⇒ Object
112
113
114
|
# File 'lib/skynet/message.rb', line 112
def [](ii)
send(self.class.fields[ii])
end
|
#error_message(error) ⇒ Object
209
210
211
|
# File 'lib/skynet/message.rb', line 209
def error_message(error)
self.class.error_message(self,error)
end
|
#error_template ⇒ Object
226
227
228
|
# File 'lib/skynet/message.rb', line 226
def error_template
self.class.error_template(self)
end
|
#fallback_task_message ⇒ Object
256
257
258
|
# File 'lib/skynet/message.rb', line 256
def fallback_task_message
self.class.fallback_task_message(self)
end
|
#fallback_template ⇒ Object
274
275
276
|
# File 'lib/skynet/message.rb', line 274
def fallback_template
self.class.fallback_template(self)
end
|
#fields ⇒ Object
73
74
75
|
# File 'lib/skynet/message.rb', line 73
def fields
self.class.fields
end
|
#payload ⇒ Object
95
96
97
98
99
100
101
|
# File 'lib/skynet/message.rb', line 95
def payload
@payload ||= begin
YAML::load(self.raw_payload) if self.raw_payload
rescue Exception => e
raise BadMessage.new("Couldnt marshal payload #{e.inspect} #{e.backtrace.join("\n")}")
end
end
|
#payload=(data) ⇒ Object
90
91
92
93
|
# File 'lib/skynet/message.rb', line 90
def payload=(data)
@payload = data
self.raw_payload = data.to_yaml if data.respond_to?(:to_yaml) and not payload.kind_of?(Proc)
end
|
#raw_payload ⇒ Object
108
109
110
|
# File 'lib/skynet/message.rb', line 108
def raw_payload
@raw_payload
end
|
#raw_payload=(data) ⇒ Object
103
104
105
106
|
# File 'lib/skynet/message.rb', line 103
def raw_payload=(data)
@raw_payload = data
@payload=nil
end
|
#result_message(result, tasktype = :result, resulttype = :result) ⇒ Object
180
181
182
|
# File 'lib/skynet/message.rb', line 180
def result_message(result,tasktype=:result, resulttype=:result)
self.class.result_message(self,result,tasktype,resulttype)
end
|
#task ⇒ Object
86
87
88
|
# File 'lib/skynet/message.rb', line 86
def task
payload
end
|
#timeout ⇒ Object
138
139
140
|
# File 'lib/skynet/message.rb', line 138
def timeout
expire_time * 2
end
|
#to_a ⇒ Object
116
117
118
119
120
|
# File 'lib/skynet/message.rb', line 116
def to_a
self.class.fields.collect do |field|
self.send(field)
end
end
|
#to_h ⇒ Object
130
131
132
|
# File 'lib/skynet/message.rb', line 130
def to_h
to_hash
end
|
#to_hash ⇒ Object
122
123
124
125
126
127
128
|
# File 'lib/skynet/message.rb', line 122
def to_hash
hash = {}
self.class.fields.each do |field|
hash[field] = self.send(field)
end
hash
end
|
#to_s ⇒ Object
134
135
136
|
# File 'lib/skynet/message.rb', line 134
def to_s
to_a
end
|