Class: Fairy::Processor
- Inherits:
-
Object
- Object
- Fairy::Processor
- Defined in:
- lib/fairy/processor.rb
Constant Summary collapse
- EXPORTS =
[]
- LIMIT_PROCESS_SIZE =
kbyte
100
- ACTIVE_STATUS =
{ :ST_INIT => true, :ST_WAIT_IMPORT => true, :ST_ACTIVATE => true }
- SEMIACTIVE_STATUS =
{ # :ST_INIT => true, # :ST_WAIT_IMPORT => true, :ST_ALL_IMPORTED => true, :ST_WAIT_EXPORT_FINISH => true, :ST_EXPORT_FINISH => true, :ST_OUTPUT_FINISH => true }
Instance Attribute Summary collapse
-
#addr ⇒ Object
Returns the value of attribute addr.
-
#deepconnect ⇒ Object
readonly
Returns the value of attribute deepconnect.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#njob_mon ⇒ Object
readonly
Returns the value of attribute njob_mon.
-
#node ⇒ Object
readonly
Returns the value of attribute node.
-
#ntasks ⇒ Object
readonly
Returns the value of attribute ntasks.
Class Method Summary collapse
Instance Method Summary collapse
- #all_ntasks_finished?(lock = :lock) ⇒ Boolean
- #all_ntasks_finished_no_lock? ⇒ Boolean
- #all_ntasks_semiactivated?(lock = :lock) ⇒ Boolean
- #all_ntasks_semiactivated_no_lock? ⇒ Boolean
- #connect_controller(controller, conf) ⇒ Object
-
#create_import(policy) ⇒ Object
def create_njob(njob_class_name, bjob, opts, *rests) klass = import(njob_class_name) njob = klass.new(self, bjob, opts, *rests) @njobs.push njob Log.debugf(self, “Njob number of %d”, @njobs.size) njob end DeepConnect.def_method_spec(self, “REF create_njob(VAL, REF, VAL, *VAL)”).
- #create_ntask ⇒ Object
- #deregister_varray_element_proc ⇒ Object
- #exist_varray_elements? ⇒ Boolean
- #export(service, obj) ⇒ Object
- #import(service) ⇒ Object
-
#init_ntask_status_feature ⇒ Object
(processor) status management and ntask status management processor status: ST_WAIT ST_ACTIVATE.
-
#init_varray_feature ⇒ Object
varray management.
-
#initialize(id) ⇒ Processor
constructor
A new instance of Processor.
- #inspectx ⇒ Object
- #life_out_life_span? ⇒ Boolean
- #log_id ⇒ Object
- #no_active_ntasks ⇒ Object
- #no_ntasks ⇒ Object
- #notice_status(st) ⇒ Object
- #ntask_next_id ⇒ Object
- #process_status_mon(inspect_p = CONF.PROCESSOR_MON_OBJECTSPACE_INSPECT_ON) ⇒ Object
- #register_varray_element(array) ⇒ Object
- #start(node_port, service = 0) ⇒ Object
-
#start_process_status_monitor ⇒ Object
prossessor monitoring.
- #start_watch_status ⇒ Object
- #terminate ⇒ Object
- #terminate_all_ntasks ⇒ Object
- #to_s ⇒ Object
- #update_status(ntask, st) ⇒ Object
- #when_disconnected(deepspace, opts) ⇒ Object
Constructor Details
#initialize(id) ⇒ Processor
Returns a new instance of Processor.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/fairy/processor.rb', line 57 def initialize(id) @id = id @reserve = 0 @services = {} @ntasks = [] @ntask_seq = -1 @ntask_seq_mutex = Mutex.new @njob_mon = FiberMon.new init_varray_feature init_ntask_status_feature end |
Instance Attribute Details
#addr ⇒ Object
Returns the value of attribute addr.
191 192 193 |
# File 'lib/fairy/processor.rb', line 191 def addr @addr end |
#deepconnect ⇒ Object (readonly)
Returns the value of attribute deepconnect.
82 83 84 |
# File 'lib/fairy/processor.rb', line 82 def deepconnect @deepconnect end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
73 74 75 |
# File 'lib/fairy/processor.rb', line 73 def id @id end |
#njob_mon ⇒ Object (readonly)
Returns the value of attribute njob_mon.
76 77 78 |
# File 'lib/fairy/processor.rb', line 76 def njob_mon @njob_mon end |
#node ⇒ Object (readonly)
Returns the value of attribute node.
192 193 194 |
# File 'lib/fairy/processor.rb', line 192 def node @node end |
#ntasks ⇒ Object (readonly)
Returns the value of attribute ntasks.
74 75 76 |
# File 'lib/fairy/processor.rb', line 74 def ntasks @ntasks end |
Class Method Details
.def_export(obj, name = nil) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/fairy/processor.rb', line 41 def Processor.def_export(obj, name = nil) unless name if obj.kind_of?(Class) if /Fairy::(.*)$/ =~ obj.name name = $1 else name = obj.name end else ERR::Raise ERR::INTERNAL::CantDefExport, obj.to_s end end EXPORTS.push [name, obj] end |
Instance Method Details
#all_ntasks_finished?(lock = :lock) ⇒ Boolean
335 336 337 338 339 340 341 342 343 |
# File 'lib/fairy/processor.rb', line 335 def all_ntasks_finished?(lock = :lock) if lock == :lock @status_cv.synchronize do all_ntasks_finished_no_lock? end else all_ntasks_finished_no_lock? end end |
#all_ntasks_finished_no_lock? ⇒ Boolean
345 346 347 348 349 350 |
# File 'lib/fairy/processor.rb', line 345 def all_ntasks_finished_no_lock? for node, status in @ntask_status return false if status != :ST_FINISH end true end |
#all_ntasks_semiactivated?(lock = :lock) ⇒ Boolean
352 353 354 355 356 357 358 359 360 |
# File 'lib/fairy/processor.rb', line 352 def all_ntasks_semiactivated?(lock = :lock) if lock == :lock @status_cv.synchronize do all_ntasks_semiactivated_no_lock? end else all_ntasks_semiactivated_no_lock? end end |
#all_ntasks_semiactivated_no_lock? ⇒ Boolean
362 363 364 365 366 367 |
# File 'lib/fairy/processor.rb', line 362 def all_ntasks_semiactivated_no_lock? for node, status in @ntask_status return false unless SEMIACTIVE_STATUS[status] end true end |
#connect_controller(controller, conf) ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/fairy/processor.rb', line 140 def connect_controller(controller, conf) @controller = controller conf.base_conf = CONF Fairy::REPLACE_CONF(conf) # Log::set_local_output_dev if CONF.PROCESSOR_MON_ON Log::info self, "Processor Status Monitoring: ON" start_process_status_monitor end $stdout = Stdout.new(controller) sprintf("%s %s#%d", Log::host, Log::type, Log::pid) end |
#create_import(policy) ⇒ Object
def create_njob(njob_class_name, bjob, opts, *rests)
klass = import(njob_class_name)
njob = klass.new(self, bjob, opts, *rests)
@njobs.push njob
Log.debugf(self, "Njob number of %d", @njobs.size)
njob
end
DeepConnect.def_method_spec(self, "REF create_njob(VAL, REF, VAL, *VAL)")
236 237 238 239 240 241 242 |
# File 'lib/fairy/processor.rb', line 236 def create_import(policy) import = Import.new(policy) import.set_log_callback do |n, key| Log::verbose(self, "IMPORT POP key=#{key}: #{n}") end import end |
#create_ntask ⇒ Object
220 221 222 223 224 |
# File 'lib/fairy/processor.rb', line 220 def create_ntask ntask = PTask.new(ntask_next_id, self) @ntasks.push ntask ntask end |
#deregister_varray_element_proc ⇒ Object
289 290 291 292 293 294 295 |
# File 'lib/fairy/processor.rb', line 289 def deregister_varray_element_proc proc do |oid| @varray_elements_mutex.synchronize do @varray_elements.delete(oid) end end end |
#exist_varray_elements? ⇒ Boolean
276 277 278 279 280 |
# File 'lib/fairy/processor.rb', line 276 def exist_varray_elements? @varray_elements_mutex.synchronize do !@varray_elements.empty? end end |
#export(service, obj) ⇒ Object
198 199 200 |
# File 'lib/fairy/processor.rb', line 198 def export(service, obj) @services[service] = obj end |
#import(service) ⇒ Object
202 203 204 205 206 207 208 |
# File 'lib/fairy/processor.rb', line 202 def import(service) svs = @services[service] unless svs ERR::Raise ERR::INTERNAL::NoRegisterService, service end svs end |
#init_ntask_status_feature ⇒ Object
(processor) status management and ntask status management processor status:
ST_WAIT
ST_ACTIVATE
303 304 305 306 307 308 309 310 |
# File 'lib/fairy/processor.rb', line 303 def init_ntask_status_feature @status = :ST_WAIT @ntask_status = {} @status_mx = @njob_mon.new_mon @status_cv = @status_mx.new_cv end |
#init_varray_feature ⇒ Object
varray management
271 272 273 274 |
# File 'lib/fairy/processor.rb', line 271 def init_varray_feature @varray_elements = {} @varray_elements_mutex = Mutex.new end |
#inspectx ⇒ Object
500 501 502 |
# File 'lib/fairy/processor.rb', line 500 def inspectx "#<#{self.class}: #{id} [#{@ntask.collect{|n| n.class.name}.join(" ")}]>" end |
#life_out_life_span? ⇒ Boolean
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/fairy/processor.rb', line 247 def life_out_life_span? # puts "LOLS: #{inspectx}" # puts "njob: #{all_njob_finished?}" # unless all_njob_finished? # for njob, status in @njob_status # puts "#{njob.class} => #{status}" # end # end # puts "varry: #{exist_varray_elements?}" return false unless all_ntasks_finished? return false if exist_varray_elements? # 取りあえず vsz = `ps -ovsz h#{Process.pid}`.to_i #puts "vsz: #{vsz}, #{LIMIT_PROCESS_SIZE > vsz}" LIMIT_PROCESS_SIZE < vsz end |
#log_id ⇒ Object
78 79 80 |
# File 'lib/fairy/processor.rb', line 78 def log_id "Processor[#{@id}]" end |
#no_active_ntasks ⇒ Object
327 328 329 330 331 332 333 |
# File 'lib/fairy/processor.rb', line 327 def no_active_ntasks no_actives = 0 @ntask_status.each{|ntask, st| no_actives += 1 if ACTIVE_STATUS[st] } no_actives end |
#no_ntasks ⇒ Object
216 217 218 |
# File 'lib/fairy/processor.rb', line 216 def no_ntasks @ntasks.size end |
#notice_status(st) ⇒ Object
433 434 435 |
# File 'lib/fairy/processor.rb', line 433 def notice_status(st) @node.update_processor_status(self, st) end |
#ntask_next_id ⇒ Object
210 211 212 213 214 |
# File 'lib/fairy/processor.rb', line 210 def ntask_next_id @ntask_seq_mutex.synchronize do @ntask_seq += 1 end end |
#process_status_mon(inspect_p = CONF.PROCESSOR_MON_OBJECTSPACE_INSPECT_ON) ⇒ Object
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 |
# File 'lib/fairy/processor.rb', line 455 def process_status_mon(inspect_p = CONF.PROCESSOR_MON_OBJECTSPACE_INSPECT_ON) if inspect_p GC.start count = 0 count_by_class = {} ObjectSpace.each_object do |o| count += 1 klass = o.__deep_connect_real_class count_by_class[klass] = (count_by_class[klass] || 0) + 1 end exp = 0 exp_by_class = {} imp = 0 for ds in @deepconnect.instance_eval{@organizer}.deep_spaces.values exp_roots = ds.instance_eval{@export_roots} exp += exp_roots.size exp_roots.each do |k, v| klass = v.class exp_by_class[klass] = (exp_by_class[klass] || 0) + 1 end imp += ds.instance_eval{@import_reference.size} end end format = CONF.PROCESSOR_MON_PSFORMAT m = `ps -o#{format} h#{Process.pid}`.chomp Log::info(self) do |sio| sio.puts("PROCESS MONITOR:") sio.puts("#{Log.host} [P]\##{@id} MONITOR: PS: #{m}") if inspect_p sio.puts("#{Log.host} [P]\##{@id} MONITOR: OBJECT: #{count}") for klass in count_by_class.keys.sort_by{|k| k.name} sio.puts("#{Log.host} [P]\##{@id} MONITOR: C: #{klass.name} => #{count_by_class[klass]}") end sio.puts("#{Log.host} [P]\##{@id} MONITOR: DEEP-CONNECT: exports: #{exp}") for klass in exp_by_class.keys.sort_by{|k| k.name} sio.puts("#{Log.host} [P]\##{@id} MONITOR: C: #{klass.name} => #{exp_by_class[klass]}") end sio.puts("#{Log.host} [P]\##{@id} MONITOR: DEEP-CONNECT: imports: #{imp}") end end end |
#register_varray_element(array) ⇒ Object
282 283 284 285 286 287 |
# File 'lib/fairy/processor.rb', line 282 def register_varray_element(array) @varray_elements_mutex.synchronize do @varray_elements[array.object_id] = array.object_id end ObjectSpace.define_finalizer(array, deregister_varray_element_proc) end |
#start(node_port, service = 0) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/fairy/processor.rb', line 84 def start(node_port, service=0) # if CONF.THREAD_STACK_SIZE # Process.setrlimit(Process::RLIMIT_STACK, CONF.THREAD_STACK_SIZE) # end @addr = nil @deepconnect = DeepConnect.start(service) @deepconnect.register_service("Processor", self) # DeepConnect::Conf.DISPLAY_KEEP_ALIVE = true @deepconnect.when_disconnected do |deepspace, opts| when_disconnected(deepspace, opts) end #@njob_mon.start require "fairy/share/inspector" @deepconnect.export("Inspector", Inspector.new(self)) require "fairy/share/log" @node_deepspace = @deepconnect.open_deepspace("localhost", node_port) @node = @node_deepspace.import("Node") @logger = @node.logger Log.type = "[P]" Log.pid =id Log.logger = @logger Log::info self, "Processor Service Start" Log::info(self, "\tfairy version: #{Version}") Log::info(self, "\t[Powered By #{RUBY_DESCRIPTION}]") begin require "fairy.so" Log::warn self, "\t Load fairy.so" rescue LoadError Log::warn self, "Can't load fairy.so. Can't use this feature" end for name, obj in EXPORTS export(name, obj) end start_watch_status # GC.disable # Thread.start do # loop do # sleep 60 # GC.start # end # end @node.register_processor(self) end |
#start_process_status_monitor ⇒ Object
prossessor monitoring
440 441 442 443 444 445 446 447 448 449 450 451 452 453 |
# File 'lib/fairy/processor.rb', line 440 def start_process_status_monitor Thread.start do begin idle = CONF.PROCESSOR_MON_INTERVAL loop do sleep idle process_status_mon end rescue Log::debug_exception raise end end end |
#start_watch_status ⇒ Object
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
# File 'lib/fairy/processor.rb', line 407 def start_watch_status # 初期状態通知 notice_status(@status) @njob_mon.entry do @status_mx.synchronize do old_status = nil old_no_active_ntasks = 0 loop do @status_cv.wait_while{ old_status == @status && old_no_active_ntasks == no_active_ntasks } no = no_active_ntasks if old_no_active_ntasks != no old_no_active_ntasks = no @controller.update_active_ntasks(self, no) end if old_status != @status old_status = @status notice_status(@status) end end end end nil end |
#terminate ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/fairy/processor.rb', line 158 def terminate # clientが終了したときの終了処理 Log::info(self, "terminate!!") Thread.start do begin # このメソッドが戻るまで待つ sleep 0.2 @ntasks.each{|ntask| ntask.abort_running} @deepconnect.stop Process.exit(0) rescue Log::debug(self, "Exception Rised in termination ntasks.") Log::debug_exception(self) end end nil end |
#terminate_all_ntasks ⇒ Object
177 178 179 180 181 182 183 184 185 |
# File 'lib/fairy/processor.rb', line 177 def terminate_all_ntasks Log::debug(self, "Terminate all ntasks!!") begin @ntasks.each{|ntask| ntask.abort_running} rescue Log::debug(self, "Exception Rised in termination ntasks.") Log::debug_exception(self) end end |
#to_s ⇒ Object
504 505 506 |
# File 'lib/fairy/processor.rb', line 504 def to_s "#<#{self.class}: #{id}>" end |
#update_status(ntask, st) ⇒ Object
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 |
# File 'lib/fairy/processor.rb', line 370 def update_status(ntask, st) Log::debug(self, "UPDATE_STATUS: #{ntask}, #{st}") @status_mx.synchronize do @ntask_status[ntask] = st case st when :ST_INIT # do nothing if all_ntasks_semiactivated?(:no_lock) @status = :ST_SEMIACTIVATE end when :ST_WAIT_IMPORT if all_ntasks_semiactivated?(:no_lock) @status = :ST_SEMIACTIVATE end when :ST_ACTIVATE @status = :ST_ACTIVATE when :ST_ALL_IMPORTED, :ST_WAIT_EXPORT_FINISH, :ST_EXPORT_FINISH, :ST_OUTPUT_FINISH if all_ntasks_semiactivated?(:no_lock) @status = :ST_SEMIACTIVATE end when :ST_FINISH if all_ntasks_finished?(:no_lock) @status = :ST_WAIT end else if @status == :ST_WAIT @status = :ST_ACTIVATE end end @status_cv.broadcast end end |
#when_disconnected(deepspace, opts) ⇒ Object
187 188 189 |
# File 'lib/fairy/processor.rb', line 187 def when_disconnected(deepspace, opts) Log::debug self, "PROCESSOR: disconnected #{deepspace.peer_uuid}" end |