Class: Parallel::JobFactory

Inherits:
Object
  • Object
show all
Defined in:
lib/gpack/core/parallel.rb

Instance Method Summary collapse

Constructor Details

#initialize(source, mutex) ⇒ JobFactory

Returns a new instance of JobFactory.



179
180
181
182
183
184
185
# File 'lib/gpack/core/parallel.rb', line 179

def initialize(source, mutex)
  @lambda = (source.respond_to?(:call) && source) || queue_wrapper(source)
  @source = source.to_a unless @lambda # turn Range and other Enumerable-s into an Array
  @mutex = mutex
  @index = -1
  @stopped = false
end

Instance Method Details

#nextObject



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/gpack/core/parallel.rb', line 187

def next
  if producer?
    # - index and item stay in sync
    # - do not call lambda after it has returned Stop
    item, index = @mutex.synchronize do
      return if @stopped
      item = @lambda.call
      @stopped = (item == Parallel::Stop)
      return if @stopped
      [item, @index += 1]
    end
  else
    index = @mutex.synchronize { @index += 1 }
    return if index >= size
    item = @source[index]
  end
  [item, index]
end

#pack(item, index) ⇒ Object

generate item that is sent to workers just index is faster + less likely to blow up with unserializable errors



216
217
218
# File 'lib/gpack/core/parallel.rb', line 216

def pack(item, index)
  producer? ? [item, index] : index
end

#sizeObject



206
207
208
209
210
211
212
# File 'lib/gpack/core/parallel.rb', line 206

def size
  if producer?
    Float::INFINITY
  else
    @source.size
  end
end

#unpack(data) ⇒ Object

unpack item that is sent to workers



221
222
223
# File 'lib/gpack/core/parallel.rb', line 221

def unpack(data)
  producer? ? data : [@source[data], data]
end