Class: Fairy::PGroupBy::DirectKB2MergeSortBuffer
- Inherits:
-
DirectKBMergeSortBuffer
- Object
- OnMemoryBuffer
- CommandMergeSortBuffer
- DirectKBMergeSortBuffer
- Fairy::PGroupBy::DirectKB2MergeSortBuffer
- Defined in:
- lib/fairy/node/p-group-by.rb
Defined Under Namespace
Classes: CachedBuffer
Instance Attribute Summary
Attributes inherited from OnMemoryBuffer
Instance Method Summary collapse
Methods inherited from CommandMergeSortBuffer
#each, #init_2ndmemory, #initialize, #open_buffer, #push
Methods inherited from OnMemoryBuffer
Constructor Details
This class inherits a constructor from Fairy::PGroupBy::CommandMergeSortBuffer
Instance Method Details
#each_2ndmemory(&block) ⇒ Object
1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 |
# File 'lib/fairy/node/p-group-by.rb', line 1465 def each_2ndmemory(&block) unless @key_values.empty? store_2ndmemory(@key_values) @key_values = nil end Log::info(self, "Merge Start: #{@buffers.size} files") Log::debug(self, @buffers.collect{|b| b.path}.join(" ")) m = DirectMergeSortBuffer::Merger.new(@njob, @buffers, CachedBuffer) m.each(&block) end |
#store_2ndmemory(key_values) ⇒ Object
1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 |
# File 'lib/fairy/node/p-group-by.rb', line 1438 def store_2ndmemory(key_values) Log::debug(self, "START STORE") sorted = key_values.sort_by{|e| e.first} open_buffer do |io| tmpary = [] tmpary_sz = 0 sorted.each do |key, vv| vv.each do |values| if tmpary_sz >= @CHUNK_SIZE Marshal.dump(tmpary, io) tmpary = [] tmpary_sz = 0 end tmpary.push values tmpary_sz += values.size end end if tmpary_sz > 0 Marshal.dump(tmpary, io) tmpary = nil end end sorted = nil Log::debug(self, "FINISH STORE") end |