Class: Lifter::ThreadPool
- Inherits:
-
Object
- Object
- Lifter::ThreadPool
- Defined in:
- lib/lifter/thread_pool.rb
Instance Method Summary collapse
-
#clear(job_tag) ⇒ Object
For a given job_tag, prevents any future pending jobs from running.
-
#initialize(pool_size) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
-
#push(job_tag, &job) ⇒ Object
Add a job closure to the thread pool, tagged with a given job_tag to allow for consistent execution ordering.
Constructor Details
#initialize(pool_size) ⇒ ThreadPool
Returns a new instance of ThreadPool.
5 6 7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/lifter/thread_pool.rb', line 5 def initialize(pool_size) @pool_size = pool_size @monitor = Monitor.new @queues = {} @workers = {} @pending = {} @cleared = [] spawn_workers end |
Instance Method Details
#clear(job_tag) ⇒ Object
For a given job_tag, prevents any future pending jobs from running.
36 37 38 39 40 |
# File 'lib/lifter/thread_pool.rb', line 36 def clear(job_tag) @monitor.synchronize do @cleared << job_tag if !@cleared.include?(job_tag) end end |
#push(job_tag, &job) ⇒ Object
Add a job closure to the thread pool, tagged with a given job_tag to allow for consistent execution ordering.
21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/lifter/thread_pool.rb', line 21 def push(job_tag, &job) job_tag = job_tag.to_s raise ArgumentError.new('job_tag must be defined') if job_tag.empty? job_hash = Zlib.crc32(job_tag) worker_id = job_hash % @pool_size queue = @queues[worker_id] queue.push([job_tag, job]) add_pending(job_tag) end |