Class: ActiveRecord::Mapreduce

Inherits:
Object
  • Object
show all
Defined in:
lib/skynet/active_record_extensions.rb

Constant Summary collapse

BATCH_SIZE =
1000
MAX_BATCHES_PER_JOB =
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Mapreduce

Returns a new instance of Mapreduce.



90
91
92
93
94
# File 'lib/skynet/active_record_extensions.rb', line 90

def initialize(options = {})
  @find_args = options[:find_args]
  @batch_size = options[:batch_size] || BATCH_SIZE
  @model_class = options[:model_class]
end

Instance Attribute Details

#batch_sizeObject

Returns the value of attribute batch_size.



84
85
86
# File 'lib/skynet/active_record_extensions.rb', line 84

def batch_size
  @batch_size
end

#find_argsObject

Returns the value of attribute find_args.



84
85
86
# File 'lib/skynet/active_record_extensions.rb', line 84

def find_args
  @find_args
end

#model_classObject

Returns the value of attribute model_class.



85
86
87
# File 'lib/skynet/active_record_extensions.rb', line 85

def model_class
  @model_class
end

Class Method Details

.find(*args) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
# File 'lib/skynet/active_record_extensions.rb', line 104

def self.find(*args)
  if not args.first.is_a?(Hash)
    args.shift
  end       
  if args.nil? or args.empty?
    args = {} 
  else
    args = *args
  end
  new(:find_args => args, :batch_size => args.delete(:batch_size), :model_class => args.delete(:model_class))
end

.logObject



242
243
244
# File 'lib/skynet/active_record_extensions.rb', line 242

def self.log
  Skynet::Logger.get
end

.map(datas) ⇒ Object



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/skynet/active_record_extensions.rb', line 246

def self.map(datas)
  return unless datas and not datas.empty?
  datas.each do |data|    
    next if (not data.is_a?(Array))
    next if data.empty?
    model_class = data[3].constantize
    table_name = model_class.table_name
    conditions = "#{table_name}.#{model_class.primary_key} >= #{data[0]}"
    conditions += " AND #{table_name}.#{model_class.primary_key} <= #{data[1]}" if data[1] > data[0]
    conditions = "(#{conditions})"
    # conditions = "ID BETWEEN #{data[0]} and #{data[1]}"
    if not data[2]
      data[2] = {:conditions => conditions}
    elsif data[2].is_a?(Hash) and data[2].empty?
      data[2] = {:conditions => conditions}
    elsif data[2].is_a?(Hash) and (not data[2][:conditions] or data[2][:conditions].empty?)
      data[2][:conditions] = conditions
    else
      data[2][:conditions] += " AND #{conditions}"
    end                             
    data[2][:select] = "#{table_name}.*"
    
    # log.error "GETTING #{data.pretty_print_inspect}"
    models = model_class.find(:all, data[2])
    # log.error "GOT MODELS: #{models.size}"
    models.each do |ar_object|
      begin
        if data[4].kind_of?(String)
          if ar_object.respond_to?(data[4].to_sym)
            # log.error "CALLING #{data[4]} on #{ar_object.class}:#{ar_object.id}"
            ar_object.send(data[4].to_sym, *data[5])
          else
            begin
              data[4].constantize.each(ar_object)
            rescue NameError
              raise NameError.new("#{data[4]} is not a class or an instance method in #{model_class}")
            end
          end
        else
          data[4].call(ar_object)
        end
      rescue Exception => e
        if data[4].kind_of?(String)
          log.error("Error in #{data[4]} #{e.inspect} #{e.backtrace.join("\n")}")              
        else
          log.error("Error in #{self} with given block #{e.inspect} #{e.backtrace.join("\n")}")
        end
      end
    end
  end    
  nil
end

.model_class(model_class) ⇒ Object



236
237
238
239
240
# File 'lib/skynet/active_record_extensions.rb', line 236

def self.model_class(model_class)
  (class << self; self; end).module_eval do
     define_method(:model_class) {model_class}
  end      
end

Instance Method Details

#chunk_query(opts = {}) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/skynet/active_record_extensions.rb', line 153

def chunk_query(opts={})

  conditions = "#{table_name}.#{primary_key} > #{opts[:id]} AND ((@t1:=(@t1+1) % #{batch_size})=0)"
  opts = opts.clone                                                               
  if opts[:conditions].nil? or opts[:conditions].empty?
    opts[:conditions] = conditions
  else
    opts[:conditions] += " AND " unless opts[:conditions].empty?
    opts[:conditions] += conditions
  end                
  limit = opts[:limit] ? "LIMIT #{opts[:limit]}" : nil
  # select @t2:=(@t2+1), @t3:=@t4, @t4:=id from profiles where ( ((@t1:=(@t1+1) % 1000)=0) or (((@t1+1) % 1000)=0) )  order by id LIMIT 100;

  # BEST
  # select @t1:=0, @t2:=0, @t3:=0, @t4:=0;
    # select @t2:=(@t2+1) as cnt, ((@t3:=@t4)+1) as first, @t4:=id as last from profiles where ((@t1:=(@t1+1) % 1000)=0) order by id LIMIT 100;
    # select (@t2:=(@t2+1) % 2) as evenodd, ((@t3:=@t4)+1) as first, @t4:=id as last from profiles where ((@t1:=(@t1+1) % 1000)=0) order by id LIMIT 100;

  execute('select @t1:=0, @t2:=-1, @t3:=0, @t4:=0')
  sql = "select @t2:=(@t2+1) as cnt, ((@t3:=@t4)+1) as first, @t4:=#{table_name}.#{primary_key} as last from #{table_name} #{opts[:joins]} where #{opts[:conditions]} ORDER BY #{table_name}.#{primary_key} #{limit}"
  # log.error "SQL #{sql}"
  select_all(sql)

  # mc.connection.select_values(mc.send(:construct_finder_sql, :select => "#{mc.table_name}.id", :joins => opts[:joins], :conditions => conditions, :limit => opts[:limit], :order => :id))
