Module: Job::BonusFeatures
- Defined in:
- lib/merb_background/job/bonus_features.rb
Class Method Summary collapse
Instance Method Summary collapse
-
#cleanup_after_threads ⇒ Object
Closes database connections left after finished threads.
- #elapsed ⇒ Object
- #ensure_worker_with_threads ⇒ Object
-
#estimated ⇒ Object
seconds to go, based on estimated and progress.
- #invoke_worker_with_threads ⇒ Object
-
#monitor_worker ⇒ Object
Monitors the worker and updates the job progress.
-
#restart_with_threads! ⇒ Object
Overridden because of new “stopped” state.
-
#stop! ⇒ Object
The record_progress() method becomes available when your worker class includes Background::WorkerMonitoring.
Class Method Details
.included(base) ⇒ Object
3 4 5 6 7 8 9 10 |
# File 'lib/merb_background/job/bonus_features.rb', line 3 def self.included(base) base.states += %w(stopping stopped) base.generate_state_helpers base.alias_method_chain :invoke_worker, :threads base.alias_method_chain :ensure_worker, :threads base.alias_method_chain :restart!, :threads end |
Instance Method Details
#cleanup_after_threads ⇒ Object
Closes database connections left after finished threads.
85 86 87 |
# File 'lib/merb_background/job/bonus_features.rb', line 85 def cleanup_after_threads ActiveRecord::Base.verify_active_connections! end |
#elapsed ⇒ Object
89 90 91 |
# File 'lib/merb_background/job/bonus_features.rb', line 89 def elapsed (updated_at.to_f - started_at.to_f).to_i if !pending? end |
#ensure_worker_with_threads ⇒ Object
23 24 25 26 |
# File 'lib/merb_background/job/bonus_features.rb', line 23 def ensure_worker_with_threads ensure_worker_without_threads cleanup_after_threads end |
#estimated ⇒ Object
seconds to go, based on estimated and progress
94 95 96 |
# File 'lib/merb_background/job/bonus_features.rb', line 94 def estimated ((elapsed * 100) / progress) - elapsed if running? && (1..99).include?(progress.to_i) end |
#invoke_worker_with_threads ⇒ Object
12 13 14 15 16 17 18 19 20 21 |
# File 'lib/merb_background/job/bonus_features.rb', line 12 def invoke_worker_with_threads monitor_worker res = catch(:stopping) do invoke_worker_without_threads; nil end self.reload self.state = res ? "stopped" : "finished" end |
#monitor_worker ⇒ Object
Monitors the worker and updates the job progress. If the job’s status is changed to ‘stopping’, the worker is requested to stop.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/merb_background/job/bonus_features.rb', line 63 def monitor_worker Thread.new do while running? && !Job.find(id).stopping? current_progress = @worker.instance_variable_get("@progress") if current_progress == progress sleep 5 else update_attribute(:progress, current_progress) sleep 1 end end if Job.find(id).stopping? @worker.instance_variable_set("@stopping", true) end end logger.info("BackgroundFu: Job monitoring started. Job(id: #{id}).") end |
#restart_with_threads! ⇒ Object
Overridden because of new “stopped” state.
49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/merb_background/job/bonus_features.rb', line 49 def restart_with_threads! if stopped? || failed? update_attributes!( :result => nil, :progress => nil, :started_at => nil, :state => "pending" ) logger.info("BackgroundFu: Restarting job. Job(id: #{id}).") end end |
#stop! ⇒ Object
The record_progress() method becomes available when your worker class includes Background::WorkerMonitoring.
Every time worker invokes record_progress() is a possible stopping place.
How it works:
-
invoke job.stop! to set a state (stopping) in a db
-
Monitoring thread picks up the state change from db and sets @stopping to true in the worker.
-
The worker invokes a register_progress() somewhere during execution.
-
The record_progress() method throws :stopping symbol if @stopping == true
-
The job catches the :stopping symbol and reacts upon it.
-
The job is stopped in a merciful way. No one gets harmed.
41 42 43 44 45 46 |
# File 'lib/merb_background/job/bonus_features.rb', line 41 def stop! if running? update_attribute(:state, "stopping") logger.info("BackgroundFu: Stopping job. Job(id: #{id}).") end end |