Class: Fairy::PGroupBy

Inherits:
PBasicGroupBy show all
Defined in:
lib/fairy/node/p-group-by.rb

Defined Under Namespace

Classes: CommandMergeSortBuffer, DepqMergeSortBuffer, DepqMergeSortBuffer2, DirectFBMergeSortBuffer, DirectKB2MergeSortBuffer, DirectKBMergeSortBuffer, DirectMergeSortBuffer, DirectOnMemoryBuffer, DirectPQMergeSortBuffer, ExtMergeSortBuffer, KeyValueStream, MergeSortBuffer, OnMemoryBuffer, PPostFilter, PQMergeSortBuffer, PQMergeSortBuffer2, SimpleCommandSortBuffer, SimpleFileByKeyBuffer

Constant Summary

Constants inherited from PBasicGroupBy

Fairy::PBasicGroupBy::ST_ALL_IMPORTED, Fairy::PBasicGroupBy::ST_EXPORT_FINISH, Fairy::PBasicGroupBy::ST_WAIT_EXPORT_FINISH

Constants inherited from PIOFilter

Fairy::PIOFilter::ST_WAIT_IMPORT

Constants inherited from PFilter

Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT

Instance Attribute Summary

Attributes inherited from PFilter

#IGNORE_EXCEPTION, #id, #log_id, #ntask

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from PBasicGroupBy

#add_export, #init_key_proc, #start_export, #terminate, #wait_export_finish

Methods inherited from PIOFilter

#input=

Methods inherited from PFilter

#abort_running, #basic_start, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_export, #start_watch_status, #status=, #terminate, #terminate_proc

Constructor Details

#initialize(id, ntask, bjob, opts, block_source) ⇒ PGroupBy

Returns a new instance of PGroupBy.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/fairy/node/p-group-by.rb', line 15

def initialize(id, ntask, bjob, opts, block_source)
  super

  @exports = []
  def @exports.each_pair(&block)
	each_with_index do |item, idx| 
	  block.call(idx, item) if item
	  unless item
 Log::debug(self, "No assgined Export")
	  end
	end
  end

  @mod = opts[:no_segment] 
  @mod ||= CONF.GROUP_BY_NO_SEGMENT
  Log::debug(self, "NO_SEGMENT: #{@mod}")

  mod = opts[:hash_module]
  mod ||= CONF.GROUP_BY_HASH_MODULE
  require mod
  @hash_generator = Fairy::HValueGenerator.new(bjob.hash_seed)

  @hash_optimize = CONF.GROUP_BY_GROUPING_OPTIMIZE
  @hash_optimize = opts[:grouping_optimize] if opts.key?(:grouping_optimize)
end

Class Method Details

.each_pair(&block) ⇒ Object



19
20
21
22
23
24
25
26
# File 'lib/fairy/node/p-group-by.rb', line 19

def @exports.each_pair(&block)
	each_with_index do |item, idx| 
	  block.call(idx, item) if item
	  unless item
	    Log::debug(self, "No assgined Export")
	  end
	end
end

Instance Method Details

#hash_key(e) ⇒ Object



41
42
43
44
45
46
# File 'lib/fairy/node/p-group-by.rb', line 41

def hash_key(e)
  if Import::CTLTOKEN_NULLVALUE === (key = super)
	return key
  end
  @hash_generator.value(super) % @mod
end