Class: Mqjob::ThreadPool

Inherits:
Concurrent::FixedThreadPool
  • Object
show all
Defined in:
lib/mqjob/thread_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(num_threads, opts = {}) ⇒ ThreadPool

Returns a new instance of ThreadPool.



5
6
7
8
9
# File 'lib/mqjob/thread_pool.rb', line 5

def initialize(num_threads, opts = {})
  super
  @job_finish = ConditionVariable.new
  @job_mutex = Mutex.new
end

Instance Method Details

#killObject



30
31
32
33
34
# File 'lib/mqjob/thread_pool.rb', line 30

def kill
  super

  @job_finish.broadcast
end

#ns_executeObject



18
19
20
21
22
# File 'lib/mqjob/thread_pool.rb', line 18

def ns_execute
  super

  @job_finish.signal
end

#post(*args, &task) ⇒ Object

NOTE 使用非缓冲线程池,防止消息丢失



12
13
14
15
16
# File 'lib/mqjob/thread_pool.rb', line 12

def post(*args, &task)
  wait

  super
end

#shutdownObject



24
25
26
27
28
# File 'lib/mqjob/thread_pool.rb', line 24

def shutdown
  super

  @job_finish.broadcast
end

#waitObject



36
37
38
39
40
41
42
# File 'lib/mqjob/thread_pool.rb', line 36

def wait
  @job_mutex.synchronize do
    while running? && (scheduled_task_count - completed_task_count >= max_length)
      @job_finish.wait(@job_mutex, 0.05)
    end
  end
end