Class: Fairy::SortedQueue1
- Inherits:
-
OnMemorySortedQueue
- Object
- OnMemorySortedQueue
- Fairy::SortedQueue1
- Defined in:
- lib/fairy/share/port.rb
Instance Method Summary collapse
- #init_2ndmemory ⇒ Object
-
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ SortedQueue1
constructor
A new instance of SortedQueue1.
- #open_2ndmemory(&block) ⇒ Object
- #pop ⇒ Object
- #pop_all ⇒ Object
- #push_on_eos ⇒ Object
- #restore_2ndmemory ⇒ Object
- #store_2ndmemory(ary) ⇒ Object
Methods inherited from OnMemorySortedQueue
Constructor Details
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ SortedQueue1
Returns a new instance of SortedQueue1.
1611 1612 1613 1614 1615 1616 1617 1618 |
# File 'lib/fairy/share/port.rb', line 1611 def initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) super @threshold = policy[:threshold] @threshold ||= CONF.SORTEDQUEUE_THRESHOLD @buffers = nil end |
Instance Method Details
#init_2ndmemory ⇒ Object
1656 1657 1658 1659 1660 1661 1662 1663 |
# File 'lib/fairy/share/port.rb', line 1656 def init_2ndmemory require "tempfile" @buffer_dir = @policy[:buffer_dir] @buffer_dir ||= CONF.TMP_DIR @buffers = [] end |
#open_2ndmemory(&block) ⇒ Object
1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 |
# File 'lib/fairy/share/port.rb', line 1665 def open_2ndmemory(&block) unless @buffers init_2ndmemory end io = Tempfile.open("port-buffer-", @buffer_dir) @buffers.push io begin # ruby BUG#2390の対応のため. # yield io yield io.instance_eval{@tmpfile} ensure io.close end @buffers end |
#pop ⇒ Object
1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 |
# File 'lib/fairy/share/port.rb', line 1631 def pop @queue_mon.synchronize do @queue_cv.wait_while{@pop_queue.nil? && @buffers.nil?} if @pop_queue.nil? && @buffers @pop_queue = restore_2ndmemory end @pop_queue.shift end end |
#pop_all ⇒ Object
1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 |
# File 'lib/fairy/share/port.rb', line 1643 def pop_all @queue_mon.synchronize do @queue_cv.wait_while{@pop_queue.nil? && @buffers.nil?} if @pop_queue.nil? || @pop_queue.empty? @pop_queue = restore_2ndmemory end pops = @pop_queue @pop_queue = nil pops end end |
#push_on_eos ⇒ Object
1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 |
# File 'lib/fairy/share/port.rb', line 1620 def push_on_eos if @push_queue.size <= @threshold super else store_2ndmemory(@push_queue) @push_queue.clear @push_queue = nil @queue_cv.broadcast end end |
#restore_2ndmemory ⇒ Object
1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 |
# File 'lib/fairy/share/port.rb', line 1694 def restore_2ndmemory io = @buffers.shift io.open buf = Marshal.load(io) if @buffers.empty? buf.push :END_OF_STREAM end io.close! buf end |
#store_2ndmemory(ary) ⇒ Object
1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 |
# File 'lib/fairy/share/port.rb', line 1681 def store_2ndmemory(ary) Log::debug(self, "start store: ") ary = ary.sort_by{|e| @sort_by.call(e)} while !ary.empty? open_2ndmemory do |io| buf = ary.shift(@pool_threshold) Marshal.dump(buf, io) end end Log::debug(self, "end store") end |