Class: Fairy::FileBufferdQueue
- Inherits:
-
Object
- Object
- Fairy::FileBufferdQueue
- Defined in:
- lib/fairy/share/port.rb
Instance Method Summary collapse
- #init_2ndmemory ⇒ Object
-
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ FileBufferdQueue
constructor
A new instance of FileBufferdQueue.
- #open_2ndmemory(&block) ⇒ Object
- #pop ⇒ Object
- #pop_all ⇒ Object
- #push(e) ⇒ Object
- #restore_2ndmemory ⇒ Object
- #store_2ndmemory(ary) ⇒ Object
Constructor Details
#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ FileBufferdQueue
Returns a new instance of FileBufferdQueue.
1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 |
# File 'lib/fairy/share/port.rb', line 1141 def initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) @policy = policy @threshold = policy[:threshold] @threshold ||= CONF.FILEBUFFEREDQUEUE_THRESHOLD @push_queue = [] @pop_queue = @push_queue @buffers_queue = nil @queue_mon = queue_mon @queue_cv = queue_cv end |
Instance Method Details
#init_2ndmemory ⇒ Object
1211 1212 1213 1214 1215 1216 |
# File 'lib/fairy/share/port.rb', line 1211 def init_2ndmemory @buffer_dir = @policy[:buffer_dir] @buffer_dir ||= CONF.TMP_DIR @buffers_queue = [] end |
#open_2ndmemory(&block) ⇒ Object
1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 |
# File 'lib/fairy/share/port.rb', line 1218 def open_2ndmemory(&block) unless @buffers_queue init_2ndmemory end buffer = FastTempfile.open("port-buffer-", @buffer_dir) begin yield buffer.io ensure buffer.close end @buffers_queue.push buffer buffer end |
#pop ⇒ Object
1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 |
# File 'lib/fairy/share/port.rb', line 1171 def pop @queue_mon.synchronize do while @pop_queue.empty? if @pop_queue.equal?(@push_queue) @queue_cv.wait elsif @buffers_queue.nil? @pop_queue = @push_queue elsif @buffers_queue.empty? @pop_queue = @push_queue @push_queue = [] @buffers_queue = nil else @pop_queue = restore_2ndmemory end end @pop_queue.shift end end |
#pop_all ⇒ Object
1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 |
# File 'lib/fairy/share/port.rb', line 1190 def pop_all @queue_mon.synchronize do while @pop_queue.empty? if @pop_queue.equal?(@push_queue) @queue_cv.wait elsif @buffers_queue.nil? @pop_queue = @push_queue elsif @buffers_queue.empty? @pop_queue = @push_queue @push_queue = [] @buffers_queue = nil else @pop_queue = restore_2ndmemory end end pops = @pop_queue.dup @pop_queue.clear pops end end |
#push(e) ⇒ Object
1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 |
# File 'lib/fairy/share/port.rb', line 1155 def push(e) @queue_mon.synchronize do @push_queue.push e @queue_cv.broadcast if @push_queue.size >= @threshold if @push_queue.equal?(@pop_queue) @push_queue = [] else store_2ndmemory(@push_queue) @push_queue = [] end end end end |
#restore_2ndmemory ⇒ Object
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 |
# File 'lib/fairy/share/port.rb', line 1244 def restore_2ndmemory buf = @buffers_queue.shift io = buf.open # queue = [] # begin # loop do # queue.push Marshal.load(io) # end # rescue # end queue = Marshal.load(io) buf.close! # Log::info(self, "end restore") queue end |
#store_2ndmemory(ary) ⇒ Object
1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 |
# File 'lib/fairy/share/port.rb', line 1232 def store_2ndmemory(ary) # Log::info(self, "start store") open_2ndmemory do |io| # while !ary.empty? # e = ary.shift # Marshal.dump(e, io) # end Marshal.dump(ary, io) end # Log::info(self, "end store") end |