Module: Kthxbye

Extended by:
Kthxbye
Includes:
Helper
Included in:
Kthxbye
Defined in:
lib/kthxbye.rb,
lib/kthxbye/job.rb,
lib/kthxbye/stats.rb,
lib/kthxbye/config.rb,
lib/kthxbye/helper.rb,
lib/kthxbye/worker.rb,
lib/kthxbye/failure.rb,
lib/kthxbye/railtie.rb,
lib/kthxbye/version.rb,
lib/kthxbye/exceptions.rb,
lib/kthxbye/web_interface.rb,
lib/generators/kthxbye/kthxbye_generator.rb

Overview

Rake tasks and integration to Rails require ‘kthxbye’

Defined Under Namespace

Modules: Config, Failure, Generators, Helper, Stats Classes: ActiveWorkerKilled, Job, Railtie, WebInterface, Worker

Constant Summary collapse

Version =

Returns current version of Kthxbye

VERSION = "1.3.2"

Instance Method Summary collapse

Methods included from Helper

#decode, #encode, #log

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args) ⇒ Object

:nodoc:



135
136
137
138
139
140
141
142
143
# File 'lib/kthxbye.rb', line 135

def method_missing(name, *args) #:nodoc:
  method_name = name.id2name
  if method_name =~ /^(data|result)_peek$/
    Kthxbye.send(:peek, $1, *args)
  else
    super
  end

end

Instance Method Details

#connect(redis_instance = nil) ⇒ Object

This is not necessary to call. Any of the methods that use redis Will make an inital call to connect to redis. Useful if you want to connect to an existing redis instance. Othewise, if called without params, it simply connects a new instance of redis.



69
70
71
# File 'lib/kthxbye.rb', line 69

def connect( redis_instance=nil )
  @redis = ( redis_instance || Redis.new( :host => Config.options[:redis_server], :port => Config.options[:redis_port] ) )
end

#delete_queue(queue) ⇒ Object

Completely removes queue: Unregisters it then deletes it should return true in all cases (including if we try to delete a non-existent queue). Note: also deletes the data and result stores for this queue.



166
167
168
169
170
171
172
173
# File 'lib/kthxbye.rb', line 166

def delete_queue(queue)
  unregister_queue(queue)
  redis.del( "queue:#{queue}" ) 
  redis.del( "data-store:#{queue}" ) 
  redis.del( "result-store:#{queue}" )

  true
end

#enqueue(queue, klass, *args) ⇒ Object

Queues jobs. Takes at minimum two paramters

1) A string representing a queue
2) The class of the job being queued.

You can optionally pass in additional params to the perform method within the class. You will need to match the number of args in the perform method when you queue the job. Otherwise this will throw an exception.



93
94
95
# File 'lib/kthxbye.rb', line 93

def enqueue(queue, klass, *args)
  Job.create(queue, klass, *args)    
end

#inspectObject

Returns a pretty inspect message about this instance of Kthxbye.



210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/kthxbye.rb', line 210

def inspect
  {
    :version => Version,
    :keys => keys.size,
    :workers => workers.size,
    :working => working.size,
    :queues => queues.size,
    :failed => Stats["failures"],
    :jobs_processed => Stats["processed"],
    :pending => queues.inject(0) {|m,o| m + size(o)}
  }
end

#job_results(queue, id = nil) ⇒ Object

Returns either the job results for a specific job (if id specified). If a job is not specified, it returns all the job results for the given queue.



201
202
203
204
205
206
207
# File 'lib/kthxbye.rb', line 201

def job_results(queue, id=nil)
  if id
    decode( redis.hget( "result-store:#{queue}", id ) )
  else
    Array( redis.hgetall( "result-store:#{queue}" ) )
  end
end

#keysObject

Returns a hash of all existing Redis keys.



82
83
84
# File 'lib/kthxbye.rb', line 82

def keys
  redis.keys("*")
end

#peek(store, queue, id = nil) ⇒ Object

This is a generic queue peek method. It isn’t used directly but is the basis for the “ghost” methods “data_peek” and “result_peek”. This method takes in a string representing a redis hash store (only two in kthxbye: “data-store” and “result-store”), a string representing a queue, and optionally a job id. If a job id is given, it will return the data for that job only. Otherwise it returns all the data for all jobs/results.



124
125
126
127
128
129
130
131
132
133
# File 'lib/kthxbye.rb', line 124

def peek(store, queue, id=nil)
  if id
    decode( redis.hget( "#{store}-store:#{queue}", id ) )
  else
    all = redis.hgetall( "#{store}-store:#{queue}" )
    results = {}
    all.each {|k,v| results[k] = decode( v ) }
    return results
  end
end

#queuesObject

Returns all the queues Kthxbye knows about



146
147
148
# File 'lib/kthxbye.rb', line 146

def queues
  redis.smembers( :queues ).sort
end

#redisObject

Returns the Redis instance for direct calls to the Redis db



74
75
76
77
78
79
# File 'lib/kthxbye.rb', line 74

def redis
  return @redis if @redis
  Config.setup
  self.connect
  self.redis
end

#register_queue(queue) ⇒ Object

This method takes a string and registers it as a queue in our “known queues” list



152
153
154
# File 'lib/kthxbye.rb', line 152

def register_queue(queue)
  redis.sadd(:queues, queue) unless redis.sismember(:queues, queue)
end

#salvage(q) ⇒ Object

This method is used mostly internally to pop the next job off of the given queue. It takes in a string representing a queue and will return a Kthxbye::Job object if a job exists on the queue. This is destructive on the queue as it will REMOVE the next job off queue and return the job object.



107
108
109
110
111
112
113
114
115
116
# File 'lib/kthxbye.rb', line 107

def salvage(q)
  id = redis.lpop( "queue:#{q}" ) 
  if id 
    payload = decode( redis.hget( "data-store:#{q}", id ) )
    return Job.new(id, q, payload)
  else
    log "No jobs found in #{q}"
    return nil
  end
end

#size(queue) ⇒ Object

Takes a string that represents a job queue. Returns the size of the given queue.



99
100
101
# File 'lib/kthxbye.rb', line 99

def size(queue)
  redis.llen("queue:#{queue}").to_i
end

#unregister_queue(queue) ⇒ Object

Removes the queue from the active queue listing, does not delete queue. This will lead to phantom queues. use delete_queue for complete removal of queue.



159
160
161
# File 'lib/kthxbye.rb', line 159

def unregister_queue(queue)
  redis.srem(:queues, queue)
end

#workersObject

Returns all workers registered with Kthxbye by the Kthxbye::Worker class. Special note: Workers are only registered once you call #run on the worker. You may also run #register_worker on the worker to manually register it, but this also occurs once the worker is run so there is no need to run this manually.



180
181
182
183
# File 'lib/kthxbye.rb', line 180

def workers
  workers = redis.smembers( :workers )
  workers.map {|x| Worker.find( x ) }
end

#workingObject

Returns all of the workers that are currently working a job. Also returns the job id and started time of the worker as a hash as follows:

[worker_id, {:job_id, :started}]


189
190
191
192
193
194
195
196
# File 'lib/kthxbye.rb', line 189

def working
  workers = redis.smembers( :working ) 
  data = []
  workers.each do |w_id|
    data << [w_id, decode( redis.get("worker:#{w_id}") )]
  end
  return data
end