Class: Fairy::PGroupBy::DirectFBMergeSortBuffer::CachedBuffer

Inherits:
Fairy::PGroupBy::DirectMergeSortBuffer::CachedBuffer show all
Extended by:
Forwardable
Defined in:
lib/fairy/node/p-group-by.rb

Instance Method Summary collapse

Methods inherited from Fairy::PGroupBy::DirectMergeSortBuffer::CachedBuffer

#eof?, #key, #shift_values

Constructor Details

#initialize(njob, io) ⇒ CachedBuffer

Returns a new instance of CachedBuffer.



1208
1209
1210
1211
1212
# File 'lib/fairy/node/p-group-by.rb', line 1208

def initialize(njob, io)
  super
  
  @each_fb = Fiber.new{|block| each_sub(block)}
end

Instance Method Details

#each_by_same_key(&block) ⇒ Object

def key if @cache.empty? read_buffer end @key end



1221
1222
1223
# File 'lib/fairy/node/p-group-by.rb', line 1221

def each_by_same_key(&block)
  @each_fb.resume(block)
end

#each_sub(block) ⇒ Object



1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
# File 'lib/fairy/node/p-group-by.rb', line 1225

def each_sub(block)
  if @cache.empty?
    read_buffer
    return if @cache.empty?
  end

  while !@cache.empty?
    @cache.each do |e|
      unless @njob.hash_key(e) == @key
	@key = @njob.hash_key(e)
	block = Fiber.yield
      end
      block.call e
    end
    read_buffer
  end
end

#read_bufferObject



1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
# File 'lib/fairy/node/p-group-by.rb', line 1243

def read_buffer
  io = @io.io
  begin
    @cache = Marshal.load(io)
  rescue EOFError
    @eof = true
    @cache = []
  rescue ArgumentError
    Log::debug(self, "MARSHAL ERROR OCCURED!!")
    io.seek(-1024, IO::SEEK_CUR)
    buf = io.read(2048)
    Log::debugf(self, "File Contents: %s", buf)
    raise
  end
#	  @key = @njob.hash_key(@cache.first)
end