Module: Kthxbye
- Extended by:
- Kthxbye
- Includes:
- Helper
- Included in:
- Kthxbye
- Defined in:
- lib/kthxbye.rb,
lib/kthxbye/job.rb,
lib/kthxbye/config.rb,
lib/kthxbye/helper.rb,
lib/kthxbye/worker.rb,
lib/kthxbye/failure.rb,
lib/kthxbye/railtie.rb,
lib/kthxbye/version.rb,
lib/kthxbye/exceptions.rb,
lib/kthxbye/web_interface.rb,
lib/generators/kthxbye/kthxbye_generator.rb
Overview
Rake tasks and integration to Rails require ‘kthxbye’
Defined Under Namespace
Modules: Config, Failure, Generators, Helper Classes: ActiveWorkerKilled, Job, Railtie, WebInterface, Worker
Constant Summary collapse
- Version =
Returns current version of Kthxbye
VERSION = "1.2.0"
Instance Method Summary collapse
-
#connect(redis_instance = nil) ⇒ Object
This is not necessary to call.
-
#delete_queue(queue) ⇒ Object
Completely removes queue: Unregisters it then deletes it should return true in all cases (including if we try to delete a non-existent queue).
-
#enqueue(queue, klass, *args) ⇒ Object
Queues jobs.
-
#inspect ⇒ Object
Returns a pretty inspect message about this instance of Kthxbye.
-
#job_results(queue, id = nil) ⇒ Object
Returns either the job results for a specific job (if id specified).
-
#keys ⇒ Object
Returns a hash of all existing Redis keys.
-
#method_missing(name, *args) ⇒ Object
:nodoc:.
-
#peek(store, queue, id = nil) ⇒ Object
This is a generic queue peek method.
-
#queues ⇒ Object
Returns all the queues Kthxbye knows about.
-
#redis ⇒ Object
Returns the Redis instance for direct calls to the Redis db.
-
#register_queue(queue) ⇒ Object
This method takes a string and registers it as a queue in our “known queues” list.
-
#salvage(q) ⇒ Object
This method is used mostly internally to pop the next job off of the given queue.
-
#size(queue) ⇒ Object
Takes a string that represents a job queue.
-
#unregister_queue(queue) ⇒ Object
Removes the queue from the active queue listing, does not delete queue.
-
#workers ⇒ Object
Returns all workers registered with Kthxbye by the Kthxbye::Worker class.
-
#working ⇒ Object
Returns all of the workers that are currently working a job.
Methods included from Helper
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(name, *args) ⇒ Object
:nodoc:
134 135 136 137 138 139 140 141 142 |
# File 'lib/kthxbye.rb', line 134 def method_missing(name, *args) #:nodoc: method_name = name.id2name if method_name =~ /^(data|result)_peek$/ Kthxbye.send(:peek, $1, *args) else super end end |
Instance Method Details
#connect(redis_instance = nil) ⇒ Object
This is not necessary to call. Any of the methods that use redis Will make an inital call to connect to redis. Useful if you want to connect to an existing redis instance. Othewise, if called without params, it simply connects a new instance of redis.
68 69 70 |
# File 'lib/kthxbye.rb', line 68 def connect( redis_instance=nil ) @redis = ( redis_instance || Redis.new( :host => Config.[:redis_server], :port => Config.[:redis_port] ) ) end |
#delete_queue(queue) ⇒ Object
Completely removes queue: Unregisters it then deletes it should return true in all cases (including if we try to delete a non-existent queue). Note: also deletes the data and result stores for this queue.
165 166 167 168 169 170 171 172 |
# File 'lib/kthxbye.rb', line 165 def delete_queue(queue) unregister_queue(queue) redis.del( "queue:#{queue}" ) redis.del( "data-store:#{queue}" ) redis.del( "result-store:#{queue}" ) true end |
#enqueue(queue, klass, *args) ⇒ Object
Queues jobs. Takes at minimum two paramters
1) A string representing a queue
2) The class of the job being queued.
You can optionally pass in additional params to the perform method within the class. You will need to match the number of args in the perform method when you queue the job. Otherwise this will throw an exception.
92 93 94 |
# File 'lib/kthxbye.rb', line 92 def enqueue(queue, klass, *args) Job.create(queue, klass, *args) end |
#inspect ⇒ Object
Returns a pretty inspect message about this instance of Kthxbye.
209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/kthxbye.rb', line 209 def inspect { :version => Version, :keys => keys.size, :workers => workers.size, :working => working.size, :queues => queues.size, :failed => Failure.count, :pending => queues.inject(0) {|m,o| m + size(o)} } end |
#job_results(queue, id = nil) ⇒ Object
Returns either the job results for a specific job (if id specified). If a job is not specified, it returns all the job results for the given queue.
200 201 202 203 204 205 206 |
# File 'lib/kthxbye.rb', line 200 def job_results(queue, id=nil) if id decode( redis.hget( "result-store:#{queue}", id ) ) else Array( redis.hgetall( "result-store:#{queue}" ) ) end end |
#keys ⇒ Object
Returns a hash of all existing Redis keys.
81 82 83 |
# File 'lib/kthxbye.rb', line 81 def keys redis.keys("*") end |
#peek(store, queue, id = nil) ⇒ Object
This is a generic queue peek method. It isn’t used directly but is the basis for the “ghost” methods “data_peek” and “result_peek”. This method takes in a string representing a redis hash store (only two in kthxbye: “data-store” and “result-store”), a string representing a queue, and optionally a job id. If a job id is given, it will return the data for that job only. Otherwise it returns all the data for all jobs/results.
123 124 125 126 127 128 129 130 131 132 |
# File 'lib/kthxbye.rb', line 123 def peek(store, queue, id=nil) if id decode( redis.hget( "#{store}-store:#{queue}", id ) ) else all = redis.hgetall( "#{store}-store:#{queue}" ) results = {} all.each {|k,v| results[k] = decode( v ) } return results end end |
#queues ⇒ Object
Returns all the queues Kthxbye knows about
145 146 147 |
# File 'lib/kthxbye.rb', line 145 def queues redis.smembers( :queues ).sort end |
#redis ⇒ Object
Returns the Redis instance for direct calls to the Redis db
73 74 75 76 77 78 |
# File 'lib/kthxbye.rb', line 73 def redis return @redis if @redis Config.setup self.connect self.redis end |
#register_queue(queue) ⇒ Object
This method takes a string and registers it as a queue in our “known queues” list
151 152 153 |
# File 'lib/kthxbye.rb', line 151 def register_queue(queue) redis.sadd(:queues, queue) unless redis.sismember(:queues, queue) end |
#salvage(q) ⇒ Object
This method is used mostly internally to pop the next job off of the given queue. It takes in a string representing a queue and will return a Kthxbye::Job object if a job exists on the queue. This is destructive on the queue as it will REMOVE the next job off queue and return the job object.
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/kthxbye.rb', line 106 def salvage(q) id = redis.lpop( "queue:#{q}" ) if id payload = decode( redis.hget( "data-store:#{q}", id ) ) return Job.new(id, q, payload) else log "No jobs found in #{q}" return nil end end |
#size(queue) ⇒ Object
Takes a string that represents a job queue. Returns the size of the given queue.
98 99 100 |
# File 'lib/kthxbye.rb', line 98 def size(queue) redis.llen("queue:#{queue}").to_i end |
#unregister_queue(queue) ⇒ Object
Removes the queue from the active queue listing, does not delete queue. This will lead to phantom queues. use delete_queue for complete removal of queue.
158 159 160 |
# File 'lib/kthxbye.rb', line 158 def unregister_queue(queue) redis.srem(:queues, queue) end |
#workers ⇒ Object
Returns all workers registered with Kthxbye by the Kthxbye::Worker class. Special note: Workers are only registered once you call #run on the worker. You may also run #register_worker on the worker to manually register it, but this also occurs once the worker is run so there is no need to run this manually.
179 180 181 182 |
# File 'lib/kthxbye.rb', line 179 def workers workers = redis.smembers( :workers ) workers.map {|x| Worker.find( x ) } end |
#working ⇒ Object
Returns all of the workers that are currently working a job. Also returns the job id and started time of the worker as a hash as follows:
[worker_id, {:job_id, :started}]
188 189 190 191 192 193 194 195 |
# File 'lib/kthxbye.rb', line 188 def working workers = redis.smembers( :working ) data = [] workers.each do |w_id| data << [w_id, decode( redis.get("worker:#{w_id}") )] end return data end |