Class: Fairy::ChunkedPoolQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/share/port.rb

Direct Known Subclasses

ChunkedSizedPoolQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(policy, queues_mon = XThread::Monitor.new, queues_cv = queues_mon.new_cond) ⇒ ChunkedPoolQueue

multi push threads single pop thread



1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
# File 'lib/fairy/share/port.rb', line 1020

def initialize(policy, queues_mon = XThread::Monitor.new, queues_cv = queues_mon.new_cond)
  @policy = policy

  @queue_threshold = CONF.POOLQUEUE_POOL_THRESHOLD
  @queue_max = CONF.POSTQUEUE_MAX_TRANSFER_SIZE

  @push_queue = []
  @push_queue_mutex = Mutex.new
  
  @queues = []
  @queues_mon = queues_mon
  @queues_cv = queues_cv

  @pop_queue = nil
#      @pop_queue_mutex = Mutex.new
end

Instance Attribute Details

#fib_cvObject

Returns the value of attribute fib_cv.



1037
1038
1039
# File 'lib/fairy/share/port.rb', line 1037

def fib_cv
  @fib_cv
end

Instance Method Details

#popObject



1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
# File 'lib/fairy/share/port.rb', line 1068

def pop
#      @pop_queue.synchronize do
  while !@pop_queue || @pop_queue.empty?
  @queues_mon.synchronize do
    @queues_cv.wait_until{@pop_queue = @queues.shift}
  end
  end
  e = @pop_queue.shift
  @pop_queue = nil if @pop_queue.empty?
  e
end

#pop_allObject



1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
# File 'lib/fairy/share/port.rb', line 1080

def pop_all
#      @pop_queue.synchronize do
  while !@pop_queue
  @queues_mon.synchronize do
    @queues_cv.wait_until{@pop_queue = @queues.shift}
  end
  end
  buf, @pop_queue = @pop_queue, nil
  buf
#      end
end

#push(e) ⇒ Object



1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
# File 'lib/fairy/share/port.rb', line 1039

def push(e)
  @push_queue_mutex.synchronize do
  @push_queue.push e
  if @push_queue.size >= @queue_threshold || 
 e == :END_OF_STREAM || 
 e == Import::SET_NO_IMPORT
    @queues_mon.synchronize do
 @queues.push @push_queue 
 @push_queue = []
 @queues_cv.broadcast
    end
  end
  end
end

#push_all(buf) ⇒ Object



1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
# File 'lib/fairy/share/port.rb', line 1054

def push_all(buf)
  @push_queue_mutex.synchronize do
  @push_queue.concat buf
  if @push_queue.size > @queue_threshold || 
 @push_queue.last == :END_OF_STREAM
    @queues_mon.synchronize do
 @queues.push @push_queue
 @push_queue = []
 @queues_cv.broadcast
    end
  end
  end
end