Class: Threadz::Batch
- Inherits:
-
Object
- Object
- Threadz::Batch
- Defined in:
- lib/threadz/batch.rb
Overview
A batch is a collection of jobs you care about that gets pushed off to the attached thread pool. The calling thread can be signaled when the batch has completed executing, or a block can be executed.
Instance Method Summary collapse
-
#completed? ⇒ Boolean
Returns true iff there are no jobs outstanding.
-
#initialize(threadpool, opts = {}) ⇒ Batch
constructor
Creates a new batch attached to the given threadpool.
-
#push(job) ⇒ Object
(also: #<<)
Add a new job to the batch.
-
#start ⇒ Object
If this is a latent batch, start processing all of the jobs in the queue.
-
#wait_until_done(opts = {}) ⇒ Object
Put the current thread to sleep until the batch is done processing.
-
#when_done(&block) ⇒ Object
Execute a given block when the batch has finished processing.
Constructor Details
#initialize(threadpool, opts = {}) ⇒ Batch
Creates a new batch attached to the given threadpool. A number of options are available:
:latent
-
If latent, none of the jobs in the batch will actually start
executing until the +start+ method is called.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/threadz/batch.rb', line 12 def initialize(threadpool, opts={}) @threadpool = threadpool @waiting_threads = [] @job_lock = Mutex.new @jobs_count = AtomicInteger.new(0) @when_done_blocks = [] @sleeper = ::Threadz::Sleeper.new ## Options #latent @latent = opts.key?(:latent) ? opts[:latent] : false if(@latent) @started = false else @started = true end @job_queue = Queue.new if @latent end |
Instance Method Details
#completed? ⇒ Boolean
Returns true iff there are no jobs outstanding.
65 66 67 |
# File 'lib/threadz/batch.rb', line 65 def completed? return @jobs_count.value == 0 end |
#push(job) ⇒ Object Also known as: <<
Add a new job to the batch. If this is a latent batch, the job can’t be scheduled until the batch is #start’ed; otherwise it may start immediately. The job can be anything that responds to call
or an array of objects that respond to call
.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/threadz/batch.rb', line 36 def push(job) if job.is_a? Array job.each {|j| self << j} elsif job.respond_to? :call @jobs_count.increment if @latent && !@started @job_queue << job else send_to_threadpool job end else raise "Not a valid job: needs to support #call" end end |
#start ⇒ Object
If this is a latent batch, start processing all of the jobs in the queue.
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/threadz/batch.rb', line 70 def start @job_lock.synchronize { # in case another thread tries to push new jobs onto the queue while we're starting if @latent @started = true until @job_queue.empty? send_to_threadpool @job_queue.pop end return true else return false end } end |
#wait_until_done(opts = {}) ⇒ Object
Put the current thread to sleep until the batch is done processing. There are options available:
:timeout
-
If specified, will only wait for at least this many seconds for the batch to finish. Typically used with #completed?
57 58 59 60 61 62 |
# File 'lib/threadz/batch.rb', line 57 def wait_until_done(opts={}) raise "Threadz: thread deadlocked because batch job was never started" if @latent && !@started timeout = opts.key?(:timeout) ? opts[:timeout] : 0 @sleeper.wait(timeout) unless completed? end |
#when_done(&block) ⇒ Object
Execute a given block when the batch has finished processing. If the batch has already finished executing, execute immediately.
86 87 88 |
# File 'lib/threadz/batch.rb', line 86 def when_done(&block) @job_lock.synchronize { completed? ? block.call : @when_done_blocks << block } end |