Class: Kthxbye::Worker

Inherits:
Object
  • Object
show all
Extended by:
Helper
Includes:
Helper
Defined in:
lib/kthxbye/worker.rb

Overview

This is the workhorse loop of the gem. Does all the dequeuing and running of jobs. It mostly makes a bunch of calls to the job methods to run the job. It simply handles the spawning off of processes to run the job and retry if necessary.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helper

decode, encode, log, redis

Constructor Details

#initialize(queues, sleep_for = 5) ⇒ Worker

Creates a worker for running jobs off of a given queue. Takes a queue or queues (csv style, e.g. test,other,bad) or the woldcard (*) symbol to take in all queues in alphabetical order. Optionally takes in an interval param on how long it waits to check the queue for new jobs once it has exhausted the queue(s).



16
17
18
19
# File 'lib/kthxbye/worker.rb', line 16

def initialize(queues, sleep_for=5)
  setup_queues(queues)
  @sleep_for = sleep_for
end

Instance Attribute Details

#current_queueObject

Returns the value of attribute current_queue.



9
10
11
# File 'lib/kthxbye/worker.rb', line 9

def current_queue
  @current_queue
end

#idObject Also known as: to_s

Returns a useful id with the hostname:pid:queues listing Same return as to_s



280
281
282
# File 'lib/kthxbye/worker.rb', line 280

def id
  @id
end

#queuesObject

Returns the queues this worker is attached toin alphabetical order.



102
103
104
# File 'lib/kthxbye/worker.rb', line 102

def queues
  @queues
end

#sleep_forObject

Returns the value of attribute sleep_for.



9
10
11
# File 'lib/kthxbye/worker.rb', line 9

def sleep_for
  @sleep_for
end

Class Method Details

.exists?(id) ⇒ Boolean

Checks if a worker is registered.

Returns:

  • (Boolean)


45
46
47
# File 'lib/kthxbye/worker.rb', line 45

def self.exists?(id)
  redis.sismember( :workers, id )
end

.find(worker) ⇒ Object

Allows us to find a worker so that we can look at some of its internal data later on.



33
34
35
36
37
38
39
40
41
42
# File 'lib/kthxbye/worker.rb', line 33

def self.find(worker)
  if exists? worker
    qs = worker.split(':')[-1].split(",")
    new_worker = new(*qs)
    new_worker.id = worker
    return new_worker
  else
    nil
  end
end

.working_on(id) ⇒ Object

Gets the job a given worker is working on Returns a hash with the ‘job_id’ and the ‘started’ time



51
52
53
# File 'lib/kthxbye/worker.rb', line 51

def self.working_on(id)
  decode( redis.get( "worker:#{id}" ) )
end

Instance Method Details

#<=>(other) ⇒ Object

:nodoc:



294
295
296
# File 'lib/kthxbye/worker.rb', line 294

def <=>(other) #:nodoc:
  to_s <=> other.to_s
end

#==(other) ⇒ Object

:nodoc:



290
291
292
# File 'lib/kthxbye/worker.rb', line 290

def ==(other) #:nodoc:
  to_s == other.to_s
end

#clean_workersObject

This method cleans Redis of all workers that no longer exist that may have been left over from a previous dirty shutdown (GC)



115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/kthxbye/worker.rb', line 115

def clean_workers
  workers = Kthxbye.workers 
  known = worker_pids
  workers.each do |worker|
    host,pid,queues = worker.id.split(":")
    next unless host == hostname
    next if known.include?(pid)
    log "Pruning unknown worker: #{worker}"
    worker.unregister_worker
  end

end

#current_jobObject

Gets the current job this worker is working.



153
154
155
156
157
# File 'lib/kthxbye/worker.rb', line 153

def current_job
  return @current_job if @current_job
  data = decode( redis.get("worker:#{self}") )
  @current_job = Job.find( data['job_id'], @current_queue )
end

#doneObject

job complete actions



178
179
180
181
182
183
184
185
# File 'lib/kthxbye/worker.rb', line 178

def done #:nodoc:
  redis.srem( :working, self )
  redis.del( "worker:#{self}" )
  log "Completed job #{@current_job}"
  Stats.incr("processed")
  Stats.incr("processed:#{self}")
  @current_job = nil
end

#exists?Boolean

Checks if this worker is registered.

Returns:

  • (Boolean)


256
257
258
# File 'lib/kthxbye/worker.rb', line 256

def exists? #:nodoc:
  redis.sismember( :workers, self )
end

#failedObject



191
192
193
# File 'lib/kthxbye/worker.rb', line 191

def failed
  Stats["failures:#{self}"]
end

#grab_jobObject

Reserves a job off the queue



243
244
245
246
247
248
249
250
251
252
253
# File 'lib/kthxbye/worker.rb', line 243

def grab_job #:nodoc:
  job = nil
  @queues.each do |q|
    @current_queue = q
    log "Checking \"#{q}\" queue for jobs"
    job = Kthxbye.salvage(q)
    break unless job.nil?
  end

  return job || false
end

#hostnameObject

Returns the hostname of the machine this worker is running on



261
262
263
# File 'lib/kthxbye/worker.rb', line 261

def hostname
  @hostname ||= `hostname`.chomp
end

#inspectObject

nice inspect for the worker with the same info as #id



286
287
288
# File 'lib/kthxbye/worker.rb', line 286

def inspect
  "#<Worker: #{@id}>"
