Class: Fairy::PGroupBy::DirectKB2MergeSortBuffer

Inherits:
DirectKBMergeSortBuffer show all
Defined in:
lib/fairy/node/p-group-by.rb

Defined Under Namespace

Classes: CachedBuffer

Instance Attribute Summary

Attributes inherited from OnMemoryBuffer

#log_id

Instance Method Summary collapse

Methods inherited from CommandMergeSortBuffer

#each, #init_2ndmemory, #initialize, #open_buffer, #push

Methods inherited from OnMemoryBuffer

#each, #initialize, #push

Constructor Details

This class inherits a constructor from Fairy::PGroupBy::CommandMergeSortBuffer

Instance Method Details

#each_2ndmemory(&block) ⇒ Object



1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
# File 'lib/fairy/node/p-group-by.rb', line 1465

def each_2ndmemory(&block)
  unless @key_values.empty?
    store_2ndmemory(@key_values)
    @key_values = nil
  end
  Log::info(self, "Merge Start: #{@buffers.size} files")
  Log::debug(self, @buffers.collect{|b| b.path}.join(" "))
  
  m = DirectMergeSortBuffer::Merger.new(@njob, @buffers, CachedBuffer)
  m.each(&block)
end

#store_2ndmemory(key_values) ⇒ Object



1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
# File 'lib/fairy/node/p-group-by.rb', line 1438

def store_2ndmemory(key_values)
  Log::debug(self, "START STORE")
  sorted = key_values.sort_by{|e| e.first}
  
  open_buffer do |io|
    tmpary = []
    tmpary_sz = 0
    sorted.each do |key, vv|
      vv.each do |values|
 if tmpary_sz >= @CHUNK_SIZE
    Marshal.dump(tmpary, io)
    tmpary = []
    tmpary_sz = 0
 end
 tmpary.push values
 tmpary_sz += values.size
      end
    end
    if tmpary_sz > 0
      Marshal.dump(tmpary, io)
      tmpary = nil
    end
  end
  sorted = nil
  Log::debug(self, "FINISH STORE")
end