Class: Fairy::Master

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/master.rb

Constant Summary collapse

IPADDR_REGEXP =

IPv4(ipv6map) または IPv6アドレスか?

/(::ffff:)?([0-9]+\.){3}[0-9]+|[0-9a-f]+:([0-9a-f]*:)[0-9a-f]*/

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMaster



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/fairy/master.rb', line 24

def initialize

#       @clients = {}
#       @clients_mutex = Mutex.new
#       @clients_cv = XThread::ConditionVariable.new

  @controller_seq = -1
  @controller_seq_mutex = Mutex.new

  @controllers = {}
  @controllers_mutex = Mutex.new
  @controllers_cv = XThread::ConditionVariable.new

#      @clientds2controller = {}

  @nodes = {}
  @nodes_mutex = Mutex.new
  @node_seq = -1

  @no_of_processors = {}
  @no_of_processors_mutex = Mutex.new

  @no_of_active_processors = {}
  @no_of_active_processors_mutex = Mutex.new
  @no_of_active_processors_cv = XThread::ConditionVariable.new

end

Instance Attribute Details

#controllersObject (readonly)

Returns the value of attribute controllers.



52
53
54
# File 'lib/fairy/master.rb', line 52

def controllers
  @controllers
end

#loggerObject (readonly)

Returns the value of attribute logger.



55
56
57
# File 'lib/fairy/master.rb', line 55

def logger
  @logger
end

#nodesObject (readonly)

Returns the value of attribute nodes.



53
54
55
# File 'lib/fairy/master.rb', line 53

def nodes
  @nodes
end

Class Method Details

.start(service) ⇒ Object



335
336
337
338
# File 'lib/fairy/master.rb', line 335

def Master.start(service)
  master = Master.new
  master.start(service)
end

Instance Method Details

#assgin_controllerObject



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/fairy/master.rb', line 116

def assgin_controller

#       @clients_mutex.synchronize do
#   @clients[fairy.deep_space] = fairy
#       end

  Log::debug(self, "Assgin Controller")
  @controllers_mutex.synchronize do
  controller_id = controller_next_id
  MasterAPP.start_subcommand(CONF.RUBY_BIN, 
   CONF.CONTROLLER_BIN,
   "--master", @deepconnect.local_id.to_s, 
   "--id", controller_id.to_s)
  begin
    timeout(CONF.SUBCMD_EXEC_TIMEOUT) do
 while !@controllers[controller_id]
   @controllers_cv.wait(@controllers_mutex)
 end
    end
  rescue Timeout::Error
    Log::fatal(self, "Can't exec Controller")
    ERR::Fail ERR::CantExecSubcmd, "controller"
  end

# @clientds2controller[fairy.deep_space] = @controllers[controller_id]
  Log::debug(self, "Assgin Controller: Assgined")
  @controllers[controller_id]
  end
end

#controller_next_idObject

Controller 関連メソッド



110
111
112
113
114
# File 'lib/fairy/master.rb', line 110

def controller_next_id
  @controller_seq_mutex.synchronize do
  @controller_seq += 1
  end
end

#leisured_node(blocking = true) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/fairy/master.rb', line 210

def leisured_node(blocking = true)
Log::debug(self, "LAISURED NODE S:")
  @no_of_active_processors_mutex.synchronize do
  loop do
    min_node = nil
    min_no_processor = nil
    for uuid, node in @nodes.dup
 no = @no_of_active_processors[node]
 if !min_no_processor or min_no_processor > no
   min_no_processor = no
   min_node = node
 end
    end
    if min_no_processor <= CONF.MASTER_MAX_ACTIVE_PROCESSORS
Log::debug(self, "LAISURED NODE E:")
 return min_node 
    end
      if blocking
        Log::debug(self, "LAISURED NODE 1 WAITING:")
        @no_of_active_processors_cv.wait(@no_of_active_processors_mutex)
        Log::debug(self, "LAISURED NODE 2 WAITING END:")
      else
Log::debug(self, "LAISURED NODE E:")
        return nil
      end
  end
  end
end

#leisured_node_except_nodes(except_nodes = [], blocking = true) ⇒ Object



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/fairy/master.rb', line 239

def leisured_node_except_nodes(except_nodes = [], blocking = true)
Log::debug(self, "LAISURED NODE S:")
  @no_of_active_processors_mutex.synchronize do
  loop do
    min_node = nil
    min_no_processor = nil
    for uuid, node in @nodes.dup
 next if except_nodes.include?(node)

 no = @no_of_active_processors[node]
 if !min_no_processor or min_no_processor > no
   min_no_processor = no
   min_node = node
 end
    end
    if min_node && min_no_processor <= CONF.MASTER_MAX_ACTIVE_PROCESSORS
Log::debug(self, "LAISURED NODE E:")
 return min_node 
    end
    if blocking
        Log::debug(self, "LAISURED NODE 1 WAITING:")
        @no_of_active_processors_cv.wait(@no_of_active_processors_mutex)
        Log::debug(self, "LAISURED NODE 2 WAITING END:")
    else
 return nil
    end
  end
  end
end

#log_idObject



57
58
59
# File 'lib/fairy/master.rb', line 57

def log_id
  "Master"
end

#node(host) ⇒ Object



303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/fairy/master.rb', line 303

def node(host)
#puts "HOST: #{host}"
  unless IPADDR_REGEXP =~ host
  Resolv.each_address(host) do |addr|
    ipaddr = IPAddr.new(addr)
