Module: Mqjob::WorkerGroup
- Defined in:
- lib/mqjob/worker_group.rb
Class Method Summary collapse
-
.configure(opts) ⇒ Object
设置线程数并返回新类型 opts threads 设置线程池大小.
Instance Method Summary collapse
- #after_start ⇒ Object
- #before_fork ⇒ Object
- #initialize ⇒ Object
- #reload ⇒ Object
- #run ⇒ Object
- #stop ⇒ Object
- #workers ⇒ Object
Class Method Details
.configure(opts) ⇒ Object
设置线程数并返回新类型opts
threads 设置线程池大小
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/mqjob/worker_group.rb', line 46 def self.configure(opts) thread_size = opts[:threads] raise "threads was required!" if thread_size.to_i.zero? workers = Array(opts[:clazz]) raise "clazz was required!" if workers.empty? md = Module.new md_name = "WorkerGroup#{md.object_id}".tr('-', '_') md.include(self) md.class_eval <<-RUBY, __FILE__, __LINE__+1 def threads #{thread_size} end def worker_classes #{workers} end private :threads, :workers RUBY ::Mqjob.const_set(md_name, md) end |
Instance Method Details
#after_start ⇒ Object
33 34 35 |
# File 'lib/mqjob/worker_group.rb', line 33 def after_start puts "call #{__method__}" end |
#before_fork ⇒ Object
9 10 11 |
# File 'lib/mqjob/worker_group.rb', line 9 def before_fork ::Mqjob.hooks.before_fork&.call end |
#initialize ⇒ Object
3 4 5 6 7 |
# File 'lib/mqjob/worker_group.rb', line 3 def initialize @stoped = false # 统一线程池,防止数据库连接池不够用,推荐设置为10 @pool = ::Mqjob::ThreadPool.new(threads) end |
#reload ⇒ Object
29 30 31 |
# File 'lib/mqjob/worker_group.rb', line 29 def reload puts "call #{__method__}" end |
#run ⇒ Object
13 14 15 16 17 18 19 20 21 |
# File 'lib/mqjob/worker_group.rb', line 13 def run return if @stoped ::Mqjob.hooks.after_fork&.call workers.each do |worker| worker.run end end |
#stop ⇒ Object
23 24 25 26 27 |
# File 'lib/mqjob/worker_group.rb', line 23 def stop workers.each{|wc| wc.stop} @stoped = true end |
#workers ⇒ Object
37 38 39 40 41 |
# File 'lib/mqjob/worker_group.rb', line 37 def workers @workers ||= worker_classes.map do |wc| wc.new(pool: @pool) end end |