Class: Kthxbye::Worker

Inherits:
Object
  • Object
show all
Extended by:
Helper
Includes:
Helper
Defined in:
lib/kthxbye/worker.rb

Overview

This is the workhorse loop of the gem. Does all the dequeuing and running of jobs. It mostly makes a bunch of calls to the job methods to run the job. It simply handles the spawning off of processes to run the job and retry if necessary.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helper

decode, encode, log, redis

Constructor Details

#initialize(queues, sleep_for = 5) ⇒ Worker

Creates a worker for running jobs off of a given queue. Takes a queue or queues (csv style, e.g. test,other,bad) or the woldcard (*) symbol to take in all queues in alphabetical order. Optionally takes in an interval param on how long it waits to check the queue for new jobs once it has exhausted the queue(s).



16
17
18
19
# File 'lib/kthxbye/worker.rb', line 16

def initialize(queues, sleep_for=5)
  setup_queues(queues)
  @sleep_for = sleep_for
end

Instance Attribute Details

#current_queueObject

Returns the value of attribute current_queue.



9
10
11
# File 'lib/kthxbye/worker.rb', line 9

def current_queue
  @current_queue
end

#idObject Also known as: to_s

Returns a useful id with the hostname:pid:queues listing Same return as to_s



281
282
283
# File 'lib/kthxbye/worker.rb', line 281

def id
  @id
end

#queuesObject

Returns the queues this worker is attached toin alphabetical order.



102
103
104
# File 'lib/kthxbye/worker.rb', line 102

def queues
  @queues
end

#sleep_forObject

Returns the value of attribute sleep_for.



9
10
11
# File 'lib/kthxbye/worker.rb', line 9

def sleep_for
  @sleep_for
end

Class Method Details

.exists?(id) ⇒ Boolean

Checks if a worker is registered.

Returns:

  • (Boolean)


45
46
47
# File 'lib/kthxbye/worker.rb', line 45

def self.exists?(id)
  redis.sismember( :workers, id )
end

.find(worker) ⇒ Object

Allows us to find a worker so that we can look at some of its internal data later on.



33
34
35
36
37
38
39
40
41
42
# File 'lib/kthxbye/worker.rb', line 33

def self.find(worker)
  if exists? worker
    qs = worker.split(':')[-1].split(",")
    new_worker = new(*qs)
    new_worker.id = worker
    return new_worker
  else
    nil
  end
end

.working_on(id) ⇒ Object

Gets the job a given worker is working on Returns a hash with the ‘job_id’ and the ‘started’ time



51
52
53
# File 'lib/kthxbye/worker.rb', line 51

def self.working_on(id)
  decode( redis.get( "worker:#{id}" ) )
end

Instance Method Details

#<=>(other) ⇒ Object

:nodoc:



295
296
297
# File 'lib/kthxbye/worker.rb', line 295

def <=>(other) #:nodoc:
  to_s <=> other.to_s
end

#==(other) ⇒ Object

:nodoc:



291
292
293
# File 'lib/kthxbye/worker.rb', line 291

def ==(other) #:nodoc:
  to_s == other.to_s
end

#clean_workersObject

This method cleans Redis of all workers that no longer exist that may have been left over from a previous dirty shutdown (GC)



115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/kthxbye/worker.rb', line 115

def clean_workers
  workers = Kthxbye.workers 
  known = worker_pids
  workers.each do |worker|
    host,pid,queues = worker.id.split(":")
    next unless host == hostname
    next if known.include?(pid)
    log "Pruning unknown worker: #{worker}"
    worker.unregister_worker
  end

end

#current_jobObject

Gets the current job this worker is working.



153
154
155
156
157
# File 'lib/kthxbye/worker.rb', line 153

def current_job
  return @current_job if @current_job
  data = decode( redis.get("worker:#{self}") )
  @current_job = Job.find( data['job_id'], @current_queue )
end

#doneObject

job complete actions



178
179
180
181
182
183
184
185
186
# File 'lib/kthxbye/worker.rb', line 178

def done #:nodoc:
  redis.srem( :working, self )
  redis.del( "worker:#{self}" )
  log "Completed job #{@current_job}"
  redis.publish("job.completed", @current_job.id)
  Stats.incr("processed")
  Stats.incr("processed:#{self}")
  @current_job = nil
end

#exists?Boolean

Checks if this worker is registered.

Returns:

  • (Boolean)


257
258
259
# File 'lib/kthxbye/worker.rb', line 257

def exists? #:nodoc:
  redis.sismember( :workers, self )
end

#failedObject



192
193
194
# File 'lib/kthxbye/worker.rb', line 192

def failed
  Stats["failures:#{self}"]
end

#grab_jobObject

Reserves a job off the queue



244
245
246
247
248
249
250
251
252
253
254
# File 'lib/kthxbye/worker.rb', line 244

def grab_job #:nodoc:
  job = nil
  @queues.each do |q|
    @current_queue = q
    log "Checking \"#{q}\" queue for jobs"
    job = Kthxbye.salvage(q)
    break unless job.nil?
  end

  return job || false
end

#hostnameObject

Returns the hostname of the machine this worker is running on



262
263
264
# File 'lib/kthxbye/worker.rb', line 262

def hostname
  @hostname ||= `hostname`.chomp
end

#inspectObject

nice inspect for the worker with the same info as #id



287
288
289
# File 'lib/kthxbye/worker.rb', line 287