#   ipaddr = ipaddr.ipv4_mapped if ipaddr.ipv4?
    ipaddr = ipaddr.native
    host = ipaddr.to_s

    @nodes_mutex.synchronize do
 if n = @nodes[host] 
   return n 
 end
    end
  end
  
  return nil
  end

  node = nil
  @nodes_mutex.synchronize do
  node = @nodes[host]
  end
  node
end

#node_in_reisured(host) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/fairy/master.rb', line 183

def node_in_reisured(host)
Log::debug(self, "NODE IN LAISURED S:")
  node = node(host)
  
  return nil unless node

  @no_of_active_processors_mutex.synchronize do
  while @no_of_active_processors[node] > CONF.MASTER_MAX_ACTIVE_PROCESSORS
Log::debug(self, "NODE IN LAISURED 1: WAITING")
    @no_of_active_processors_cv.wait(@no_of_active_processors_mutex)
Log::debug(self, "NODE IN LAISURED 2: WAITING END")
  end
Log::debug(self, "NODE IN LAISURED E:")
  node
  end
end

#node_in_reisured_without_block(host) ⇒ Object



200
201
202
203
204
205
206
207
208
# File 'lib/fairy/master.rb', line 200

def node_in_reisured_without_block(host)
  node = node(host)
  @no_of_active_processors_mutex.synchronize do
  if @no_of_active_processors[node] > CONF.MASTER_MAX_ACTIVE_PROCESSORS
      return false
  end
  end
  node
end

#register_controller(controller) ⇒ Object



146
147
148
149
150
151
# File 'lib/fairy/master.rb', line 146

def register_controller(controller)
  @controllers_mutex.synchronize do
  @controllers[controller.id] = controller
  @controllers_cv.broadcast
  end
end

#register_node(node) ⇒ Object

Node 関連メソッド



286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/fairy/master.rb', line 286

def register_node(node)
  @nodes_mutex.synchronize do
  @node_seq += 1
  @no_of_processors[node] = 0
  @no_of_active_processors[node] = 0
  
  addr = node.deep_space.peer_uuid[0]
  @nodes[addr] = node
  Log::info self, "Node added: #{addr}->#{node}##{@node_seq}"
  node.id = @node_seq
  node.addr = addr
  end
end

#set_no_of_active_processors(node, no) ⇒ Object



175
176
177
178
179
180
181
# File 'lib/fairy/master.rb', line 175

def set_no_of_active_processors(node, no)
  @no_of_active_processors_mutex.synchronize do
  Log::debug(self, "CHANGE ACTIVE PROCESSORS: #{node}->#{no}")
  @no_of_active_processors[node] = no
  @no_of_active_processors_cv.broadcast
  end
end

#set_no_of_processors(node, no) ⇒ Object



168
169
170
171
172
# File 'lib/fairy/master.rb', line 168

def set_no_of_processors(node, no)
  @no_of_processors_mutex.synchronize do
  @no_of_processors[node] = no
  end
end

#start(service) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fairy/master.rb', line 61

def start(service)
  @deepconnect = DeepConnect.start(service)
  @deepconnect.export("Master", self)

  require "fairy/share/inspector"
  @deepconnect.export("Inspector", Inspector.new(self))
  

  require "fairy/share/log"
  @logger = Logger.new
  Log.logger = @logger
  Log.type = "[M]"

  @deepconnect.when_disconnected do |deepspace, opts|
  when_disconnected(deepspace, opts)
  end

  Log.info(self, "Master Service Start")
  Log::info(self, "\tfairy version: #{Version}")
  Log::info(self, "\t[Powerd By #{RUBY_DESCRIPTION}]") 

  begin
  require "fairy.so"
  Log::warn self, "\t Load fairy.so"
  rescue LoadError
  Log::warn self, "Can't load fairy.so. Can't use this feature"
  end
end

#terminate_controller(controller) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/fairy/master.rb', line 153

def terminate_controller(controller)
  @controllers_mutex.synchronize do
  @controllers.delete(controller.id)
  @controllers_cv.broadcast
  end
  
  begin
  controller.terminate
  Process.wait
  rescue
  p $!, $@
  end
end

#unlimited_leisured_nodeObject



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/fairy/master.rb', line 269

def unlimited_leisured_node
  min_node = nil
  min_no_processor = nil
  for uuid, node in @nodes.dup
# no = nil
# @no_of_processors_mutex.synchronize do
  no = @no_of_active_processors[node]
# end
  if !min_no_processor or min_no_processor > no
    min_no_processor = no
    min_node = node
  end
  end
  min_node
end

#when_disconnected(deepspace, opts) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fairy/master.rb', line 90

def when_disconnected(deepspace, opts)
  Log::debug self, "MASTER: disconnected: Start termination"
#       @controllers_mutex.synchronize do
#   if c = @controllers.find{|c| c.deep_space == deepspace}
#     when_disconnected_controller(c, deepspace, opts)
#   end
#       end

  # node

  @nodes_mutex.synchronize do
  if addr_node= @nodes.find{|addr, node| node.deep_space == deepspace}
    Log::info self, "MASTER: disconnected NODE: start termination"
    when_disconnected_node(addr_node[0], addr_node[1], opts)
  end
  end
end

#when_disconnected_node(addr, node, opts) ⇒ Object



329
330
331
332
333
# File 'lib/fairy/master.rb', line 329

def when_disconnected_node(addr, node, opts)
#      addr = deep_space.peer_uuid[0]
  Log::info(self, "NODE: disconnected(#{addr})")
  @nodes.delete(addr)
end