Class: Fairy::PGroupBy::SimpleFileByKeyBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/fairy/node/p-group-by.rb

Instance Method Summary collapse

Constructor Details

#initialize(njob, policy) ⇒ SimpleFileByKeyBuffer

Returns a new instance of SimpleFileByKeyBuffer.



216
217
218
219
220
221
222
223
224
225
226
# File 'lib/fairy/node/p-group-by.rb', line 216

def initialize(njob, policy)
  require "tempfile"

  @njob = njob
  @policy = policy

  @key_file = {}
  @key_file_mutex = Mutex.new
  @buffer_dir = policy[:buffer_dir]
  @buffer_dir ||= CONF.TMP_DIR
end

Instance Method Details

#each(&block) ⇒ Object



242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/fairy/node/p-group-by.rb', line 242

def each(&block)
  @key_file.each do |key, file|
    values = KeyValueStream.new(key, nil)
    file.rewind
    while !file.eof?
      values.push Marshal.load(file)
    end
    values.push_eos
#   file.close
    
    yield values
  end
end

#push(value) ⇒ Object



228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/fairy/node/p-group-by.rb', line 228

def push(value)
  key = @njob.hash_key(value)

  @key_file_mutex.synchronize do
    unless @key_file.key?(key)
      @key_file[key] = Tempfile.open("mod-group-by-buffer-#{@njob.no}-", @buffer_dir)
    end
  
    # ruby BUG#2390の対応のため.
    # Marshal.dump(value, @key_file[key])
    Marshal.dump(value, @key_file[key].instance_eval{@tmpfile})
  end
end