Class: Fairy::PGroupBy::DirectFBMergeSortBuffer::CachedBuffer
- Inherits:
-
Fairy::PGroupBy::DirectMergeSortBuffer::CachedBuffer
- Object
- Fairy::PGroupBy::DirectMergeSortBuffer::CachedBuffer
- Fairy::PGroupBy::DirectFBMergeSortBuffer::CachedBuffer
- Extended by:
- Forwardable
- Defined in:
- lib/fairy/node/p-group-by.rb
Instance Method Summary collapse
-
#each_by_same_key(&block) ⇒ Object
def key if @cache.empty? read_buffer end @key end.
- #each_sub(block) ⇒ Object
-
#initialize(njob, io) ⇒ CachedBuffer
constructor
A new instance of CachedBuffer.
- #read_buffer ⇒ Object
Methods inherited from Fairy::PGroupBy::DirectMergeSortBuffer::CachedBuffer
Constructor Details
#initialize(njob, io) ⇒ CachedBuffer
Returns a new instance of CachedBuffer.
1208 1209 1210 1211 1212 |
# File 'lib/fairy/node/p-group-by.rb', line 1208 def initialize(njob, io) super @each_fb = Fiber.new{|block| each_sub(block)} end |
Instance Method Details
#each_by_same_key(&block) ⇒ Object
def key if @cache.empty? read_buffer end @key end
1221 1222 1223 |
# File 'lib/fairy/node/p-group-by.rb', line 1221 def each_by_same_key(&block) @each_fb.resume(block) end |
#each_sub(block) ⇒ Object
1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 |
# File 'lib/fairy/node/p-group-by.rb', line 1225 def each_sub(block) if @cache.empty? read_buffer return if @cache.empty? end while !@cache.empty? @cache.each do |e| unless @njob.hash_key(e) == @key @key = @njob.hash_key(e) block = Fiber.yield end block.call e end read_buffer end end |
#read_buffer ⇒ Object
1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 |
# File 'lib/fairy/node/p-group-by.rb', line 1243 def read_buffer io = @io.io begin @cache = Marshal.load(io) rescue EOFError @eof = true @cache = [] rescue ArgumentError Log::debug(self, "MARSHAL ERROR OCCURED!!") io.seek(-1024, IO::SEEK_CUR) buf = io.read(2048) Log::debugf(self, "File Contents: %s", buf) raise end # @key = @njob.hash_key(@cache.first) end |