Class: Fairy::CDirectProduct::CPreFilter

Inherits:
Fairy::CIOFilter show all
Defined in:
lib/fairy/master/c-direct-product.rb

Overview

呼ばれない

def start_create_nodes
  @main_prefilter.start_create_nodes
end

Instance Attribute Summary

Attributes inherited from Fairy::CIOFilter

#input

Attributes included from Fairy::CInputtable

#input

Instance Method Summary collapse

Methods inherited from Fairy::CIOFilter

#node_class, #output=

Methods included from Fairy::CInputtable

#break_running, #inputtable?

Methods inherited from Fairy::CFilter

#abort_create_node, #add_node, #assgin_number_of_nodes?, #break_create_node, #break_running, #create_and_add_node, #create_import, #create_node, #create_nodes, #def_job_pool_variable, #each_node, #each_node_exist_only, #handle_exception, #input, #job_pool_dict, #job_pool_variable, #nodes, #number_of_nodes, #pool_dict, #postmapping_policy, #start_export, #start_watch_node_status, #update_status, watch_status, watch_status=, #watch_status?

Constructor Details

#initialize(controller, opts, block_source) ⇒ CPreFilter

Returns a new instance of CPreFilter.



92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/fairy/master/c-direct-product.rb', line 92

def initialize(controller, opts, block_source)
	super
	@block_source = block_source

	@no = 0
	@exports = {}
	@exports_mutex = Mutex.new
#	@exports_cv = XThread::ConditionVariable.new

	@products = nil
	@products_mutex = Mutex.new
	@products_cv = XThread::ConditionVariable.new
end

Instance Method Details

#bind_export(exp, imp) ⇒ Object



183
184
185
# File 'lib/fairy/master/c-direct-product.rb', line 183

def bind_export(exp, imp)
	# do nothing
end

#each_assigned_filter(&block) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/fairy/master/c-direct-product.rb', line 139

def each_assigned_filter(&block)
	Thread.start do
	  @main.other_prefilters.each do |p| 
	    p.each_node do |n| 
 @exports_mutex.synchronize do 
		@exports[n] = n.exports.dc_dup
#		@exports_cv.broadcast
 end
	    end
	  end
	  @products_mutex.synchronize do 
	    @products = nodes.product(*@main.other_prefilters.collect{|p| p.nodes})
	    @products_cv.broadcast
	  end
	end

	super
end

#each_export_by(njob, mapper, &block) ⇒ Object

main prefilter 用



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/fairy/master/c-direct-product.rb', line 159

def each_export_by(njob, mapper, &block)
	@exports_mutex.synchronize do
	  @exports[njob] = njob.exports.dc_dup
#	  @exports_cv.broadcast
	end
	@products_mutex.synchronize do
	  while !@products
	    @products_cv.wait(@products_mutex)
	  end
	  
	  post_njob_no = -1
	  @products.each do |main_njob, *others_njobs|
	    post_njob_no += 1
	    next if main_njob != njob
	    @others_njobs = others_njobs
	    
	    block.call(@exports[main_njob].shift,
   :init_njob => proc{|njob| 
			 njob.no = post_njob_no
			 njob.other_inputs = others_njobs.collect{|n| @exports[n].shift}})
	  end
	end
end

#main=(main) ⇒ Object



106
107
108
# File 'lib/fairy/master/c-direct-product.rb', line 106

def main=(main)
	@main = main
end

#njob_creation_paramsObject



114
115
116
# File 'lib/fairy/master/c-direct-product.rb', line 114

def njob_creation_params
	[@block_source]
end

#node_class_nameObject



110
111
112
# File 'lib/fairy/master/c-direct-product.rb', line 110

def node_class_name
	"PDirectProduct::PPreFilter"
end

#number_of_exportsObject



123
124
125
# File 'lib/fairy/master/c-direct-product.rb', line 123

def number_of_exports
	@main.no_of_exports_for_prefilter(self)
end

#number_of_nodes=(no) ⇒ Object



118
119
120
121
# File 'lib/fairy/master/c-direct-product.rb', line 118

def number_of_nodes=(no)
	super
	@main.update_prefilter_no_nodes(self)
end

#start_create_nodesObject



127
128
129
130
131
132
133
134
135
136
137
# File 'lib/fairy/master/c-direct-product.rb', line 127

def start_create_nodes
 	Log::debug self, "START_CREATE_NODES: #{self}"
 	@main.other_prefilters.each do |other|
 	  Thread.start do
other.each_assigned_filter do |input_filter|
  exp = input_filter.start_export
end
 	  end
 	end
	super
end