Class: Fairy::FileMarshaledQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/share/port-marshaled-queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond) ⇒ FileMarshaledQueue

Returns a new instance of FileMarshaledQueue.



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/fairy/share/port-marshaled-queue.rb', line 184

def initialize(policy, queues_mon = Monitor.new, queues_cv = queues_mon.new_cond)
  @policy = policy

  @chunk_size = CONF.MARSHAL_QUEUE_CHUNK_SIZE
  @min_chunk_no = @policy[:min_chunk_no]
  @min_chunk_no ||= CONF.MARSHAL_QUEUE_MIN_CHUNK_NO

  @push_queue = []
  @push_queue_mutex = Mutex.new
  
  @buffers_queue = []
  @buffers_queue_mon = queues_mon
  @buffers_queue_cv = queues_cv

  @pop_queue = nil

  @buffer_dir = @policy[:buffer_dir]
  @buffer_dir ||= CONF.TMP_DIR
end

Instance Method Details

#open_2ndmemory(&block) ⇒ Object



310
311
312
313
314
315
316
317
318
319
# File 'lib/fairy/share/port-marshaled-queue.rb', line 310

def open_2ndmemory(&block)
  buffer = FastTempfile.open("port-buffer-", @buffer_dir)
  begin
	yield buffer.io
  ensure
	buffer.close
  end
  @buffers_queue.push buffer
  buffer
end

#popObject



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/fairy/share/port-marshaled-queue.rb', line 256

def pop
  while !@pop_queue || @pop_queue.empty?
	@buffers_queue_mon.synchronize do
	  buf = nil
	  @buffers_queue_cv.wait_until{buf = @buffers_queue.shift}
	  
	  if buf == :END_OF_STREAM
 @pop_queue = [buf]
	  else
 @pop_queue = restore_2ndmemory(buf)
	  end
	end
  end
  e = @pop_queue.shift
  @pop_queue = nil if @pop_queue.empty?
  e
end

#pop_allObject



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/fairy/share/port-marshaled-queue.rb', line 274

def pop_all
  while !@pop_queue
	@buffers_queue_mon.synchronize do
	  buf = nil
	  @buffers_queue_cv.wait_until{buf = @buffers_queue.shift}
	  if buf == :END_OF_STREAM
 @pop_queue = [buf]
	  else
 @pop_queue = restore_2ndmemory(buf)
	  end
	end
  end
  buf, @pop_queue = @pop_queue, nil
  buf
end

#pop_rawObject



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/fairy/share/port-marshaled-queue.rb', line 290

def pop_raw
  if @pop_queue && !@pop_queue.empty?
	ERR::Raise ERR::INTERNAL::MarshalQueueNotEmpty
  end
  
  pop_raw = nil
  while !pop_raw
	@buffers_queue_mon.synchronize do
	  buf = nil
	  @buffers_queue_cv.wait_until{buf = @buffers_queue.shift}
	  if buf == :END_OF_STREAM
 pop_raw = buf
	  else
 pop_raw = restore_raw_2ndmemory(buf)
	  end
	end
  end
  pop_raw
end

#push(e) ⇒ Object



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/fairy/share/port-marshaled-queue.rb', line 204

def push(e)
  @push_queue_mutex.synchronize do
	@push_queue.push e
	if @push_queue.size >= @min_chunk_no || 
 e == :END_OF_STREAM || 
 e == Import::SET_NO_IMPORT
	  @buffers_queue_mon.synchronize do
 @push_queue.pop if e == :END_OF_STREAM
 store_2ndmemory(@push_queue)
 @buffers_queue.push e if e == :END_OF_STREAM

 @push_queue = []
 @buffers_queue_cv.broadcast
	  end
	end
  end
end

#push_all(buf) ⇒ Object



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/fairy/share/port-marshaled-queue.rb', line 222

def push_all(buf)
  @push_queue_mutex.synchronize do
	@push_queue.concat buf
	if @push_queue.size > @min_chunk_no || 
 @push_queue.last == :END_OF_STREAM
	  @buffers_queue_mon.synchronize do
 @push_queue.pop if e == :END_OF_STREAM
 store_2ndmemory(@push_queue)
 @buffers_queue.push e if e == :END_OF_STREAM

 @push_queue = []
 @buffers_queue_cv.broadcast
	  end
	end
  end
end

#push_raw(raw) ⇒ Object



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/fairy/share/port-marshaled-queue.rb', line 239

def push_raw(raw)
  @push_queue_mutex.synchronize do
	@buffers_queue_mon.synchronize do
	  unless @push_queue.empty?
 store_2ndmemory(@push_queue)
 @push_queue = []
	  end
	  if raw == :END_OF_STREAM
 @buffers_queue.push raw
	  else
 store_raw_2ndmemory(raw)
	  end
	  @buffers_queue_cv.broadcast
	end
  end
end

#restore_2ndmemory(buf) ⇒ Object



336
337
338
339
340
341
342
343
344
# File 'lib/fairy/share/port-marshaled-queue.rb', line 336

def restore_2ndmemory(buf)
# log解析で使いたいときはデコメントする
#      Log::debug(self, "START M.RESTORE")
  io = buf.open
  queue = Marshal.load(io)
  buf.close!
#      Log::debug(self, "FINISH M.RESTORE")
  queue
end

#restore_raw_2ndmemory(buf) ⇒ Object



346
347
348
349
350
351
# File 'lib/fairy/share/port-marshaled-queue.rb', line 346

def restore_raw_2ndmemory(buf)
  io = buf.open
  raw = io.read
  buf.close!
  raw
end

#store_2ndmemory(ary) ⇒ Object



321
322
323
324
325
326
327
328
# File 'lib/fairy/share/port-marshaled-queue.rb', line 321

def store_2ndmemory(ary)
  open_2ndmemory do |io|
# log解析で使いたいときはデコメントする
#	Log::debug(self, "START M.STORE")
	Marshal.dump(ary, io)
#	Log::debug(self, "FINISH M.STORE")
  end
end

#store_raw_2ndmemory(raw) ⇒ Object



330
331
332
333
334
# File 'lib/fairy/share/port-marshaled-queue.rb', line 330

def store_raw_2ndmemory(raw)
  open_2ndmemory do |io|
	io.write raw
  end
end