Class: Fairy::SortedQueue
- Inherits:
-
Object
- Object
- Fairy::SortedQueue
- Defined in:
- lib/fairy/share/port.rb
Instance Method Summary collapse
- #init_2ndmemory ⇒ Object
-
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ SortedQueue
constructor
A new instance of SortedQueue.
- #open_2ndmemory(&block) ⇒ Object
- #pop ⇒ Object
- #pop_2ndmemory ⇒ Object
- #pop_all ⇒ Object
- #push(e) ⇒ Object
- #store_2ndmemory(ary) ⇒ Object
Constructor Details
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ SortedQueue
Returns a new instance of SortedQueue.
1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 |
# File 'lib/fairy/share/port.rb', line 1392 def initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) @policy = policy @pool_threshold = policy[:pool_threshold] @pool_threshold ||= CONF.SORTEDQUEUE_POOL_THRESHOLD @threshold = policy[:threshold] @threshold ||= CONF.SORTEDQUEUE_THRESHOLD @push_queue = [] @pop_queue = nil @buffers = nil @queue_mon = queue_mon @queue_cv = queue_cv @sort_by = policy[:sort_by] @sort_by ||= CONF.SORTEDQUEUE_SORTBY if @sort_by.kind_of?(String) @sort_by = eval("proc{#{@sort_by}}") end end |
Instance Method Details
#init_2ndmemory ⇒ Object
1486 1487 1488 1489 1490 1491 1492 1493 1494 |
# File 'lib/fairy/share/port.rb', line 1486 def init_2ndmemory require "tempfile" @buffer_dir = @policy[:buffer_dir] @buffer_dir ||= CONF.TMP_DIR @buffers = [] @merge_io = nil end |
#open_2ndmemory(&block) ⇒ Object
1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 |
# File 'lib/fairy/share/port.rb', line 1496 def open_2ndmemory(&block) unless @buffers init_2ndmemory end buffer = Tempfile.open("port-buffer-", @buffer_dir) begin # ruby BUG#2390の対応のため. # yield buffer yield buffer.instance_eval{@tmpfile} ensure buffer.close end @buffers.push buffer buffer end |
#pop ⇒ Object
1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 |
# File 'lib/fairy/share/port.rb', line 1442 def pop @queue_mon.synchronize do @queue_cv.wait_while{@pop_queue.nil?} if @buffers.nil? #Log::debug(self, @pop_queue.inspect) return @pop_queue.shift else pop_2ndmemory end end end |
#pop_2ndmemory ⇒ Object
1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 |
# File 'lib/fairy/share/port.rb', line 1524 def pop_2ndmemory unless @merge_io @buffers.each{|tf| tf.open} @merge_io = @buffers.map{|io| e = nil begin e = Marshal.load(io) rescue EOFError io.close! end [io, e]}.select{|io, v| !v.nil?}.sort_by{|io, v| @sort_by.call(v)} end unless io_min = @merge_io.shift return :END_OF_STREAM end io, min = io_min begin e = Marshal.load(io) @merge_io.push [io, e] @merge_io = @merge_io.sort_by{|io, e| @sort_by.call(e)} rescue EOFError io.close! end min end |
#pop_all ⇒ Object
1455 1456 1457 1458 1459 1460 1461 1462 |
# File 'lib/fairy/share/port.rb', line 1455 def pop_all buf = [] while e = pop buf.push e return buf if buf.size > @pool_threshold end buf end |
#push(e) ⇒ Object
1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 |
# File 'lib/fairy/share/port.rb', line 1416 def push(e) @queue_mon.synchronize do @push_queue.push e if e == :END_OF_STREAM @push_queue.pop if @buffers store_2ndmemory(@push_queue) @push_queue = [] @pop_queue = [] else begin @pop_queue = @push_queue.sort_by{|e| @sort_by.call(e)} @pop_queue.push :END_OF_STREAM rescue Log::debug_exception end end @queue_cv.broadcast end if @push_queue.size >= @threshold store_2ndmemory(@push_queue) @push_queue = [] end end end |
#store_2ndmemory(ary) ⇒ Object
1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 |
# File 'lib/fairy/share/port.rb', line 1512 def store_2ndmemory(ary) Log::debug(self, "start store: ") open_2ndmemory do |io| ary = ary.sort_by{|e| @sort_by.call(e)} while !ary.empty? e = ary.shift Marshal.dump(e, io) end end Log::debug(self, "end store") end |