Class: Fairy::CFilter

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/master/c-filter.rb

Direct Known Subclasses

CIOFilter, CInput, COutput

Defined Under Namespace

Classes: Context

Constant Summary collapse

@@watch_status =
false

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(controller, opts, *rests) ⇒ CFilter

Returns a new instance of CFilter.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fairy/master/c-filter.rb', line 23

def initialize(controller, opts, *rests)
  Log::info self, "CREATE BJOB: #{self.class}"
  @controller = controller

  @opts = opts
  @opts = {} unless @opts

  @job_pool_dict = PoolDictionary.new

  @number_of_nodes = nil
#      @number_of_nodes_mutex = Mutex.new
#      @number_of_nodes_cv = XThread::ConditionVariable.new

  @nodes = []
  @nodes_for_next_filters = []
  @nodes_mutex = Mutex.new
  @nodes_cv = XThread::ConditionVariable.new

  @nodes_status = {}
  @nodes_status_mutex = Mutex.new
  @nodes_status_cv = XThread::ConditionVariable.new

  @controller.register_bjob(self)

  @create_node_thread = nil
  # gbreakのときに安全に@create_node_threadスレッドをとめるため
  @create_node_mutex = Mutex.new

  @context = Context.new(self)

  start_watch_node_status if watch_status?
end

Class Method Details

.watch_statusObject



13
14
15
# File 'lib/fairy/master/c-filter.rb', line 13

def self.watch_status
  @@watch_status
end

.watch_status=(val) ⇒ Object



17
18
19
# File 'lib/fairy/master/c-filter.rb', line 17

def self.watch_status=(val)
  @@watch_status=val
end

Instance Method Details

#abort_create_nodeObject



343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/fairy/master/c-filter.rb', line 343

def abort_create_node
Log::debug(self, "ABORT_CREATE_NODE: S")
  @controller.create_processor_mutex.synchronize do
Log::debug(self, "ABORT_CREATE_NODE: 1")
  if @create_node_thread && @create_node_thread.alive?
Log::debug(self, "ABORT_CREATE_NODE: 2 ")
    @create_node_thread.raise AbortCreateNode
Log::debug(self, "ABORT_CREATE_NODE: 3")
  end
Log::debug(self, "ABORT_CREATE_NODE: E")
  end
end

#add_node(node) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fairy/master/c-filter.rb', line 109

def add_node(node)
  @nodes_mutex.synchronize do
  unless node
    @nodes_for_next_filters.push nil
    @nodes_cv.broadcast
    return
  end

# node.no = node.input.no
  @nodes[node.no] = node
  @nodes_for_next_filters.push node
  @nodes_cv.broadcast
  end
end

#assgin_number_of_nodes?Boolean

Returns:

  • (Boolean)


263
264
265
# File 'lib/fairy/master/c-filter.rb', line 263

def assgin_number_of_nodes?
  @number_of_nodes
end

#bind_export(exp, imp) ⇒ Object



319
320
321
# File 'lib/fairy/master/c-filter.rb', line 319

def bind_export(exp, imp)
  imp.no_import = 1
end

#break_create_nodeObject



334
335
336
337
338
339
340
341
# File 'lib/fairy/master/c-filter.rb', line 334

def break_create_node
  # 作成中のものは完全に作成させるため
  @controller.create_processor_mutex.synchronize do
  if @create_node_thread && @create_node_thread.alive?
    @create_node_thread.raise BreakCreateNode
  end
  end
end

#break_running(njob = nil) ⇒ Object

job control



326
327
328
329
330
331
332
# File 'lib/fairy/master/c-filter.rb', line 326

def break_running(njob = nil)
  break_create_node
  
  each_node do |tasklet|
  tasklet.break_running unless tasklet.equal?(njob)
  end
end

#create_and_add_node(ntask, mapper, opts = {}) ⇒ Object



235
236
237
238
239
240
241
242
243
# File 'lib/fairy/master/c-filter.rb', line 235

def create_and_add_node(ntask, mapper, opts={})
  node = create_node(ntask) {|node|
  if opts[:init_njob]
    opts[:init_njob].call(node)
  end
  mapper.bind_input(node)
  }
  node
end

#create_import(processor) ⇒ Object



310
311
312
# File 'lib/fairy/master/c-filter.rb', line 310

def create_import(processor)
  processor.create_import(@opts[:prequeuing_policy])
end

