Class: MerbBackground::Job

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
DataMapper::ActiveRecordMethods, DataMapper::Resource
Defined in:
lib/merb_background/job.rb,
lib/merb_background/job/ar.rb,
lib/merb_background/job/dm.rb

Class Method Summary collapse

Instance Method Summary collapse

Methods included from DataMapper::ActiveRecordMethods

included

Class Method Details

.cleanup_finished_jobsObject

Delete finished jobs that are more than a week old.



97
98
99
100
101
102
# File 'lib/merb_background/job.rb', line 97

def self.cleanup_finished_jobs
  logger.info "MerbBackground: Cleaning up finished jobs."

  # TODO: AR compats
  Job.finished.all(:updated_at.lt => 1.week.ago).destroy!
end

.enqueue!(worker_class, worker_method, *args) ⇒ Object

attr_readonly :worker_class, :worker_method, :args



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/merb_background/job.rb', line 14

def self.enqueue!(worker_class, worker_method, *args)
  job = create!(
    :worker_class  => worker_class.to_s,
    :worker_method => worker_method.to_s,
    :args          => args
  )

  logger.info("MerbBackground: Job enqueued. Job(id: #{job.id}, worker: #{worker_class}, method: #{worker_method}, argc: #{args.size}).")
  
  job
end

.execute!(config = nil) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/merb_background/job.rb', line 26

def self.execute!(config = nil)
  config = MerbBackground::Config.merge(config || {})
  cleanup_finished_jobs if config[:cleanup_interval] == :at_start

  config.each do |k,v| 
    logger.info "MerbBackground: #{k}: #{v}"
  end

  loop do
    if job = pending.ready.first(:order => [:priority.desc, :start_at.asc])
      job.get_done!
    else
      logger.info("MerbBackground: Waiting for jobs...")
      sleep config[:monitor_interval]
    end
    cleanup_finished_jobs if config[:cleanup_interval] == :continuous
  end
end

.generate_state_helpersObject



104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/merb_background/job.rb', line 104

def self.generate_state_helpers
  states.each do |state_name|
    define_method("#{state_name}?") do
      state == state_name
    end

    # Job.running => array of running jobs, etc.
    self.class.send(:define_method, state_name) do
      all(:state => state_name, :order => [:id.desc])
    end
  end
end

.readyObject



30
31
32
# File 'lib/merb_background/job/dm.rb', line 30

def self.ready
  all(:start_at.lte => Time.now)
end

Instance Method Details

#ensure_workerObject



87
88
89
90
91
92
93
94
# File 'lib/merb_background/job.rb', line 87

def ensure_worker
  self.progress = @worker.instance_variable_get("@progress")
  save!
rescue StaleObjectError
  # Ignore this exception as its only purpose is
  # not allowing multiple daemons execute the same job.
  logger.info("MerbBackground: Race condition handled (It's OK). Job(id: #{id}).")
end

#get_done!Object

Invoked by a background daemon.



47
48
49
50
51
52
53
54
# File 'lib/merb_background/job.rb', line 47

def get_done!
  initialize_worker
  invoke_worker
rescue Exception => e
  rescue_worker(e)
ensure
  ensure_worker
end

#initialize_workerObject



69
70
71
72
73
# File 'lib/merb_background/job.rb', line 69

def initialize_worker
  update_attributes!(:started_at => Time.now, :state => "running")
  @worker = worker_class.constantize.new
  logger.info("MerbBackground: Job initialized. Job(id: #{id}).")
end

#invoke_workerObject



75
76
77
78
79
# File 'lib/merb_background/job.rb', line 75

def invoke_worker
  self.result = @worker.send(worker_method, *args)
  self.state  = "finished"
  logger.info("MerbBackground: Job finished. Job(id: #{id}).")
end

#rescue_worker(exception) ⇒ Object



81
82
83
84
85
# File 'lib/merb_background/job.rb', line 81

def rescue_worker(exception)
  self.result = [exception.message, exception.backtrace.join("\n")].join("\n\n")
  self.state  = "failed"
  logger.info("MerbBackground: Job failed. Job(id: #{id}).")
end

#restart!Object

Restart a failed job.



57
58
59
60
61
62
63
64
65
66
67
# File 'lib/merb_background/job.rb', line 57

def restart!
  if failed? 
    update_attributes!(
      :result     => nil, 
      :progress   => nil, 
      :started_at => nil, 
      :state      => "pending"
    )
    logger.info("MerbBackground: Job restarted. Job(id: #{id}).")
  end
end

#setup_priorityObject

Default priority is 0. Jobs will be executed in descending priority order (negative priorities allowed).



125
126
127
128
129
# File 'lib/merb_background/job.rb', line 125

def setup_priority
  return unless priority.blank?
  
  self.priority = 0
end

#setup_start_atObject

Job will be executed after this timestamp.



132
133
134
135
136
# File 'lib/merb_background/job.rb', line 132

def setup_start_at
  return unless start_at.blank?
  
  self.start_at = Time.now
end

#setup_stateObject



118
119
120
121
122
# File 'lib/merb_background/job.rb', line 118

def setup_state
  return unless state.blank?

  self.state = "pending" 
end