Class: Fairy::PGroupBy::PPostFilter
- Inherits:
-
Fairy::PSingleExportFilter
- Object
- Fairy::PFilter
- Fairy::PIOFilter
- Fairy::PSingleExportFilter
- Fairy::PGroupBy::PPostFilter
- Defined in:
- lib/fairy/node/p-group-by.rb
Constant Summary
Constants included from Fairy::PSingleExportable
Fairy::PSingleExportable::END_OF_STREAM, Fairy::PSingleExportable::ST_EXPORT_FINISH, Fairy::PSingleExportable::ST_WAIT_EXPORT_FINISH
Constants inherited from Fairy::PIOFilter
Fairy::PIOFilter::ST_WAIT_IMPORT
Constants inherited from Fairy::PFilter
Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT
Instance Attribute Summary
Attributes included from Fairy::PSingleExportable
Attributes inherited from Fairy::PFilter
#IGNORE_EXCEPTION, #id, #log_id, #ntask
Instance Method Summary collapse
- #basic_each(&block) ⇒ Object
- #basic_each_0(&block) ⇒ Object
- #hash_key(e) ⇒ Object
- #init_key_proc ⇒ Object
-
#initialize(id, ntask, bjob, opts, block_source) ⇒ PPostFilter
constructor
A new instance of PPostFilter.
Methods included from Fairy::PSingleExportable
#start, #start_export, #terminate, #wait_export_finish
Methods inherited from Fairy::PIOFilter
Methods inherited from Fairy::PFilter
#abort_running, #basic_start, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_export, #start_watch_status, #status=, #terminate, #terminate_proc
Constructor Details
#initialize(id, ntask, bjob, opts, block_source) ⇒ PPostFilter
Returns a new instance of PPostFilter.
51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/fairy/node/p-group-by.rb', line 51 def initialize(id, ntask, bjob, opts, block_source) super @block_source = block_source @buffering_policy = @opts[:buffering_policy] @buffering_policy ||= CONF.GROUP_BY_BUFFERING_POLICY unless CONF.BUG234 @hash_optimize = CONF.GROUP_BY_GROUPING_OPTIMIZE @hash_optimize = opts[:grouping_optimize] if opts.key?(:grouping_optimize) end end |
Instance Method Details
#basic_each(&block) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/fairy/node/p-group-by.rb', line 107 def basic_each(&block) @key_value_buffer = eval("#{@buffering_policy[:buffering_class]}").new(self, @buffering_policy) init_key_proc @input.each do |e| @key_value_buffer.push(e) e = nil end @key_value_buffer.each do |kvs| block.call kvs end @key_value_buffer = nil end |
#basic_each_0(&block) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/fairy/node/p-group-by.rb', line 92 def basic_each_0(&block) # @key_value_buffer = # eval("#{@buffering_policy[:buffering_class]}").new(@buffering_policy) if @hash_optimize @hash_proc = eval("proc{#{@block_source.source}}") else @hash_proc = BBlock.new(@block_source, @context, self) end @input.group_by{|e| e}.each{|k, v| block.call [k, v] } end |
#hash_key(e) ⇒ Object
122 123 124 |
# File 'lib/fairy/node/p-group-by.rb', line 122 def hash_key(e) @hash_proc.yield(e) end |