Class: Kthxbye::Job
- Inherits:
-
Object
- Object
- Kthxbye::Job
- Extended by:
- Helper
- Includes:
- Helper
- Defined in:
- lib/kthxbye/job.rb
Overview
This class is our main job runner. It also handles the instantiation and the meat of the job queuing and retreival.
Instance Attribute Summary collapse
-
#data ⇒ Object
Returns the value of attribute data.
-
#failed_attempts ⇒ Object
readonly
Returns the value of attribute failed_attempts.
-
#id ⇒ Object
Returns the value of attribute id.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#worker ⇒ Object
Returns the value of attribute worker.
Class Method Summary collapse
-
.add_to_queue(id, queue) ⇒ Object
Adds a job to the queue from a given job id and queue.
-
.create(queue, klass, *args) ⇒ Object
The bulk of the job queuing method.
-
.destroy(id, queue) ⇒ Object
Removes all existence of this job and its data Returns the last known status of the job.
-
.find(id, queue) ⇒ Object
Returns a job object for a given job id off of a given queue.
Instance Method Summary collapse
-
#==(obj) ⇒ Object
:nodoc:.
-
#active ⇒ Object
Removes the job from the inactive queue.
- #active? ⇒ Boolean
-
#dequeue ⇒ Object
Simply removes this job from the active queue and places it on the inactive list.
- #fail(ex) ⇒ Object
-
#inactive ⇒ Object
Places the job on the active queue.
-
#initialize(id, queue, data) ⇒ Job
constructor
Instantiates a job from a job id, a queue, and the job data.
-
#perform ⇒ Object
Does all the heavy lifting of performing the job and storing the results.
-
#rerun ⇒ Object
Simply requeues the job to be rerun.
-
#result ⇒ Object
Returns the job’s result once it has been run.
-
#status ⇒ Object
Returns the job’s status.
Methods included from Helper
Constructor Details
#initialize(id, queue, data) ⇒ Job
Instantiates a job from a job id, a queue, and the job data. Most often used in the ::find method and for the worker to recreate the job for running.
68 69 70 71 72 73 |
# File 'lib/kthxbye/job.rb', line 68 def initialize(id, queue, data) @id = id.to_i @queue = queue @data = data @failed_attempts = Failure.fails_for_job(@id) # local tracking only, for rerun purposes end |
Instance Attribute Details
#data ⇒ Object
Returns the value of attribute data.
9 10 11 |
# File 'lib/kthxbye/job.rb', line 9 def data @data end |
#failed_attempts ⇒ Object (readonly)
Returns the value of attribute failed_attempts.
10 11 12 |
# File 'lib/kthxbye/job.rb', line 10 def failed_attempts @failed_attempts end |
#id ⇒ Object
Returns the value of attribute id.
9 10 11 |
# File 'lib/kthxbye/job.rb', line 9 def id @id end |
#queue ⇒ Object
Returns the value of attribute queue.
9 10 11 |
# File 'lib/kthxbye/job.rb', line 9 def queue @queue end |
#worker ⇒ Object
Returns the value of attribute worker.
9 10 11 |
# File 'lib/kthxbye/job.rb', line 9 def worker @worker end |
Class Method Details
.add_to_queue(id, queue) ⇒ Object
Adds a job to the queue from a given job id and queue. useful for switching a job to another queue or adding a defined job to multiple queues.
15 16 17 |
# File 'lib/kthxbye/job.rb', line 15 def self.add_to_queue(id, queue) redis.rpush( "queue:#{queue}", id ) end |
.create(queue, klass, *args) ⇒ Object
The bulk of the job queuing method. Takes a string representing a queue name, a job class, and arguments to pass to the perform method of the job class. Returns a unique id for the job based on a redis “uniq_id” key (int) which is a simple incremented value. Queues the job in the given queue and the job in the data-store hash.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/kthxbye/job.rb', line 24 def self.create(queue, klass, *args) raise "Need a queue to store job in" if queue.to_s.empty? raise "No class to reference job type by" if klass.nil? redis.incr :uniq_id id = redis.get :uniq_id Job.add_to_queue( id, queue ) Kthxbye.register_queue( queue ) # mark job as inactive currently. will mark active when job is getting run redis.sadd("jobs:inactive", id) redis.hset( "data-store:#{queue}", id, encode( {:klass => klass.to_s, :payload => args} ) ) log "Created job in queue #{queue} with an unique key of #{id}" return id.to_i end |
.destroy(id, queue) ⇒ Object
Removes all existence of this job and its data Returns the last known status of the job
50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/kthxbye/job.rb', line 50 def self.destroy(id, queue) ret = Job.find(id, queue).status # remove the element from the active queue redis.lrem("queue:#{queue}", 0, id) # be sure we also remove it from the inactive queue redis.srem("queue:#{queue}:inactive", id) # remove the job's data as well redis.hdel("data-store:#{queue}", id) redis.hdel("result-store:#{queue}", id) redis.hdel( :failure, id ) return ret end |
Instance Method Details
#==(obj) ⇒ Object
:nodoc:
154 155 156 157 158 159 |
# File 'lib/kthxbye/job.rb', line 154 def ==(obj) #:nodoc: return false if obj.nil? @data == obj.data && @id == obj.id && @queue == obj.queue end |
#active ⇒ Object
Removes the job from the inactive queue.
141 142 143 |
# File 'lib/kthxbye/job.rb', line 141 def active redis.srem("jobs:inactive", @id) end |
#active? ⇒ Boolean
145 146 147 |
# File 'lib/kthxbye/job.rb', line 145 def active? !redis.sismember("jobs:inactive", @id) end |
#dequeue ⇒ Object
Simply removes this job from the active queue and places it on the inactive list. Does not remove job payload from storage. It just removes its id from the actively run job queue.
105 106 107 108 |
# File 'lib/kthxbye/job.rb', line 105 def dequeue redis.lrem("queue:#{@queue}", 0, @id) inactive end |
#fail(ex) ⇒ Object
132 133 134 135 136 137 138 |
# File 'lib/kthxbye/job.rb', line 132 def fail(ex) @failed_attempts += 1 log "Error occured: #{ex.}. Try: #{@failed_attempts}/#{Kthxbye::Config.[:attempts]}" redis.publish("job.failed", @id) Stats.incr("failures") Kthxbye::Failure.create( self, ex ) end |
#inactive ⇒ Object
Places the job on the active queue
150 151 152 |
# File 'lib/kthxbye/job.rb', line 150 def inactive redis.sadd("jobs:inactive", @id) end |
#perform ⇒ Object
Does all the heavy lifting of performing the job and storing the results. It will get the jobs class, payload and then run the job, storing the result in the result’s store once complete. Also responsible for reporting errors and storing the job in the failure listing if an exception occurs. Will also publish a message on the job.failed channel (Redis PUBSUB) with the id of the failed job
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/kthxbye/job.rb', line 116 def perform begin @klass = Object.const_get(@data['klass']) @payload = @data['payload'] #set job active, getting ready to run self.active result = @klass.send(:perform, *@payload) redis.hset( "result-store:#{@queue}", @id, encode( result ) ) return result rescue Object => ex # handled by worker raise ex end end |
#rerun ⇒ Object
Simply requeues the job to be rerun.
76 77 78 |
# File 'lib/kthxbye/job.rb', line 76 def rerun Job.add_to_queue( @id, @queue ) end |
#result ⇒ Object
Returns the job’s result once it has been run.
98 99 100 |
# File 'lib/kthxbye/job.rb', line 98 def result decode( redis.hget("result-store:#{@queue}", @id) ) end |
#status ⇒ Object
Returns the job’s status. Will be one of 4 things.
1) :succeeded - the job ran and has a result
2) :failed - the job failed and reported an error
3) :active - job is being run.
4) :inactive - job is waiting to be run.
85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/kthxbye/job.rb', line 85 def status if result :succeeded elsif Failure.find(@id) :failed elsif active? :active else :inactive end end |