Class: Fairy::CSegJoin

Inherits:
CIOFilter show all
Defined in:
lib/fairy/master/c-seg-join.rb

Defined Under Namespace

Classes: CPreSegJoinFilter, NoAllFilter

Instance Attribute Summary

Attributes inherited from CIOFilter

#input

Attributes included from CInputtable

#input

Instance Method Summary collapse

Methods inherited from CIOFilter

#node_class, #output=

Methods included from CInputtable

#inputtable?

Methods inherited from CFilter

#abort_create_node, #add_node, #assgin_number_of_nodes?, #bind_export, #break_create_node, #create_import, #create_node, #create_nodes, #def_job_pool_variable, #each_assigned_filter, #each_export_by, #each_node, #each_node_exist_only, #handle_exception, #input, #job_pool_dict, #job_pool_variable, #nodes, #number_of_nodes, #number_of_nodes=, #pool_dict, #postmapping_policy, #start_export, #start_watch_node_status, #update_status, watch_status, watch_status=, #watch_status?

Constructor Details

#initialize(controller, opts, others, block_source) ⇒ CSegJoin

DeepConnect.def_single_method_spec(self, “REF new(REF, VAL, VAL, REF)”)



17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/fairy/master/c-seg-join.rb', line 17

def initialize(controller, opts, others, block_source)
  super
  @others = others
  @block_source = block_source

  @exports = {}
  @others_status = {}
  @exports_mutex = Mutex.new
  @exports_cv = XThread::ConditionVariable.new

  @key_proc
  init_key_proc
end

Instance Method Details

#break_runningObject



121
122
123
124
# File 'lib/fairy/master/c-seg-join.rb', line 121

def break_running
  super
  @others.each{|other| Thread.start{other.break_running}}
end

#create_and_add_node(ntask, mapper, opts = {}) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/fairy/master/c-seg-join.rb', line 79

def create_and_add_node(ntask, mapper, opts={})
  node = create_node(ntask) {|node|
  if opts[:init_njob]
    opts[:init_njob].call(node)
  end
  mapper.bind_input(node)

  key = @key_proc.call(node)
  exps = nil
  @exports_mutex.synchronize do
    while !(exps = other_filter_of(key))
 @exports_cv.wait(@exports_mutex)
    end
  end
  node.join_inputs = exps
  exps.zip(node.join_imports) do |other, import|
    other.output = import
    import.no_import = 1
  end
  }
  node
end

#init_key_procObject



31
32
33
34
35
36
37
38
39
40
# File 'lib/fairy/master/c-seg-join.rb', line 31

def init_key_proc
  case join_by
  when :ORDER
  @key_proc = proc{|input| input.no}
  when :KEY
  @key_proc = proc{|input| input.key}
  else
  ERR::Raise ERR::UnrecoginizedOption, join_by
  end
end

#join_byObject



42
43
44
45
46
# File 'lib/fairy/master/c-seg-join.rb', line 42

def join_by
  by = @opts[:by]
  return :ORDER unless by
  by.to_s.upcase.intern
end

#njob_creation_paramsObject



52
53
54
# File 'lib/fairy/master/c-seg-join.rb', line 52

def njob_creation_params
  [@block_source]
end

#node_class_nameObject



48
49
50
# File 'lib/fairy/master/c-seg-join.rb', line 48

def node_class_name
  "PSegJoin"
end

#other_filter_of(key) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/fairy/master/c-seg-join.rb', line 104

def other_filter_of(key)
  begin
  @others.collect do |o| 
    unless exp = @exports[key][o]
 unless @other_status[o]
   raise NoAllFilter
 end
    end
    exp
  end
  rescue NoAllFilter
  return nil
  rescue
  return nil
  end
end

#start_create_nodesObject



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fairy/master/c-seg-join.rb', line 56

def start_create_nodes
  Log::debug self, "START_CREATE_NODES: #{self}"
  @others.each do |other|
  Thread.start do
    other.each_assigned_filter do |input_filter|
 @exports_mutex.synchronize do
   unless exps = @exports[@key_proc.call(input_filter)]
    exps = @exports[@key_proc.call(input_filter)] = {}
    @exports_cv.broadcast
   end
   exp = input_filter.start_export
   exps[other] = exp
 end
    end
    @exports_mutex.synchronize do
 @others_status[other] = true
 @exports_cv.broadcast
    end
  end
  end
  super
end