end

#eachObject



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/skynet/active_record_extensions.rb', line 229

def map(klass_or_method=nil, *arguments, &block)    
  klass_or_method ||= model_class
  log = Skynet::Logger.get

  batches = []                    
  each_range(find_args) do |ids,ii|
    batch_item = [
        ids['first'].to_i, 
        ids['last'].to_i, 
        find_args.clone,
        model_class
    ]
    if block_given?          
      batch_item << block
    else
      batch_item << "#{klass_or_method}"
      batch_item << arguments
    end
    batches << batch_item
    if batches.size >= MAX_BATCHES_PER_JOB
      log.error "MAX BATCH SIZE EXCEEDED RUNNING: #{batches.size}"
      run_job_for_batch(batches)
      batches = []    
    end
  end
  run_job_for_batch(batches)      
end

#each_range(opts = {}) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/skynet/active_record_extensions.rb', line 120

def each_range(opts={})
  opts = opts.clone
  opts[:id] || opts[:id] = 0
  count = model_klass.count(:all,:conditions => opts[:conditions], :joins => opts[:joins])
  if count <= batch_size
    return yield({"first" => 0, "last" => nil, "cnt" => 0}, 0)
  end
  
  rows = chunk_query(opts)            
  # log.error "ROWS, #{rows.pretty_print_inspect}"
  
  
  ii = 0                        
  if rows.empty?
    rows = [{"first" => 0, "last" => nil, "cnt" => ii}]
  end
  last_row = nil
  while rows.any?
    rows.each do |record| 
      last_row = record
      yield record, ii          
    end
    ii +=1                                                  
    return if last_row["last"].nil?
    rows = chunk_query(opts.merge(:id => rows.last["last"])) 
  end
  
  if last_row["last"] and (last_row["last"].to_i - last_row["first"].to_i) >= batch_size 
    catchall_row = {"first" => last_row["last"].to_i+1, "last" => nil, "cnt" => ii}
    yield catchall_row, ii
  end
end

#logObject



116
117
118
# File 'lib/skynet/active_record_extensions.rb', line 116

def log
  Skynet::Logger.get
end

#map(klass_or_method = nil, *arguments, &block) ⇒ Object



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/skynet/active_record_extensions.rb', line 201

def map(klass_or_method=nil, *arguments, &block)    
  klass_or_method ||= model_class
  log = Skynet::Logger.get

  batches = []                    
  each_range(find_args) do |ids,ii|
    batch_item = [
        ids['first'].to_i, 
        ids['last'].to_i, 
        find_args.clone,
        model_class
    ]
    if block_given?          
      batch_item << block
    else
      batch_item << "#{klass_or_method}"
      batch_item << arguments
    end
    batches << batch_item
    if batches.size >= MAX_BATCHES_PER_JOB
      log.error "MAX BATCH SIZE EXCEEDED RUNNING: #{batches.size}"
      run_job_for_batch(batches)
      batches = []    
    end
  end
  run_job_for_batch(batches)      
end

#mapreduceObject



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/skynet/active_record_extensions.rb', line 230

def map(klass_or_method=nil, *arguments, &block)    
  klass_or_method ||= model_class
  log = Skynet::Logger.get

  batches = []                    
  each_range(find_args) do |ids,ii|
    batch_item = [
        ids['first'].to_i, 
        ids['last'].to_i, 
        find_args.clone,
        model_class
    ]
    if block_given?          
      batch_item << block
    else
      batch_item << "#{klass_or_method}"
      batch_item << arguments
    end
    batches << batch_item
    if batches.size >= MAX_BATCHES_PER_JOB
      log.error "MAX BATCH SIZE EXCEEDED RUNNING: #{batches.size}"
      run_job_for_batch(batches)
      batches = []    
    end
  end
  run_job_for_batch(batches)      
end

#model_klassObject



96
97
98
# File 'lib/skynet/active_record_extensions.rb', line 96

def model_klass
  @model_klass ||= model_class.constantize
end

#run_job_for_batch(batches, &block) ⇒ Object



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/skynet/active_record_extensions.rb', line 179

def run_job_for_batch(batches,&block)
  jobopts = {
    :mappers               => 20000,
    :map_data              => batches,
    :name                  => "each #{model_class} MASTER",
    :map_name              => "each #{model_class} MAP",
    :map_timeout           => 2.minutes,
    :master_timeout        => 12.hours,
    :master_result_timeout => 2.minutes,        
    :master_retry          => 0,
    :map_retry             => 0
  }   

  job = nil
  if block_given?
    job = Skynet::Job.new(jobopts.merge(:map => block), :local_master => true)
  else
    job = Skynet::AsyncJob.new(jobopts.merge(:map_reduce_class => "#{self.class}"))
  end         
  job.run
end