Class: Fairy::PGroupBy::DirectOnMemoryBuffer
- Inherits:
-
Object
- Object
- Fairy::PGroupBy::DirectOnMemoryBuffer
- Defined in:
- lib/fairy/node/p-group-by.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#log_id ⇒ Object
Returns the value of attribute log_id.
Instance Method Summary collapse
- #each(&block) ⇒ Object
-
#initialize(njob, policy) ⇒ DirectOnMemoryBuffer
constructor
A new instance of DirectOnMemoryBuffer.
- #push(value) ⇒ Object
Constructor Details
#initialize(njob, policy) ⇒ DirectOnMemoryBuffer
Returns a new instance of DirectOnMemoryBuffer.
866 867 868 869 870 871 872 873 874 875 876 877 |
# File 'lib/fairy/node/p-group-by.rb', line 866 def initialize(njob, policy) @njob = njob @policy = policy @key_values = [] @key_values_mutex = Mutex.new @CHUNK_SIZE = policy[:chunk_size] @CHUNK_SIZE ||= CONF.GROUP_BY_CMSB_CHUNK_SIZE @log_id = format("%s[%s]", self.class.name.sub(/Fairy::/, ''), @njob.id) end |
Instance Attribute Details
#log_id ⇒ Object
Returns the value of attribute log_id.
879 880 881 |
# File 'lib/fairy/node/p-group-by.rb', line 879 def log_id @log_id end |
Instance Method Details
#each(&block) ⇒ Object
887 888 889 890 891 |
# File 'lib/fairy/node/p-group-by.rb', line 887 def each(&block) # @key_values = @key_values.collect{|e| [@njob.hash_key(e), e]}.group_by{|k, e| k}.sort_by{|k, e| k} @key_values = @key_values.group_by{|e| @njob.hash_key(e)}.sort_by{|k, e| k}.collect{|k, values| kvs = KeyValueStream.new(k, nil); kvs.concat(values); kvs.push_eos; kvs} @key_values.each &block end |
#push(value) ⇒ Object
881 882 883 884 885 |
# File 'lib/fairy/node/p-group-by.rb', line 881 def push(value) @key_values_mutex.synchronize do @key_values.push value end end |