Class: Fairy::CDirectProduct::CPreFilter
Overview
呼ばれない
def start_create_nodes
@main_prefilter.start_create_nodes
end
Instance Attribute Summary
#input
#input
Instance Method Summary
collapse
#node_class, #output=
#break_running, #inputtable?
#abort_create_node, #add_node, #assgin_number_of_nodes?, #break_create_node, #break_running, #create_and_add_node, #create_import, #create_node, #create_nodes, #def_job_pool_variable, #each_node, #each_node_exist_only, #handle_exception, #input, #job_pool_dict, #job_pool_variable, #nodes, #number_of_nodes, #pool_dict, #postmapping_policy, #start_export, #start_watch_node_status, #update_status, watch_status, watch_status=, #watch_status?
Constructor Details
#initialize(controller, opts, block_source) ⇒ CPreFilter
Returns a new instance of CPreFilter.
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/fairy/master/c-direct-product.rb', line 92
def initialize(controller, opts, block_source)
super
@block_source = block_source
@no = 0
@exports = {}
@exports_mutex = Mutex.new
@products = nil
@products_mutex = Mutex.new
@products_cv = XThread::ConditionVariable.new
end
|
Instance Method Details
#bind_export(exp, imp) ⇒ Object
183
184
185
|
# File 'lib/fairy/master/c-direct-product.rb', line 183
def bind_export(exp, imp)
end
|
#each_assigned_filter(&block) ⇒ Object
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
# File 'lib/fairy/master/c-direct-product.rb', line 139
def each_assigned_filter(&block)
Thread.start do
@main.other_prefilters.each do |p|
p.each_node do |n|
@exports_mutex.synchronize do
@exports[n] = n.exports.dc_dup
end
end
end
@products_mutex.synchronize do
@products = nodes.product(*@main.other_prefilters.collect{|p| p.nodes})
@products_cv.broadcast
end
end
super
end
|
#each_export_by(njob, mapper, &block) ⇒ Object
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
# File 'lib/fairy/master/c-direct-product.rb', line 159
def each_export_by(njob, mapper, &block)
@exports_mutex.synchronize do
@exports[njob] = njob.exports.dc_dup
end
@products_mutex.synchronize do
while !@products
@products_cv.wait(@products_mutex)
end
post_njob_no = -1
@products.each do |main_njob, *others_njobs|
post_njob_no += 1
next if main_njob != njob
@others_njobs = others_njobs
block.call(@exports[main_njob].shift,
:init_njob => proc{|njob|
njob.no = post_njob_no
njob.other_inputs = others_njobs.collect{|n| @exports[n].shift}})
end
end
end
|
#main=(main) ⇒ Object
106
107
108
|
# File 'lib/fairy/master/c-direct-product.rb', line 106
def main=(main)
@main = main
end
|
#njob_creation_params ⇒ Object
114
115
116
|
# File 'lib/fairy/master/c-direct-product.rb', line 114
def njob_creation_params
[@block_source]
end
|
#node_class_name ⇒ Object
110
111
112
|
# File 'lib/fairy/master/c-direct-product.rb', line 110
def node_class_name
"PDirectProduct::PPreFilter"
end
|
#number_of_exports ⇒ Object
123
124
125
|
# File 'lib/fairy/master/c-direct-product.rb', line 123
def number_of_exports
@main.no_of_exports_for_prefilter(self)
end
|
#number_of_nodes=(no) ⇒ Object
118
119
120
121
|
# File 'lib/fairy/master/c-direct-product.rb', line 118
def number_of_nodes=(no)
super
@main.update_prefilter_no_nodes(self)
end
|
#start_create_nodes ⇒ Object
127
128
129
130
131
132
133
134
135
136
137
|
# File 'lib/fairy/master/c-direct-product.rb', line 127
def start_create_nodes
Log::debug self, "START_CREATE_NODES: #{self}"
@main.other_prefilters.each do |other|
Thread.start do
other.each_assigned_filter do |input_filter|
exp = input_filter.start_export
end
end
end
super
end
|