Class: Fairy::PDirectProduct::PPostFilter
- Inherits:
-
Fairy::PSingleExportFilter
- Object
- Fairy::PFilter
- Fairy::PIOFilter
- Fairy::PSingleExportFilter
- Fairy::PDirectProduct::PPostFilter
- Defined in:
- lib/fairy/node/p-direct-product.rb
Constant Summary
Constants included from Fairy::PSingleExportable
Fairy::PSingleExportable::END_OF_STREAM, Fairy::PSingleExportable::ST_EXPORT_FINISH, Fairy::PSingleExportable::ST_WAIT_EXPORT_FINISH
Constants inherited from Fairy::PIOFilter
Fairy::PIOFilter::ST_WAIT_IMPORT
Constants inherited from Fairy::PFilter
Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT
Instance Attribute Summary
Attributes included from Fairy::PSingleExportable
Attributes inherited from Fairy::PFilter
#IGNORE_EXCEPTION, #id, #log_id, #ntask
Instance Method Summary collapse
- #basic_each(&block) ⇒ Object
-
#initialize(id, processor, bjob, opts, block_source) ⇒ PPostFilter
constructor
A new instance of PPostFilter.
- #input=(input) ⇒ Object
- #other_imports ⇒ Object
- #other_inputs=(exports) ⇒ Object
Methods included from Fairy::PSingleExportable
#start, #start_export, #terminate, #wait_export_finish
Methods inherited from Fairy::PFilter
#abort_running, #basic_start, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_export, #start_watch_status, #status=, #terminate, #terminate_proc
Constructor Details
#initialize(id, processor, bjob, opts, block_source) ⇒ PPostFilter
Returns a new instance of PPostFilter.
85 86 87 88 89 90 91 92 |
# File 'lib/fairy/node/p-direct-product.rb', line 85 def initialize(id, processor, bjob, opts, block_source) super @block_source = block_source @other_imports = nil @other_imports_mutex = Mutex.new @other_imports_cv = XThread::ConditionVariable.new end |
Instance Method Details
#basic_each(&block) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/fairy/node/p-direct-product.rb', line 130 def basic_each(&block) @map_proc = BBlock.new(@block_source, @context, self) elements = [] elements.push @input.to_a elements.push *other_imports.collect{|i| i.to_a} idxs = elements.collect{|e| 0} max_idxs = elements.collect{|e| e.size} cont = true while cont e = elements.zip(idxs).collect{|ary, idx| ary[idx]} block.call @map_proc.yield *e (idxs.size-1).downto(0) do |idx| idxs[idx] += 1 break if idxs[idx] < max_idxs[idx] idxs[idx] = 0 cont = false if idx == 0 && idxs[idx] == 0 end end end |
#input=(input) ⇒ Object
94 95 96 97 98 |
# File 'lib/fairy/node/p-direct-product.rb', line 94 def input=(input) @input = input # self.no = input.no self.key = input.key end |
#other_imports ⇒ Object
121 122 123 124 125 126 127 128 |
# File 'lib/fairy/node/p-direct-product.rb', line 121 def other_imports @other_imports_mutex.synchronize do while !@other_imports @other_imports_cv.wait(@other_imports_mutex) end @other_imports end end |
#other_inputs=(exports) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/fairy/node/p-direct-product.rb', line 100 def other_inputs=(exports) @other_imports_mutex.synchronize do @other_imports = exports.collect{|exp| # 後で検討する # policy = @opts[:prequeuing_policy] # imp = Import.new(policy) imp = Import.new imp.no = exp.no imp.add_key(exp.key) imp.no_import = 1 imp.set_log_callback do |n, key| Log::verbose(self, "IMPORT POP key=#{key}: #{n}") end exp.output = imp imp } @other_imports_cv.broadcast end end |