Class: Fairy::Master
- Inherits:
-
Object
- Object
- Fairy::Master
- Defined in:
- lib/fairy/master.rb
Constant Summary collapse
- IPADDR_REGEXP =
IPv4(ipv6map) または IPv6アドレスか?
/(::ffff:)?([0-9]+\.){3}[0-9]+|[0-9a-f]+:([0-9a-f]*:)[0-9a-f]*/
Instance Attribute Summary collapse
-
#controllers ⇒ Object
readonly
Returns the value of attribute controllers.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#nodes ⇒ Object
readonly
Returns the value of attribute nodes.
Class Method Summary collapse
Instance Method Summary collapse
- #assgin_controller ⇒ Object
-
#controller_next_id ⇒ Object
Controller 関連メソッド.
-
#initialize ⇒ Master
constructor
A new instance of Master.
- #leisured_node(blocking = true) ⇒ Object
- #leisured_node_except_nodes(except_nodes = [], blocking = true) ⇒ Object
- #log_id ⇒ Object
- #node(host) ⇒ Object
- #node_in_reisured(host) ⇒ Object
- #node_in_reisured_without_block(host) ⇒ Object
- #register_controller(controller) ⇒ Object
-
#register_node(node) ⇒ Object
Node 関連メソッド.
- #set_no_of_active_processors(node, no) ⇒ Object
- #set_no_of_processors(node, no) ⇒ Object
- #start(service) ⇒ Object
- #terminate_controller(controller) ⇒ Object
- #unlimited_leisured_node ⇒ Object
- #when_disconnected(deepspace, opts) ⇒ Object
- #when_disconnected_node(addr, node, opts) ⇒ Object
Constructor Details
#initialize ⇒ Master
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/fairy/master.rb', line 24 def initialize # @clients = {} # @clients_mutex = Mutex.new # @clients_cv = XThread::ConditionVariable.new @controller_seq = -1 @controller_seq_mutex = Mutex.new @controllers = {} @controllers_mutex = Mutex.new @controllers_cv = XThread::ConditionVariable.new # @clientds2controller = {} @nodes = {} @nodes_mutex = Mutex.new @node_seq = -1 @no_of_processors = {} @no_of_processors_mutex = Mutex.new @no_of_active_processors = {} @no_of_active_processors_mutex = Mutex.new @no_of_active_processors_cv = XThread::ConditionVariable.new end |
Instance Attribute Details
#controllers ⇒ Object (readonly)
Returns the value of attribute controllers.
52 53 54 |
# File 'lib/fairy/master.rb', line 52 def controllers @controllers end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
55 56 57 |
# File 'lib/fairy/master.rb', line 55 def logger @logger end |
#nodes ⇒ Object (readonly)
Returns the value of attribute nodes.
53 54 55 |
# File 'lib/fairy/master.rb', line 53 def nodes @nodes end |
Class Method Details
.start(service) ⇒ Object
335 336 337 338 |
# File 'lib/fairy/master.rb', line 335 def Master.start(service) master = Master.new master.start(service) end |
Instance Method Details
#assgin_controller ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/fairy/master.rb', line 116 def assgin_controller # @clients_mutex.synchronize do # @clients[fairy.deep_space] = fairy # end Log::debug(self, "Assgin Controller") @controllers_mutex.synchronize do controller_id = controller_next_id MasterAPP.start_subcommand(CONF.RUBY_BIN, CONF.CONTROLLER_BIN, "--master", @deepconnect.local_id.to_s, "--id", controller_id.to_s) begin timeout(CONF.SUBCMD_EXEC_TIMEOUT) do while !@controllers[controller_id] @controllers_cv.wait(@controllers_mutex) end end rescue Timeout::Error Log::fatal(self, "Can't exec Controller") ERR::Fail ERR::CantExecSubcmd, "controller" end # @clientds2controller[fairy.deep_space] = @controllers[controller_id] Log::debug(self, "Assgin Controller: Assgined") @controllers[controller_id] end end |
#controller_next_id ⇒ Object
Controller 関連メソッド
110 111 112 113 114 |
# File 'lib/fairy/master.rb', line 110 def controller_next_id @controller_seq_mutex.synchronize do @controller_seq += 1 end end |
#leisured_node(blocking = true) ⇒ Object
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/fairy/master.rb', line 210 def leisured_node(blocking = true) Log::debug(self, "LAISURED NODE S:") @no_of_active_processors_mutex.synchronize do loop do min_node = nil min_no_processor = nil for uuid, node in @nodes.dup no = @no_of_active_processors[node] if !min_no_processor or min_no_processor > no min_no_processor = no min_node = node end end if min_no_processor <= CONF.MASTER_MAX_ACTIVE_PROCESSORS Log::debug(self, "LAISURED NODE E:") return min_node end if blocking Log::debug(self, "LAISURED NODE 1 WAITING:") @no_of_active_processors_cv.wait(@no_of_active_processors_mutex) Log::debug(self, "LAISURED NODE 2 WAITING END:") else Log::debug(self, "LAISURED NODE E:") return nil end end end end |
#leisured_node_except_nodes(except_nodes = [], blocking = true) ⇒ Object
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/fairy/master.rb', line 239 def leisured_node_except_nodes(except_nodes = [], blocking = true) Log::debug(self, "LAISURED NODE S:") @no_of_active_processors_mutex.synchronize do loop do min_node = nil min_no_processor = nil for uuid, node in @nodes.dup next if except_nodes.include?(node) no = @no_of_active_processors[node] if !min_no_processor or min_no_processor > no min_no_processor = no min_node = node end end if min_node && min_no_processor <= CONF.MASTER_MAX_ACTIVE_PROCESSORS Log::debug(self, "LAISURED NODE E:") return min_node end if blocking Log::debug(self, "LAISURED NODE 1 WAITING:") @no_of_active_processors_cv.wait(@no_of_active_processors_mutex) Log::debug(self, "LAISURED NODE 2 WAITING END:") else return nil end end end end |
#log_id ⇒ Object
57 58 59 |
# File 'lib/fairy/master.rb', line 57 def log_id "Master" end |
#node(host) ⇒ Object
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/fairy/master.rb', line 303 def node(host) #puts "HOST: #{host}" unless IPADDR_REGEXP =~ host Resolv.each_address(host) do |addr| ipaddr = IPAddr.new(addr) # ipaddr = ipaddr.ipv4_mapped if ipaddr.ipv4? ipaddr = ipaddr.native host = ipaddr.to_s @nodes_mutex.synchronize do if n = @nodes[host] return n end end end return nil end node = nil @nodes_mutex.synchronize do node = @nodes[host] end node end |
#node_in_reisured(host) ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/fairy/master.rb', line 183 def node_in_reisured(host) Log::debug(self, "NODE IN LAISURED S:") node = node(host) return nil unless node @no_of_active_processors_mutex.synchronize do while @no_of_active_processors[node] > CONF.MASTER_MAX_ACTIVE_PROCESSORS Log::debug(self, "NODE IN LAISURED 1: WAITING") @no_of_active_processors_cv.wait(@no_of_active_processors_mutex) Log::debug(self, "NODE IN LAISURED 2: WAITING END") end Log::debug(self, "NODE IN LAISURED E:") node end end |
#node_in_reisured_without_block(host) ⇒ Object
200 201 202 203 204 205 206 207 208 |
# File 'lib/fairy/master.rb', line 200 def node_in_reisured_without_block(host) node = node(host) @no_of_active_processors_mutex.synchronize do if @no_of_active_processors[node] > CONF.MASTER_MAX_ACTIVE_PROCESSORS return false end end node end |
#register_controller(controller) ⇒ Object
146 147 148 149 150 151 |
# File 'lib/fairy/master.rb', line 146 def register_controller(controller) @controllers_mutex.synchronize do @controllers[controller.id] = controller @controllers_cv.broadcast end end |
#register_node(node) ⇒ Object
Node 関連メソッド
286 287 288 289 290 291 292 293 294 295 296 297 298 |
# File 'lib/fairy/master.rb', line 286 def register_node(node) @nodes_mutex.synchronize do @node_seq += 1 @no_of_processors[node] = 0 @no_of_active_processors[node] = 0 addr = node.deep_space.peer_uuid[0] @nodes[addr] = node Log::info self, "Node added: #{addr}->#{node}##{@node_seq}" node.id = @node_seq node.addr = addr end end |
#set_no_of_active_processors(node, no) ⇒ Object
175 176 177 178 179 180 181 |
# File 'lib/fairy/master.rb', line 175 def set_no_of_active_processors(node, no) @no_of_active_processors_mutex.synchronize do Log::debug(self, "CHANGE ACTIVE PROCESSORS: #{node}->#{no}") @no_of_active_processors[node] = no @no_of_active_processors_cv.broadcast end end |
#set_no_of_processors(node, no) ⇒ Object
168 169 170 171 172 |
# File 'lib/fairy/master.rb', line 168 def set_no_of_processors(node, no) @no_of_processors_mutex.synchronize do @no_of_processors[node] = no end end |
#start(service) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/fairy/master.rb', line 61 def start(service) @deepconnect = DeepConnect.start(service) @deepconnect.export("Master", self) require "fairy/share/inspector" @deepconnect.export("Inspector", Inspector.new(self)) require "fairy/share/log" @logger = Logger.new Log.logger = @logger Log.type = "[M]" @deepconnect.when_disconnected do |deepspace, opts| when_disconnected(deepspace, opts) end Log.info(self, "Master Service Start") Log::info(self, "\tfairy version: #{Version}") Log::info(self, "\t[Powerd By #{RUBY_DESCRIPTION}]") begin require "fairy.so" Log::warn self, "\t Load fairy.so" rescue LoadError Log::warn self, "Can't load fairy.so. Can't use this feature" end end |
#terminate_controller(controller) ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/fairy/master.rb', line 153 def terminate_controller(controller) @controllers_mutex.synchronize do @controllers.delete(controller.id) @controllers_cv.broadcast end begin controller.terminate Process.wait rescue p $!, $@ end end |
#unlimited_leisured_node ⇒ Object
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/fairy/master.rb', line 269 def unlimited_leisured_node min_node = nil min_no_processor = nil for uuid, node in @nodes.dup # no = nil # @no_of_processors_mutex.synchronize do no = @no_of_active_processors[node] # end if !min_no_processor or min_no_processor > no min_no_processor = no min_node = node end end min_node end |
#when_disconnected(deepspace, opts) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/fairy/master.rb', line 90 def when_disconnected(deepspace, opts) Log::debug self, "MASTER: disconnected: Start termination" # @controllers_mutex.synchronize do # if c = @controllers.find{|c| c.deep_space == deepspace} # when_disconnected_controller(c, deepspace, opts) # end # end # node @nodes_mutex.synchronize do if addr_node= @nodes.find{|addr, node| node.deep_space == deepspace} Log::info self, "MASTER: disconnected NODE: start termination" when_disconnected_node(addr_node[0], addr_node[1], opts) end end end |
#when_disconnected_node(addr, node, opts) ⇒ Object
329 330 331 332 333 |
# File 'lib/fairy/master.rb', line 329 def when_disconnected_node(addr, node, opts) # addr = deep_space.peer_uuid[0] Log::info(self, "NODE: disconnected(#{addr})") @nodes.delete(addr) end |