Class: Fairy::OnMemorySortedQueue
- Inherits:
-
Object
- Object
- Fairy::OnMemorySortedQueue
- Defined in:
- lib/fairy/share/port.rb
Direct Known Subclasses
Instance Method Summary collapse
-
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ OnMemorySortedQueue
constructor
A new instance of OnMemorySortedQueue.
- #pop ⇒ Object
- #pop_all ⇒ Object
- #push(e) ⇒ Object
- #push_on_eos ⇒ Object
Constructor Details
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ OnMemorySortedQueue
Returns a new instance of OnMemorySortedQueue.
1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 |
# File 'lib/fairy/share/port.rb', line 1553 def initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) @policy = policy @pool_threshold = policy[:pool_threshold] @pool_threshold ||= CONF.SORTEDQUEUE_POOL_THRESHOLD @push_queue = [] @pop_queue = nil @queue_mutex = queue_mon @queue_cv = queue_cv @sort_by = policy[:sort_by] @sort_by ||= CONF.SORTEDQUEUE_SORTBY if @sort_by.kind_of?(String) @sort_by = eval("proc{#{@sort_by}}") end end |
Instance Method Details
#pop ⇒ Object
1595 1596 1597 1598 1599 1600 |
# File 'lib/fairy/share/port.rb', line 1595 def pop @queue_mon.synchronize do @queue_cv.wait_while{@pop_queue.nil?} @pop_queue.shift end end |
#pop_all ⇒ Object
1602 1603 1604 1605 1606 1607 |
# File 'lib/fairy/share/port.rb', line 1602 def pop_all @queue_mon.synchronize do @queue_cv.wait_while{@pop_queue.nil?} @pop_queue.shift(@pool_threshold) end end |
#push(e) ⇒ Object
1573 1574 1575 1576 1577 1578 1579 1580 1581 |
# File 'lib/fairy/share/port.rb', line 1573 def push(e) @queue_mon.synchronize do @push_queue.push e if e == :END_OF_STREAM @push_queue.pop push_on_eos end end end |
#push_on_eos ⇒ Object
1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 |
# File 'lib/fairy/share/port.rb', line 1583 def push_on_eos begin @pop_queue = @push_queue.sort_by{|e| @sort_by.call(e)} @pop_queue.push :END_OF_STREAM @push_queue.clear @push_queue = nil rescue Log::debug_exception end @queue_cv.broadcast end |