def inspect
  "#<Worker: #{@id}>"
end

#kill_childObject

:nodoc:



231
232
233
234
235
236
237
238
239
240
241
# File 'lib/kthxbye/worker.rb', line 231

def kill_child #:nodoc:
  if @child
    log "Killing child at #{@child}"
    if system("ps -o pid,state -p #{@child}")
      Process.kill("KILL", @child) rescue nil
    else
      log "Child #{@child} not found, restarting."
      shutdown
    end
  end
end

#perform(job) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/kthxbye/worker.rb', line 88

def perform(job)
  begin
    result = job.perform
  rescue Object => ex
    job.fail(ex)
    log "Error running job #{job}"
    Stats.incr("failures:#{self}")
    perform(job) if job.failed_attempts < Kthxbye::Config.options[:attempts]
  ensure
    yield job if block_given?
  end
end

#pidObject

Returns the process id of this worker.



267
268
269
# File 'lib/kthxbye/worker.rb', line 267

def pid
  Process.pid
end

#processedObject



188
189
190
# File 'lib/kthxbye/worker.rb', line 188

def processed
  Stats["processed:#{self}"]
end

#register_signalsObject



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/kthxbye/worker.rb', line 203

def register_signals #:nordoc:
  trap('TERM') { shutdown! }
  trap('INT')  { shutdown! }

  begin
    trap('QUIT') { shutdown }   
    trap('USR1') { shutdown }
    trap('USR2') { log "Paused"; @paused = true }
    trap('CONT') { log "Unpaused"; @paused = false }
  rescue ArgumentError
    warn "Signals QUIT, USR1, USR2, and/or CONT not supported."
  end 

  log "Registered signals"
end

#register_workerObject

Adds this worker to the worker registry



129
130
131
132
133
# File 'lib/kthxbye/worker.rb', line 129

def register_worker
  log "Registered worker #{self}"
  redis.sadd( :workers, self ) if !exists?
  redis.set( "worker:#{self}:started", Time.now.to_s )
end

#run(&block) ⇒ Object

This is the major run loop. Workhorse of a worker… sort of. In the end, this loop simply runs the jobs in separate processes by forking out the process then waiting for it to return. we only process one. Can optionally take in a block to run after the job has run.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/kthxbye/worker.rb', line 59

def run(&block)
  log "Starting Kthxbye::Worker #{self}"
  startup
  
  loop do
    break if @terminate

    if !@paused and job = grab_job
      log "Found job #{job}"
      working(job)

      @child = fork {
        log "Forking..."
        perform(job)
        exit!
      }

      Process.wait
      done
    else
      break if @sleep_for == 0
      log "No jobs on #{@queues} - sleeping for #{@sleep_for}"
      sleep sleep_for.to_i
    end
  end 
ensure
  unregister_worker
end

#setup_queues(queues) ⇒ Object

:nodoc:



21
22
23
24
25
26
27
28
29
# File 'lib/kthxbye/worker.rb', line 21

def setup_queues(queues) # :nodoc:
  if queues == "*"
    @queues = Kthxbye.queues.sort
  elsif queues.include? ?,
    @queues = queues.split(",").compact
  else
    @queues = *queues
  end
end

#shutdownObject

Shuts down the worker gracefully (once process has completed



220
221
222
223
# File 'lib/kthxbye/worker.rb', line 220

def shutdown #:nodoc:
  log "Shutting down worker #{self}"
  @terminate = true
end

#shutdown!Object

Hard kills the worker by killing the process.



226
227
228
229
# File 'lib/kthxbye/worker.rb', line 226

def shutdown! #:nodoc:
  kill_child
  shutdown
end

#startedObject



196
197
198
# File 'lib/kthxbye/worker.rb', line 196

def started
  redis.get( "worker:#{self}:started" )
end

#startupObject

Run startup actions



107
108
109
110
111
# File 'lib/kthxbye/worker.rb', line 107

def startup #:nodoc:
  clean_workers
  register_worker
  register_signals
end

#unregister_workerObject

Removes the worker from our worker registry



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/kthxbye/worker.rb', line 136

def unregister_worker
  log "Unregistered worker #{self}"
  if working?
    log "Was active. Reporting and rerunning"
    Failure.create(current_job, ActiveWorkerKilled.new) 
    current_job.rerun
  end

  redis.del "worker:#{self}"
  redis.srem :workers, self
  redis.del "worker:#{self}:started"

  Stats.reset("processed:#{self}")
  Stats.reset("failures:#{self}")
end

#worker_pidsObject

Returns an array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.



273
274
275
276
277
# File 'lib/kthxbye/worker.rb', line 273

def worker_pids
  `ps -A -o pid,command | grep kthxbye`.split("\n").map do |line|
    line.split(' ')[0]
  end
end

#working(job) ⇒ Object

Run when the job starts running



160
161
162
163
164
165
166
167
168
169
170
# File 'lib/kthxbye/worker.rb', line 160

def working(job) #:nodoc:
  redis.sadd( :working, self )

  data = encode( {:job_id => job.id, :started => Time.now.to_s} )
  redis.set("worker:#{self}", data)
  @current_job = job

  # activates job
  job.active
  redis.publish("job.started", job.id)
end

#working?Boolean

Is this job working?

Returns:

  • (Boolean)


173
174
175
# File 'lib/kthxbye/worker.rb', line 173

def working? 
  redis.sismember( :working, self )
end