Class: Fairy::PWC::PPostFilter

Inherits:
Fairy::PIOFilter show all
Defined in:
lib/fairy/node/p-wc.rb

Constant Summary collapse

ST_OUTPUT_FINISH =
:ST_OUTPUT_FINISH
IPADDR_REGEXP =
/::ffff:([0-9]+\.){3}[0-9]+|[0-9a-f]+:([0-9a-f]*:)[0-9a-f]*/

Constants inherited from Fairy::PIOFilter

Fairy::PIOFilter::ST_WAIT_IMPORT

Constants inherited from Fairy::PFilter

Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT

Instance Attribute Summary

Attributes inherited from Fairy::PFilter

#IGNORE_EXCEPTION, #id, #log_id, #ntask

Instance Method Summary collapse

Methods inherited from Fairy::PFilter

#abort_running, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_export, #start_watch_status, #status=, #terminate, #terminate_proc

Constructor Details

#initialize(id, ntask, bjob, opt, vf) ⇒ PPostFilter

Returns a new instance of PPostFilter.



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/fairy/node/p-wc.rb', line 162

def initialize(id, ntask, bjob, opt, vf)
	super
	@vfile = vf

	@buffering_policy = @opts[:buffering_policy]
	@buffering_policy ||= CONF.GROUP_BY_BUFFERING_POLICY

	@mod = @opts[:no_segment] 
	@mod ||= CONF.GROUP_BY_NO_SEGMENT

	mod = @opts[:hash_module]
	mod ||= CONF.GROUP_BY_HASH_MODULE
	require mod
	@hash_generator = Fairy::HValueGenerator.new(bjob.hash_seed)

	@hash_optimize = CONF.GROUP_BY_GROUPING_OPTIMIZE
	@hash_optimize = @opts[:grouping_optimize] if @opts.key?(:grouping_optimize)
end

Instance Method Details

#basic_start(&block) ⇒ Object



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/fairy/node/p-wc.rb', line 190

def basic_start(&block)
	Log::debug(self, "START")
	output_uri = gen_real_file_name
	@vfile.set_real_file(no, output_uri)

	Log::debug(self, "write real file: #{output_uri}")
	begin
	  output_file = URI.parse(output_uri).path
	rescue
	  Log::debug_exception(self)
	  raise
	end

	unless File.exist?(File.dirname(output_file))
	  create_dir(File.dirname(output_file))
	end

	@key_value_buffer = 
	  eval("PGroupBy::#{@buffering_policy[:buffering_class]}").new(self, @buffering_policy)
#	if @hash_optimize
	  @hash_proc = eval("proc{|w| w}")
#	else
#	  @hash_proc = BBlock.new("|w| w", @context, self)
#	end

	@input.each do |e|
	  @key_value_buffer.push(e)
	  e = nil
	end

	File.open(output_file, "w") do |io|
	  Log::debug(self, "start write real file: #{output_uri}")
	  @key_value_buffer.each do |values|
	    io.puts [values.key, values.size].join(" ")
	  end
	  @key_value_buffer = nil
	  Log::debug(self, "finish write real file: #{output_uri}")
	end

	self.status = ST_OUTPUT_FINISH
end

#create_dir(path) ⇒ Object



232
233
234
235
236
237
238
239
240
241
# File 'lib/fairy/node/p-wc.rb', line 232

def create_dir(path)
	unless File.exist?(File.dirname(path))
	  create_dir(File.dirname(path))
	end
	begin
	  Dir.mkdir(path)
	rescue Errno::EEXIST
	  # 無視
	end
end

#gen_real_file_nameObject



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/fairy/node/p-wc.rb', line 245

def gen_real_file_name
	host= processor.addr
	root = CONF.VF_ROOT
	prefix = CONF.VF_PREFIX
	base_name = @vfile.base_name
	no = @input.no
	

	if IPADDR_REGEXP =~ host
	  begin
	    host = Resolv.getname(host)
	  rescue
	    # ホスト名が分からない場合 は そのまま ipv6 アドレスにする
	    host = "[#{host}]"
	  end
	end
	
	format("file://#{host}#{root}/#{prefix}/#{base_name}-%03d", no)
end

#hash_key(e) ⇒ Object



186
187
188
# File 'lib/fairy/node/p-wc.rb', line 186

def hash_key(e)
	@hash_proc.yield(e)
end

#input=(input) ⇒ Object



181
182
183
184
# File 'lib/fairy/node/p-wc.rb', line 181

def input=(input)
	super
	start
end