Module: Kthxbye
- Extended by:
- Kthxbye
- Includes:
- Helper
- Included in:
- Kthxbye
- Defined in:
- lib/kthxbye.rb,
lib/kthxbye/job.rb,
lib/kthxbye/stats.rb,
lib/kthxbye/config.rb,
lib/kthxbye/helper.rb,
lib/kthxbye/worker.rb,
lib/kthxbye/failure.rb,
lib/kthxbye/railtie.rb,
lib/kthxbye/version.rb,
lib/kthxbye/exceptions.rb,
lib/kthxbye/web_interface.rb,
lib/generators/kthxbye/kthxbye_generator.rb
Overview
Rake tasks and integration to Rails require ‘kthxbye’
Defined Under Namespace
Modules: Config, Failure, Generators, Helper, Stats Classes: ActiveWorkerKilled, Job, Railtie, WebInterface, Worker
Constant Summary collapse
- Version =
Returns current version of Kthxbye
VERSION = "1.3.2"
Instance Method Summary collapse
-
#connect(redis_instance = nil) ⇒ Object
This is not necessary to call.
-
#delete_queue(queue) ⇒ Object
Completely removes queue: Unregisters it then deletes it should return true in all cases (including if we try to delete a non-existent queue).
-
#enqueue(queue, klass, *args) ⇒ Object
Queues jobs.
-
#inspect ⇒ Object
Returns a pretty inspect message about this instance of Kthxbye.
-
#job_results(queue, id = nil) ⇒ Object
Returns either the job results for a specific job (if id specified).
-
#keys ⇒ Object
Returns a hash of all existing Redis keys.
-
#method_missing(name, *args) ⇒ Object
:nodoc:.
-
#peek(store, queue, id = nil) ⇒ Object
This is a generic queue peek method.
-
#queues ⇒ Object
Returns all the queues Kthxbye knows about.
-
#redis ⇒ Object
Returns the Redis instance for direct calls to the Redis db.
-
#register_queue(queue) ⇒ Object
This method takes a string and registers it as a queue in our “known queues” list.
-
#salvage(q) ⇒ Object
This method is used mostly internally to pop the next job off of the given queue.
-
#size(queue) ⇒ Object
Takes a string that represents a job queue.
-
#unregister_queue(queue) ⇒ Object
Removes the queue from the active queue listing, does not delete queue.
-
#workers ⇒ Object
Returns all workers registered with Kthxbye by the Kthxbye::Worker class.
-
#working ⇒ Object
Returns all of the workers that are currently working a job.
Methods included from Helper
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(name, *args) ⇒ Object
:nodoc:
135 136 137 138 139 140 141 142 143 |
# File 'lib/kthxbye.rb', line 135 def method_missing(name, *args) #:nodoc: method_name = name.id2name if method_name =~ /^(data|result)_peek$/ Kthxbye.send(:peek, $1, *args) else super end end |
Instance Method Details
#connect(redis_instance = nil) ⇒ Object
This is not necessary to call. Any of the methods that use redis Will make an inital call to connect to redis. Useful if you want to connect to an existing redis instance. Othewise, if called without params, it simply connects a new instance of redis.
69 70 71 |
# File 'lib/kthxbye.rb', line 69 def connect( redis_instance=nil ) @redis = ( redis_instance || Redis.new( :host => Config.[:redis_server], :port => Config.[:redis_port] ) ) end |
#delete_queue(queue) ⇒ Object
Completely removes queue: Unregisters it then deletes it should return true in all cases (including if we try to delete a non-existent queue). Note: also deletes the data and result stores for this queue.
166 167 168 169 170 171 172 173 |
# File 'lib/kthxbye.rb', line 166 def delete_queue(queue) unregister_queue(queue) redis.del( "queue:#{queue}" ) redis.del( "data-store:#{queue}" ) redis.del( "result-store:#{queue}" ) true end |
#enqueue(queue, klass, *args) ⇒ Object
Queues jobs. Takes at minimum two paramters
1) A string representing a queue
2) The class of the job being queued.
You can optionally pass in additional params to the perform method within the class. You will need to match the number of args in the perform method when you queue the job. Otherwise this will throw an exception.
93 94 95 |
# File 'lib/kthxbye.rb', line 93 def enqueue(queue, klass, *args) Job.create(queue, klass, *args) end |
#inspect ⇒ Object
Returns a pretty inspect message about this instance of Kthxbye.
210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/kthxbye.rb', line 210 def inspect { :version => Version, :keys => keys.size, :workers => workers.size, :working => working.size, :queues => queues.size, :failed => Stats["failures"], :jobs_processed => Stats["processed"], :pending => queues.inject(0) {|m,o| m + size(o)} } end |
#job_results(queue, id = nil) ⇒ Object
Returns either the job results for a specific job (if id specified). If a job is not specified, it returns all the job results for the given queue.
201 202 203 204 205 206 207 |
# File 'lib/kthxbye.rb', line 201 def job_results(queue, id=nil) if id decode( redis.hget( "result-store:#{queue}", id ) ) else Array( redis.hgetall( "result-store:#{queue}" ) ) end end |
#keys ⇒ Object
Returns a hash of all existing Redis keys.
82 83 84 |
# File 'lib/kthxbye.rb', line 82 def keys redis.keys("*") end |
#peek(store, queue, id = nil) ⇒ Object
This is a generic queue peek method. It isn’t used directly but is the basis for the “ghost” methods “data_peek” and “result_peek”. This method takes in a string representing a redis hash store (only two in kthxbye: “data-store” and “result-store”), a string representing a queue, and optionally a job id. If a job id is given, it will return the data for that job only. Otherwise it returns all the data for all jobs/results.
124 125 126 127 128 129 130 131 132 133 |
# File 'lib/kthxbye.rb', line 124 def peek(store, queue, id=nil) if id decode( redis.hget( "#{store}-store:#{queue}", id ) ) else all = redis.hgetall( "#{store}-store:#{queue}" ) results = {} all.each {|k,v| results[k] = decode( v ) } return results end end |
#queues ⇒ Object
Returns all the queues Kthxbye knows about
146 147 148 |
# File 'lib/kthxbye.rb', line 146 def queues redis.smembers( :queues ).sort end |
#redis ⇒ Object
Returns the Redis instance for direct calls to the Redis db
74 75 76 77 78 79 |
# File 'lib/kthxbye.rb', line 74 def redis return @redis if @redis Config.setup self.connect self.redis end |
#register_queue(queue) ⇒ Object
This method takes a string and registers it as a queue in our “known queues” list
152 153 154 |
# File 'lib/kthxbye.rb', line 152 def register_queue(queue) redis.sadd(:queues, queue) unless redis.sismember(:queues, queue) end |
#salvage(q) ⇒ Object
This method is used mostly internally to pop the next job off of the given queue. It takes in a string representing a queue and will return a Kthxbye::Job object if a job exists on the queue. This is destructive on the queue as it will REMOVE the next job off queue and return the job object.
107 108 109 110 111 112 113 114 115 116 |
# File 'lib/kthxbye.rb', line 107 def salvage(q) id = redis.lpop( "queue:#{q}" ) if id payload = decode( redis.hget( "data-store:#{q}", id ) ) return Job.new(id, q, payload) else log "No jobs found in #{q}" return nil end end |
#size(queue) ⇒ Object
Takes a string that represents a job queue. Returns the size of the given queue.
99 100 101 |
# File 'lib/kthxbye.rb', line 99 def size(queue) redis.llen("queue:#{queue}").to_i end |
#unregister_queue(queue) ⇒ Object
Removes the queue from the active queue listing, does not delete queue. This will lead to phantom queues. use delete_queue for complete removal of queue.
159 160 161 |
# File 'lib/kthxbye.rb', line 159 def unregister_queue(queue) redis.srem(:queues, queue) end |
#workers ⇒ Object
Returns all workers registered with Kthxbye by the Kthxbye::Worker class. Special note: Workers are only registered once you call #run on the worker. You may also run #register_worker on the worker to manually register it, but this also occurs once the worker is run so there is no need to run this manually.
180 181 182 183 |
# File 'lib/kthxbye.rb', line 180 def workers workers = redis.smembers( :workers ) workers.map {|x| Worker.find( x ) } end |
#working ⇒ Object
Returns all of the workers that are currently working a job. Also returns the job id and started time of the worker as a hash as follows:
[worker_id, {:job_id, :started}]
189 190 191 192 193 194 195 196 |
# File 'lib/kthxbye.rb', line 189 def working workers = redis.smembers( :working ) data = [] workers.each do |w_id| data << [w_id, decode( redis.get("worker:#{w_id}") )] end return data end |