Class: Skynet::Worker
Defined Under Namespace
Classes: ConnectionFailure, Error, NoManagerError, RespawnWorker
Constant Summary
collapse
- RETRY_TIME =
2
- MEMORY_CHECK_DELAY =
30
- MANAGER_PING_INTERVAL =
60
- @@ok_to_mem_check =
false
- @@lastmem =
nil
- @@memct =
0
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
#get_unique_id
#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn
Constructor Details
#initialize(worker_type, options = {}) ⇒ Worker
Returns a new instance of Worker.
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/skynet/skynet_worker.rb', line 27
def initialize(worker_type, options = {})
@worker_id = get_unique_id(1).to_i
@worker_type = worker_type.to_sym
@queue_id = options[:queue_id] || 0
@processed = 0
@in_process = false
@mq = Skynet::MessageQueue.new
debug "THIS WORKER TAKES #{worker_type}"
@worker_info = {
:tasktype => worker_type,
:hostname => hostname,
:process_id => process_id,
:worker_type => payload_type,
:worker_id => worker_id,
:version => mq.get_worker_version,
}
@worker_info.merge!(options)
end
|
Instance Attribute Details
#message ⇒ Object
Returns the value of attribute message.
15
16
17
|
# File 'lib/skynet/skynet_worker.rb', line 15
def message
@message
end
|
#mq ⇒ Object
Returns the value of attribute mq.
15
16
17
|
# File 'lib/skynet/skynet_worker.rb', line 15
def mq
@mq
end
|
#processed ⇒ Object
Returns the value of attribute processed.
15
16
17
|
# File 'lib/skynet/skynet_worker.rb', line 15
def processed
@processed
end
|
#queue_id ⇒ Object
Returns the value of attribute queue_id.
16
17
18
|
# File 'lib/skynet/skynet_worker.rb', line 16
def queue_id
@queue_id
end
|
#task ⇒ Object
Returns the value of attribute task.
15
16
17
|
# File 'lib/skynet/skynet_worker.rb', line 15
def task
@task
end
|
#worker_id ⇒ Object
Returns the value of attribute worker_id.
16
17
18
|
# File 'lib/skynet/skynet_worker.rb', line 16
def worker_id
@worker_id
end
|
#worker_info ⇒ Object
Returns the value of attribute worker_info.
16
17
18
|
# File 'lib/skynet/skynet_worker.rb', line 16
def worker_info
@worker_info
end
|
#worker_type ⇒ Object
Returns the value of attribute worker_type.
16
17
18
|
# File 'lib/skynet/skynet_worker.rb', line 16
def worker_type
@worker_type
end
|
Class Method Details
.debug_class_desc ⇒ Object
23
24
25
|
# File 'lib/skynet/skynet_worker.rb', line 23
def self.debug_class_desc
"WORKER-#{$$}"
end
|
.start(options = {}) ⇒ Object
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
|
# File 'lib/skynet/skynet_worker.rb', line 362
def self.start(options={})
options[:worker_type] ||= :any
options[:required_libs] ||= []
OptionParser.new do |opt|
opt.banner = "Usage: worker [options]"
opt.on('-r', '--required LIBRARY', 'Include the specified libraries') do |v|
options[:required_libs] << v
end
opt.on('--worker_type=WORKERTYPE', "master, task or any") do |v|
if ["any","master","task"].include?(v)
options[:worker_type] = v
else
raise Skynet::Error.new("#{v} is not a valid worker_type")
end
end
opt.on('--config=CONFIG_FILE', 'Where to find the skynet.rb config file') do |v|
options[:config_file] = File.expand_path(v)
end
opt.on('--queue=QUEUE_NAME', 'Which queue should these workers use (default "default").') do |v|
options[:queue] = v
end
opt.on('--queue_id=queue_id', 'Which queue should these workers use (default 0).') do |v|
options[:queue_id] = v.to_i
end
opt.parse!(ARGV)
end
if options[:queue]
if options[:queue_id]
raise Skynet::Error.new("You may either provide a queue_id or a queue, but not both.")
end
options[:queue_id] = config.queue_id_by_name(options[:queue])
end
options[:required_libs].each do |adlib|
begin
require adlib
rescue MissingSourceFile => e
error "The included lib #{adlib} was not found: #{e.inspect}"
exit
end
end
options[:config_file] ||= Skynet::CONFIG[:CONFIG_FILE]
if options[:config_file]
begin
require options[:config_file]
rescue MissingSourceFile => e
error "The config file at #{options[:config_file]} was not found: #{e.inspect}"
exit
end
else
error "Config file missing. Please add a config/skynet_config.rb before starting."
exit
end
debug "WORKER STARTING WORKER_TYPE?:#{options[:worker_type]}. QUEUE: #{Skynet::Config.new.queue_name_by_id(options[:queue_id])}"
begin
worker = Skynet::Worker.new(options[:worker_type], options)
worker.start
rescue Skynet::Worker::NoManagerError => e
fatal e.message
exit
rescue Skynet::Worker::RespawnWorker => e
warn "WORKER #{$$} SCRIPT CAUGHT RESPAWN. RESTARTING #{e.message}"
cmd = "ruby #{Skynet::CONFIG[:LAUNCHER_PATH]} --worker_type=#{options[:worker_type]} --queue_id=#{options[:queue_id]} "
cmd << "--config=#{options[:config_file]} "
cmd << "-r #{options[:required_libs].join(' -r ')}" if options[:required_libs] and not options[:required_libs].empty?
pid = Skynet.fork_and_exec(cmd)
exit
rescue SystemExit
info "WORKER #{$$} EXITING GRACEFULLY"
rescue Exception => e
fatal "WORKER #{$$} DYING #{e.class} #{e.message} #{e.backtrace}"
report = ExceptionReport.new(e)
report.save
end
end
|
Instance Method Details
#find_pid_size(file, format = :notpretty) ⇒ Object
331
332
333
334
335
336
337
338
339
340
341
342
343
344
|
# File 'lib/skynet/skynet_worker.rb', line 331
def find_pid_size(file, format=:notpretty)
begin
open(file).each { |line|
if line.index('VmSize')
temp = line[7..-5].strip.to_f/1000
return BigDecimal(temp.to_s).truncate(5).to_s('F') if format == :pretty
return temp
end
}
rescue Exception => e
warn "ERROR #{e.inspect}"
'0'
end
end
|
#get_memory_size ⇒ Object
346
347
348
|
# File 'lib/skynet/skynet_worker.rb', line 346
def get_memory_size
find_pid_size("/proc/self/status")
end
|
#hostname ⇒ Object
52
53
54
|
# File 'lib/skynet/skynet_worker.rb', line 52
def hostname
@machine_name ||= Socket.gethostname
end
|
#interrupt ⇒ Object
163
164
165
166
167
168
169
170
171
172
173
|
# File 'lib/skynet/skynet_worker.rb', line 163
def interrupt
if @die
exit
else
@die = true
if not @in_process
notify_worker_stop
exit
end
end
end
|
#manager ⇒ Object
154
155
156
|
# File 'lib/skynet/skynet_worker.rb', line 154
def manager
Skynet::Manager.get
end
|
#manager_send(method, *args) ⇒ Object
140
141
142
143
144
145
146
147
148
|
# File 'lib/skynet/skynet_worker.rb', line 140
def manager_send(method,*args)
begin
manager.send(method,*args)
rescue DRb::DRbConnError, Errno::ECONNREFUSED => e
error "Worker could not connect to manager to call #{method} on manager #{e.inspect}"
rescue Exception => e
error "Worker could not connect call #{method} on manager #{e.inspect} args:", args
end
end
|
#max_memory_reached? ⇒ Boolean
317
318
319
320
321
322
323
324
325
326
327
328
329
|
# File 'lib/skynet/skynet_worker.rb', line 317
def max_memory_reached?
return false unless ok_to_mem_check?
if !@memchecktime
@memchecktime = Time.now
return false
elsif Time.now > (@memchecktime + MEMORY_CHECK_DELAY)
@memchecktime = Time.now
local_mem = get_memory_size.to_i
return local_mem if local_mem > Skynet::CONFIG[:WORKER_MAX_MEMORY]
else
false
end
end
|
#new_version_respawn? ⇒ Boolean
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/skynet/skynet_worker.rb', line 60
def new_version_respawn?
if !@verchecktime
@verchecktime = Time.now
begin
@curver = mq.get_worker_version
debug "FINDING INITIAL VER #{@curver}"
rescue Skynet::RequestExpiredError => e
warn "NO INITIAL VER IN MQ using 1"
@curver = 1
end
else
if Time.now < (@verchecktime + Skynet::CONFIG[:WORKER_VERSION_CHECK_DELAY])
return false
else
@verchecktime = Time.now
begin
newver = mq.get_worker_version
if newver != @curver and not mq.version_active?(@curver, queue_id)
info "RESTARTING WORKER ON PID #{$$}"
return true
end
rescue Skynet::RequestExpiredError => e
warn "NO CURRENT WORKER REV IN MQ still using 1"
mq.set_worker_version(1)
return false
end
end
end
return false
end
|
#notify_task_begun(task) ⇒ Object
102
103
104
105
106
107
|
# File 'lib/skynet/skynet_worker.rb', line 102
def notify_task_begun(task)
task[:processed] = @processed
task[:started_at] = Time.now.to_i
@in_process = true
write_worker_status(@worker_info.merge(task))
end
|
#notify_task_complete ⇒ Object
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
# File 'lib/skynet/skynet_worker.rb', line 109
def notify_task_complete
@processed += 1
@in_process = false
write_worker_status(
@worker_info.merge({
:task_id => 0,
:job_id => 0,
:name => "waiting for #{@worker_type}",
:processed => @processed,
:map_or_reduce => nil,
:started_at => Time.now.to_i
})
)
end
|
#notify_worker_started ⇒ Object
92
93
94
95
96
97
98
99
100
|
# File 'lib/skynet/skynet_worker.rb', line 92
def notify_worker_started
write_worker_status(
@worker_info.merge({
:name => "waiting for #{@worker_type}",
:processed => 0,
:started_at => Time.now.to_i
})
)
end
|
#notify_worker_stop ⇒ Object
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
# File 'lib/skynet/skynet_worker.rb', line 125
def notify_worker_stop
info "Worker #{process_id} stopping..."
write_worker_status(
@worker_info.merge({
:task_id => 0,
:job_id => 0,
:name => "waiting for #{@worker_type}",
:processed => @processed,
:process_id => nil,
:map_or_reduce => nil,
:started_at => Time.now.to_i
})
)
end
|
#ok_to_mem_check? ⇒ Boolean
350
351
352
353
354
355
356
357
358
359
360
|
# File 'lib/skynet/skynet_worker.rb', line 350
def ok_to_mem_check?
return true if @@ok_to_mem_check == true
return false if @@ok_to_mem_check == :notok
if File.exists?('/proc/self/status')
@@lastmem ||= get_memory_size.to_i
return @@ok_to_mem_check = true
else
@@ok_to_mem_check = :notok
return false
end
end
|
#payload_type ⇒ Object
158
159
160
161
|
# File 'lib/skynet/skynet_worker.rb', line 158
def payload_type
return nil if worker_type == :any
return worker_type
end
|
#process_id ⇒ Object
48
49
50
|
# File 'lib/skynet/skynet_worker.rb', line 48
def process_id
$$
end
|
#start ⇒ Object
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
|
# File 'lib/skynet/skynet_worker.rb', line 175
def start
exceptions = 0
conerror = 0
@curver = nil
Signal.trap("HUP") do
@respawn = true
raise Skynet::Worker::RespawnWorker.new if not @in_process
end
Signal.trap("TERM") { interrupt }
Signal.trap("INT") { interrupt }
raise Skynet::Worker::RespawnWorker.new if new_version_respawn?
printlog "STARTING WORKER @ VER:#{@curver} type:#{@worker_type} QUEUE_ID:#{queue_id}"
notify_worker_started
message = nil
task = nil
loop do
message = nil
begin
if Skynet::CONFIG[:WORKER_MAX_PROCESSED] and Skynet::CONFIG[:WORKER_MAX_PROCESSED] > 0 and @processed >= Skynet::CONFIG[:WORKER_MAX_PROCESSED]
raise Skynet::Worker::RespawnWorker.new("WORKER OVER MAX MEM AT: #{get_memory_size} MAX: #{Skynet::CONFIG[:WORKER_MAX_MEMORY]}")
end
if @die
exit
elsif @respawn
raise Skynet::Worker::RespawnWorker.new()
end
if local_mem = max_memory_reached?
raise Skynet::Worker::RespawnWorker.new("WORKER OVER MAX MEM AT: #{local_mem} MAX: #{Skynet::CONFIG[:WORKER_MAX_MEMORY]}")
end
if conerror > 0
@mq = Skynet::MessageQueue.new
warn "WORKER RECONNECTED AFTER #{conerror} tries"
conerror = 0
end
message = mq.take_next_task(@curver, 0.00001, payload_type, queue_id)
next unless message.respond_to?(:payload)
task = message.payload
error "BAD MESSAGE", task unless task.respond_to?(:map_or_reduce)
info "STEP 2 GOT MESSAGE #{message.name} type:#{task.map_or_reduce}, jobid: #{message.job_id}, taskid:#{message.task_id} it: #{message.iteration}"
debug "STEP 2.1 message=", message.to_a
next unless task
info "STEP 4 RUNNING TASK #{message.name} jobid: #{message.job_id} taskid: #{task.task_id}"
notify_task_begun({
:job_id => message.job_id,
:task_id => message.task_id,
:iteration => message.iteration,
:name => message.name,
:map_or_reduce => task.map_or_reduce
})
result = task.run(message.iteration)
info "STEP 5 GOT RESULT FROM RUN TASK #{message.name} jobid: #{message.job_id} taskid: #{task.task_id}"
debug "STEP 5.1 RESULT DATA:", result
result_message = mq.write_result(message,result,task.result_timeout)
info "STEP 6 WROTE RESULT MESSAGE #{message.name} jobid: #{message.job_id} taskid: #{task.task_id}"
notify_task_complete
rescue Skynet::Task::TimeoutError => e
error "Task timed out while executing #{e.inspect} #{e.backtrace.join("\n")}"
@in_process = false
next
rescue Skynet::Worker::RespawnWorker => e
info "Respawning and taking worker status #{e.message}"
notify_worker_stop
raise e
rescue Skynet::RequestExpiredError => e
if new_version_respawn?
notify_worker_stop
manager_send(:restart_worker,$$)
end
sleep 1
next
rescue Skynet::ConnectionError, DRb::DRbConnError => e
conerror += 1
retry_time = conerror > 6 ? RETRY_TIME * 3 : RETRY_TIME
error "#{e.message}, RETRY #{conerror} in #{retry_time} seconds !!"
@mq = nil
sleep retry_time
if conerror > 20
fatal "TOO MANY RECONNECTION EXCEPTIONS #{e.message}"
notify_worker_stop
raise e
end
next
rescue NoManagerError => e
fatal e.message
break
rescue Interrupt, SystemExit => e
info "Exiting..."
notify_worker_stop
break
rescue Exception => e
error "skynet_worker.rb:#{__LINE__} #{e.inspect} #{e.backtrace.join("\n")}"
exceptions += 1
break if exceptions > 1000
if message
mq.write_error(message,"#{e.inspect} #{e.backtrace.join("\n")}",(task.respond_to?(:result_timeout) ? task.result_timeout : 200))
else
end
@in_process = false
next
end
end
end
|
#version ⇒ Object
56
57
58
|
# File 'lib/skynet/skynet_worker.rb', line 56
def version
@curver
end
|
#write_worker_status(status) ⇒ Object
150
151
152
|
# File 'lib/skynet/skynet_worker.rb', line 150
def write_worker_status(status)
manager_send(:worker_notify,status)
end
|