Class: Fairy::PWC::PPostFilter
- Inherits:
-
Fairy::PIOFilter
- Object
- Fairy::PFilter
- Fairy::PIOFilter
- Fairy::PWC::PPostFilter
- Defined in:
- lib/fairy/node/p-wc.rb
Constant Summary collapse
- ST_OUTPUT_FINISH =
:ST_OUTPUT_FINISH
- IPADDR_REGEXP =
/::ffff:([0-9]+\.){3}[0-9]+|[0-9a-f]+:([0-9a-f]*:)[0-9a-f]*/
Constants inherited from Fairy::PIOFilter
Fairy::PIOFilter::ST_WAIT_IMPORT
Constants inherited from Fairy::PFilter
Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT
Instance Attribute Summary
Attributes inherited from Fairy::PFilter
#IGNORE_EXCEPTION, #id, #log_id, #ntask
Instance Method Summary collapse
- #basic_start(&block) ⇒ Object
- #create_dir(path) ⇒ Object
- #gen_real_file_name ⇒ Object
- #hash_key(e) ⇒ Object
-
#initialize(id, ntask, bjob, opt, vf) ⇒ PPostFilter
constructor
A new instance of PPostFilter.
- #input=(input) ⇒ Object
Methods inherited from Fairy::PFilter
#abort_running, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_export, #start_watch_status, #status=, #terminate, #terminate_proc
Constructor Details
#initialize(id, ntask, bjob, opt, vf) ⇒ PPostFilter
Returns a new instance of PPostFilter.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/fairy/node/p-wc.rb', line 162 def initialize(id, ntask, bjob, opt, vf) super @vfile = vf @buffering_policy = @opts[:buffering_policy] @buffering_policy ||= CONF.GROUP_BY_BUFFERING_POLICY @mod = @opts[:no_segment] @mod ||= CONF.GROUP_BY_NO_SEGMENT mod = @opts[:hash_module] mod ||= CONF.GROUP_BY_HASH_MODULE require mod @hash_generator = Fairy::HValueGenerator.new(bjob.hash_seed) @hash_optimize = CONF.GROUP_BY_GROUPING_OPTIMIZE @hash_optimize = @opts[:grouping_optimize] if @opts.key?(:grouping_optimize) end |
Instance Method Details
#basic_start(&block) ⇒ Object
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/fairy/node/p-wc.rb', line 190 def basic_start(&block) Log::debug(self, "START") output_uri = gen_real_file_name @vfile.set_real_file(no, output_uri) Log::debug(self, "write real file: #{output_uri}") begin output_file = URI.parse(output_uri).path rescue Log::debug_exception(self) raise end unless File.exist?(File.dirname(output_file)) create_dir(File.dirname(output_file)) end @key_value_buffer = eval("PGroupBy::#{@buffering_policy[:buffering_class]}").new(self, @buffering_policy) # if @hash_optimize @hash_proc = eval("proc{|w| w}") # else # @hash_proc = BBlock.new("|w| w", @context, self) # end @input.each do |e| @key_value_buffer.push(e) e = nil end File.open(output_file, "w") do |io| Log::debug(self, "start write real file: #{output_uri}") @key_value_buffer.each do |values| io.puts [values.key, values.size].join(" ") end @key_value_buffer = nil Log::debug(self, "finish write real file: #{output_uri}") end self.status = ST_OUTPUT_FINISH end |
#create_dir(path) ⇒ Object
232 233 234 235 236 237 238 239 240 241 |
# File 'lib/fairy/node/p-wc.rb', line 232 def create_dir(path) unless File.exist?(File.dirname(path)) create_dir(File.dirname(path)) end begin Dir.mkdir(path) rescue Errno::EEXIST # 無視 end end |
#gen_real_file_name ⇒ Object
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/fairy/node/p-wc.rb', line 245 def gen_real_file_name host= processor.addr root = CONF.VF_ROOT prefix = CONF.VF_PREFIX base_name = @vfile.base_name no = @input.no if IPADDR_REGEXP =~ host begin host = Resolv.getname(host) rescue # ホスト名が分からない場合 は そのまま ipv6 アドレスにする host = "[#{host}]" end end format("file://#{host}#{root}/#{prefix}/#{base_name}-%03d", no) end |
#hash_key(e) ⇒ Object
186 187 188 |
# File 'lib/fairy/node/p-wc.rb', line 186 def hash_key(e) @hash_proc.yield(e) end |
#input=(input) ⇒ Object
181 182 183 184 |
# File 'lib/fairy/node/p-wc.rb', line 181 def input=(input) super start end |