Class: Threadz::Batch
- Inherits:
-
Object
- Object
- Threadz::Batch
- Defined in:
- lib/threadz/batch.rb
Overview
A batch is a collection of jobs you care about that gets pushed off to the attached thread pool. The calling thread can be signaled when the batch has completed executing, or a block can be executed.
Instance Method Summary collapse
-
#completed? ⇒ Boolean
Returns true iff there are no unfinished jobs in the queue.
-
#initialize(threadpool, opts = {}) ⇒ Batch
constructor
Creates a new batch attached to the given threadpool.
-
#push(job) ⇒ Object
(also: #<<)
Add a new job to the batch.
-
#start ⇒ Object
If this is a latent batch, start processing all of the jobs in the queue.
-
#wait_until_done(opts = {}) ⇒ Object
Put the current thread to sleep until the batch is done processing.
-
#when_done(&block) ⇒ Object
Execute a given block when the batch has finished processing.
Constructor Details
#initialize(threadpool, opts = {}) ⇒ Batch
Creates a new batch attached to the given threadpool. A number of options are available:
:latent
-
If latent, none of the jobs in the batch will actually start
executing until the +start+ method is called.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/threadz/batch.rb', line 12 def initialize(threadpool, opts={}) @threadpool = threadpool @waiting_threads = [] @job_lock = Mutex.new @jobs_count = AtomicInteger.new(0) @when_done_blocks = [] @sleeper = ::Threadz::Sleeper.new ## Options #latent @latent = opts.key?(:latent) ? opts[:latent] : false if(@latent) @started = false else @started = true end @job_queue = Queue.new if @latent end |
Instance Method Details
#completed? ⇒ Boolean
Returns true iff there are no unfinished jobs in the queue.
69 70 71 |
# File 'lib/threadz/batch.rb', line 69 def completed? return @jobs_count.value == 0 end |
#push(job) ⇒ Object Also known as: <<
Add a new job to the batch. If this is a latent batch, the job can’t be scheduled until the batch is #start’ed; otherwise it may start immediately. The job can be anything that responds to call
or an array of objects that respond to call
.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/threadz/batch.rb', line 36 def push(job) if job.is_a? Array job.each {|j| self << j} elsif job.respond_to? :call @jobs_count.increment if @latent && !@started @job_queue << job else send_to_threadpool job end else raise "Not a valid job: needs to support #call" end end |
#start ⇒ Object
If this is a latent batch, start processing all of the jobs in the queue.
74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/threadz/batch.rb', line 74 def start Thread.exclusive do # in case another thread tries to push new jobs onto the queue while we're starting if @latent @started = true until @job_queue.empty? send_to_threadpool @job_queue.pop end return true else return false end end end |
#wait_until_done(opts = {}) ⇒ Object
Put the current thread to sleep until the batch is done processing. There are options available:
:timeout
-
If specified, will only wait for at least this many seconds for the batch to finish. Typically used with #completed?
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/threadz/batch.rb', line 57 def wait_until_done(opts={}) return if completed? raise "Threadz: thread deadlocked because batch job was never started" if @latent && !@started timeout = opts.key?(:timeout) ? opts[:timeout] : 0 #raise "Timeout not supported at the moment" if timeout @sleeper.wait(timeout) end |
#when_done(&block) ⇒ Object
Execute a given block when the batch has finished processing. If the batch has already finished executing, execute immediately.
90 91 92 93 94 95 96 97 98 |
# File 'lib/threadz/batch.rb', line 90 def when_done(&block) @job_lock.lock if completed? block.call else @when_done_blocks << block end @job_lock.unlock end |