Class: Fairy::PGroupBy::DirectMergeSortBuffer
Defined Under Namespace
Classes: CachedBuffer, KeyValueStream, Merger
Instance Attribute Summary
#log_id
Instance Method Summary
collapse
Constructor Details
Returns a new instance of DirectMergeSortBuffer.
895
896
897
898
899
900
901
902
|
# File 'lib/fairy/node/p-group-by.rb', line 895
def initialize(njob, policy)
super
@threshold = policy[:threshold]
@threshold ||= CONF.GROUP_BY_CMSB_THRESHOLD
@buffers = nil
end
|
Instance Method Details
#each(&block) ⇒ Object
960
961
962
963
964
965
966
|
# File 'lib/fairy/node/p-group-by.rb', line 960
def each(&block)
if @buffers
each_2ndmemory &block
else
super
end
end
|
#each_2ndmemory(&block) ⇒ Object
968
969
970
971
972
973
974
975
976
977
978
|
# File 'lib/fairy/node/p-group-by.rb', line 968
def each_2ndmemory(&block)
unless @key_values.empty?
store_2ndmemory(@key_values)
@key_values = nil
end
Log::info(self, "Merge Start: #{@buffers.size} files")
Log::debug(self, @buffers.collect{|b| b.path}.join(" "))
m = Merger.new(@njob, @buffers)
m.each(&block)
end
|
#init_2ndmemory ⇒ Object
904
905
906
907
908
909
910
911
|
# File 'lib/fairy/node/p-group-by.rb', line 904
def init_2ndmemory
require "fairy/share/fast-tempfile"
@buffer_dir = @policy[:buffer_dir]
@buffer_dir ||= CONF.TMP_DIR
@buffers = []
end
|
#open_buffer(&block) ⇒ Object
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
|
# File 'lib/fairy/node/p-group-by.rb', line 913
def open_buffer(&block)
unless @buffers
init_2ndmemory
end
buffer = FastTempfile.open("mod-group-by-buffer-#{@njob.no}-", @buffer_dir)
@buffers.push buffer
if block_given?
begin
yield buffer.io
ensure
buffer.close
end
else
buffer
end
end
|
#push(value) ⇒ Object
932
933
934
935
936
937
938
939
940
941
942
943
944
945
|
# File 'lib/fairy/node/p-group-by.rb', line 932
def push(value)
super
key_values = nil
@key_values_mutex.synchronize do
if @key_values.size > @threshold
key_values = @key_values
@key_values = []
end
if key_values
store_2ndmemory(key_values)
end
end
end
|
#store_2ndmemory(key_values) ⇒ Object
947
948
949
950
951
952
953
954
955
956
957
958
|
# File 'lib/fairy/node/p-group-by.rb', line 947
def store_2ndmemory(key_values)
Log::debug(self, "START STORE")
key_values = key_values.sort_by{|e| @njob.hash_key(e)}
open_buffer do |io|
key_values.each_slice(@CHUNK_SIZE) do |ary|
Marshal.dump(ary, io)
end
end
sorted = nil
Log::debug(self, "FINISH STORE")
end
|