Class: Fairy::PDirectProduct::PPostFilter

Inherits:
Fairy::PSingleExportFilter show all
Defined in:
lib/fairy/node/p-direct-product.rb

Constant Summary

Constants included from Fairy::PSingleExportable

Fairy::PSingleExportable::END_OF_STREAM, Fairy::PSingleExportable::ST_EXPORT_FINISH, Fairy::PSingleExportable::ST_WAIT_EXPORT_FINISH

Constants inherited from Fairy::PIOFilter

Fairy::PIOFilter::ST_WAIT_IMPORT

Constants inherited from Fairy::PFilter

Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT

Instance Attribute Summary

Attributes included from Fairy::PSingleExportable

#export

Attributes inherited from Fairy::PFilter

#IGNORE_EXCEPTION, #id, #log_id, #ntask

Instance Method Summary collapse

Methods included from Fairy::PSingleExportable

#start, #start_export, #terminate, #wait_export_finish

Methods inherited from Fairy::PFilter

#abort_running, #basic_start, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_export, #start_watch_status, #status=, #terminate, #terminate_proc

Constructor Details

#initialize(id, processor, bjob, opts, block_source) ⇒ PPostFilter

Returns a new instance of PPostFilter.



85
86
87
88
89
90
91
92
# File 'lib/fairy/node/p-direct-product.rb', line 85

def initialize(id, processor, bjob, opts, block_source)
  super
  @block_source = block_source
  
  @other_imports = nil
  @other_imports_mutex = Mutex.new
  @other_imports_cv = XThread::ConditionVariable.new
end

Instance Method Details

#basic_each(&block) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/fairy/node/p-direct-product.rb', line 130

def basic_each(&block)
  @map_proc = BBlock.new(@block_source, @context, self)

  elements = []
  elements.push @input.to_a
  elements.push *other_imports.collect{|i| i.to_a}

  idxs = elements.collect{|e| 0}
  max_idxs = elements.collect{|e| e.size}

  cont = true
  while cont
    e = elements.zip(idxs).collect{|ary, idx| ary[idx]}
    block.call @map_proc.yield *e

    (idxs.size-1).downto(0) do |idx|
      idxs[idx] += 1
      break if idxs[idx] < max_idxs[idx]
      idxs[idx] = 0
      cont = false if idx == 0 && idxs[idx] == 0
    end
  end
end

#input=(input) ⇒ Object



94
95
96
97
98
# File 'lib/fairy/node/p-direct-product.rb', line 94

def input=(input)
  @input = input
# self.no = input.no
  self.key = input.key
end

#other_importsObject



121
122
123
124
125
126
127
128
# File 'lib/fairy/node/p-direct-product.rb', line 121

def other_imports
  @other_imports_mutex.synchronize do
    while !@other_imports
      @other_imports_cv.wait(@other_imports_mutex)
    end
    @other_imports
  end
end

#other_inputs=(exports) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/fairy/node/p-direct-product.rb', line 100

def other_inputs=(exports)
  @other_imports_mutex.synchronize do
    @other_imports = exports.collect{|exp|
# 後で検討する
#           policy = @opts[:prequeuing_policy]
#     imp = Import.new(policy)
      imp = Import.new
      imp.no = exp.no
      imp.add_key(exp.key)
      imp.no_import = 1
      imp.set_log_callback do |n, key| 
 Log::verbose(self, "IMPORT POP key=#{key}: #{n}")
      end

      exp.output = imp
      imp
    }
    @other_imports_cv.broadcast
  end
end