Class: Fairy::PGroupBy::DirectMergeSortBuffer

Inherits:
DirectOnMemoryBuffer show all
Defined in:
lib/fairy/node/p-group-by.rb

Defined Under Namespace

Classes: CachedBuffer, KeyValueStream, Merger

Instance Attribute Summary

Attributes inherited from DirectOnMemoryBuffer

#log_id

Instance Method Summary collapse

Constructor Details

#initialize(njob, policy) ⇒ DirectMergeSortBuffer

Returns a new instance of DirectMergeSortBuffer.



895
896
897
898
899
900
901
902
# File 'lib/fairy/node/p-group-by.rb', line 895

def initialize(njob, policy)
	super

	@threshold = policy[:threshold]
	@threshold ||= CONF.GROUP_BY_CMSB_THRESHOLD

	@buffers = nil
end

Instance Method Details

#each(&block) ⇒ Object



960
961
962
963
964
965
966
# File 'lib/fairy/node/p-group-by.rb', line 960

def each(&block)
	if @buffers
	  each_2ndmemory &block
	else
	  super
	end
end

#each_2ndmemory(&block) ⇒ Object



968
969
970
971
972
973
974
975
976
977
978
# File 'lib/fairy/node/p-group-by.rb', line 968

def each_2ndmemory(&block)
	unless @key_values.empty?
	  store_2ndmemory(@key_values)
	  @key_values = nil
	end
	Log::info(self, "Merge Start: #{@buffers.size} files")
	Log::debug(self, @buffers.collect{|b| b.path}.join(" "))
	
	m = Merger.new(@njob, @buffers)
	m.each(&block)
end

#init_2ndmemoryObject



904
905
906
907
908
909
910
911
# File 'lib/fairy/node/p-group-by.rb', line 904

def init_2ndmemory
	require "fairy/share/fast-tempfile"

	@buffer_dir = @policy[:buffer_dir]
	@buffer_dir ||= CONF.TMP_DIR

	@buffers = []
end

#open_buffer(&block) ⇒ Object



913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
# File 'lib/fairy/node/p-group-by.rb', line 913

def open_buffer(&block)
	unless @buffers
	  init_2ndmemory
	end
	buffer = FastTempfile.open("mod-group-by-buffer-#{@njob.no}-", @buffer_dir)
	@buffers.push buffer
	if block_given?
	  begin
	    # ruby BUG#2390の対応のため.
	    # yield buffer
	    yield buffer.io
	  ensure
	    buffer.close
	  end
	else
	  buffer
	end
end

#push(value) ⇒ Object



932
933
934
935
936
937
938
939
940
941
942
943
944
945
# File 'lib/fairy/node/p-group-by.rb', line 932

def push(value)
	super

	key_values = nil
	@key_values_mutex.synchronize do
	  if @key_values.size > @threshold
	    key_values = @key_values
	    @key_values = []
	  end
	  if key_values
	    store_2ndmemory(key_values)
	  end
	end
end

#store_2ndmemory(key_values) ⇒ Object



947
948
949
950
951
952
953
954
955
956
957
958
# File 'lib/fairy/node/p-group-by.rb', line 947

def store_2ndmemory(key_values)
	Log::debug(self, "START STORE")
	key_values = key_values.sort_by{|e| @njob.hash_key(e)}
	
	open_buffer do |io|
	  key_values.each_slice(@CHUNK_SIZE) do |ary|
	    Marshal.dump(ary, io)
	  end
	end
	sorted = nil
	Log::debug(self, "FINISH STORE")
end