Class: Fairy::PGroupBy::PQMergeSortBuffer2::StSt
- Inherits:
-
MergeSortBuffer::StSt
- Object
- MergeSortBuffer::StSt
- Fairy::PGroupBy::PQMergeSortBuffer2::StSt
- Defined in:
- lib/fairy/node/p-group-by.rb
Instance Method Summary collapse
- #each(&block) ⇒ Object
-
#initialize(buffers) ⇒ StSt
constructor
A new instance of StSt.
Methods inherited from MergeSortBuffer::StSt
Constructor Details
#initialize(buffers) ⇒ StSt
Returns a new instance of StSt.
805 806 807 808 809 810 811 812 813 814 815 816 817 |
# File 'lib/fairy/node/p-group-by.rb', line 805 def initialize(buffers) require "priority_queue" @buffers = PriorityQueue.new buffers.each{|buf| buf.open kv = read_line(buf.io) next unless kv @buffers.push [kv, buf], kv.first } @fiber = nil end |
Instance Method Details
#each(&block) ⇒ Object
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 |
# File 'lib/fairy/node/p-group-by.rb', line 819 def each(&block) key = @buffers.min_key.first.first values = KeyValueStream.new(key, self) @fiber = Fiber.new{yield values} while buf_min = @buffers.min_key kv, buf = buf_min if key == kv[0] values.concat kv[1] @fiber.resume else values.push_eos @fiber.resume key = kv[0] values = KeyValueStream.new(key, self) @fiber = Fiber.new{yield values} values.concat kv[1] @fiber.resume end unless line = read_line(buf.io) buf.close! @buffers.delete_min next end buf_min[0] = line @buffers.change_priority buf_min, line[0] end values.push_eos @fiber.resume end |