Module: MapreduceHelper

Defined in:
lib/skynet/mapreduce_helper.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



33
34
35
# File 'lib/skynet/mapreduce_helper.rb', line 33

def self.included(base)
  base.extend MapreduceHelper
end

Instance Method Details

#map(map_data_array) ⇒ Object

Takes an array of map_data, iterates over that array calling self.map_each(item) for each item in that array. Catches exceptions in each iteration and continues processing.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/skynet/mapreduce_helper.rb', line 39

def map(map_data_array)
  raise Skynet::Job::BadMapOrReduceError.new("#{self.class} has no self.map_each method.") unless self.respond_to?(:map_each)
  if map_data_array.is_a?(Array)
    results = []
    map_data_array.each do |data|
      begin
        results << map_each(data)
      rescue Exception => e
        error "ERROR IN #{self} [#{e.class} #{e.message}] #{e.backtrace.join("\n")}"
      end                
    end
    results
  else
    map_each(map_data_array)
  end
end

#reduce(reduce_partitioned_data_array) ⇒ Object

Takes an array of post reduce_partitioned data, iterates over that array calling self.reduce_each(item) for each item in that array. Catches exceptions in each iteration and continues processing.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/skynet/mapreduce_helper.rb', line 58

def reduce(reduce_partitioned_data_array)
  return reduce_partitioned_data_array unless self.respond_to?(:reduce_each)
  if reduce_partitioned_data_array.is_a?(Array)
    results = []
    reduce_partitioned_data_array.each do |data|
      begin
        results << reduce_each(data)
      rescue Exception => e
        error "ERROR IN #{self} [#{e.class} #{e.message}] #{e.backtrace.join("\n")}"
      end        
    end
    results
  else
    reduce_each(reduce_partitioned_data_array)
  end
end