Class: Fairy::FileMarshaledQueue
- Inherits:
-
Object
- Object
- Fairy::FileMarshaledQueue
- Defined in:
- lib/fairy/share/port-marshaled-queue.rb
Instance Method Summary collapse
-
#initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond) ⇒ FileMarshaledQueue
constructor
A new instance of FileMarshaledQueue.
- #open_2ndmemory(&block) ⇒ Object
- #pop ⇒ Object
- #pop_all ⇒ Object
- #pop_raw ⇒ Object
- #push(e) ⇒ Object
- #push_all(buf) ⇒ Object
- #push_raw(raw) ⇒ Object
- #restore_2ndmemory(buf) ⇒ Object
- #restore_raw_2ndmemory(buf) ⇒ Object
- #store_2ndmemory(ary) ⇒ Object
- #store_raw_2ndmemory(raw) ⇒ Object
Constructor Details
#initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond) ⇒ FileMarshaledQueue
Returns a new instance of FileMarshaledQueue.
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 184 def initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond) @policy = policy @chunk_size = CONF.MARSHAL_QUEUE_CHUNK_SIZE @min_chunk_no = @policy[:min_chunk_no] @min_chunk_no ||= CONF.MARSHAL_QUEUE_MIN_CHUNK_NO @push_queue = [] @push_queue_mutex = Mutex.new @buffers_queue = [] @buffers_queue_mon = queues_mon @buffers_queue_cv = queues_cv @pop_queue = nil @buffer_dir = @policy[:buffer_dir] @buffer_dir ||= CONF.TMP_DIR end |
Instance Method Details
#open_2ndmemory(&block) ⇒ Object
310 311 312 313 314 315 316 317 318 319 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 310 def open_2ndmemory(&block) buffer = FastTempfile.open("port-buffer-", @buffer_dir) begin yield buffer.io ensure buffer.close end @buffers_queue.push buffer buffer end |
#pop ⇒ Object
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 256 def pop while !@pop_queue || @pop_queue.empty? @buffers_queue_mon.synchronize do buf = nil @buffers_queue_cv.wait_until{buf = @buffers_queue.shift} if buf == :END_OF_STREAM @pop_queue = [buf] else @pop_queue = restore_2ndmemory(buf) end end end e = @pop_queue.shift @pop_queue = nil if @pop_queue.empty? e end |
#pop_all ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 274 def pop_all while !@pop_queue @buffers_queue_mon.synchronize do buf = nil @buffers_queue_cv.wait_until{buf = @buffers_queue.shift} if buf == :END_OF_STREAM @pop_queue = [buf] else @pop_queue = restore_2ndmemory(buf) end end end buf, @pop_queue = @pop_queue, nil buf end |
#pop_raw ⇒ Object
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 290 def pop_raw if @pop_queue && !@pop_queue.empty? ERR::Raise ERR::INTERNAL::MarshalQueueNotEmpty end pop_raw = nil while !pop_raw @buffers_queue_mon.synchronize do buf = nil @buffers_queue_cv.wait_until{buf = @buffers_queue.shift} if buf == :END_OF_STREAM pop_raw = buf else pop_raw = restore_raw_2ndmemory(buf) end end end pop_raw end |
#push(e) ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 204 def push(e) @push_queue_mutex.synchronize do @push_queue.push e if @push_queue.size >= @min_chunk_no || e == :END_OF_STREAM || e == Import::SET_NO_IMPORT @buffers_queue_mon.synchronize do @push_queue.pop if e == :END_OF_STREAM store_2ndmemory(@push_queue) @buffers_queue.push e if e == :END_OF_STREAM @push_queue = [] @buffers_queue_cv.broadcast end end end end |
#push_all(buf) ⇒ Object
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 222 def push_all(buf) @push_queue_mutex.synchronize do @push_queue.concat buf if @push_queue.size > @min_chunk_no || @push_queue.last == :END_OF_STREAM @buffers_queue_mon.synchronize do @push_queue.pop if e == :END_OF_STREAM store_2ndmemory(@push_queue) @buffers_queue.push e if e == :END_OF_STREAM @push_queue = [] @buffers_queue_cv.broadcast end end end end |
#push_raw(raw) ⇒ Object
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 239 def push_raw(raw) @push_queue_mutex.synchronize do @buffers_queue_mon.synchronize do unless @push_queue.empty? store_2ndmemory(@push_queue) @push_queue = [] end if raw == :END_OF_STREAM @buffers_queue.push raw else store_raw_2ndmemory(raw) end @buffers_queue_cv.broadcast end end end |
#restore_2ndmemory(buf) ⇒ Object
336 337 338 339 340 341 342 343 344 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 336 def restore_2ndmemory(buf) # log解析で使いたいときはデコメントする # Log::debug(self, "START M.RESTORE") io = buf.open queue = Marshal.load(io) buf.close! # Log::debug(self, "FINISH M.RESTORE") queue end |
#restore_raw_2ndmemory(buf) ⇒ Object
346 347 348 349 350 351 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 346 def restore_raw_2ndmemory(buf) io = buf.open raw = io.read buf.close! raw end |
#store_2ndmemory(ary) ⇒ Object
321 322 323 324 325 326 327 328 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 321 def store_2ndmemory(ary) open_2ndmemory do |io| # log解析で使いたいときはデコメントする # Log::debug(self, "START M.STORE") Marshal.dump(ary, io) # Log::debug(self, "FINISH M.STORE") end end |
#store_raw_2ndmemory(raw) ⇒ Object
330 331 332 333 334 |
# File 'lib/fairy/share/port-marshaled-queue.rb', line 330 def store_raw_2ndmemory(raw) open_2ndmemory do |io| io.write raw end end |