Class: Parallel::JobFactory
- Inherits:
-
Object
- Object
- Parallel::JobFactory
- Defined in:
- lib/gpack/core/parallel.rb
Instance Method Summary collapse
-
#initialize(source, mutex) ⇒ JobFactory
constructor
A new instance of JobFactory.
- #next ⇒ Object
-
#pack(item, index) ⇒ Object
generate item that is sent to workers just index is faster + less likely to blow up with unserializable errors.
- #size ⇒ Object
-
#unpack(data) ⇒ Object
unpack item that is sent to workers.
Constructor Details
#initialize(source, mutex) ⇒ JobFactory
Returns a new instance of JobFactory.
179 180 181 182 183 184 185 |
# File 'lib/gpack/core/parallel.rb', line 179 def initialize(source, mutex) @lambda = (source.respond_to?(:call) && source) || queue_wrapper(source) @source = source.to_a unless @lambda # turn Range and other Enumerable-s into an Array @mutex = mutex @index = -1 @stopped = false end |
Instance Method Details
#next ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/gpack/core/parallel.rb', line 187 def next if producer? # - index and item stay in sync # - do not call lambda after it has returned Stop item, index = @mutex.synchronize do return if @stopped item = @lambda.call @stopped = (item == Parallel::Stop) return if @stopped [item, @index += 1] end else index = @mutex.synchronize { @index += 1 } return if index >= size item = @source[index] end [item, index] end |
#pack(item, index) ⇒ Object
generate item that is sent to workers just index is faster + less likely to blow up with unserializable errors
216 217 218 |
# File 'lib/gpack/core/parallel.rb', line 216 def pack(item, index) producer? ? [item, index] : index end |
#size ⇒ Object
206 207 208 209 210 211 212 |
# File 'lib/gpack/core/parallel.rb', line 206 def size if producer? Float::INFINITY else @source.size end end |
#unpack(data) ⇒ Object
unpack item that is sent to workers
221 222 223 |
# File 'lib/gpack/core/parallel.rb', line 221 def unpack(data) producer? ? data : [@source[data], data] end |