Module: Kthxbye

Extended by:
Kthxbye
Includes:
Helper
Included in:
Kthxbye
Defined in:
lib/kthxbye.rb,
lib/kthxbye/job.rb,
lib/kthxbye/config.rb,
lib/kthxbye/helper.rb,
lib/kthxbye/worker.rb,
lib/kthxbye/failure.rb,
lib/kthxbye/railtie.rb,
lib/kthxbye/version.rb,
lib/kthxbye/exceptions.rb,
lib/kthxbye/web_interface.rb,
lib/generators/kthxbye/kthxbye_generator.rb

Overview

Rake tasks and integration to Rails require ‘kthxbye’

Defined Under Namespace

Modules: Config, Failure, Generators, Helper Classes: ActiveWorkerKilled, Job, Railtie, WebInterface, Worker

Constant Summary collapse

Version =

Returns current version of Kthxbye

VERSION = "1.2.0"

Instance Method Summary collapse

Methods included from Helper

#decode, #encode, #log

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args) ⇒ Object

:nodoc:



134
135
136
137
138
139
140
141
142
# File 'lib/kthxbye.rb', line 134

def method_missing(name, *args) #:nodoc:
  method_name = name.id2name
  if method_name =~ /^(data|result)_peek$/
    Kthxbye.send(:peek, $1, *args)
  else
    super
  end

end

Instance Method Details

#connect(redis_instance = nil) ⇒ Object

This is not necessary to call. Any of the methods that use redis Will make an inital call to connect to redis. Useful if you want to connect to an existing redis instance. Othewise, if called without params, it simply connects a new instance of redis.



68
69
70
# File 'lib/kthxbye.rb', line 68

def connect( redis_instance=nil )
  @redis = ( redis_instance || Redis.new( :host => Config.options[:redis_server], :port => Config.options[:redis_port] ) )
end

#delete_queue(queue) ⇒ Object

Completely removes queue: Unregisters it then deletes it should return true in all cases (including if we try to delete a non-existent queue). Note: also deletes the data and result stores for this queue.



165
166
167
168
169
170
171
172
# File 'lib/kthxbye.rb', line 165

def delete_queue(queue)
  unregister_queue(queue)
  redis.del( "queue:#{queue}" ) 
  redis.del( "data-store:#{queue}" ) 
  redis.del( "result-store:#{queue}" )

  true
end

#enqueue(queue, klass, *args) ⇒ Object

Queues jobs. Takes at minimum two paramters

1) A string representing a queue
2) The class of the job being queued.

You can optionally pass in additional params to the perform method within the class. You will need to match the number of args in the perform method when you queue the job. Otherwise this will throw an exception.



92
93
94
# File 'lib/kthxbye.rb', line 92

def enqueue(queue, klass, *args)
  Job.create(queue, klass, *args)    
end

#inspectObject

Returns a pretty inspect message about this instance of Kthxbye.



209
210
211
212
213
214
215
216
217
218
219
# File 'lib/kthxbye.rb', line 209

def inspect
  {
    :version => Version,
    :keys => keys.size,
    :workers => workers.size,
    :working => working.size,
    :queues => queues.size,
    :failed => Failure.count,
    :pending => queues.inject(0) {|m,o| m + size(o)}
  }
end

#job_results(queue, id = nil) ⇒ Object

Returns either the job results for a specific job (if id specified). If a job is not specified, it returns all the job results for the given queue.



200
201
202
203
204
205
206
# File 'lib/kthxbye.rb', line 200

def job_results(queue, id=nil)
  if id
    decode( redis.hget( "result-store:#{queue}", id ) )
  else
    Array( redis.hgetall( "result-store:#{queue}" ) )
  end
end

#keysObject

Returns a hash of all existing Redis keys.



81
82
83
# File 'lib/kthxbye.rb', line 81

def keys
  redis.keys("*")
end

#peek(store, queue, id = nil) ⇒ Object

This is a generic queue peek method. It isn’t used directly but is the basis for the “ghost” methods “data_peek” and “result_peek”. This method takes in a string representing a redis hash store (only two in kthxbye: “data-store” and “result-store”), a string representing a queue, and optionally a job id. If a job id is given, it will return the data for that job only. Otherwise it returns all the data for all jobs/results.



123
124
125
126
127
128
129
130
131
132
# File 'lib/kthxbye.rb', line 123

def peek(store, queue, id=nil)
  if id
    decode( redis.hget( "#{store}-store:#{queue}", id ) )
  else
    all = redis.hgetall( "#{store}-store:#{queue}" )
    results = {}
    all.each {|k,v| results[k] = decode( v ) }
    return results
  end
end

#queuesObject

Returns all the queues Kthxbye knows about



145
146
147
# File 'lib/kthxbye.rb', line 145

def queues
  redis.smembers( :queues ).sort
end

#redisObject

Returns the Redis instance for direct calls to the Redis db



73
74
75
76
77
78
# File 'lib/kthxbye.rb', line 73

def redis
  return @redis if @redis
  Config.setup
  self.connect
  self.redis
end

#register_queue(queue) ⇒ Object

This method takes a string and registers it as a queue in our “known queues” list



151
152
153
# File 'lib/kthxbye.rb', line 151

def register_queue(queue)
  redis.sadd(:queues, queue) unless redis.sismember(:queues, queue)
end

#salvage(q) ⇒ Object

This method is used mostly internally to pop the next job off of the given queue. It takes in a string representing a queue and will return a Kthxbye::Job object if a job exists on the queue. This is destructive on the queue as it will REMOVE the next job off queue and return the job object.



106
107
108
109
110
111
112
113
114
115
# File 'lib/kthxbye.rb', line 106

def salvage(q)
  id = redis.lpop( "queue:#{q}" ) 
  if id 
    payload = decode( redis.hget( "data-store:#{q}", id ) )
    return Job.new(id, q, payload)
  else
    log "No jobs found in #{q}"
    return nil
  end
end

#size(queue) ⇒ Object

Takes a string that represents a job queue. Returns the size of the given queue.



98
99
100
# File 'lib/kthxbye.rb', line 98

def size(queue)
  redis.llen("queue:#{queue}").to_i
end

#unregister_queue(queue) ⇒ Object

Removes the queue from the active queue listing, does not delete queue. This will lead to phantom queues. use delete_queue for complete removal of queue.



158
159
160
# File 'lib/kthxbye.rb', line 158

def unregister_queue(queue)
  redis.srem(:queues, queue)
end

#workersObject

Returns all workers registered with Kthxbye by the Kthxbye::Worker class. Special note: Workers are only registered once you call #run on the worker. You may also run #register_worker on the worker to manually register it, but this also occurs once the worker is run so there is no need to run this manually.



179
180
181
182
# File 'lib/kthxbye.rb', line 179

def workers
  workers = redis.smembers( :workers )
  workers.map {|x| Worker.find( x ) }
end

#workingObject

Returns all of the workers that are currently working a job. Also returns the job id and started time of the worker as a hash as follows:

[worker_id, {:job_id, :started}]


188
189
190
191
192
193
194
195
# File 'lib/kthxbye.rb', line 188

def working
  workers = redis.smembers( :working ) 
  data = []
  workers.each do |w_id|
    data << [w_id, decode( redis.get("worker:#{w_id}") )]
  end
  return data
end