Class: Fairy::PWC::PPostFilter

Inherits:
Fairy::PIOFilter show all
Defined in:
lib/fairy/node/p-wc.rb

Constant Summary collapse

ST_OUTPUT_FINISH =
:ST_OUTPUT_FINISH
IPADDR_REGEXP =
/::ffff:([0-9]+\.){3}[0-9]+|[0-9a-f]+:([0-9a-f]*:)[0-9a-f]*/

Constants inherited from Fairy::PIOFilter

Fairy::PIOFilter::ST_WAIT_IMPORT

Constants inherited from Fairy::PFilter

Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT

Instance Attribute Summary

Attributes inherited from Fairy::PFilter

#IGNORE_EXCEPTION, #id, #log_id, #ntask

Instance Method Summary collapse

Methods inherited from Fairy::PFilter

#abort_running, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_export, #start_watch_status, #status=, #terminate, #terminate_proc

Constructor Details

#initialize(id, ntask, bjob, opt, vf) ⇒ PPostFilter



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/fairy/node/p-wc.rb', line 162

def initialize(id, ntask, bjob, opt, vf)
  super
  @vfile = vf

  @buffering_policy = @opts[:buffering_policy]
  @buffering_policy ||= CONF.GROUP_BY_BUFFERING_POLICY

  @mod = @opts[:no_segment] 
  @mod ||= CONF.GROUP_BY_NO_SEGMENT

  mod = @opts[:hash_module]
  mod ||= CONF.GROUP_BY_HASH_MODULE
  require mod
  @hash_generator = Fairy::HValueGenerator.new(bjob.hash_seed)

  @hash_optimize = CONF.GROUP_BY_GROUPING_OPTIMIZE
  @hash_optimize = @opts[:grouping_optimize] if @opts.key?(:grouping_optimize)
end

Instance Method Details

#basic_start(&block) ⇒ Object



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/fairy/node/p-wc.rb', line 190

def basic_start(&block)
  Log::debug(self, "START")
  output_uri = gen_real_file_name
  @vfile.set_real_file(no, output_uri)

  Log::debug(self, "write real file: #{output_uri}")
  begin
    output_file = URI.parse(output_uri).path
  rescue
    Log::debug_exception(self)
    raise
  end

  unless File.exist?(File.dirname(output_file))
    create_dir(File.dirname(output_file))
  end

  @key_value_buffer = 
    eval("PGroupBy::#{@buffering_policy[:buffering_class]}").new(self, @buffering_policy)
# if @hash_optimize
    @hash_proc = eval("proc{|w| w}")
# else
#   @hash_proc = BBlock.new("|w| w", @context, self)
# end

  @input.each do |e|
    @key_value_buffer.push(e)
    e = nil
  end

  File.open(output_file, "w") do |io|
    Log::debug(self, "start write real file: #{output_uri}")
    @key_value_buffer.each do |values|
      io.puts [values.key, values.size].join(" ")
    end
    @key_value_buffer = nil
    Log::debug(self, "finish write real file: #{output_uri}")
  end

  self.status = ST_OUTPUT_FINISH
end

#create_dir(path) ⇒ Object



232
233
234
235
236
237
238
239
240
241
# File 'lib/fairy/node/p-wc.rb', line 232

def create_dir(path)
  unless File.exist?(File.dirname(path))
    create_dir(File.dirname(path))
  end
  begin
    Dir.mkdir(path)
  rescue Errno::EEXIST
    # 無視
  end
end

#gen_real_file_nameObject



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/fairy/node/p-wc.rb', line 245

def gen_real_file_name
  host= processor.addr
  root = CONF.VF_ROOT
  prefix = CONF.VF_PREFIX
  base_name = @vfile.base_name
  no = @input.no
  

  if IPADDR_REGEXP =~ host
    begin
      host = Resolv.getname(host)
    rescue
      # ホスト名が分からない場合 は そのまま ipv6 アドレスにする
      host = "[#{host}]"
    end
  end
  
  format("file://#{host}#{root}/#{prefix}/#{base_name}-%03d", no)
end

#hash_key(e) ⇒ Object



186
187
188
# File 'lib/fairy/node/p-wc.rb', line 186

def hash_key(e)
  @hash_proc.yield(e)
end

#input=(input) ⇒ Object



181
182
183
184
# File 'lib/fairy/node/p-wc.rb', line 181

def input=(input)
  super
  start
end