#create_node(ntask, *params, &block) ⇒ Object



245
246
247
248
249
250
251
252
253
# File 'lib/fairy/master/c-filter.rb', line 245

def create_node(ntask, *params, &block)
  if params.empty?
  params = njob_creation_params
  end
  njob = ntask.create_njob(node_class_name, self, @opts, *params)
  block.call(njob)
  add_node(njob)
  njob
end

#create_nodesObject



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/fairy/master/c-filter.rb', line 182

def create_nodes
  begin
  no = 0
  ret = nil
  @controller.assign_ntasks(self, @create_node_mutex) do 
    |ntask, mapper, opts={}|
    njob = create_and_add_node(ntask, mapper, opts)
    no += 1
    njob
  end
  add_node(nil)
  Log::debug self, "CREATE_NODES: #{self}.number_of_nodes=#{no}"
  self.number_of_nodes = no

  rescue BreakCreateNode
  Log::debug self, "BREAK CREATE NODE: #{self}" 
  add_node(nil)
  Log::debug self, "CREATE_NODES: #{self}.number_of_nodes=#{no}"
  self.number_of_nodes = no

  rescue AbortCreateNode
  Log::debug self, "Abort CREATE NODE: #{self}" 
  # do nothing

  rescue ERR::NodeNotArrived
  Log::debug self, "NODE NOT ARRIVED: #{self}"
  begin
    handle_exception($!)
  rescue
    Log::debug_exception(self)
  end
  Log::debug self, "NODE NOT ARRIVED2: #{self}"
  raise

  rescue ERR::CantExecSubcmd
  begin
    handle_exception($!)
  rescue
    Log::debug_exception(self)
  end
  Log::debug self, "CANT EXEC SUBCOMMAND: #{self}"
  raise

  rescue Exception
  Log::warn_exception(self)
  raise
#      ensure
# Log::debug self, "CREATE_NODES: #{self}.number_of_nodes=#{no}"
  #add_node(nil)
  #self.number_of_nodes = no
  end
end

#def_job_pool_variable(vname, value = nil) ⇒ Object



76
77
78
# File 'lib/fairy/master/c-filter.rb', line 76

def def_job_pool_variable(vname, value = nil)
  @job_pool_dict.def_variable(vname, value)
end

#each_assigned_filter(&block) ⇒ Object

def next_filter(mapper)

@nodes_mutex.synchronize do

while @nodes_for_next_filters.empty? @nodes_cv.wait(@nodes_mutex) end @nodes_for_next_filters.shift

  end
end


292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/fairy/master/c-filter.rb', line 292

def each_assigned_filter(&block)
  loop do 
  input_filter = nil
  @nodes_mutex.synchronize do
    while @nodes_for_next_filters.empty?
 @nodes_cv.wait(@nodes_mutex)
    end
    input_filter = @nodes_for_next_filters.shift
    return unless input_filter
  end
  block.call input_filter
  end 
end

#each_export_by(njob, mapper, &block) ⇒ Object



314
315
316
317
# File 'lib/fairy/master/c-filter.rb', line 314

def each_export_by(njob, mapper, &block)
#      block.call njob.export, :foo=>:bar
  block.call njob.export
end

#each_node(flag = nil, &block) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/fairy/master/c-filter.rb', line 124

def each_node(flag = nil, &block)
  if flag == :exist_only
  return each_node_exist_only &block
  end
  @nodes_mutex.synchronize do
  idx = 0
  while !@number_of_nodes || idx < @number_of_nodes
    unless @nodes[idx]
 @nodes_cv.wait(@nodes_mutex)
 next
    end
    begin
 @nodes_mutex.unlock
 block.call @nodes[idx] 
    ensure
 @nodes_mutex.lock
    end
    idx +=1
  end
  end
end

#each_node_exist_only(&block) ⇒ Object



146
147
148
149
# File 'lib/fairy/master/c-filter.rb', line 146

def each_node_exist_only(&block)
  nodes = @nodes_mutex.synchronize{@nodes.dup}
  nodes.each &block
end

#handle_exception(exp) ⇒ Object



392
393
394
# File 'lib/fairy/master/c-filter.rb', line 392

def handle_exception(exp)
  @controller.handle_exception(exp)
end

#inputObject



56
57
58
# File 'lib/fairy/master/c-filter.rb', line 56

def input
  ERR::Raise ERR::INTERNAL::ShouldDefineSubclass
