Class: Fairy::CSegJoin
Defined Under Namespace
Classes: CPreSegJoinFilter, NoAllFilter
Instance Attribute Summary
Attributes inherited from CIOFilter
#input
Attributes included from CInputtable
#input
Instance Method Summary
collapse
Methods inherited from CIOFilter
#node_class, #output=
#inputtable?
Methods inherited from CFilter
#abort_create_node, #add_node, #assgin_number_of_nodes?, #bind_export, #break_create_node, #create_import, #create_node, #create_nodes, #def_job_pool_variable, #each_assigned_filter, #each_export_by, #each_node, #each_node_exist_only, #handle_exception, #input, #job_pool_dict, #job_pool_variable, #nodes, #number_of_nodes, #number_of_nodes=, #pool_dict, #postmapping_policy, #start_export, #start_watch_node_status, #update_status, watch_status, watch_status=, #watch_status?
Constructor Details
#initialize(controller, opts, others, block_source) ⇒ CSegJoin
DeepConnect.def_single_method_spec(self, “REF new(REF, VAL, VAL, REF)”)
17
18
19
20
21
22
23
24
25
26
27
28
29
|
# File 'lib/fairy/master/c-seg-join.rb', line 17
def initialize(controller, opts, others, block_source)
super
@others = others
@block_source = block_source
@exports = {}
@others_status = {}
@exports_mutex = Mutex.new
@exports_cv = XThread::ConditionVariable.new
@key_proc
init_key_proc
end
|
Instance Method Details
#break_running ⇒ Object
121
122
123
124
|
# File 'lib/fairy/master/c-seg-join.rb', line 121
def break_running
super
@others.each{|other| Thread.start{other.break_running}}
end
|
#create_and_add_node(ntask, mapper, opts = {}) ⇒ Object
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/fairy/master/c-seg-join.rb', line 79
def create_and_add_node(ntask, mapper, opts={})
node = create_node(ntask) {|node|
if opts[:init_njob]
opts[:init_njob].call(node)
end
mapper.bind_input(node)
key = @key_proc.call(node)
exps = nil
@exports_mutex.synchronize do
while !(exps = other_filter_of(key))
@exports_cv.wait(@exports_mutex)
end
end
node.join_inputs = exps
exps.zip(node.join_imports) do |other, import|
other.output = import
import.no_import = 1
end
}
node
end
|
#init_key_proc ⇒ Object
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/fairy/master/c-seg-join.rb', line 31
def init_key_proc
case join_by
when :ORDER
@key_proc = proc{|input| input.no}
when :KEY
@key_proc = proc{|input| input.key}
else
ERR::Raise ERR::UnrecoginizedOption, join_by
end
end
|
#join_by ⇒ Object
42
43
44
45
46
|
# File 'lib/fairy/master/c-seg-join.rb', line 42
def join_by
by = @opts[:by]
return :ORDER unless by
by.to_s.upcase.intern
end
|
#njob_creation_params ⇒ Object
52
53
54
|
# File 'lib/fairy/master/c-seg-join.rb', line 52
def njob_creation_params
[@block_source]
end
|
#node_class_name ⇒ Object
48
49
50
|
# File 'lib/fairy/master/c-seg-join.rb', line 48
def node_class_name
"PSegJoin"
end
|
#other_filter_of(key) ⇒ Object
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# File 'lib/fairy/master/c-seg-join.rb', line 104
def other_filter_of(key)
begin
@others.collect do |o|
unless exp = @exports[key][o]
unless @other_status[o]
raise NoAllFilter
end
end
exp
end
rescue NoAllFilter
return nil
rescue
return nil
end
end
|
#start_create_nodes ⇒ Object
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/fairy/master/c-seg-join.rb', line 56
def start_create_nodes
Log::debug self, "START_CREATE_NODES: #{self}"
@others.each do |other|
Thread.start do
other.each_assigned_filter do |input_filter|
@exports_mutex.synchronize do
unless exps = @exports[@key_proc.call(input_filter)]
exps = @exports[@key_proc.call(input_filter)] = {}
@exports_cv.broadcast
end
exp = input_filter.start_export
exps[other] = exp
end
end
@exports_mutex.synchronize do
@others_status[other] = true
@exports_cv.broadcast
end
end
end
super
end
|