Class: Threasy::Work
- Inherits:
-
Object
show all
- Defined in:
- lib/threasy/work.rb
Defined Under Namespace
Classes: TimeoutQueue, Worker
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize ⇒ Work
Returns a new instance of Work.
18
19
20
21
22
|
# File 'lib/threasy/work.rb', line 18
def initialize
@queue = TimeoutQueue.new
@pool = Set.new
@semaphore = Mutex.new
end
|
Instance Attribute Details
#pool ⇒ Object
Threasy::Work
Class to manage the work queue.
An instance of ‘Threasy::Work` will manage a pool of worker threads and a queue of jobs. As the job queue grows, new worker threads are created process the jobs. When the Queue remains empty, worker threads are culled.
Example
work = Threasy::Work.new
work.enqueue { puts "Hello from the background!" }
16
17
18
|
# File 'lib/threasy/work.rb', line 16
def pool
@pool
end
|
#queue ⇒ Object
Threasy::Work
Class to manage the work queue.
An instance of ‘Threasy::Work` will manage a pool of worker threads and a queue of jobs. As the job queue grows, new worker threads are created process the jobs. When the Queue remains empty, worker threads are culled.
Example
work = Threasy::Work.new
work.enqueue { puts "Hello from the background!" }
16
17
18
|
# File 'lib/threasy/work.rb', line 16
def queue
@queue
end
|
Instance Method Details
#add_worker(size) ⇒ Object
68
69
70
71
72
73
|
# File 'lib/threasy/work.rb', line 68
def add_worker(size)
log "Adding new worker to pool"
worker = Worker.new(self, size)
pool.add worker
worker.work
end
|
#check_workers ⇒ Object
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/threasy/work.rb', line 57
def check_workers
sync do
pool_size = pool.size
queue_size = queue.size
log "Checking workers. Pool: #{pool_size}, Max: #{max_workers}, Queue: #{queue_size}"
if pool_size < max_workers
add_worker(pool_size) if pool_size == 0 || queue_size > max_workers
end
end
end
|
#clear ⇒ Object
75
76
77
|
# File 'lib/threasy/work.rb', line 75
def clear
queue.clear
end
|
#enqueue(job = nil, &block) ⇒ Object
Also known as:
enqueue_block
Enqueue a job into the work queue
Examples
work = Threasy::Work.new
work.enqueue { do_some_background_work }
work.enqueue BackgroundJob.new(some: data)
Threasy.enqueue("BackgroundJob.new")
39
40
41
|
# File 'lib/threasy/work.rb', line 39
def enqueue(job = nil, &block)
queue.push(block_given? ? block : job).tap { check_workers }
end
|
#grab ⇒ Object
49
50
51
|
# File 'lib/threasy/work.rb', line 49
def grab
queue.pop
end
|
#log(msg) ⇒ Object
79
80
81
|
# File 'lib/threasy/work.rb', line 79
def log(msg)
Threasy.logger.debug msg
end
|
#max_workers ⇒ Object
53
54
55
|
# File 'lib/threasy/work.rb', line 53
def max_workers
Threasy.config.max_workers
end
|
#sync(&block) ⇒ Object
45
46
47
|
# File 'lib/threasy/work.rb', line 45
def sync(&block)
@semaphore.synchronize &block
end
|