end

#job_pool_dictObject



72
73
74
# File 'lib/fairy/master/c-filter.rb', line 72

def job_pool_dict
  @job_pool_dict
end

#job_pool_variable(vname, *value) ⇒ Object



80
81
82
83
84
85
86
# File 'lib/fairy/master/c-filter.rb', line 80

def job_pool_variable(vname, *value)
  if value.empty?
  @job_pool_dict[vname]
  else
  @job_pool_dict[vname] = value
  end
end

#njob_creation_paramsObject



259
260
261
# File 'lib/fairy/master/c-filter.rb', line 259

def njob_creation_params
  []
end

#node_class_nameObject



255
256
257
# File 'lib/fairy/master/c-filter.rb', line 255

def node_class_name
  ERR::Raise ERR::INTERNAL::NoRegisterService, self.class
end

#nodesObject



103
104
105
106
107
# File 'lib/fairy/master/c-filter.rb', line 103

def nodes
  @nodes_mutex.synchronize do
  @nodes
  end
end

#number_of_nodesObject

def each_export(&block)

each_node do |node|

exp = node.export block.call exp, node node.export.output_no_import = 1

  end
end


159
160
161
162
163
164
165
166
167
168
# File 'lib/fairy/master/c-filter.rb', line 159

def number_of_nodes
#      @number_of_nodes_mutex.synchronize do
  @nodes_mutex.synchronize do
  while !@number_of_nodes
#   @number_of_nodes_cv.wait(@number_of_nodes_mutex)
    @nodes_cv.wait(@nodes_mutex)
  end
  @number_of_nodes
  end
end

#number_of_nodes=(no) ⇒ Object

Njob Methods:



92
93
94
95
96
97
98
99
100
101
# File 'lib/fairy/master/c-filter.rb', line 92

def number_of_nodes=(no)
#puts "#{self}.number_of_nodes=#{no}"
#      @number_of_nodes_mutex.synchronize do
  @nodes_mutex.synchronize do
  @number_of_nodes = no
# @number_of_nodes_cv.broadcast
  @nodes_cv.broadcast
  @nodes_status_cv.broadcast
  end
end

#pool_dictObject

Pool Variables: (JP: プール変数)



68
69
70
# File 'lib/fairy/master/c-filter.rb', line 68

def pool_dict
  @controller.pool_dict
end

#postmapping_policyObject



60
61
62
# File 'lib/fairy/master/c-filter.rb', line 60

def postmapping_policy
  @opts[:postmapping_policy] || CONF.POSTMAPPING_POLICY
end

#start_create_nodesObject

Njob creation methods



173
174
175
176
177
178
179
180
# File 'lib/fairy/master/c-filter.rb', line 173

def start_create_nodes
  @create_node_thread = Thread.start{
  Log::debug self, "START_CREATE_NODES: START #{self}"
  create_nodes
  Log::debug self, "START_CREATE_NODES: END #{self}"
  }
  nil
end

#start_export(njob) ⇒ Object



306
307
308
# File 'lib/fairy/master/c-filter.rb', line 306

def start_export(njob)
  export = njob.start_export
end

#start_watch_node_statusObject



367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/fairy/master/c-filter.rb', line 367

def start_watch_node_status
  Thread.start do

  all_finished = false
  while !@number_of_nodes || !all_finished
    @nodes_status_mutex.synchronize do
 @nodes_status_cv.wait(@nodes_status_mutex)
    end

    all_finished = @number_of_nodes
    Log::info(self) do |sio|
 sio.puts "Status Changed: BEGIN #{self}"
 each_node(:exist_only) do |node|
   st = @nodes_status[node]
   sio.puts "  node: #{node} status: #{st.id2name}" if st
   all_finished &&= st==:ST_FINISH
 end
 sio.puts "Status Changed: END #{self}"
    end
  end
  Log::info self, "Monitoring finish: ALL NJOB finished"
  end
  nil
end

#update_status(node, st) ⇒ Object



356
357
358
359
360
361
# File 'lib/fairy/master/c-filter.rb', line 356

def update_status(node, st)
  @nodes_status_mutex.synchronize do
  @nodes_status[node] = st
  @nodes_status_cv.broadcast
  end
end

#watch_status?Boolean

Returns:

  • (Boolean)


363
364
365
# File 'lib/fairy/master/c-filter.rb', line 363

def watch_status?
  @@watch_status
end