Class: MerbBackground::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- MerbBackground::Job
- Includes:
- DataMapper::ActiveRecordMethods, DataMapper::Resource
- Defined in:
- lib/merb_background/job.rb,
lib/merb_background/job/ar.rb,
lib/merb_background/job/dm.rb
Class Method Summary collapse
-
.cleanup_finished_jobs ⇒ Object
Delete finished jobs that are more than a week old.
-
.enqueue!(worker_class, worker_method, *args) ⇒ Object
attr_readonly :worker_class, :worker_method, :args.
- .execute!(config = nil) ⇒ Object
- .generate_state_helpers ⇒ Object
- .ready ⇒ Object
Instance Method Summary collapse
- #ensure_worker ⇒ Object
-
#get_done! ⇒ Object
Invoked by a background daemon.
- #initialize_worker ⇒ Object
- #invoke_worker ⇒ Object
- #rescue_worker(exception) ⇒ Object
-
#restart! ⇒ Object
Restart a failed job.
-
#setup_priority ⇒ Object
Default priority is 0.
-
#setup_start_at ⇒ Object
Job will be executed after this timestamp.
- #setup_state ⇒ Object
Methods included from DataMapper::ActiveRecordMethods
Class Method Details
.cleanup_finished_jobs ⇒ Object
Delete finished jobs that are more than a week old.
97 98 99 100 101 102 |
# File 'lib/merb_background/job.rb', line 97 def self.cleanup_finished_jobs logger.info "MerbBackground: Cleaning up finished jobs." # TODO: AR compats Job.finished.all(:updated_at.lt => 1.week.ago).destroy! end |
.enqueue!(worker_class, worker_method, *args) ⇒ Object
attr_readonly :worker_class, :worker_method, :args
14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/merb_background/job.rb', line 14 def self.enqueue!(worker_class, worker_method, *args) job = create!( :worker_class => worker_class.to_s, :worker_method => worker_method.to_s, :args => args ) logger.info("MerbBackground: Job enqueued. Job(id: #{job.id}, worker: #{worker_class}, method: #{worker_method}, argc: #{args.size}).") job end |
.execute!(config = nil) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/merb_background/job.rb', line 26 def self.execute!(config = nil) config = MerbBackground::Config.merge(config || {}) cleanup_finished_jobs if config[:cleanup_interval] == :at_start config.each do |k,v| logger.info "MerbBackground: #{k}: #{v}" end loop do if job = pending.ready.first(:order => [:priority.desc, :start_at.asc]) job.get_done! else logger.info("MerbBackground: Waiting for jobs...") sleep config[:monitor_interval] end cleanup_finished_jobs if config[:cleanup_interval] == :continuous end end |
.generate_state_helpers ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/merb_background/job.rb', line 104 def self.generate_state_helpers states.each do |state_name| define_method("#{state_name}?") do state == state_name end # Job.running => array of running jobs, etc. self.class.send(:define_method, state_name) do all(:state => state_name, :order => [:id.desc]) end end end |
.ready ⇒ Object
30 31 32 |
# File 'lib/merb_background/job/dm.rb', line 30 def self.ready all(:start_at.lte => Time.now) end |
Instance Method Details
#ensure_worker ⇒ Object
87 88 89 90 91 92 93 94 |
# File 'lib/merb_background/job.rb', line 87 def ensure_worker self.progress = @worker.instance_variable_get("@progress") save! rescue StaleObjectError # Ignore this exception as its only purpose is # not allowing multiple daemons execute the same job. logger.info("MerbBackground: Race condition handled (It's OK). Job(id: #{id}).") end |
#get_done! ⇒ Object
Invoked by a background daemon.
47 48 49 50 51 52 53 54 |
# File 'lib/merb_background/job.rb', line 47 def get_done! initialize_worker invoke_worker rescue Exception => e rescue_worker(e) ensure ensure_worker end |
#initialize_worker ⇒ Object
69 70 71 72 73 |
# File 'lib/merb_background/job.rb', line 69 def initialize_worker update_attributes!(:started_at => Time.now, :state => "running") @worker = worker_class.constantize.new logger.info("MerbBackground: Job initialized. Job(id: #{id}).") end |
#invoke_worker ⇒ Object
75 76 77 78 79 |
# File 'lib/merb_background/job.rb', line 75 def invoke_worker self.result = @worker.send(worker_method, *args) self.state = "finished" logger.info("MerbBackground: Job finished. Job(id: #{id}).") end |
#rescue_worker(exception) ⇒ Object
81 82 83 84 85 |
# File 'lib/merb_background/job.rb', line 81 def rescue_worker(exception) self.result = [exception., exception.backtrace.join("\n")].join("\n\n") self.state = "failed" logger.info("MerbBackground: Job failed. Job(id: #{id}).") end |
#restart! ⇒ Object
Restart a failed job.
57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/merb_background/job.rb', line 57 def restart! if failed? update_attributes!( :result => nil, :progress => nil, :started_at => nil, :state => "pending" ) logger.info("MerbBackground: Job restarted. Job(id: #{id}).") end end |
#setup_priority ⇒ Object
Default priority is 0. Jobs will be executed in descending priority order (negative priorities allowed).
125 126 127 128 129 |
# File 'lib/merb_background/job.rb', line 125 def setup_priority return unless priority.blank? self.priority = 0 end |
#setup_start_at ⇒ Object
Job will be executed after this timestamp.
132 133 134 135 136 |
# File 'lib/merb_background/job.rb', line 132 def setup_start_at return unless start_at.blank? self.start_at = Time.now end |
#setup_state ⇒ Object
118 119 120 121 122 |
# File 'lib/merb_background/job.rb', line 118 def setup_state return unless state.blank? self.state = "pending" end |