end

#kill_childObject

:nodoc:



230
231
232
233
234
235
236
237
238
239
240
# File 'lib/kthxbye/worker.rb', line 230

def kill_child #:nodoc:
  if @child
    log "Killing child at #{@child}"
    if system("ps -o pid,state -p #{@child}")
      Process.kill("KILL", @child) rescue nil
    else
      log "Child #{@child} not found, restarting."
      shutdown
    end
  end
end

#perform(job) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/kthxbye/worker.rb', line 88

def perform(job)
  begin
    result = job.perform
  rescue Object => ex
    job.fail(ex)
    log "Error running job #{job}"
    Stats.incr("failures:#{self}")
    perform(job) if job.failed_attempts < Kthxbye::Config.options[:attempts]
  ensure
    yield job if block_given?
  end
end

#pidObject

Returns the process id of this worker.



266
267
268
# File 'lib/kthxbye/worker.rb', line 266

def pid
  Process.pid
end

#processedObject



187
188
189
# File 'lib/kthxbye/worker.rb', line 187

def processed
  Stats["processed:#{self}"]
end

#register_signalsObject



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/kthxbye/worker.rb', line 202

def register_signals #:nordoc:
  trap('TERM') { shutdown! }
  trap('INT')  { shutdown! }

  begin
    trap('QUIT') { shutdown }   
    trap('USR1') { shutdown }
    trap('USR2') { log "Paused"; @paused = true }
    trap('CONT') { log "Unpaused"; @paused = false }
  rescue ArgumentError
    warn "Signals QUIT, USR1, USR2, and/or CONT not supported."
  end 

  log "Registered signals"
end

#register_workerObject

Adds this worker to the worker registry



129
130
131
132
133
# File 'lib/kthxbye/worker.rb', line 129

def register_worker
  log "Registered worker #{self}"
  redis.sadd( :workers, self ) if !exists?
  redis.set( "worker:#{self}:started", Time.now.to_s )
end

#run(&block) ⇒ Object

This is the major run loop. Workhorse of a worker… sort of. In the end, this loop simply runs the jobs in separate processes by forking out the process then waiting for it to return. we only process one. Can optionally take in a block to run after the job has run.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/kthxbye/worker.rb', line 59

def run(&block)
  log "Starting Kthxbye::Worker #{self}"
  startup
  
  loop do
    break if @terminate

    if !@paused and job = grab_job
      log "Found job #{job}"
      working(job)

      @child = fork {
        log "Forking..."
        perform(job)
        exit!
      }

      Process.wait
      done
    else
      break if @sleep_for == 0
      log "No jobs on #{@queues} - sleeping for #{@sleep_for}"
      sleep sleep_for.to_i
    end
  end 
ensure
  unregister_worker
end

#setup_queues(queues) ⇒ Object

:nodoc:



21
22
23
24
25
26
27
28
29
# File 'lib/kthxbye/worker.rb', line 21

def setup_queues(queues) # :nodoc:
  if queues == "*"
    @queues = Kthxbye.queues.sort
  elsif queues.include? ?,
    @queues = queues.split(",").compact
  else
    @queues = *queues
  end
end

#shutdownObject

Shuts down the worker gracefully (once process has completed



219
220
221
222
# File 'lib/kthxbye/worker.rb', line 219

def shutdown #:nodoc:
  log "Shutting down worker #{self}"
  @terminate = true
end

#shutdown!Object

Hard kills the worker by killing the process.



225
226
227
228
# File 'lib/kthxbye/worker.rb', line 225

def shutdown! #:nodoc:
  kill_child
  shutdown
end

#startedObject



195
196
197
# File 'lib/kthxbye/worker.rb', line 195

def started
  redis.get( "worker:#{self}:started" )
end

#startupObject

Run startup actions



107
108
109
110
111
# File 'lib/kthxbye/worker.rb', line 107

def startup #:nodoc:
  clean_workers
  register_worker
  register_signals
end

#unregister_workerObject

Removes the worker from our worker registry



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/kthxbye/worker.rb', line 136

def unregister_worker
  log "Unregistered worker #{self}"
  if working?
    log "Was active. Reporting and rerunning"
    Failure.create(current_job, ActiveWorkerKilled.new) 
    current_job.rerun
  end

  redis.del "worker:#{self}"
  redis.srem :workers, self
  redis.del "worker:#{self}:started"

  Stats.reset("processed:#{self}")
  Stats.reset("failures:#{self}")
end

#worker_pidsObject

Returns an array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.



272
273
274
275
276
# File 'lib/kthxbye/worker.rb', line 272

def worker_pids
  `ps -A -o pid,command | grep kthxbye`.split("\n").map do |line|
    line.split(' ')[0]
  end
end

#working(job) ⇒ Object

Run when the job starts running



160
161
162
163
164
165
166
167
168
169
170
# File 'lib/kthxbye/worker.rb', line 160

def working(job) #:nodoc:
  redis.sadd( :working, self )

  data = encode( {:job_id => job.id, :started => Time.now.to_s} )
  redis.set("worker:#{self}", data)
  @current_job = job

  # activates job
  job.active
  redis.publish("job.started", job.id)
end

#working?Boolean

Is this job working?

Returns:

  • (Boolean)


173
174
175
# File 'lib/kthxbye/worker.rb', line 173

def working? 
  redis.sismember( :working, self )
end