Class: Fairy::PGroupBy::ExtMergeSortBuffer

Inherits:
MergeSortBuffer show all
Defined in:
lib/fairy/node/p-group-by.rb

Instance Attribute Summary

Attributes inherited from OnMemoryBuffer

#log_id

Instance Method Summary collapse

Methods inherited from MergeSortBuffer

#store_2ndmemory

Methods inherited from CommandMergeSortBuffer

#each, #init_2ndmemory, #initialize, #open_buffer, #push, #store_2ndmemory

Methods inherited from OnMemoryBuffer

#each, #initialize, #push

Constructor Details

This class inherits a constructor from Fairy::PGroupBy::CommandMergeSortBuffer

Instance Method Details

#each_2ndmemory(&block) ⇒ Object



525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
# File 'lib/fairy/node/p-group-by.rb', line 525

def each_2ndmemory(&block)
	require "deep-connect/deep-fork"
	
	unless @key_values.empty?
	  store_2ndmemory(@key_values)
	end

	Log::debug(self, @buffers.collect{|b| b.path}.join(" "))

	df = DeepConnect::DeepFork.fork(@njob.processor.deepconnect){|dc, ds|
	  $0 = "fairy processor sorter"

	  dc.export("Sorter", self)

	  finish_wait
#	  ds.close
#	  dc.stop
	  sleep 1
	}
	sorter = df.peer_deep_space.import("Sorter", true)
	sorter.sub_each {|key, values|
#	sorter.sub_each {|bigstr|
# 	  values = bigstr.split("\t").collect{|e| 
# 	    e.gsub(/(\\t|\\\\)/){|v| v == "\\t" ? "\t" : "\\"}
# 	  }
# 	  key = values.shift
	  block.call values
	  nil  # referenceが戻らないようにしている
	}
	sorter.finish
#	df.peer_deep_space.close
	@buffers.each{|buf| buf.close!}
	Process.waitpid(df.peer_pid)
end

#finishObject



604
605
606
# File 'lib/fairy/node/p-group-by.rb', line 604

def finish
	@cv.signal
end

#finish_waitObject

DeepConnect.def_method_spec(self, “REF sub_each()DVAL”)



596
597
598
599
600
601
602
# File 'lib/fairy/node/p-group-by.rb', line 596

def finish_wait
	@mx = Mutex.new
	@cv = XThread::ConditionVariable.new
	@mx.synchronize do
	  @cv.wait(@mx)
	end
end

#sub_each(&block) ⇒ Object



560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
# File 'lib/fairy/node/p-group-by.rb', line 560

def sub_each(&block)
	bufs = @buffers.collect{|buf|
	  buf.open
	  kv = read_line(buf.io)
	  [kv, buf]
	}.select{|kv, buf| !kv.nil?}.sort_by{|kv, buf| kv[0]}
	
	key = nil
	values = []
	while buf_min = bufs.shift
	  kv, buf = buf_min

	  if key == kv[0]
	    values.concat kv[1]
	  else
	    yield key, values unless values.empty?
	    key = kv[0]
	    values = kv[1]
	  end

	  next unless line = read_line(buf.io)
	  idx = bufs.rindex{|kv, b| kv[0] <= line[0]}
	  idx ? bufs.insert(idx+1, [line, buf]) : bufs.unshift([line, buf])
	end
	unless values.empty?
	  yield values
# 	  values.unshift key
# 	  bigstr = values.collect{|e| 
# 	    e.gsub(/[\\\t]/){|v| v == "\t" ? "\\t" : '\\\\'}
# 	  }.join("\t")
# 	  yield bigstr
	end
	nil  # referenceが戻らないようにしている
end