Class: Skynet::Manager
Defined Under Namespace
Classes: Error
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn
Constructor Details
#initialize(options) ⇒ Manager
Returns a new instance of Manager.
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/skynet/skynet_manager.rb', line 21
def initialize(options)
raise Error.new("You must provide a script path to Skynet::Manager.new.") unless options[:script_path]
@script_path = options[:script_path] || Skynet::CONFIG[:LAUNCHER_PATH]
@workers_requested = options[:workers] || 4
@required_libs = options[:required_libs] || []
@queue_id = options[:queue_id] || 0
@number_of_workers = 0
@workers_by_type = {:master => [], :task => [], :any => []}
@signaled_workers = []
@worker_queue = {}
@workers_restarting = 0
@all_workers_started = false
@config = Skynet::Config.new
@mutex = Mutex.new
@wqts = Queue.new
end
|
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
19
20
21
|
# File 'lib/skynet/skynet_manager.rb', line 19
def config
@config
end
|
#queue_id ⇒ Object
Returns the value of attribute queue_id.
18
19
20
|
# File 'lib/skynet/skynet_manager.rb', line 18
def queue_id
@queue_id
end
|
#required_libs ⇒ Object
Returns the value of attribute required_libs.
18
19
20
|
# File 'lib/skynet/skynet_manager.rb', line 18
def required_libs
@required_libs
end
|
#worker_queue ⇒ Object
Returns the value of attribute worker_queue.
19
20
21
|
# File 'lib/skynet/skynet_manager.rb', line 19
def worker_queue
@worker_queue
end
|
#wqts ⇒ Object
Returns the value of attribute wqts.
19
20
21
|
# File 'lib/skynet/skynet_manager.rb', line 19
def wqts
@wqts
end
|
Class Method Details
.debug_class_desc ⇒ Object
14
15
16
|
# File 'lib/skynet/skynet_manager.rb', line 14
def self.debug_class_desc
"MANAGER"
end
|
.get ⇒ Object
488
489
490
|
# File 'lib/skynet/skynet_manager.rb', line 488
def self.get
DRbObject.new(nil,local_manager_uri)
end
|
.local_manager_uri ⇒ Object
484
485
486
|
# File 'lib/skynet/skynet_manager.rb', line 484
def self.local_manager_uri
"druby://localhost:#{Skynet::CONFIG[:SKYNET_LOCAL_MANAGER_PORT]}"
end
|
.read_pid_file ⇒ Object
693
694
695
696
|
# File 'lib/skynet/skynet_manager.rb', line 693
def self.read_pid_file
pidfile = Skynet::Config.pidfile_location
File.read(pidfile).to_i if File.exist?(pidfile)
end
|
.run_manager(options) ⇒ Object
664
665
666
667
668
669
670
|
# File 'lib/skynet/skynet_manager.rb', line 664
def self.run_manager(options)
@manager = Skynet::Manager.new(options)
@drb_manager = DRb.start_service("druby://:#{Skynet::CONFIG[:SKYNET_LOCAL_MANAGER_PORT]}", @manager)
@manager.start_workers
info "MANAGER STARTED ON PORT: #{Skynet::CONFIG[:SKYNET_LOCAL_MANAGER_PORT]}"
@manager.run
end
|
.start(options = {}) ⇒ Object
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
|
# File 'lib/skynet/skynet_manager.rb', line 492
def self.start(options={})
options[:add_workers] ||= nil
options[:remove_workers] ||= nil
options[:use_rails] ||= false
options[:required_libs] ||= []
config = Skynet::Config.new
OptionParser.new do |opt|
opt.banner = %{Usage:
> skynet [options]
OR to daemonize
> skynet [options] start
> skynet stop
You can also run:
> skynet console [options]
}
opt.on('--restart-all-workers', 'Restart All Workers') do |v|
puts "Restarting ALL workers on ALL machines."
begin
manager = self.get
manager.restart_all_workers
exit
rescue DRb::DRbConnError => e
puts "No manager running at #{local_manager_uri} ERROR: #{e.inspect}"
exit
end
end
opt.on('--restart-workers', 'Restart Workers') do |v|
puts "Restarting workers on this machine."
begin
manager = self.get
manager.restart_workers
exit
rescue DRb::DRbConnError => e
puts "No manager running at #{local_manager_uri} ERROR: #{e.inspect}"
exit
end
end
opt.on('--increment-worker-version', 'Increment Worker Version') do |v|
ver = Skynet::MessageQueue.new.increment_worker_version
puts "Incrementing Worker Version to #{ver}"
exit
end
opt.on('--add-workers=WORKERS', 'Number of workers to add.') do |v|
options[:add_workers] = v.to_i
end
opt.on('--remove-workers=WORKERS', 'Number of workers to remove.') do |v|
options[:remove_workers] = v.to_i
end
opt.on('--workers=WORKERS', 'Number of workers to start.') do |v|
options[:workers] = v.to_i
end
opt.on('-r', '--required LIBRARY', 'Require the specified libraries') do |v|
options[:required_libs] << File.expand_path(v)
end
opt.on('--config=CONFIG_FILE', 'Where to find the skynet.rb config file') do |v|
options[:config_file] = File.expand_path(v)
end
opt.on('--queue=QUEUE_NAME', 'Which queue should these workers use (default "default").') do |v|
options[:queue] = v
end
opt.on('--queue_id=queue_id', 'Which queue should these workers use (default 0).') do |v|
options[:queue_id] = v.to_i
end
opt.parse!(ARGV)
end
if options[:queue]
if options[:queue_id]
raise Skynet::Error.new("You may either provide a queue_id or a queue, but not both.")
end
options[:queue_id] = config.queue_id_by_name(options[:queue])
else
options[:queue_id] ||= 0
end
options[:required_libs].each do |adlib|
begin
require adlib
rescue MissingSourceFile => e
error "The included lib #{adlib} was not found: #{e.inspect}"
exit
end
end
options[:config_file] ||= Skynet::CONFIG[:CONFIG_FILE]
if options[:config_file]
begin
require options[:config_file]
rescue MissingSourceFile => e
error "The config file at #{options[:config_file]} was not found: #{e.inspect}"
exit
end
elsif Skynet::CONFIG[:SYSTEM_RUNNER]
error "Config file missing. Please add a config/skynet_config.rb before starting."
end
options[:workers] ||= Skynet::CONFIG[:NUMBER_OF_WORKERS] || 4
options[:pid_file] ||= Skynet::Config.pidfile_location
options[:script_path] ||= Skynet::CONFIG[:LAUNCHER_PATH]
if options[:add_workers] or options[:remove_workers]
begin
manager = self.get
if options[:add_workers]
pids = manager.add_worker(options[:add_workers])
warn "ADDING #{options[:add_workers]} workers PIDS: #{pids.inspect}"
elsif options[:remove_workers]
pids = manager.remove_workers(options[:remove_workers])
warn "REMOVING #{options[:remove_workers]} workers PIDS: #{pids.inspect}"
end
rescue DRb::DRbConnError => e
warn "Couldnt add or remove workers. There are probably no workers running. At least I couldn't find a skynet_manager around at #{local_manager_uri} #{e.inspect}"
rescue Exception => e
warn "Couldnt add or remove workers #{e.inspect} #{e.backtrace.join("\n")}"
end
exit
else
begin
debug "Making sure there's an available MessageQueue"
ts = Skynet::MessageQueue.new
rescue Skynet::ConnectionError => e
fatal "Couldn't get MessageQueue! #{e.message}"
raise Skynet::ConnectionError.new("ERROR! Couldn't get MessageQueue! #{e.message}")
end
debug "CONTINUING TO START : There IS an available MessageQueue", options
begin
if oldpid = read_pid_file
errmsg = nil
if Skynet.process_alive?(oldpid)
errmsg = "Another Skynet Manager is running at pid: #{oldpid}"
warn errmsg
stderr errmsg
exit
else
errmsg = "Deleting stale pidfile #{Skynet::Config.pidfile_location}"
warn errmsg
stderr errmsg
File.unlink(Skynet::Config.pidfile_location) if File.exist?(Skynet::Config.pidfile_location)
end
end
printlog "STARTING THE MANAGER!!!!!!!!!!! port: #{Skynet::CONFIG[:SKYNET_LOCAL_MANAGER_PORT]}"
puts "Starting Skynet..."
printlog "Skynet Stopped"
if options["daemonize"]
Skynet.safefork do
sess_id = Process.setsid
write_pid_file
Skynet.close_console
run_manager(options)
exit!
end
else
write_pid_file
run_manager(options)
end
rescue SystemExit, Interrupt
rescue Exception => e
fatal("Error in Manager. Manager Dying. #{e.inspect} #{e.backtrace}")
end
end
end
|
.stats_for_hosts(manager_hosts = nil) ⇒ Object
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
|
# File 'lib/skynet/skynet_manager.rb', line 393
def self.stats_for_hosts(manager_hosts=nil)
manager_hosts ||= Skynet::CONFIG[:MANAGER_HOSTS] || ["localhost"]
stats = {
:servers => {},
:processed => 0,
:number_of_workers => 0,
:active_workers => 0,
:idle_workers => 0,
:hosts => 0,
:masters => 0,
:taskworkers => 0,
:time => Time.now.to_f
}
servers = {}
manager_hosts.each do |manager_host|
begin
manager = DRbObject.new(nil,"druby://#{manager_host}:#{Skynet::CONFIG[:SKYNET_LOCAL_MANAGER_PORT]}")
manager_stats = manager.stats
servers[manager_host] = manager_stats
manager_stats.each do |key,value|
next unless value.is_a?(Fixnum)
stats[key] ||= 0
stats[key] += value
end
rescue DRb::DRbConnError, Errno::ECONNREFUSED => e
warn "Couldn't get stats from manager at druby://#{manager_host}:#{Skynet::CONFIG[:SKYNET_LOCAL_MANAGER_PORT]}"
end
end
stats[:servers] = servers
stats[:hosts] = manager_hosts
stats
end
|
.stop(options = {}) ⇒ Object
stop the daemon, nicely at first, and then forcefully if necessary
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
|
# File 'lib/skynet/skynet_manager.rb', line 673
def self.stop(options = {})
pid = read_pid_file
if not pid
puts "The Skynet Manager is not running. No PID found in #{Skynet::Config.pidfile_location}"
exit
end
$stdout.puts "Stopping Skynet"
printlog "Stopping Skynet"
Process.kill("TERM", pid)
180.times { Process.kill(0, pid); sleep(1) }
Process.kill("TERM", pid)
180.times { Process.kill(0, pid); sleep(1) }
$stdout.puts("using kill -9 #{pid}")
Process.kill("KILL", pid)
rescue Errno::ESRCH => e
printlog "Skynet Stopped"
ensure
File.unlink(Skynet::Config.pidfile_location) if File.exist?(Skynet::Config.pidfile_location)
end
|
.write_pid_file ⇒ Object
698
699
700
701
702
703
|
# File 'lib/skynet/skynet_manager.rb', line 698
def self.write_pid_file
pidfile = Skynet::Config.pidfile_location
info "Writing PIDFILE to #{pidfile}"
open(pidfile, "w") {|f| f << Process.pid << "\n"}
at_exit { File.unlink(pidfile) if read_pid_file == Process.pid }
end
|
Instance Method Details
#active_workers ⇒ Object
460
461
462
|
# File 'lib/skynet/skynet_manager.rb', line 460
def active_workers
@worker_queue.values.select{|status| status.process_id.is_a?(Fixnum) }
end
|
#add_worker(workers = 1) ⇒ Object
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
|
# File 'lib/skynet/skynet_manager.rb', line 215
def add_worker(workers=1)
num_task_only_workers = (workers * Skynet::CONFIG[:PERCENTAGE_OF_TASK_ONLY_WORKERS]).to_i
num_master_only_workers = (workers * Skynet::CONFIG[:PERCENTAGE_OF_MASTER_ONLY_WORKERS]).to_i
warn "Adding #{workers} WORKERS. Task Workers: #{num_task_only_workers}, Master Workers: #{num_master_only_workers} Master & Task Workers: #{workers - num_task_only_workers - num_master_only_workers}"
@all_workers_started = false
worker_types = {:task => 0, :master => 0, :any => 0}
(1..workers).collect do |ii|
worker_type = :any
if (ii <= num_master_only_workers)
worker_type = :master
worker_types[:master] += 1
elsif (ii > num_master_only_workers and ii <= num_master_only_workers + num_task_only_workers)
worker_type = :task
worker_types[:task] += 1
else
worker_types[:any] += 1
end
cmd = "#{@script_path} --worker_type=#{worker_type}"
cmd << " --config='#{Skynet::CONFIG[:CONFIG_FILE]}'" if Skynet::CONFIG[:CONFIG_FILE]
cmd << " --queue_id=#{queue_id}"
cmd << " -r #{required_libs.join(' -r ')}" if required_libs and not required_libs.empty?
wpid = Skynet.fork_and_exec(cmd)
Skynet.close_console
@workers_by_type[worker_type] ||= []
@workers_by_type[worker_type] << wpid
warn "Adding Worker ##{ii} PID: #{wpid} QUEUE: #{queue_id}, WORKER_TYPE?:#{worker_type}"
@mutex.synchronize do
@number_of_workers += 1
end
sleep 0.01
wpid
end
info "Worker Distribution", worker_types
check_started_workers
end
|
#add_workers(*args) ⇒ Object
211
212
213
|
# File 'lib/skynet/skynet_manager.rb', line 211
def add_workers(*args)
add_worker(*args)
end
|
#check_number_of_workers ⇒ Object
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
|
# File 'lib/skynet/skynet_manager.rb', line 150
def check_number_of_workers
if @shutdown
worker_shutdown
if worker_pids.size < 1
exit
end
elsif @workers_restarting > 0
if @workers_requested - worker_pids.size != 0
restarting = @workers_requested - worker_pids.size
warn "RESTART MODE: Expected #{@number_of_workers} workers. #{worker_pids.size} running. #{restarting} are still restarting"
else
warn "RESTART MODE: Expected #{@number_of_workers} workers. #{worker_pids.size} running."
end
@workers_restarting = @workers_requested - worker_pids.size
elsif worker_pids.size != @number_of_workers
starting = 0
if worker_pids.size.to_f / @workers_requested.to_f < 0.85
starting = @workers_requested - worker_pids.size
error "Expected #{@number_of_workers} workers. #{worker_pids.size} running. Starting #{starting}"
@number_of_workers = worker_pids.size
add_worker(starting)
else
error "Expected #{@number_of_workers} workers. #{worker_pids.size} running."
@number_of_workers = worker_pids.size
end
end
end
|
#check_running_pids ⇒ Object
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
# File 'lib/skynet/skynet_manager.rb', line 135
def check_running_pids
worker_pids.each do |wpid|
if not worker_alive?(wpid)
if @shutdown
info "Worker #{wpid} shut down gracefully. Removing from queue."
else
error "Worker #{wpid} was in queue and but was not running. Removing from queue."
end
mark_worker_as_stopped(wpid)
@number_of_workers -= 1
end
end
worker_pids
end
|
#check_started_workers ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
# File 'lib/skynet/skynet_manager.rb', line 92
def check_started_workers
begin
100.times do |ii|
warn "Checking started workers, #{active_workers.size} out of #{@number_of_workers} after the #{(ii+1)}th try..."
break if active_workers.size >= @number_of_workers
sleep (@number_of_workers - active_workers.size)
end
rescue Exception => e
fatal "Something bad happened #{e.inspect} #{e.backtrace.join("\n")}"
end
@all_workers_started = true
printlog "FINISHED STARTING ALL #{active_workers.size} WORKERS"
if active_workers.size > @number_of_workers
warn "EXPECTED #{@number_of_workers}"
@number_of_workers = active_workers.size
end
end
|
#check_workers ⇒ Object
128
129
130
131
132
133
|
# File 'lib/skynet/skynet_manager.rb', line 128
def check_workers
debug "Checking on #{@number_of_workers} workers..." unless @shutdown
check_running_pids
check_number_of_workers
true
end
|
#hard_restart_workers ⇒ Object
301
302
303
304
305
306
307
308
309
|
# File 'lib/skynet/skynet_manager.rb', line 301
def hard_restart_workers
@all_workers_started = false
signal_workers("TERM")
@restart = true
signal_workers("INT",:master)
signal_workers("INT",:any)
sleep @number_of_workers
check_started_workers
end
|
#hostname ⇒ Object
476
477
478
|
# File 'lib/skynet/skynet_manager.rb', line 476
def hostname
@machine_name ||= Socket.gethostname
end
|
#inactive_workers ⇒ Object
464
465
466
|
# File 'lib/skynet/skynet_manager.rb', line 464
def inactive_workers
@worker_queue.values.select{|status| !status.process_id.is_a?(Fixnum) }
end
|
#load_worker_queue_from_file ⇒ Object
373
374
375
376
377
378
379
380
381
382
383
384
385
386
|
# File 'lib/skynet/skynet_manager.rb', line 373
def load_worker_queue_from_file
if File.exists?(Skynet.config.manager_statfile_location)
File.open(Skynet.config.manager_statfile_location,"r") do |f|
begin
@worker_queue = YAML.load(f.read)
raise Error.new("Bad Manager File returned type #{@worker_queue.class}") unless @worker_queue.is_a?(Hash)
rescue Exception => e
error "Error loading manager stats file: #{f}", e
@worker_queue = {}
save_worker_queue_to_file
end
end
end
end
|
#mark_worker_as_stopped(wpid) ⇒ Object
270
271
272
273
274
275
276
277
278
|
# File 'lib/skynet/skynet_manager.rb', line 270
def mark_worker_as_stopped(wpid)
worker = @worker_queue.values.detect {|status| status.process_id == wpid}
if worker and not worker_alive?(wpid)
@worker_queue.delete_if {|worker_id, status| status.process_id == wpid }
worker_pids.delete(worker.process_id)
worker.started_at = Time.now.to_f
worker.process_id = nil
end
end
|
#parent_pid ⇒ Object
472
473
474
|
# File 'lib/skynet/skynet_manager.rb', line 472
def parent_pid
$$
end
|
#ping ⇒ Object
480
481
482
|
# File 'lib/skynet/skynet_manager.rb', line 480
def ping
true
end
|
#prune_inactive_worker_stats ⇒ Object
388
389
390
391
|
# File 'lib/skynet/skynet_manager.rb', line 388
def prune_inactive_worker_stats
@worker_queue.delete_if{|worker_id, worker| !worker.process_id.is_a?(Fixnum) }
stats
end
|
#remove_worker(pids = nil) ⇒ Object
257
258
259
260
261
262
263
264
265
266
267
268
|
# File 'lib/skynet/skynet_manager.rb', line 257
def remove_worker(pids = nil)
pids = [pids] unless pids.kind_of?(Array)
info "Removing workers #{pids.join(",")} from worker queue. They will die gracefully when they finish what they're doing."
pids.collect do |wpid|
Process.kill("INT",wpid)
mark_worker_as_stopped(wpid)
@number_of_workers -= 1
warn "REMOVING WORKER #{wpid}"
@signaled_workers << wpid
end
pids
end
|
#remove_workers(workers = 1) ⇒ Object
252
253
254
255
|
# File 'lib/skynet/skynet_manager.rb', line 252
def remove_workers(workers=1)
pids = worker_pids[0...workers]
remove_worker(pids)
end
|
#restart_worker(wpid) ⇒ Object
XXX THIS IS A HORRIBLE HACK =
314
315
316
317
318
319
320
321
322
|
# File 'lib/skynet/skynet_manager.rb', line 314
def restart_worker(wpid)
info "RESTARTING WORKER #{wpid}"
@mutex.synchronize do
Process.kill("HUP",wpid)
mark_worker_as_stopped(wpid)
@workers_restarting += 1
end
sleep Skynet::CONFIG[:WORKER_CHECK_DELAY]
end
|
#restart_workers ⇒ Object
324
325
326
327
328
329
|
# File 'lib/skynet/skynet_manager.rb', line 324
def restart_workers
@all_workers_started = false
signal_workers("HUP")
sleep @number_of_workers
check_started_workers
end
|
#run ⇒ Object
the main application loop
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/skynet/skynet_manager.rb', line 113
def run
loop do
next unless @all_workers_started
begin
check_workers
sleep Skynet::CONFIG[:WORKER_CHECK_DELAY]
rescue SystemExit, Interrupt => e
printlog "Manager Exiting!"
exit
rescue Exception => e
fatal "Something bad happened #{e.inspect} #{e.backtrace.join("\n")}"
end
end
end
|
#save_worker_queue_to_file ⇒ Object
366
367
368
369
370
371
|
# File 'lib/skynet/skynet_manager.rb', line 366
def save_worker_queue_to_file
debug "Writing worker queue to file #{Skynet.config.manager_statfile_location}"
File.open(Skynet.config.manager_statfile_location,"w") do |f|
f.write(YAML.dump(@worker_queue))
end
end
|
#setup_signals ⇒ Object
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
|
# File 'lib/skynet/skynet_manager.rb', line 331
def setup_signals
Signal.trap("HUP") do
restart_workers
end
Signal.trap("TERM") do
if @term
terminate
else
@term=true
shutdown
end
end
Signal.trap("INT") do
if @shutdown
terminate
else
shutdown
end
end
end
|
#shutdown ⇒ Object
353
354
355
356
357
|
# File 'lib/skynet/skynet_manager.rb', line 353
def shutdown
info(:shutdown)
@shutdown = true
signal_workers("TERM",[:idle,:master,:any])
end
|
#signal_workers(signal, worker_type = []) ⇒ Object
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
|
# File 'lib/skynet/skynet_manager.rb', line 280
def signal_workers(signal,worker_type=[])
worker_types = [worker_type].flatten
active_workers.each do |worker|
worker_types.each do |worker_type|
if worker_type == :idle
next if worker_type and worker.task_id
else
next if worker_type and not @workers_by_type[worker_type].include?(worker.process_id)
end
end
warn "SHUTTING DOWN #{worker.process_id} MR: #{worker.map_or_reduce} SIG: #{signal}"
begin
Process.kill(signal,worker.process_id)
rescue Errno::ESRCH
warn "Tried to kill a process that didn't exist #{worker.process_id}"
end
@signaled_workers << worker.process_id
end
end
|
#start_worker_queue_thread ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/skynet/skynet_manager.rb', line 43
def start_worker_queue_thread
Thread.new do
last_save_time = Time.now
loop do
task = @wqts.pop
begin
status = Skynet::WorkerStatusMessage.new(task)
status.started_at = status.started_at.to_i
@mutex.synchronize do
@worker_queue[status.worker_id] = status
end
if last_save_time < Time.now - 60
save_worker_queue_to_file
last_save_time = Time.now
end
rescue Exception => e
error "Error in worker queue thread #{e.inspect} #{e.backtrace.join("\n")}"
end
end
end
end
|
#start_workers ⇒ Object
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/skynet/skynet_manager.rb', line 65
def start_workers
load_worker_queue_from_file
start_worker_queue_thread
setup_signals
starting = workers_to_start(@workers_requested)
warn "Starting #{starting} workers. QUEUE: #{config.queue_name_by_id(queue_id)} #{@workers_requested - starting} already running."
add_worker(starting)
end
|
#stats ⇒ Object
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
|
# File 'lib/skynet/skynet_manager.rb', line 426
def stats
started_times = @worker_queue.values.collect{|worker| worker.started_at }.sort
active_started_times = active_workers.collect{|worker|worker.started_at }.sort
stats = {
:hostname => hostname,
:earliest_update => started_times.first,
:latest_update => started_times.last,
:active_earliest_update => active_started_times.first,
:active_latest_update => active_started_times.last,
:processed => 0,
:processed_by_active_workers => 0,
:number_of_workers => 0,
:idle_workers => 0,
:shutdown_workers => 0,
}
@worker_queue.values.collect{|worker|stats[:processed] += worker.processed}
active_workers.collect{|worker|stats[:processed_by_active_workers] += worker.processed}
currently_active_workers, idle_workers = active_workers.partition{|worker| worker.map_or_reduce }
stats[:number_of_workers] = active_workers.size
stats[:active_workers] = currently_active_workers.size
stats[:idle_workers] = idle_workers.size
stats[:shutdown_workers] = inactive_workers.size
stats[:masters] = active_workers.select{|worker|worker.tasktype.to_s == "master"}.size
stats[:master_or_task_workers] = active_workers.select{|worker|worker.tasktype.to_s == "any"}.size
stats[:taskworkers] = active_workers.select{|worker|worker.tasktype.to_s == "task"}.size
stats[:active_masters] = currently_active_workers.select{|worker|worker.tasktype.to_s == "master"}.size
stats[:active_master_or_task_workers] = currently_active_workers.select{|worker|worker.tasktype.to_s == "any"}.size
stats[:active_taskworkers] = currently_active_workers.select{|worker|worker.tasktype.to_s == "task"}.size
stats[:idle_masters] = idle_workers.select{|worker|worker.tasktype.to_s == "master"}.size
stats[:idle_master_or_task_workers] = idle_workers.select{|worker|worker.tasktype.to_s == "any"}.size
stats[:idle_taskworkers] = idle_workers.select{|worker|worker.tasktype.to_s == "task"}.size
stats
end
|
#terminate ⇒ Object
359
360
361
362
363
364
|
# File 'lib/skynet/skynet_manager.rb', line 359
def terminate
info(:terminate)
signal_workers("KILL")
sleep 1
exit
end
|
#worker_alive?(worker_pid) ⇒ Boolean
207
208
209
|
# File 'lib/skynet/skynet_manager.rb', line 207
def worker_alive?(worker_pid)
Skynet.process_alive?(worker_pid)
end
|
#worker_notify(item) ⇒ Object
39
40
41
|
# File 'lib/skynet/skynet_manager.rb', line 39
def worker_notify(item)
@wqts.push(item)
end
|
#worker_pids ⇒ Object
468
469
470
|
# File 'lib/skynet/skynet_manager.rb', line 468
def worker_pids
active_workers.collect {|w| w.process_id}
end
|
#worker_shutdown ⇒ Object
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
# File 'lib/skynet/skynet_manager.rb', line 180
def worker_shutdown
if not @masters_dead
workers_to_kill = active_workers.select do |w|
w.map_or_reduce == "master" and active_workers.detect{|status| status.process_id == w.process_id and worker_alive?(w.process_id)}
end
warn "Shutting down masters. #{worker_pids.size} workers still running." if worker_pids.size > 0
worker_pids_to_kill = workers_to_kill.collect { |w| w.process_id }
if worker_pids_to_kill and not worker_pids_to_kill.empty?
warn "FOUND MORE RUNNING MASTERS WE HAVEN'T KILLED:", worker_pids_to_kill
remove_worker(worker_pids_to_kill)
end
if not active_workers.detect { |w| w.map_or_reduce == "master" }
signal_workers("TERM")
@masters_dead = true
else
return check_number_of_workers
end
end
if worker_pids.size < 1
info "No more workers running."
else
warn "Shutting down. #{worker_pids.size} workers still running." if worker_pids.size > 0
end
end
|
#workers_to_start(workers_to_start) ⇒ Object
maybe workers_to_start should be a method
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/skynet/skynet_manager.rb', line 77
def workers_to_start(workers_to_start)
if not worker_pids.empty?
worker_pids.each do |worker_pid|
if worker_alive?(worker_pid)
@number_of_workers += 1
workers_to_start -= 1
else
mark_worker_as_stopped(worker_pid)
end
return 0 if workers_to_start < 1
end
end
return workers_to_start
end
|