Class: Threadz::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/threadz/batch.rb

Overview

A batch is a collection of jobs you care about that gets pushed off to the attached thread pool. The calling thread can be signaled when the batch has completed executing, or a block can be executed.

Instance Method Summary collapse

Constructor Details

#initialize(threadpool, opts = {}) ⇒ Batch

Creates a new batch attached to the given threadpool. A number of options are available:

:latent

If latent, none of the jobs in the batch will actually start

executing until the +start+ method is called.


12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/threadz/batch.rb', line 12

def initialize(threadpool, opts={})
  @threadpool = threadpool
  @waiting_threads = []
  @job_lock = Mutex.new
  @jobs_count = AtomicInteger.new(0)
  @when_done_blocks = []
  @sleeper = ::Threadz::Sleeper.new

  ## Options

  #latent
  @latent = opts.key?(:latent) ? opts[:latent] : false
  if(@latent)
    @started = false
  else
    @started = true
  end
  @job_queue = Queue.new if @latent
end

Instance Method Details

#completed?Boolean

Returns true iff there are no unfinished jobs in the queue.

Returns:

  • (Boolean)


69
70
71
# File 'lib/threadz/batch.rb', line 69

def completed?
  return @jobs_count.value == 0
end

#push(job) ⇒ Object Also known as: <<

Add a new job to the batch. If this is a latent batch, the job can’t be scheduled until the batch is #start’ed; otherwise it may start immediately. The job can be anything that responds to call or an array of objects that respond to call.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/threadz/batch.rb', line 36

def push(job)
  if job.is_a? Array
    job.each {|j| self << j}
  elsif job.respond_to? :call
    @jobs_count.increment
    if @latent && !@started
      @job_queue << job
    else
      send_to_threadpool job
    end
  else
    raise "Not a valid job: needs to support #call"
  end
end

#startObject

If this is a latent batch, start processing all of the jobs in the queue.



74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/threadz/batch.rb', line 74

def start
  Thread.exclusive do # in case another thread tries to push new jobs onto the queue while we're starting
    if @latent
      @started = true
      until @job_queue.empty?
        send_to_threadpool @job_queue.pop
      end
      return true
    else
      return false
    end
  end
end

#wait_until_done(opts = {}) ⇒ Object

Put the current thread to sleep until the batch is done processing. There are options available:

:timeout

If specified, will only wait for at least this many seconds for the batch to finish. Typically used with #completed?



57
58
59
60
61
62
63
64
65
66
# File 'lib/threadz/batch.rb', line 57

def wait_until_done(opts={})
  return if completed?

  raise "Threadz: thread deadlocked because batch job was never started" if @latent && !@started

  timeout = opts.key?(:timeout) ? opts[:timeout] : 0
  #raise "Timeout not supported at the moment" if timeout

  @sleeper.wait(timeout)
end

#when_done(&block) ⇒ Object

Execute a given block when the batch has finished processing. If the batch has already finished executing, execute immediately.



90
91
92
93
94
95
96
97
98
# File 'lib/threadz/batch.rb', line 90

def when_done(&block)
  @job_lock.lock
  if completed?
    block.call
  else
    @when_done_blocks << block
  end
  @job_lock.unlock
end