Class: Fairy::PGroupBy::DirectMergeSortBuffer::Merger
- Inherits:
-
Object
- Object
- Fairy::PGroupBy::DirectMergeSortBuffer::Merger
show all
- Defined in:
- lib/fairy/node/p-group-by.rb
Instance Method Summary
collapse
Constructor Details
#initialize(njob, buffers, cached_buffer_class = CachedBuffer) ⇒ Merger
Returns a new instance of Merger.
981
982
983
984
985
986
|
# File 'lib/fairy/node/p-group-by.rb', line 981
def initialize(njob, buffers, cached_buffer_class = CachedBuffer)
@njob = njob
@buffers = buffers.collect{|buf| cached_buffer_class.new(@njob, buf)}.select{|buf| !buf.eof?}.sort_by{|buf| buf.key}
@key = nil
end
|
Instance Method Details
#each(&block) ⇒ Object
988
989
990
991
992
993
994
|
# File 'lib/fairy/node/p-group-by.rb', line 988
def each(&block)
while !@buffers.empty?
@key = @buffers.first.key
values = KeyValueStream.new(@key, self)
block.call values
end
end
|
#each_by_key(&block) ⇒ Object
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
|
# File 'lib/fairy/node/p-group-by.rb', line 996
def each_by_key(&block)
while buf_min = @buffers.shift
vv_key = buf_min.key
unless @key == vv_key
@buffers.unshift buf_min
return
end
buf_min.each_by_same_key(&block)
if buf_min.eof?
buf_min.close!
next
end
if vv_key == buf_min.key
@buffers.unshift(buf_min)
else
idx = @buffers.rindex{|buf| buf.key <= buf_min.key}
idx ? @buffers.insert(idx+1, buf_min) : @buffers.unshift(buf_min)
end
end
end
|
#get_buf(values) ⇒ Object
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
|
# File 'lib/fairy/node/p-group-by.rb', line 1020
def get_buf(values)
unless buf_min = @buffers.shift
values.push_eos
return
end
vv_key = buf_min.key
unless @key == vv_key
values.push_eos
@buffers.unshift buf_min
return
end
vv = buf_min.shift_values
if vv
values.concat vv
end
if buf_min.eof?
buf_min.close!
return
end
idx = @buffers.rindex{|buf| buf.key <= buf_min.key}
idx ? @buffers.insert(idx+1, buf_min) : @buffers.unshift(buf_min)
end
|