Class: Fairy::OnMemorySortedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/share/port.rb

Direct Known Subclasses

SortedQueue1

Instance Method Summary collapse

Constructor Details

#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ OnMemorySortedQueue

Returns a new instance of OnMemorySortedQueue.



1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
# File 'lib/fairy/share/port.rb', line 1553

def initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond)
  @policy = policy

  @pool_threshold = policy[:pool_threshold]
  @pool_threshold ||= CONF.SORTEDQUEUE_POOL_THRESHOLD

  @push_queue = []
  @pop_queue = nil

  @queue_mutex = queue_mon
  @queue_cv = queue_cv

  @sort_by = policy[:sort_by]
  @sort_by ||= CONF.SORTEDQUEUE_SORTBY   

  if @sort_by.kind_of?(String)
  @sort_by = eval("proc{#{@sort_by}}")
  end
end

Instance Method Details

#popObject



1595
1596
1597
1598
1599
1600
# File 'lib/fairy/share/port.rb', line 1595

def pop
  @queue_mon.synchronize do
  @queue_cv.wait_while{@pop_queue.nil?}
  @pop_queue.shift
  end
end

#pop_allObject



1602
1603
1604
1605
1606
1607
# File 'lib/fairy/share/port.rb', line 1602

def pop_all
  @queue_mon.synchronize do
  @queue_cv.wait_while{@pop_queue.nil?}
  @pop_queue.shift(@pool_threshold)
  end
end

#push(e) ⇒ Object



1573
1574
1575
1576
1577
1578
1579
1580
1581
# File 'lib/fairy/share/port.rb', line 1573

def push(e)
  @queue_mon.synchronize do
  @push_queue.push e
  if e == :END_OF_STREAM
    @push_queue.pop
    push_on_eos
  end
  end
end

#push_on_eosObject



1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
# File 'lib/fairy/share/port.rb', line 1583

def push_on_eos
  begin
  @pop_queue = @push_queue.sort_by{|e| @sort_by.call(e)}
  @pop_queue.push :END_OF_STREAM
  @push_queue.clear
  @push_queue = nil
  rescue
  Log::debug_exception
  end
  @queue_cv.broadcast
end