Class: Kthxbye::Worker

Inherits:
Object
  • Object
show all
Extended by:
Helper
Includes:
Helper
Defined in:
lib/kthxbye/worker.rb

Overview

This is the workhorse loop of the gem. Does all the dequeuing and running of jobs. It mostly makes a bunch of calls to the job methods to run the job. It simply handles the spawning off of processes to run the job and retry if necessary.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helper

decode, encode, log, redis

Constructor Details

#initialize(queues, sleep_for = 5) ⇒ Worker

Creates a worker for running jobs off of a given queue. Takes a queue or queues (csv style, e.g. test,other,bad) or the woldcard (*) symbol to take in all queues in alphabetical order. Optionally takes in an interval param on how long it waits to check the queue for new jobs once it has exhausted the queue(s).



16
17
18
19
# File 'lib/kthxbye/worker.rb', line 16

def initialize(queues, sleep_for=5)
  setup_queues(queues)
  @sleep_for = sleep_for
end

Instance Attribute Details

#current_queueObject

Returns the value of attribute current_queue.



9
10
11
# File 'lib/kthxbye/worker.rb', line 9

def current_queue
  @current_queue
end

#idObject Also known as: to_s

Returns a useful id with the hostname:pid:queues listing Same return as to_s



250
251
252
# File 'lib/kthxbye/worker.rb', line 250

def id
  @id
end

#queuesObject

Returns the queues this worker is attached toin alphabetical order.



90
91
92
# File 'lib/kthxbye/worker.rb', line 90

def queues
  @queues
end

#sleep_forObject

Returns the value of attribute sleep_for.



9
10
11
# File 'lib/kthxbye/worker.rb', line 9

def sleep_for
  @sleep_for
end

Class Method Details

.exists?(id) ⇒ Boolean

Checks if a worker is registered.

Returns:

  • (Boolean)


45
46
47
# File 'lib/kthxbye/worker.rb', line 45

def self.exists?(id)
  redis.sismember( :workers, id )
end

.find(worker) ⇒ Object

Allows us to find a worker so that we can look at some of its internal data later on.



33
34
35
36
37
38
39
40
41
42
# File 'lib/kthxbye/worker.rb', line 33

def self.find(worker)
  if exists? worker
    qs = worker.split(':')[-1].split(",")
    new_worker = new(*qs)
    new_worker.id = worker
    return new_worker
  else
    nil
  end
end

.working_on(id) ⇒ Object

Gets the job a given worker is working on Returns a hash with the ‘job_id’ and the ‘started’ time



51
52
53
# File 'lib/kthxbye/worker.rb', line 51

def self.working_on(id)
  decode( redis.get( "worker:#{id}" ) )
end

Instance Method Details

#<=>(other) ⇒ Object

:nodoc:



264
265
266
# File 'lib/kthxbye/worker.rb', line 264

def <=>(other) #:nodoc:
  to_s <=> other.to_s
end

#==(other) ⇒ Object

:nodoc:



260
261
262
# File 'lib/kthxbye/worker.rb', line 260

def ==(other) #:nodoc:
  to_s == other.to_s
end

#clean_workersObject

This method cleans Redis of all workers that no longer exist that may have been left over from a previous dirty shutdown (GC)



103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/kthxbye/worker.rb', line 103

def clean_workers
  workers = Kthxbye.workers 
  known = worker_pids
  workers.each do |worker|
    host,pid,queues = worker.id.split(":")
    next unless host == hostname
    next if known.include?(pid)
    log "Pruning unknown worker: #{worker}"
    worker.unregister_worker
  end

end

#current_jobObject

Gets the current job this worker is working.



136
137
138
139
140
# File 'lib/kthxbye/worker.rb', line 136

def current_job
  return @current_job if @current_job
  data = decode( redis.get("worker:#{self}") )
  @current_job = Job.find( data['job_id'], @current_queue )
end

#doneObject

job complete actions



161
162
163
164
165
166
167
# File 'lib/kthxbye/worker.rb', line 161

def done #:nodoc:
  redis.srem( :working, self )
  redis.del( "worker:#{self}" )
  log "Completed job #{@current_job}"
  redis.publish("job.completed", @current_job.id)
  @current_job = nil
end

#exists?Boolean

Checks if this worker is registered.

Returns:

  • (Boolean)


226
227
228
# File 'lib/kthxbye/worker.rb', line 226

def exists? #:nodoc:
  redis.sismember( :workers, self )
end

#grab_jobObject

Reserves a job off the queue



213
214
215
216
217
218
219
220
221
222
223
# File 'lib/kthxbye/worker.rb', line 213

def grab_job #:nodoc:
  job = nil
  @queues.each do |q|
    @current_queue = q
    log "Checking \"#{q}\" queue for jobs"
    job = Kthxbye.salvage(q)
    break unless job.nil?
  end

  return job || false
end

#hostnameObject

Returns the hostname of the machine this worker is running on



