Class: Fairy::SortedQueue1

Inherits:
OnMemorySortedQueue show all
Defined in:
lib/fairy/share/port.rb

Instance Method Summary collapse

Methods inherited from OnMemorySortedQueue

#push

Constructor Details

#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ SortedQueue1

Returns a new instance of SortedQueue1.



1611
1612
1613
1614
1615
1616
1617
1618
# File 'lib/fairy/share/port.rb', line 1611

def initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond)
  super

  @threshold = policy[:threshold]
  @threshold ||= CONF.SORTEDQUEUE_THRESHOLD

  @buffers = nil
end

Instance Method Details

#init_2ndmemoryObject



1656
1657
1658
1659
1660
1661
1662
1663
# File 'lib/fairy/share/port.rb', line 1656

def init_2ndmemory
  require "tempfile"

  @buffer_dir = @policy[:buffer_dir]
  @buffer_dir ||= CONF.TMP_DIR

  @buffers = []
end

#open_2ndmemory(&block) ⇒ Object



1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
# File 'lib/fairy/share/port.rb', line 1665

def open_2ndmemory(&block)
  unless @buffers
	init_2ndmemory
  end
  io = Tempfile.open("port-buffer-", @buffer_dir)
  @buffers.push io
  begin
	# ruby BUG#2390の対応のため.
#	yield io
	yield io.instance_eval{@tmpfile}
  ensure
	io.close
  end
  @buffers
end

#popObject



1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
# File 'lib/fairy/share/port.rb', line 1631

def pop
  @queue_mon.synchronize do
	@queue_cv.wait_while{@pop_queue.nil? && @buffers.nil?}

	if @pop_queue.nil? && @buffers
	  @pop_queue = restore_2ndmemory
	end

	@pop_queue.shift
  end
end

#pop_allObject



1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
# File 'lib/fairy/share/port.rb', line 1643

def pop_all
  @queue_mon.synchronize do
	@queue_cv.wait_while{@pop_queue.nil? && @buffers.nil?}

	if @pop_queue.nil? || @pop_queue.empty?
	  @pop_queue = restore_2ndmemory
	end
	pops = @pop_queue
	@pop_queue = nil
	pops
  end
end

#push_on_eosObject



1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
# File 'lib/fairy/share/port.rb', line 1620

def push_on_eos
  if @push_queue.size <= @threshold
	super
  else
	store_2ndmemory(@push_queue)
	@push_queue.clear
	@push_queue = nil
	@queue_cv.broadcast
  end
end

#restore_2ndmemoryObject



1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
# File 'lib/fairy/share/port.rb', line 1694

def restore_2ndmemory
  io = @buffers.shift
  io.open
  buf = Marshal.load(io)
  if @buffers.empty?
	buf.push :END_OF_STREAM 
  end
  io.close!
  buf
end

#store_2ndmemory(ary) ⇒ Object



1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
# File 'lib/fairy/share/port.rb', line 1681

def store_2ndmemory(ary)
  Log::debug(self, "start store: ")
  ary = ary.sort_by{|e| @sort_by.call(e)}
  
  while !ary.empty?
	open_2ndmemory do |io|
	  buf = ary.shift(@pool_threshold)  
	  Marshal.dump(buf, io)
	end
  end
  Log::debug(self, "end store")
end