Class: Fairy::PGroupBy::PQMergeSortBuffer2::StSt

Inherits:
MergeSortBuffer::StSt show all
Defined in:
lib/fairy/node/p-group-by.rb

Instance Method Summary collapse

Methods inherited from MergeSortBuffer::StSt

#read_line

Constructor Details

#initialize(buffers) ⇒ StSt

Returns a new instance of StSt.



805
806
807
808
809
810
811
812
813
814
815
816
817
# File 'lib/fairy/node/p-group-by.rb', line 805

def initialize(buffers)
  require "priority_queue"

  @buffers = PriorityQueue.new
  buffers.each{|buf|
    buf.open
    kv = read_line(buf.io)
    next unless kv
    @buffers.push [kv, buf], kv.first
  }

  @fiber = nil
end

Instance Method Details

#each(&block) ⇒ Object



819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
# File 'lib/fairy/node/p-group-by.rb', line 819

def each(&block)
  key = @buffers.min_key.first.first
  values = KeyValueStream.new(key, self)
  @fiber = Fiber.new{yield values}
  while buf_min = @buffers.min_key
    kv, buf = buf_min
    if key == kv[0]
      values.concat kv[1]
      @fiber.resume
    else
      values.push_eos
      @fiber.resume
      key = kv[0]
      values = KeyValueStream.new(key, self)
      @fiber = Fiber.new{yield values}
      values.concat kv[1]
      @fiber.resume
    end
    
    unless line = read_line(buf.io)
      buf.close!
      @buffers.delete_min
      next
    end
    buf_min[0] = line
    @buffers.change_priority buf_min, line[0]
  end
  values.push_eos
  @fiber.resume
end