Module: Job::BonusFeatures

Defined in:
lib/merb_background/job/bonus_features.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



3
4
5
6
7
8
9
10
# File 'lib/merb_background/job/bonus_features.rb', line 3

def self.included(base)
  base.states += %w(stopping stopped)
  base.generate_state_helpers

  base.alias_method_chain :invoke_worker, :threads
  base.alias_method_chain :ensure_worker, :threads
  base.alias_method_chain :restart!,      :threads
end

Instance Method Details

#cleanup_after_threadsObject

Closes database connections left after finished threads.



85
86
87
# File 'lib/merb_background/job/bonus_features.rb', line 85

def cleanup_after_threads
  ActiveRecord::Base.verify_active_connections!
end

#elapsedObject



89
90
91
# File 'lib/merb_background/job/bonus_features.rb', line 89

def elapsed
  (updated_at.to_f - started_at.to_f).to_i if !pending?
end

#ensure_worker_with_threadsObject



23
24
25
26
# File 'lib/merb_background/job/bonus_features.rb', line 23

def ensure_worker_with_threads
  ensure_worker_without_threads
  cleanup_after_threads
end

#estimatedObject

seconds to go, based on estimated and progress



94
95
96
# File 'lib/merb_background/job/bonus_features.rb', line 94

def estimated
  ((elapsed * 100) / progress) - elapsed if running? && (1..99).include?(progress.to_i)
end

#invoke_worker_with_threadsObject



12
13
14
15
16
17
18
19
20
21
# File 'lib/merb_background/job/bonus_features.rb', line 12

def invoke_worker_with_threads
  monitor_worker
  
  res = catch(:stopping) do
    invoke_worker_without_threads; nil
  end
  
  self.reload
  self.state = res ? "stopped" : "finished"
end

#monitor_workerObject

Monitors the worker and updates the job progress. If the job’s status is changed to ‘stopping’, the worker is requested to stop.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/merb_background/job/bonus_features.rb', line 63

def monitor_worker
  Thread.new do
    while running? && !Job.find(id).stopping?
      current_progress = @worker.instance_variable_get("@progress")

      if current_progress == progress
        sleep 5
      else
        update_attribute(:progress, current_progress)
        sleep 1
      end
    end

    if Job.find(id).stopping?
      @worker.instance_variable_set("@stopping", true)
    end
  end
  
  logger.info("BackgroundFu: Job monitoring started. Job(id: #{id}).")
end

#restart_with_threads!Object

Overridden because of new “stopped” state.



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/merb_background/job/bonus_features.rb', line 49

def restart_with_threads!
  if stopped? || failed?
    update_attributes!(
      :result     => nil, 
      :progress   => nil, 
      :started_at => nil, 
      :state      => "pending"
    ) 
    logger.info("BackgroundFu: Restarting job. Job(id: #{id}).")
  end
end

#stop!Object

The record_progress() method becomes available when your worker class includes Background::WorkerMonitoring.

Every time worker invokes record_progress() is a possible stopping place.

How it works:

  1. invoke job.stop! to set a state (stopping) in a db

  2. Monitoring thread picks up the state change from db and sets @stopping to true in the worker.

  3. The worker invokes a register_progress() somewhere during execution.

  4. The record_progress() method throws :stopping symbol if @stopping == true

  5. The job catches the :stopping symbol and reacts upon it.

  6. The job is stopped in a merciful way. No one gets harmed.



41
42
43
44
45
46
# File 'lib/merb_background/job/bonus_features.rb', line 41

def stop!
  if running?
    update_attribute(:state, "stopping")
    logger.info("BackgroundFu: Stopping job. Job(id: #{id}).")
  end
end