231
232
233
# File 'lib/kthxbye/worker.rb', line 231

def hostname
  @hostname ||= `hostname`.chomp
end

#inspectObject

nice inspect for the worker with the same info as #id



256
257
258
# File 'lib/kthxbye/worker.rb', line 256

def inspect
  "#<Worker: #{@id}>"
end

#kill_childObject

:nodoc:



200
201
202
203
204
205
206
207
208
209
210
# File 'lib/kthxbye/worker.rb', line 200

def kill_child #:nodoc:
  if @child
    log "Killing child at #{@child}"
    if system("ps -o pid,state -p #{@child}")
      Process.kill("KILL", @child) rescue nil
    else
      log "Child #{@child} not found, restarting."
      shutdown
    end
  end
end

#pidObject

Returns the process id of this worker.



236
237
238
# File 'lib/kthxbye/worker.rb', line 236

def pid
  Process.pid
end

#register_signalsObject



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/kthxbye/worker.rb', line 172

def register_signals #:nordoc:
  trap('TERM') { shutdown! }
  trap('INT')  { shutdown! }

  begin
    trap('QUIT') { shutdown }   
    trap('USR1') { shutdown }
    trap('USR2') { log "Paused"; @paused = true }
    trap('CONT') { log "Unpaused"; @paused = false }
  rescue ArgumentError
    warn "Signals QUIT, USR1, USR2, and/or CONT not supported."
  end 

  log "Registered signals"
end

#register_workerObject

Adds this worker to the worker registry



117
118
119
120
# File 'lib/kthxbye/worker.rb', line 117

def register_worker
  log "Registered worker #{self}"
  redis.sadd( :workers, self ) if !exists?
end

#run(&block) ⇒ Object

This is the major run loop. Workhorse of a worker… sort of. In the end, this loop simply runs the jobs in separate processes by forking out the process then waiting for it to return. we only process one. Can optionally take in a block to run after the job has run.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/kthxbye/worker.rb', line 59

def run(&block)
  log "Starting Kthxbye::Worker #{self}"
  startup
  
  loop do
    break if @terminate

    if !@paused and job = grab_job
      log "Found job #{job}"
      working(job)

      @child = fork {
        log "Forking..."
        result = job.perform
        yield job if block_given?
        exit!
      }

      Process.wait
      done
    else
      break if @sleep_for == 0
      log "No jobs on #{@queues} - sleeping for #{@sleep_for}"
      sleep sleep_for.to_i
    end
  end 
ensure
  unregister_worker
end

#setup_queues(queues) ⇒ Object

:nodoc:



21
22
23
24
25
26
27
28
29
# File 'lib/kthxbye/worker.rb', line 21

def setup_queues(queues) # :nodoc:
  if queues == "*"
    @queues = Kthxbye.queues.sort
  elsif queues.include? ?,
    @queues = queues.split(",").compact
  else
    @queues = *queues
  end
end

#shutdownObject

Shuts down the worker gracefully (once process has completed



189
190
191
192
# File 'lib/kthxbye/worker.rb', line 189

def shutdown #:nodoc:
  log "Shutting down worker #{self}"
  @terminate = true
end

#shutdown!Object

Hard kills the worker by killing the process.



195
196
197
198
# File 'lib/kthxbye/worker.rb', line 195

def shutdown! #:nodoc:
  kill_child
  shutdown
end

#startupObject

Run startup actions



95
96
97
98
99
# File 'lib/kthxbye/worker.rb', line 95

def startup #:nodoc:
  clean_workers
  register_worker
  register_signals
end

#unregister_workerObject

Removes the worker from our worker registry



123
124
125
126
127
128
129
130
131
132
133
# File 'lib/kthxbye/worker.rb', line 123

def unregister_worker
  log "Unregistered worker #{self}"
  if working?
    log "Was active. Reporting and rerunning"
    Failure.create(current_job, ActiveWorkerKilled.new) 
    current_job.rerun
  end

  redis.del "worker:#{self}"
  redis.srem :workers, self
end

#worker_pidsObject

Returns an array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.



242
243
244
245
246
# File 'lib/kthxbye/worker.rb', line 242

def worker_pids
  `ps -A -o pid,command | grep kthxbye`.split("\n").map do |line|
    line.split(' ')[0]
  end
end

#working(job) ⇒ Object

Run when the job starts running



143
144
145
146
147
148
149
150
151
152
153
# File 'lib/kthxbye/worker.rb', line 143

def working(job) #:nodoc:
  redis.sadd( :working, self )

  data = encode( {:job_id => job.id, :started => Time.now.to_s} )
  redis.set("worker:#{self}", data)
  @current_job = job

  # activates job
  job.active
  redis.publish("job.started", job.id)
end

#working?Boolean

Is this job working?

Returns:

  • (Boolean)


156
157
158
# File 'lib/kthxbye/worker.rb', line 156

def working? 
  redis.sismember( :working, self )
end