Class: Fairy::PGroupBy
- Inherits:
-
PBasicGroupBy
- Object
- PFilter
- PIOFilter
- PBasicGroupBy
- Fairy::PGroupBy
- Defined in:
- lib/fairy/node/p-group-by.rb
Defined Under Namespace
Classes: CommandMergeSortBuffer, DepqMergeSortBuffer, DepqMergeSortBuffer2, DirectFBMergeSortBuffer, DirectKB2MergeSortBuffer, DirectKBMergeSortBuffer, DirectMergeSortBuffer, DirectOnMemoryBuffer, DirectPQMergeSortBuffer, ExtMergeSortBuffer, KeyValueStream, MergeSortBuffer, OnMemoryBuffer, PPostFilter, PQMergeSortBuffer, PQMergeSortBuffer2, SimpleCommandSortBuffer, SimpleFileByKeyBuffer
Constant Summary
Constants inherited from PBasicGroupBy
Fairy::PBasicGroupBy::ST_ALL_IMPORTED, Fairy::PBasicGroupBy::ST_EXPORT_FINISH, Fairy::PBasicGroupBy::ST_WAIT_EXPORT_FINISH
Constants inherited from PIOFilter
Fairy::PIOFilter::ST_WAIT_IMPORT
Constants inherited from PFilter
Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT
Instance Attribute Summary
Attributes inherited from PFilter
#IGNORE_EXCEPTION, #id, #log_id, #ntask
Class Method Summary collapse
Instance Method Summary collapse
- #hash_key(e) ⇒ Object
-
#initialize(id, ntask, bjob, opts, block_source) ⇒ PGroupBy
constructor
A new instance of PGroupBy.
Methods inherited from PBasicGroupBy
#add_export, #init_key_proc, #start_export, #terminate, #wait_export_finish
Methods inherited from PIOFilter
Methods inherited from PFilter
#abort_running, #basic_start, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_export, #start_watch_status, #status=, #terminate, #terminate_proc
Constructor Details
#initialize(id, ntask, bjob, opts, block_source) ⇒ PGroupBy
Returns a new instance of PGroupBy.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/fairy/node/p-group-by.rb', line 15 def initialize(id, ntask, bjob, opts, block_source) super @exports = [] def @exports.each_pair(&block) each_with_index do |item, idx| block.call(idx, item) if item unless item Log::debug(self, "No assgined Export") end end end @mod = opts[:no_segment] @mod ||= CONF.GROUP_BY_NO_SEGMENT Log::debug(self, "NO_SEGMENT: #{@mod}") mod = opts[:hash_module] mod ||= CONF.GROUP_BY_HASH_MODULE require mod @hash_generator = Fairy::HValueGenerator.new(bjob.hash_seed) @hash_optimize = CONF.GROUP_BY_GROUPING_OPTIMIZE @hash_optimize = opts[:grouping_optimize] if opts.key?(:grouping_optimize) end |
Class Method Details
.each_pair(&block) ⇒ Object
19 20 21 22 23 24 25 26 |
# File 'lib/fairy/node/p-group-by.rb', line 19 def @exports.each_pair(&block) each_with_index do |item, idx| block.call(idx, item) if item unless item Log::debug(self, "No assgined Export") end end end |
Instance Method Details
#hash_key(e) ⇒ Object
41 42 43 44 45 46 |
# File 'lib/fairy/node/p-group-by.rb', line 41 def hash_key(e) if Import::CTLTOKEN_NULLVALUE === (key = super) return key end @hash_generator.value(super) % @mod end |