Class: TaskJuggler::BatchProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/taskjuggler/BatchProcessor.rb

Overview

The BatchProcessor class can be used to run code blocks of the program as a separate process. Mulitple pieces of code can be submitted to be executed in parallel. The number of CPU cores to use is limited at object creation time. The submitted jobs will be queued and scheduled to the given number of CPUs. The usage model is simple. Create an BatchProcessor object. Use BatchProcessor#queue to submit all the jobs and then use BatchProcessor#wait to wait for completion and to process the results.

Instance Method Summary collapse

Constructor Details

#initialize(maxCpuCores) ⇒ BatchProcessor

Create a BatchProcessor object. maxCpuCores limits the number of simultaneously spawned processes.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/taskjuggler/BatchProcessor.rb', line 69

def initialize(maxCpuCores)
  @maxCpuCores = maxCpuCores
  # Jobs submitted by calling queue() are put in the @toRunQueue. The
  # pusher Thread will pick them up and fork them off into another
  # process.
  @toRunQueue = Queue.new
  # A hash that maps the JobInfo objects of running jobs by their PID.
  @runningJobs = { }
  # A list of jobs that wait to complete their writing.
  @spoolingJobs = [ ]
  # The wait() method will then clean the @toDropQueue, executes the post
  # processing block and removes all JobInfo related objects.
  @toDropQueue = Queue.new

  # A semaphore to guard accesses to @runningJobs, @spoolingJobs and
  # following shared data structures.
  @lock = Monitor.new
  # We count the submitted and completed jobs. The @jobsIn counter also
  # doubles as a unique job ID.
  @jobsIn = @jobsOut = 0
  # An Array that holds all the IO objects to receive data from.
  @pipes = []
  # A hash that maps IO objects to JobInfo objects
  @pipeToJob = {}

  # This global flag is set to true to signal the threads to terminate.
  @terminate = false
  # Sleep time of the threads when no data is pending. This value must be
  # large enough to allow for a context switch between the sending
  # (forked-off) process and this process. If it's too large, throughput
  # will suffer.
  @timeout = 0.02

  Thread.abort_on_exception = true
end

Instance Method Details

#queue(tag = nil, &block) ⇒ Object

Add a new job the job queue. tag is some data that the caller can use to identify the job upon completion. block is a Ruby code block to be executed in a separate process.



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/taskjuggler/BatchProcessor.rb', line 108

def queue(tag = nil, &block)
  raise 'You cannot call queue() while wait() is running!' if @jobsOut > 0

  # If this is the first queued job for this run, we have to start the
  # helper threads.
  if @jobsIn == 0
    # The JobInfo objects in the @toRunQueue are processed by the pusher
    # thread.  It forkes off processes to execute the code block associated
    # with the JobInfo.
    @pusher = Thread.new { pusher }
    # The popper thread waits for terminated childs and picks up the
    # results.
    @popper = Thread.new { popper }
    # The grabber thread collects $stdout and $stderr data from each child
    # process and stores them in the corresponding JobInfo.
    @grabber = Thread.new { grabber }
  end

  # Create a new JobInfo object for the job and push it to the @toRunQueue.
  job = JobInfo.new(@jobsIn, block, tag)
  # Increase job counter
  @lock.synchronize { @jobsIn += 1 }
  @toRunQueue.push(job)
end

#waitObject

Wait for all jobs to complete. The code block will get the JobInfo objects for each job to pick up the results.



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/taskjuggler/BatchProcessor.rb', line 135

def wait
  # Don't wait if there are no jobs.
  return if @jobsIn == 0

  # When we have received as many jobs in the @toDropQueue than we have
  # started then we're done.
  while !@lock.synchronize { @jobsIn == @jobsOut }
    if @toDropQueue.empty?
      sleep(@timeout)
    else
      # We have completed jobs.
      while !@toDropQueue.empty?
        # Pop a job from the @toDropQueue and call the block with it.
        job = @toDropQueue.pop
        # Remove the job related entries from the housekeeping tables.
        @lock.synchronize { @jobsOut += 1 }

        # Call the post-processing block that was passed to wait() with
        # the JobInfo object as argument.
        yield(job)
      end
    end
  end

  # Signal threads to stop
  @terminate = true
  # Wait for treads to finish
  @pusher.join
  @popper.join
  @grabber.join

  # Reset some variables so we can reuse the object for further job runs.
  @jobsIn = @jobsOut = 0
  @terminate = false

  # Make sure all data structures are empty and clean.
  check
end