Class: Fairy::CFilter
- Inherits:
-
Object
- Object
- Fairy::CFilter
- Defined in:
- lib/fairy/master/c-filter.rb
Defined Under Namespace
Classes: Context
Constant Summary collapse
- @@watch_status =
false
Class Method Summary collapse
Instance Method Summary collapse
- #abort_create_node ⇒ Object
- #add_node(node) ⇒ Object
- #assgin_number_of_nodes? ⇒ Boolean
- #bind_export(exp, imp) ⇒ Object
- #break_create_node ⇒ Object
-
#break_running(njob = nil) ⇒ Object
job control.
- #create_and_add_node(ntask, mapper, opts = {}) ⇒ Object
- #create_import(processor) ⇒ Object
- #create_node(ntask, *params, &block) ⇒ Object
- #create_nodes ⇒ Object
- #def_job_pool_variable(vname, value = nil) ⇒ Object
-
#each_assigned_filter(&block) ⇒ Object
def next_filter(mapper) @nodes_mutex.synchronize do while @nodes_for_next_filters.empty? @nodes_cv.wait(@nodes_mutex) end @nodes_for_next_filters.shift end end.
- #each_export_by(njob, mapper, &block) ⇒ Object
- #each_node(flag = nil, &block) ⇒ Object
- #each_node_exist_only(&block) ⇒ Object
- #handle_exception(exp) ⇒ Object
-
#initialize(controller, opts, *rests) ⇒ CFilter
constructor
A new instance of CFilter.
- #input ⇒ Object
- #job_pool_dict ⇒ Object
- #job_pool_variable(vname, *value) ⇒ Object
- #njob_creation_params ⇒ Object
- #node_class_name ⇒ Object
- #nodes ⇒ Object
-
#number_of_nodes ⇒ Object
def each_export(&block) each_node do |node| exp = node.export block.call exp, node node.export.output_no_import = 1 end end.
-
#number_of_nodes=(no) ⇒ Object
Njob Methods:.
-
#pool_dict ⇒ Object
Pool Variables: (JP: プール変数).
- #postmapping_policy ⇒ Object
-
#start_create_nodes ⇒ Object
Njob creation methods.
- #start_export(njob) ⇒ Object
- #start_watch_node_status ⇒ Object
- #update_status(node, st) ⇒ Object
- #watch_status? ⇒ Boolean
Constructor Details
#initialize(controller, opts, *rests) ⇒ CFilter
Returns a new instance of CFilter.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/fairy/master/c-filter.rb', line 23 def initialize(controller, opts, *rests) Log::info self, "CREATE BJOB: #{self.class}" @controller = controller @opts = opts @opts = {} unless @opts @job_pool_dict = PoolDictionary.new @number_of_nodes = nil # @number_of_nodes_mutex = Mutex.new # @number_of_nodes_cv = XThread::ConditionVariable.new @nodes = [] @nodes_for_next_filters = [] @nodes_mutex = Mutex.new @nodes_cv = XThread::ConditionVariable.new @nodes_status = {} @nodes_status_mutex = Mutex.new @nodes_status_cv = XThread::ConditionVariable.new @controller.register_bjob(self) @create_node_thread = nil # gbreakのときに安全に@create_node_threadスレッドをとめるため @create_node_mutex = Mutex.new @context = Context.new(self) start_watch_node_status if watch_status? end |
Class Method Details
.watch_status ⇒ Object
13 14 15 |
# File 'lib/fairy/master/c-filter.rb', line 13 def self.watch_status @@watch_status end |
.watch_status=(val) ⇒ Object
17 18 19 |
# File 'lib/fairy/master/c-filter.rb', line 17 def self.watch_status=(val) @@watch_status=val end |
Instance Method Details
#abort_create_node ⇒ Object
343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/fairy/master/c-filter.rb', line 343 def abort_create_node Log::debug(self, "ABORT_CREATE_NODE: S") @controller.create_processor_mutex.synchronize do Log::debug(self, "ABORT_CREATE_NODE: 1") if @create_node_thread && @create_node_thread.alive? Log::debug(self, "ABORT_CREATE_NODE: 2 ") @create_node_thread.raise AbortCreateNode Log::debug(self, "ABORT_CREATE_NODE: 3") end Log::debug(self, "ABORT_CREATE_NODE: E") end end |
#add_node(node) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/fairy/master/c-filter.rb', line 109 def add_node(node) @nodes_mutex.synchronize do unless node @nodes_for_next_filters.push nil @nodes_cv.broadcast return end # node.no = node.input.no @nodes[node.no] = node @nodes_for_next_filters.push node @nodes_cv.broadcast end end |
#assgin_number_of_nodes? ⇒ Boolean
263 264 265 |
# File 'lib/fairy/master/c-filter.rb', line 263 def assgin_number_of_nodes? @number_of_nodes end |
#bind_export(exp, imp) ⇒ Object
319 320 321 |
# File 'lib/fairy/master/c-filter.rb', line 319 def bind_export(exp, imp) imp.no_import = 1 end |
#break_create_node ⇒ Object
334 335 336 337 338 339 340 341 |
# File 'lib/fairy/master/c-filter.rb', line 334 def break_create_node # 作成中のものは完全に作成させるため @controller.create_processor_mutex.synchronize do if @create_node_thread && @create_node_thread.alive? @create_node_thread.raise BreakCreateNode end end end |
#break_running(njob = nil) ⇒ Object
job control
326 327 328 329 330 331 332 |
# File 'lib/fairy/master/c-filter.rb', line 326 def break_running(njob = nil) break_create_node each_node do |tasklet| tasklet.break_running unless tasklet.equal?(njob) end end |
#create_and_add_node(ntask, mapper, opts = {}) ⇒ Object
235 236 237 238 239 240 241 242 243 |
# File 'lib/fairy/master/c-filter.rb', line 235 def create_and_add_node(ntask, mapper, opts={}) node = create_node(ntask) {|node| if opts[:init_njob] opts[:init_njob].call(node) end mapper.bind_input(node) } node end |
#create_import(processor) ⇒ Object
310 311 312 |
# File 'lib/fairy/master/c-filter.rb', line 310 def create_import(processor) processor.create_import(@opts[:prequeuing_policy]) end |
#create_node(ntask, *params, &block) ⇒ Object
245 246 247 248 249 250 251 252 253 |
# File 'lib/fairy/master/c-filter.rb', line 245 def create_node(ntask, *params, &block) if params.empty? params = njob_creation_params end njob = ntask.create_njob(node_class_name, self, @opts, *params) block.call(njob) add_node(njob) njob end |
#create_nodes ⇒ Object
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/fairy/master/c-filter.rb', line 182 def create_nodes begin no = 0 ret = nil @controller.assign_ntasks(self, @create_node_mutex) do |ntask, mapper, opts={}| njob = create_and_add_node(ntask, mapper, opts) no += 1 njob end add_node(nil) Log::debug self, "CREATE_NODES: #{self}.number_of_nodes=#{no}" self.number_of_nodes = no rescue BreakCreateNode Log::debug self, "BREAK CREATE NODE: #{self}" add_node(nil) Log::debug self, "CREATE_NODES: #{self}.number_of_nodes=#{no}" self.number_of_nodes = no rescue AbortCreateNode Log::debug self, "Abort CREATE NODE: #{self}" # do nothing rescue ERR::NodeNotArrived Log::debug self, "NODE NOT ARRIVED: #{self}" begin handle_exception($!) rescue Log::debug_exception(self) end Log::debug self, "NODE NOT ARRIVED2: #{self}" raise rescue ERR::CantExecSubcmd begin handle_exception($!) rescue Log::debug_exception(self) end Log::debug self, "CANT EXEC SUBCOMMAND: #{self}" raise rescue Exception Log::warn_exception(self) raise # ensure # Log::debug self, "CREATE_NODES: #{self}.number_of_nodes=#{no}" #add_node(nil) #self.number_of_nodes = no end end |
#def_job_pool_variable(vname, value = nil) ⇒ Object
76 77 78 |
# File 'lib/fairy/master/c-filter.rb', line 76 def def_job_pool_variable(vname, value = nil) @job_pool_dict.def_variable(vname, value) end |
#each_assigned_filter(&block) ⇒ Object
def next_filter(mapper)
@nodes_mutex.synchronize do
while @nodes_for_next_filters.empty? @nodes_cv.wait(@nodes_mutex) end @nodes_for_next_filters.shift
end
end
292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/fairy/master/c-filter.rb', line 292 def each_assigned_filter(&block) loop do input_filter = nil @nodes_mutex.synchronize do while @nodes_for_next_filters.empty? @nodes_cv.wait(@nodes_mutex) end input_filter = @nodes_for_next_filters.shift return unless input_filter end block.call input_filter end end |
#each_export_by(njob, mapper, &block) ⇒ Object
314 315 316 317 |
# File 'lib/fairy/master/c-filter.rb', line 314 def each_export_by(njob, mapper, &block) # block.call njob.export, :foo=>:bar block.call njob.export end |
#each_node(flag = nil, &block) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/fairy/master/c-filter.rb', line 124 def each_node(flag = nil, &block) if flag == :exist_only return each_node_exist_only &block end @nodes_mutex.synchronize do idx = 0 while !@number_of_nodes || idx < @number_of_nodes unless @nodes[idx] @nodes_cv.wait(@nodes_mutex) next end begin @nodes_mutex.unlock block.call @nodes[idx] ensure @nodes_mutex.lock end idx +=1 end end end |
#each_node_exist_only(&block) ⇒ Object
146 147 148 149 |
# File 'lib/fairy/master/c-filter.rb', line 146 def each_node_exist_only(&block) nodes = @nodes_mutex.synchronize{@nodes.dup} nodes.each &block end |
#handle_exception(exp) ⇒ Object
392 393 394 |
# File 'lib/fairy/master/c-filter.rb', line 392 def handle_exception(exp) @controller.handle_exception(exp) end |
#input ⇒ Object
56 57 58 |
# File 'lib/fairy/master/c-filter.rb', line 56 def input ERR::Raise ERR::INTERNAL::ShouldDefineSubclass end |
#job_pool_dict ⇒ Object
72 73 74 |
# File 'lib/fairy/master/c-filter.rb', line 72 def job_pool_dict @job_pool_dict end |
#job_pool_variable(vname, *value) ⇒ Object
80 81 82 83 84 85 86 |
# File 'lib/fairy/master/c-filter.rb', line 80 def job_pool_variable(vname, *value) if value.empty? @job_pool_dict[vname] else @job_pool_dict[vname] = value end end |
#njob_creation_params ⇒ Object
259 260 261 |
# File 'lib/fairy/master/c-filter.rb', line 259 def njob_creation_params [] end |
#node_class_name ⇒ Object
255 256 257 |
# File 'lib/fairy/master/c-filter.rb', line 255 def node_class_name ERR::Raise ERR::INTERNAL::NoRegisterService, self.class end |
#nodes ⇒ Object
103 104 105 106 107 |
# File 'lib/fairy/master/c-filter.rb', line 103 def nodes @nodes_mutex.synchronize do @nodes end end |
#number_of_nodes ⇒ Object
def each_export(&block)
each_node do |node|
exp = node.export block.call exp, node node.export.output_no_import = 1
end
end
159 160 161 162 163 164 165 166 167 168 |
# File 'lib/fairy/master/c-filter.rb', line 159 def number_of_nodes # @number_of_nodes_mutex.synchronize do @nodes_mutex.synchronize do while !@number_of_nodes # @number_of_nodes_cv.wait(@number_of_nodes_mutex) @nodes_cv.wait(@nodes_mutex) end @number_of_nodes end end |
#number_of_nodes=(no) ⇒ Object
Njob Methods:
92 93 94 95 96 97 98 99 100 101 |
# File 'lib/fairy/master/c-filter.rb', line 92 def number_of_nodes=(no) #puts "#{self}.number_of_nodes=#{no}" # @number_of_nodes_mutex.synchronize do @nodes_mutex.synchronize do @number_of_nodes = no # @number_of_nodes_cv.broadcast @nodes_cv.broadcast @nodes_status_cv.broadcast end end |
#pool_dict ⇒ Object
Pool Variables: (JP: プール変数)
68 69 70 |
# File 'lib/fairy/master/c-filter.rb', line 68 def pool_dict @controller.pool_dict end |
#postmapping_policy ⇒ Object
60 61 62 |
# File 'lib/fairy/master/c-filter.rb', line 60 def postmapping_policy @opts[:postmapping_policy] || CONF.POSTMAPPING_POLICY end |
#start_create_nodes ⇒ Object
Njob creation methods
173 174 175 176 177 178 179 180 |
# File 'lib/fairy/master/c-filter.rb', line 173 def start_create_nodes @create_node_thread = Thread.start{ Log::debug self, "START_CREATE_NODES: START #{self}" create_nodes Log::debug self, "START_CREATE_NODES: END #{self}" } nil end |
#start_export(njob) ⇒ Object
306 307 308 |
# File 'lib/fairy/master/c-filter.rb', line 306 def start_export(njob) export = njob.start_export end |
#start_watch_node_status ⇒ Object
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 |
# File 'lib/fairy/master/c-filter.rb', line 367 def start_watch_node_status Thread.start do all_finished = false while !@number_of_nodes || !all_finished @nodes_status_mutex.synchronize do @nodes_status_cv.wait(@nodes_status_mutex) end all_finished = @number_of_nodes Log::info(self) do |sio| sio.puts "Status Changed: BEGIN #{self}" each_node(:exist_only) do |node| st = @nodes_status[node] sio.puts " node: #{node} status: #{st.id2name}" if st all_finished &&= st==:ST_FINISH end sio.puts "Status Changed: END #{self}" end end Log::info self, "Monitoring finish: ALL NJOB finished" end nil end |
#update_status(node, st) ⇒ Object
356 357 358 359 360 361 |
# File 'lib/fairy/master/c-filter.rb', line 356 def update_status(node, st) @nodes_status_mutex.synchronize do @nodes_status[node] = st @nodes_status_cv.broadcast end end |
#watch_status? ⇒ Boolean
363 364 365 |
# File 'lib/fairy/master/c-filter.rb', line 363 def watch_status? @@watch_status end |