Class: Kthxbye::Job

Inherits:
Object
  • Object
show all
Extended by:
Helper
Includes:
Helper
Defined in:
lib/kthxbye/job.rb

Overview

This class is our main job runner. It also handles the instantiation and the meat of the job queuing and retreival.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helper

decode, encode, log, redis

Constructor Details

#initialize(id, queue, data) ⇒ Job

Instantiates a job from a job id, a queue, and the job data. Most often used in the ::find method and for the worker to recreate the job for running.



68
69
70
71
72
73
# File 'lib/kthxbye/job.rb', line 68

def initialize(id, queue, data)
  @id = id.to_i
  @queue = queue
  @data = data
  @failed_attempts = Failure.fails_for_job(@id) # local tracking only, for rerun purposes
end

Instance Attribute Details

#dataObject

Returns the value of attribute data.



9
10
11
# File 'lib/kthxbye/job.rb', line 9

def data
  @data
end

#failed_attemptsObject (readonly)

Returns the value of attribute failed_attempts.



10
11
12
# File 'lib/kthxbye/job.rb', line 10

def failed_attempts
  @failed_attempts
end

#idObject

Returns the value of attribute id.



9
10
11
# File 'lib/kthxbye/job.rb', line 9

def id
  @id
end

#queueObject

Returns the value of attribute queue.



9
10
11
# File 'lib/kthxbye/job.rb', line 9

def queue
  @queue
end

#workerObject

Returns the value of attribute worker.



9
10
11
# File 'lib/kthxbye/job.rb', line 9

def worker
  @worker
end

Class Method Details

.add_to_queue(id, queue) ⇒ Object

Adds a job to the queue from a given job id and queue. useful for switching a job to another queue or adding a defined job to multiple queues.



15
16
17
# File 'lib/kthxbye/job.rb', line 15

def self.add_to_queue(id, queue)
  redis.rpush( "queue:#{queue}", id )
end

.create(queue, klass, *args) ⇒ Object

The bulk of the job queuing method. Takes a string representing a queue name, a job class, and arguments to pass to the perform method of the job class. Returns a unique id for the job based on a redis “uniq_id” key (int) which is a simple incremented value. Queues the job in the given queue and the job in the data-store hash.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/kthxbye/job.rb', line 24

def self.create(queue, klass, *args)
  raise "Need a queue to store job in" if queue.to_s.empty?
  raise "No class to reference job type by" if klass.nil?

  redis.incr :uniq_id
  id = redis.get :uniq_id

  Job.add_to_queue( id, queue )
  Kthxbye.register_queue( queue )
 
  # mark job as inactive currently. will mark active when job is getting run
  redis.sadd("jobs:inactive", id)
  redis.hset( "data-store:#{queue}", id, encode( {:klass => klass.to_s, :payload => args} ) )
  log "Created job in queue #{queue} with an unique key of #{id}"

  return id.to_i
end

.destroy(id, queue) ⇒ Object

Removes all existence of this job and its data Returns the last known status of the job



50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/kthxbye/job.rb', line 50

def self.destroy(id, queue)
  ret = Job.find(id, queue).status

  # remove the element from the active queue
  redis.lrem("queue:#{queue}", 0, id) 
  # be sure we also remove it from the inactive queue
  redis.srem("queue:#{queue}:inactive", id)
  # remove the job's data as well
  redis.hdel("data-store:#{queue}", id) 
  redis.hdel("result-store:#{queue}", id)
  redis.hdel( :failure, id )
  
  return ret
end

.find(id, queue) ⇒ Object

Returns a job object for a given job id off of a given queue.



43
44
45
46
# File 'lib/kthxbye/job.rb', line 43

def self.find(id, queue)
  data = decode( redis.hget( "data-store:#{queue}", id ) )
  data ? Job.new(id, queue, data) : nil
end

Instance Method Details

#==(obj) ⇒ Object

:nodoc:



154
155
156
157
158
159
# File 'lib/kthxbye/job.rb', line 154

def ==(obj) #:nodoc:
  return false if obj.nil?
  @data == obj.data &&
    @id == obj.id &&
    @queue == obj.queue
end

#activeObject

Removes the job from the inactive queue.



141
142
143
# File 'lib/kthxbye/job.rb', line 141

def active
  redis.srem("jobs:inactive", @id)
end

#active?Boolean

Returns:

  • (Boolean)


145
146
147
# File 'lib/kthxbye/job.rb', line 145

def active?
  !redis.sismember("jobs:inactive", @id)
end

#dequeueObject

Simply removes this job from the active queue and places it on the inactive list. Does not remove job payload from storage. It just removes its id from the actively run job queue.



105
106
107
108
# File 'lib/kthxbye/job.rb', line 105

def dequeue
  redis.lrem("queue:#{@queue}", 0, @id) 
  inactive
end

#fail(ex) ⇒ Object



132
133
134
135
136
137
138
# File 'lib/kthxbye/job.rb', line 132

def fail(ex)
  @failed_attempts += 1
  log "Error occured: #{ex.message}. Try: #{@failed_attempts}/#{Kthxbye::Config.options[:attempts]}"
  redis.publish("job.failed", @id)
  Stats.incr("failures")
  Kthxbye::Failure.create( self, ex ) 
end

#inactiveObject

Places the job on the active queue



150
151
152
# File 'lib/kthxbye/job.rb', line 150

def inactive
  redis.sadd("jobs:inactive", @id)
end

#performObject

Does all the heavy lifting of performing the job and storing the results. It will get the jobs class, payload and then run the job, storing the result in the result’s store once complete. Also responsible for reporting errors and storing the job in the failure listing if an exception occurs. Will also publish a message on the job.failed channel (Redis PUBSUB) with the id of the failed job



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/kthxbye/job.rb', line 116

def perform
  begin
    @klass = Object.const_get(@data['klass'])
    @payload = @data['payload']
    #set job active, getting ready to run
    self.active

    result = @klass.send(:perform, *@payload)
    redis.hset( "result-store:#{@queue}", @id, encode( result ) )
    return result
  rescue Object => ex
    # handled by worker
    raise ex
  end
end

#rerunObject

Simply requeues the job to be rerun.



76
77
78
# File 'lib/kthxbye/job.rb', line 76

def rerun
  Job.add_to_queue( @id, @queue )
end

#resultObject

Returns the job’s result once it has been run.



98
99
100
# File 'lib/kthxbye/job.rb', line 98

def result
  decode( redis.hget("result-store:#{@queue}", @id) )
end

#statusObject

Returns the job’s status. Will be one of 4 things.

1) :succeeded - the job ran and has a result
2) :failed - the job failed and reported an error
3) :active - job is being run.
4) :inactive - job is waiting to be run.


85
86
87
88
89
90
91
92
93
94
95
# File 'lib/kthxbye/job.rb', line 85

def status
  if result
    :succeeded
  elsif Failure.find(@id)
    :failed
  elsif active?
    :active
  else
    :inactive
  end
end