Class: Fairy::PDirectProduct::PPreFilter
- Inherits:
-
Fairy::PIOFilter
- Object
- Fairy::PFilter
- Fairy::PIOFilter
- Fairy::PDirectProduct::PPreFilter
- Defined in:
- lib/fairy/node/p-direct-product.rb
Constant Summary
Constants inherited from Fairy::PIOFilter
Fairy::PIOFilter::ST_WAIT_IMPORT
Constants inherited from Fairy::PFilter
Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT
Instance Attribute Summary
Attributes inherited from Fairy::PFilter
#IGNORE_EXCEPTION, #id, #log_id, #ntask
Instance Method Summary collapse
- #exports ⇒ Object
-
#initialize(id, ntask, bjob, opts, block_source) ⇒ PPreFilter
constructor
A new instance of PPreFilter.
- #input=(input) ⇒ Object
- #start_export ⇒ Object
-
#start_watch_exports ⇒ Object
def number_of_exports=(n) @exports_mutex.synchronize do @exports = [] n.timesExport.new end @exports_cv.broadcast end.
Methods inherited from Fairy::PFilter
#abort_running, #basic_start, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_watch_status, #status=, #terminate, #terminate_proc
Constructor Details
#initialize(id, ntask, bjob, opts, block_source) ⇒ PPreFilter
Returns a new instance of PPreFilter.
16 17 18 19 20 21 22 23 |
# File 'lib/fairy/node/p-direct-product.rb', line 16 def initialize(id, ntask, bjob, opts, block_source) super @block_source = block_source @exports = nil @exports_mutex = Mutex.new @exports_cv = XThread::ConditionVariable.new end |
Instance Method Details
#exports ⇒ Object
60 61 62 63 64 65 66 67 |
# File 'lib/fairy/node/p-direct-product.rb', line 60 def exports @exports_mutex.synchronize do while @exports.nil? @exports_cv.wait(@exports_mutex) end end @exports end |
#input=(input) ⇒ Object
25 26 27 28 |
# File 'lib/fairy/node/p-direct-product.rb', line 25 def input=(input) super start_watch_exports end |
#start_export ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/fairy/node/p-direct-product.rb', line 69 def start_export start do begin @input.each do |e| exports.each{|exp| exp.push e} end ensure exports.each{|exp| exp.push :END_OF_STREAM} end end end |
#start_watch_exports ⇒ Object
def number_of_exports=(n) @exports_mutex.synchronize do @exports = [] n.timesExport.new end @exports_cv.broadcast
end
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/fairy/node/p-direct-product.rb', line 38 def start_watch_exports Thread.start do n = @bjob.number_of_exports @exports_mutex.synchronize do @exports = [] n.times do # 後で検討する # policy = @opts[:postqueuing_policy] # exp = Export.new(policy) exp = Export.new exp.njob_id = @id exp.no = @input.no exp.add_key(@input.key) exp.output_no_import = 1 @exports.push exp end end @exports_cv.broadcast end end |