Class: Fairy::CDirectProduct::CPreFilter

Inherits:
Fairy::CIOFilter show all
Defined in:
lib/fairy/master/c-direct-product.rb

Overview

呼ばれない

def start_create_nodes
  @main_prefilter.start_create_nodes
end

Instance Attribute Summary

Attributes inherited from Fairy::CIOFilter

#input

Attributes included from Fairy::CInputtable

#input

Instance Method Summary collapse

Methods inherited from Fairy::CIOFilter

#node_class, #output=

Methods included from Fairy::CInputtable

#break_running, #inputtable?

Methods inherited from Fairy::CFilter

#abort_create_node, #add_node, #assgin_number_of_nodes?, #break_create_node, #break_running, #create_and_add_node, #create_import, #create_node, #create_nodes, #def_job_pool_variable, #each_node, #each_node_exist_only, #handle_exception, #input, #job_pool_dict, #job_pool_variable, #nodes, #number_of_nodes, #pool_dict, #postmapping_policy, #start_export, #start_watch_node_status, #update_status, watch_status, watch_status=, #watch_status?

Constructor Details

#initialize(controller, opts, block_source) ⇒ CPreFilter

Returns a new instance of CPreFilter.



92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/fairy/master/c-direct-product.rb', line 92

def initialize(controller, opts, block_source)
  super
  @block_source = block_source

  @no = 0
  @exports = {}
  @exports_mutex = Mutex.new
# @exports_cv = XThread::ConditionVariable.new

  @products = nil
  @products_mutex = Mutex.new
  @products_cv = XThread::ConditionVariable.new
end

Instance Method Details

#bind_export(exp, imp) ⇒ Object



183
184
185
# File 'lib/fairy/master/c-direct-product.rb', line 183

def bind_export(exp, imp)
  # do nothing
end

#each_assigned_filter(&block) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/fairy/master/c-direct-product.rb', line 139

def each_assigned_filter(&block)
  Thread.start do
    @main.other_prefilters.each do |p| 
      p.each_node do |n| 
 @exports_mutex.synchronize do 
    @exports[n] = n.exports.dc_dup
#   @exports_cv.broadcast
 end
      end
    end
    @products_mutex.synchronize do 
      @products = nodes.product(*@main.other_prefilters.collect{|p| p.nodes})
      @products_cv.broadcast
    end
  end

  super
end

#each_export_by(njob, mapper, &block) ⇒ Object

main prefilter 用



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/fairy/master/c-direct-product.rb', line 159

def each_export_by(njob, mapper, &block)
  @exports_mutex.synchronize do
    @exports[njob] = njob.exports.dc_dup
#   @exports_cv.broadcast
  end
  @products_mutex.synchronize do
    while !@products
      @products_cv.wait(@products_mutex)
    end
    
    post_njob_no = -1
    @products.each do |main_njob, *others_njobs|
      post_njob_no += 1
      next if main_njob != njob
      @others_njobs = others_njobs
      
      block.call(@exports[main_njob].shift,
   :init_njob => proc{|njob| 
       njob.no = post_njob_no
       njob.other_inputs = others_njobs.collect{|n| @exports[n].shift}})
    end
  end
end

#main=(main) ⇒ Object



106
107
108
# File 'lib/fairy/master/c-direct-product.rb', line 106

def main=(main)
  @main = main
end

#njob_creation_paramsObject



114
115
116
# File 'lib/fairy/master/c-direct-product.rb', line 114

def njob_creation_params
  [@block_source]
end

#node_class_nameObject



110
111
112
# File 'lib/fairy/master/c-direct-product.rb', line 110

def node_class_name
  "PDirectProduct::PPreFilter"
end

#number_of_exportsObject



123
124
125
# File 'lib/fairy/master/c-direct-product.rb', line 123

def number_of_exports
  @main.no_of_exports_for_prefilter(self)
end

#number_of_nodes=(no) ⇒ Object



118
119
120
121
# File 'lib/fairy/master/c-direct-product.rb', line 118

def number_of_nodes=(no)
  super
  @main.update_prefilter_no_nodes(self)
end

#start_create_nodesObject



127
128
129
130
131
132
133
134
135
136
137
# File 'lib/fairy/master/c-direct-product.rb', line 127

def start_create_nodes
  Log::debug self, "START_CREATE_NODES: #{self}"
  @main.other_prefilters.each do |other|
    Thread.start do
other.each_assigned_filter do |input_filter|
  exp = input_filter.start_export
end
    end
  end
  super
end