Class: Kthxbye::Worker
- Inherits:
-
Object
- Object
- Kthxbye::Worker
- Extended by:
- Helper
- Includes:
- Helper
- Defined in:
- lib/kthxbye/worker.rb
Overview
This is the workhorse loop of the gem. Does all the dequeuing and running of jobs. It mostly makes a bunch of calls to the job methods to run the job. It simply handles the spawning off of processes to run the job and retry if necessary.
Instance Attribute Summary collapse
-
#current_queue ⇒ Object
Returns the value of attribute current_queue.
-
#id ⇒ Object
(also: #to_s)
Returns a useful id with the hostname:pid:queues listing Same return as to_s.
-
#queues ⇒ Object
Returns the queues this worker is attached toin alphabetical order.
-
#sleep_for ⇒ Object
Returns the value of attribute sleep_for.
Class Method Summary collapse
-
.exists?(id) ⇒ Boolean
Checks if a worker is registered.
-
.find(worker) ⇒ Object
Allows us to find a worker so that we can look at some of its internal data later on.
-
.working_on(id) ⇒ Object
Gets the job a given worker is working on Returns a hash with the ‘job_id’ and the ‘started’ time.
Instance Method Summary collapse
-
#<=>(other) ⇒ Object
:nodoc:.
-
#==(other) ⇒ Object
:nodoc:.
-
#clean_workers ⇒ Object
This method cleans Redis of all workers that no longer exist that may have been left over from a previous dirty shutdown (GC).
-
#current_job ⇒ Object
Gets the current job this worker is working.
-
#done ⇒ Object
job complete actions.
-
#exists? ⇒ Boolean
Checks if this worker is registered.
-
#grab_job ⇒ Object
Reserves a job off the queue.
-
#hostname ⇒ Object
Returns the hostname of the machine this worker is running on.
-
#initialize(queues, sleep_for = 5) ⇒ Worker
constructor
Creates a worker for running jobs off of a given queue.
-
#inspect ⇒ Object
nice inspect for the worker with the same info as #id.
-
#kill_child ⇒ Object
:nodoc:.
-
#pid ⇒ Object
Returns the process id of this worker.
-
#register_signals ⇒ Object
thanks to github.com/defunkt/resque/blob/master/lib/resque/worker.rb for these signals.
-
#register_worker ⇒ Object
Adds this worker to the worker registry.
-
#run(&block) ⇒ Object
This is the major run loop.
-
#setup_queues(queues) ⇒ Object
:nodoc:.
-
#shutdown ⇒ Object
Shuts down the worker gracefully (once process has completed.
-
#shutdown! ⇒ Object
Hard kills the worker by killing the process.
-
#startup ⇒ Object
Run startup actions.
-
#unregister_worker ⇒ Object
Removes the worker from our worker registry.
-
#worker_pids ⇒ Object
Returns an array of string pids of all the other workers on this machine.
-
#working(job) ⇒ Object
Run when the job starts running.
-
#working? ⇒ Boolean
Is this job working?.
Methods included from Helper
Constructor Details
#initialize(queues, sleep_for = 5) ⇒ Worker
Creates a worker for running jobs off of a given queue. Takes a queue or queues (csv style, e.g. test,other,bad) or the woldcard (*) symbol to take in all queues in alphabetical order. Optionally takes in an interval param on how long it waits to check the queue for new jobs once it has exhausted the queue(s).
16 17 18 19 |
# File 'lib/kthxbye/worker.rb', line 16 def initialize(queues, sleep_for=5) setup_queues(queues) @sleep_for = sleep_for end |
Instance Attribute Details
#current_queue ⇒ Object
Returns the value of attribute current_queue.
9 10 11 |
# File 'lib/kthxbye/worker.rb', line 9 def current_queue @current_queue end |
#id ⇒ Object Also known as: to_s
Returns a useful id with the hostname:pid:queues listing Same return as to_s
250 251 252 |
# File 'lib/kthxbye/worker.rb', line 250 def id @id end |
#queues ⇒ Object
Returns the queues this worker is attached toin alphabetical order.
90 91 92 |
# File 'lib/kthxbye/worker.rb', line 90 def queues @queues end |
#sleep_for ⇒ Object
Returns the value of attribute sleep_for.
9 10 11 |
# File 'lib/kthxbye/worker.rb', line 9 def sleep_for @sleep_for end |
Class Method Details
.exists?(id) ⇒ Boolean
Checks if a worker is registered.
45 46 47 |
# File 'lib/kthxbye/worker.rb', line 45 def self.exists?(id) redis.sismember( :workers, id ) end |
.find(worker) ⇒ Object
Allows us to find a worker so that we can look at some of its internal data later on.
33 34 35 36 37 38 39 40 41 42 |
# File 'lib/kthxbye/worker.rb', line 33 def self.find(worker) if exists? worker qs = worker.split(':')[-1].split(",") new_worker = new(*qs) new_worker.id = worker return new_worker else nil end end |
.working_on(id) ⇒ Object
Gets the job a given worker is working on Returns a hash with the ‘job_id’ and the ‘started’ time
51 52 53 |
# File 'lib/kthxbye/worker.rb', line 51 def self.working_on(id) decode( redis.get( "worker:#{id}" ) ) end |
Instance Method Details
#<=>(other) ⇒ Object
:nodoc:
264 265 266 |
# File 'lib/kthxbye/worker.rb', line 264 def <=>(other) #:nodoc: to_s <=> other.to_s end |
#==(other) ⇒ Object
:nodoc:
260 261 262 |
# File 'lib/kthxbye/worker.rb', line 260 def ==(other) #:nodoc: to_s == other.to_s end |
#clean_workers ⇒ Object
This method cleans Redis of all workers that no longer exist that may have been left over from a previous dirty shutdown (GC)
103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/kthxbye/worker.rb', line 103 def clean_workers workers = Kthxbye.workers known = worker_pids workers.each do |worker| host,pid,queues = worker.id.split(":") next unless host == hostname next if known.include?(pid) log "Pruning unknown worker: #{worker}" worker.unregister_worker end end |
#current_job ⇒ Object
Gets the current job this worker is working.
136 137 138 139 140 |
# File 'lib/kthxbye/worker.rb', line 136 def current_job return @current_job if @current_job data = decode( redis.get("worker:#{self}") ) @current_job = Job.find( data['job_id'], @current_queue ) end |
#done ⇒ Object
job complete actions
161 162 163 164 165 166 167 |
# File 'lib/kthxbye/worker.rb', line 161 def done #:nodoc: redis.srem( :working, self ) redis.del( "worker:#{self}" ) log "Completed job #{@current_job}" redis.publish("job.completed", @current_job.id) @current_job = nil end |
#exists? ⇒ Boolean
Checks if this worker is registered.
226 227 228 |
# File 'lib/kthxbye/worker.rb', line 226 def exists? #:nodoc: redis.sismember( :workers, self ) end |
#grab_job ⇒ Object
Reserves a job off the queue
213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/kthxbye/worker.rb', line 213 def grab_job #:nodoc: job = nil @queues.each do |q| @current_queue = q log "Checking \"#{q}\" queue for jobs" job = Kthxbye.salvage(q) break unless job.nil? end return job || false end |
#hostname ⇒ Object
Returns the hostname of the machine this worker is running on
231 232 233 |
# File 'lib/kthxbye/worker.rb', line 231 def hostname @hostname ||= `hostname`.chomp end |
#inspect ⇒ Object
nice inspect for the worker with the same info as #id
256 257 258 |
# File 'lib/kthxbye/worker.rb', line 256 def inspect "#<Worker: #{@id}>" end |
#kill_child ⇒ Object
:nodoc:
200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/kthxbye/worker.rb', line 200 def kill_child #:nodoc: if @child log "Killing child at #{@child}" if system("ps -o pid,state -p #{@child}") Process.kill("KILL", @child) rescue nil else log "Child #{@child} not found, restarting." shutdown end end end |
#pid ⇒ Object
Returns the process id of this worker.
236 237 238 |
# File 'lib/kthxbye/worker.rb', line 236 def pid Process.pid end |
#register_signals ⇒ Object
thanks to github.com/defunkt/resque/blob/master/lib/resque/worker.rb for these signals
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/kthxbye/worker.rb', line 172 def register_signals #:nordoc: trap('TERM') { shutdown! } trap('INT') { shutdown! } begin trap('QUIT') { shutdown } trap('USR1') { shutdown } trap('USR2') { log "Paused"; @paused = true } trap('CONT') { log "Unpaused"; @paused = false } rescue ArgumentError warn "Signals QUIT, USR1, USR2, and/or CONT not supported." end log "Registered signals" end |
#register_worker ⇒ Object
Adds this worker to the worker registry
117 118 119 120 |
# File 'lib/kthxbye/worker.rb', line 117 def register_worker log "Registered worker #{self}" redis.sadd( :workers, self ) if !exists? end |
#run(&block) ⇒ Object
This is the major run loop. Workhorse of a worker… sort of. In the end, this loop simply runs the jobs in separate processes by forking out the process then waiting for it to return. we only process one. Can optionally take in a block to run after the job has run.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/kthxbye/worker.rb', line 59 def run(&block) log "Starting Kthxbye::Worker #{self}" startup loop do break if @terminate if !@paused and job = grab_job log "Found job #{job}" working(job) @child = fork { log "Forking..." result = job.perform yield job if block_given? exit! } Process.wait done else break if @sleep_for == 0 log "No jobs on #{@queues} - sleeping for #{@sleep_for}" sleep sleep_for.to_i end end ensure unregister_worker end |
#setup_queues(queues) ⇒ Object
:nodoc:
21 22 23 24 25 26 27 28 29 |
# File 'lib/kthxbye/worker.rb', line 21 def setup_queues(queues) # :nodoc: if queues == "*" @queues = Kthxbye.queues.sort elsif queues.include? ?, @queues = queues.split(",").compact else @queues = *queues end end |
#shutdown ⇒ Object
Shuts down the worker gracefully (once process has completed
189 190 191 192 |
# File 'lib/kthxbye/worker.rb', line 189 def shutdown #:nodoc: log "Shutting down worker #{self}" @terminate = true end |
#shutdown! ⇒ Object
Hard kills the worker by killing the process.
195 196 197 198 |
# File 'lib/kthxbye/worker.rb', line 195 def shutdown! #:nodoc: kill_child shutdown end |
#startup ⇒ Object
Run startup actions
95 96 97 98 99 |
# File 'lib/kthxbye/worker.rb', line 95 def startup #:nodoc: clean_workers register_worker register_signals end |
#unregister_worker ⇒ Object
Removes the worker from our worker registry
123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/kthxbye/worker.rb', line 123 def unregister_worker log "Unregistered worker #{self}" if working? log "Was active. Reporting and rerunning" Failure.create(current_job, ActiveWorkerKilled.new) current_job.rerun end redis.del "worker:#{self}" redis.srem :workers, self end |
#worker_pids ⇒ Object
Returns an array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.
242 243 244 245 246 |
# File 'lib/kthxbye/worker.rb', line 242 def worker_pids `ps -A -o pid,command | grep kthxbye`.split("\n").map do |line| line.split(' ')[0] end end |
#working(job) ⇒ Object
Run when the job starts running
143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/kthxbye/worker.rb', line 143 def working(job) #:nodoc: redis.sadd( :working, self ) data = encode( {:job_id => job.id, :started => Time.now.to_s} ) redis.set("worker:#{self}", data) @current_job = job # activates job job.active redis.publish("job.started", job.id) end |
#working? ⇒ Boolean
Is this job working?
156 157 158 |
# File 'lib/kthxbye/worker.rb', line 156 def working? redis.sismember( :working, self ) end |