Class: Fairy::PDirectProduct::PPostFilter

Inherits:
Fairy::PSingleExportFilter show all
Defined in:
lib/fairy/node/p-direct-product.rb

Constant Summary

Constants included from Fairy::PSingleExportable

Fairy::PSingleExportable::END_OF_STREAM, Fairy::PSingleExportable::ST_EXPORT_FINISH, Fairy::PSingleExportable::ST_WAIT_EXPORT_FINISH

Constants inherited from Fairy::PIOFilter

Fairy::PIOFilter::ST_WAIT_IMPORT

Constants inherited from Fairy::PFilter

Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT

Instance Attribute Summary

Attributes included from Fairy::PSingleExportable

#export

Attributes inherited from Fairy::PFilter

#IGNORE_EXCEPTION, #id, #log_id, #ntask

Instance Method Summary collapse

Methods included from Fairy::PSingleExportable

#start, #start_export, #terminate, #wait_export_finish

Methods inherited from Fairy::PFilter

#abort_running, #basic_start, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_export, #start_watch_status, #status=, #terminate, #terminate_proc

Constructor Details

#initialize(id, processor, bjob, opts, block_source) ⇒ PPostFilter

Returns a new instance of PPostFilter.



85
86
87
88
89
90
91
92
# File 'lib/fairy/node/p-direct-product.rb', line 85

def initialize(id, processor, bjob, opts, block_source)
	super
	@block_source = block_source
	
	@other_imports = nil
	@other_imports_mutex = Mutex.new
	@other_imports_cv = XThread::ConditionVariable.new
end

Instance Method Details

#basic_each(&block) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/fairy/node/p-direct-product.rb', line 130

def basic_each(&block)
	@map_proc = BBlock.new(@block_source, @context, self)

	elements = []
	elements.push @input.to_a
	elements.push *other_imports.collect{|i| i.to_a}

	idxs = elements.collect{|e| 0}
	max_idxs = elements.collect{|e| e.size}

	cont = true
	while cont
	  e = elements.zip(idxs).collect{|ary, idx| ary[idx]}
	  block.call @map_proc.yield *e

	  (idxs.size-1).downto(0) do |idx|
	    idxs[idx] += 1
	    break if idxs[idx] < max_idxs[idx]
	    idxs[idx] = 0
	    cont = false if idx == 0 && idxs[idx] == 0
	  end
	end
end

#input=(input) ⇒ Object



94
95
96
97
98
# File 'lib/fairy/node/p-direct-product.rb', line 94

def input=(input)
	@input = input
#	self.no = input.no
	self.key = input.key
end

#other_importsObject



121
122
123
124
125
126
127
128
# File 'lib/fairy/node/p-direct-product.rb', line 121

def other_imports
	@other_imports_mutex.synchronize do
	  while !@other_imports
	    @other_imports_cv.wait(@other_imports_mutex)
	  end
	  @other_imports
	end
end

#other_inputs=(exports) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/fairy/node/p-direct-product.rb', line 100

def other_inputs=(exports)
	@other_imports_mutex.synchronize do
	  @other_imports = exports.collect{|exp|
# 後で検討する
#           policy = @opts[:prequeuing_policy]
#	    imp = Import.new(policy)
	    imp = Import.new
	    imp.no = exp.no
	    imp.add_key(exp.key)
	    imp.no_import = 1
	    imp.set_log_callback do |n, key| 
 Log::verbose(self, "IMPORT POP key=#{key}: #{n}")
	    end

	    exp.output = imp
	    imp
	  }
	  @other_imports_cv.broadcast
	end
end