Class: Lifter::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/lifter/thread_pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(pool_size) ⇒ ThreadPool

Returns a new instance of ThreadPool.



5
6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/lifter/thread_pool.rb', line 5

def initialize(pool_size)
  @pool_size = pool_size

  @monitor = Monitor.new

  @queues = {}
  @workers = {}

  @pending = {}
  @cleared = []

  spawn_workers
end

Instance Method Details

#clear(job_tag) ⇒ Object

For a given job_tag, prevents any future pending jobs from running.



36
37
38
39
40
# File 'lib/lifter/thread_pool.rb', line 36

def clear(job_tag)
  @monitor.synchronize do
    @cleared << job_tag if !@cleared.include?(job_tag)
  end
end

#push(job_tag, &job) ⇒ Object

Add a job closure to the thread pool, tagged with a given job_tag to allow for consistent execution ordering.

Raises:

  • (ArgumentError)


21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/lifter/thread_pool.rb', line 21

def push(job_tag, &job)
  job_tag = job_tag.to_s

  raise ArgumentError.new('job_tag must be defined') if job_tag.empty?

  job_hash = Zlib.crc32(job_tag)
  worker_id = job_hash % @pool_size

  queue = @queues[worker_id]
  queue.push([job_tag, job])

  add_pending(job_tag)
end