Class: Workhorse::Pool
- Inherits:
-
Object
- Object
- Workhorse::Pool
- Defined in:
- lib/workhorse/pool.rb
Overview
Abstraction layer of a simple thread pool implementation used by the worker.
Instance Attribute Summary collapse
-
#active_threads ⇒ Object
readonly
Returns the value of attribute active_threads.
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
Instance Method Summary collapse
-
#idle ⇒ Object
Returns the number of idle threads.
-
#initialize(size) ⇒ Pool
constructor
A new instance of Pool.
- #on_idle(&block) ⇒ Object
-
#post ⇒ Object
Posts a new work unit to the pool.
-
#shutdown ⇒ Object
Shuts down the pool and waits for termination.
-
#wait ⇒ Object
Waits until the pool is shut down.
Constructor Details
#initialize(size) ⇒ Pool
Returns a new instance of Pool.
7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/workhorse/pool.rb', line 7 def initialize(size) @size = size @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 0, max_threads: @size, max_queue: 0, fallback_policy: :abort, auto_terminate: false ) @mutex = Mutex.new @active_threads = Concurrent::AtomicFixnum.new(0) @on_idle = nil end |
Instance Attribute Details
#active_threads ⇒ Object (readonly)
Returns the value of attribute active_threads.
5 6 7 |
# File 'lib/workhorse/pool.rb', line 5 def active_threads @active_threads end |
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
4 5 6 |
# File 'lib/workhorse/pool.rb', line 4 def mutex @mutex end |
Instance Method Details
#idle ⇒ Object
Returns the number of idle threads.
48 49 50 |
# File 'lib/workhorse/pool.rb', line 48 def idle @size - @active_threads.value end |
#on_idle(&block) ⇒ Object
21 22 23 |
# File 'lib/workhorse/pool.rb', line 21 def on_idle(&block) @on_idle = block end |
#post ⇒ Object
Posts a new work unit to the pool.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/workhorse/pool.rb', line 26 def post mutex.synchronize do if idle.zero? fail 'All threads are busy.' end active_threads = @active_threads active_threads.increment @executor.post do begin # rubocop:disable Style/RedundantBegin yield ensure active_threads.decrement @on_idle.try(:call) end end end end |
#shutdown ⇒ Object
Shuts down the pool and waits for termination.
66 67 68 69 |
# File 'lib/workhorse/pool.rb', line 66 def shutdown @executor.shutdown wait end |
#wait ⇒ Object
Waits until the pool is shut down. This will wait forever unless you eventually call shutdown (either before calling ‘wait` or after it in another thread).
55 56 57 58 59 60 61 62 63 |
# File 'lib/workhorse/pool.rb', line 55 def wait # Here we use a loop-sleep combination instead of using # ThreadPoolExecutor's `wait_for_termination`. See issue #21 for more # information. loop do break if @executor.shutdown? sleep 0.1 end end |