Class: Fairy::PDirectProduct::PPreFilter

Inherits:
Fairy::PIOFilter show all
Defined in:
lib/fairy/node/p-direct-product.rb

Constant Summary

Constants inherited from Fairy::PIOFilter

Fairy::PIOFilter::ST_WAIT_IMPORT

Constants inherited from Fairy::PFilter

Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT

Instance Attribute Summary

Attributes inherited from Fairy::PFilter

#IGNORE_EXCEPTION, #id, #log_id, #ntask

Instance Method Summary collapse

Methods inherited from Fairy::PFilter

#abort_running, #basic_start, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_watch_status, #status=, #terminate, #terminate_proc

Constructor Details

#initialize(id, ntask, bjob, opts, block_source) ⇒ PPreFilter

Returns a new instance of PPreFilter.



16
17
18
19
20
21
22
23
# File 'lib/fairy/node/p-direct-product.rb', line 16

def initialize(id, ntask, bjob, opts, block_source)
	super
	@block_source = block_source

	@exports = nil
	@exports_mutex = Mutex.new
	@exports_cv = XThread::ConditionVariable.new
end

Instance Method Details

#exportsObject



60
61
62
63
64
65
66
67
# File 'lib/fairy/node/p-direct-product.rb', line 60

def exports
	@exports_mutex.synchronize do
	  while @exports.nil?
	    @exports_cv.wait(@exports_mutex)
	  end
	end
	@exports
end

#input=(input) ⇒ Object



25
26
27
28
# File 'lib/fairy/node/p-direct-product.rb', line 25

def input=(input)
	super
	start_watch_exports
end

#start_exportObject



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fairy/node/p-direct-product.rb', line 69

def start_export
	start do
	  begin
	    @input.each do |e|
exports.each{|exp| exp.push e}
	    end
	  ensure
	    exports.each{|exp| exp.push :END_OF_STREAM}
	  end
 	end
end

#start_watch_exportsObject

def number_of_exports=(n) @exports_mutex.synchronize do @exports = [] n.timesExport.new end @exports_cv.broadcast

end


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/fairy/node/p-direct-product.rb', line 38

def start_watch_exports
	Thread.start do
	  n = @bjob.number_of_exports
	  @exports_mutex.synchronize do
	    @exports = []
	    n.times do

# 後で検討する
#	      policy = @opts[:postqueuing_policy]
#	      exp = Export.new(policy)
 exp = Export.new
 exp.njob_id = @id
 exp.no = @input.no
 exp.add_key(@input.key)
 exp.output_no_import = 1
 @exports.push exp
	    end
	  end
	  @exports_cv.broadcast
	end
end