Module: Mqjob::WorkerGroup

Defined in:
lib/mqjob/worker_group.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.configure(opts) ⇒ Object

设置线程数并返回新类型opts

threads 设置线程池大小


46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/mqjob/worker_group.rb', line 46

def self.configure(opts)
  thread_size = opts[:threads]
  raise "threads was required!" if thread_size.to_i.zero?
  workers = Array(opts[:clazz])
  raise "clazz was required!" if workers.empty?

  md = Module.new
  md_name = "WorkerGroup#{md.object_id}".tr('-', '_')

  md.include(self)
  md.class_eval <<-RUBY, __FILE__, __LINE__+1
    def threads
      #{thread_size}
    end

    def worker_classes
      #{workers}
    end

    private :threads, :workers
  RUBY

  ::Mqjob.const_set(md_name, md)
end

Instance Method Details

#after_startObject



33
34
35
# File 'lib/mqjob/worker_group.rb', line 33

def after_start
  puts "call #{__method__}"
end

#before_forkObject



9
10
11
# File 'lib/mqjob/worker_group.rb', line 9

def before_fork
  ::Mqjob.hooks.before_fork&.call
end

#initializeObject



3
4
5
6
7
# File 'lib/mqjob/worker_group.rb', line 3

def initialize
  @stoped = false
  # 统一线程池,防止数据库连接池不够用,推荐设置为10
  @pool = ::Mqjob::ThreadPool.new(threads)
end

#reloadObject



29
30
31
# File 'lib/mqjob/worker_group.rb', line 29

def reload
  puts "call #{__method__}"
end

#runObject



13
14
15
16
17
18
19
20
21
# File 'lib/mqjob/worker_group.rb', line 13

def run
  return if @stoped

  ::Mqjob.hooks.after_fork&.call

  workers.each do |worker|
    worker.run
  end
end

#stopObject



23
24
25
26
27
# File 'lib/mqjob/worker_group.rb', line 23

def stop
  workers.each{|wc| wc.stop}

  @stoped = true
end

#workersObject



37
38
39
40
41
# File 'lib/mqjob/worker_group.rb', line 37

def workers
  @workers ||= worker_classes.map do |wc|
    wc.new(pool: @pool)
  end
end