Class: Fairy::FileBufferdQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/share/port.rb

Instance Method Summary collapse

Constructor Details

#initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond) ⇒ FileBufferdQueue

Returns a new instance of FileBufferdQueue.



1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
# File 'lib/fairy/share/port.rb', line 1141

def initialize(policy, queue_mon = XThread::Monitor.new, queue_cv = queue_mon.new_cond)

  @policy = policy
  @threshold = policy[:threshold]
  @threshold ||= CONF.FILEBUFFEREDQUEUE_THRESHOLD

  @push_queue = []
  @pop_queue = @push_queue
  @buffers_queue = nil

  @queue_mon = queue_mon
  @queue_cv = queue_cv
end

Instance Method Details

#init_2ndmemoryObject



1211
1212
1213
1214
1215
1216
# File 'lib/fairy/share/port.rb', line 1211

def init_2ndmemory
  @buffer_dir = @policy[:buffer_dir]
  @buffer_dir ||= CONF.TMP_DIR

  @buffers_queue = []
end

#open_2ndmemory(&block) ⇒ Object



1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
# File 'lib/fairy/share/port.rb', line 1218

def open_2ndmemory(&block)
  unless @buffers_queue
  init_2ndmemory
  end
  buffer = FastTempfile.open("port-buffer-", @buffer_dir)
  begin
  yield buffer.io
  ensure
  buffer.close
  end
  @buffers_queue.push buffer
  buffer
end

#popObject



1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
# File 'lib/fairy/share/port.rb', line 1171

def pop
  @queue_mon.synchronize do
  while @pop_queue.empty?
    if @pop_queue.equal?(@push_queue)
 @queue_cv.wait
    elsif @buffers_queue.nil?
 @pop_queue = @push_queue
    elsif @buffers_queue.empty?
 @pop_queue = @push_queue
 @push_queue = []
 @buffers_queue = nil
    else
 @pop_queue = restore_2ndmemory
    end
  end
  @pop_queue.shift
  end
end

#pop_allObject



1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
# File 'lib/fairy/share/port.rb', line 1190

def pop_all
  @queue_mon.synchronize do
  while @pop_queue.empty?
    if @pop_queue.equal?(@push_queue)
 @queue_cv.wait
    elsif @buffers_queue.nil?
 @pop_queue = @push_queue
    elsif @buffers_queue.empty?
 @pop_queue = @push_queue
 @push_queue = []
 @buffers_queue = nil
    else
 @pop_queue = restore_2ndmemory
    end
  end
  pops = @pop_queue.dup
  @pop_queue.clear
  pops
  end
end

#push(e) ⇒ Object



1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
# File 'lib/fairy/share/port.rb', line 1155

def push(e)
  @queue_mon.synchronize do
  @push_queue.push e
  @queue_cv.broadcast

  if @push_queue.size >= @threshold
    if @push_queue.equal?(@pop_queue)
 @push_queue = []
    else
 store_2ndmemory(@push_queue)
 @push_queue = []
    end
  end
  end
end

#restore_2ndmemoryObject



1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
# File 'lib/fairy/share/port.rb', line 1244

def restore_2ndmemory
  buf = @buffers_queue.shift
  io = buf.open
#       queue = []
#       begin
#   loop do
#     queue.push Marshal.load(io)
#   end
#       rescue
#       end
  queue = Marshal.load(io)
  buf.close!
#      Log::info(self, "end restore")
  queue
end

#store_2ndmemory(ary) ⇒ Object



1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
# File 'lib/fairy/share/port.rb', line 1232

def store_2ndmemory(ary)
#      Log::info(self, "start store")
  open_2ndmemory do |io|
#   while !ary.empty?
#     e = ary.shift
#     Marshal.dump(e, io)
#   end
  Marshal.dump(ary, io)
  end
#      Log::info(self, "end store")
end